Files
quibot/raspi/motion.py
2026-06-18 13:45:32 +02:00

620 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
motion.py — Control de motors (rodes, braços, xeringa), sensor de distància
VL53L0X i seguidor de línia TCRT5000 via ADS1115.
Equivalent a motion.cpp del codi Arduino/ESP32.
"""
import math
import time
import threading
import pigpio
import adafruit_extended_bus
import adafruit_vl53l0x
import adafruit_ads1x15.ads1115 as ADS
from adafruit_ads1x15.analog_in import AnalogIn
from pins import (
STEP_R_W, DIR_R_W, STEP_L_W, DIR_L_W, EN_W,
STEP_R_A, DIR_R_A, STEP_L_A, DIR_L_A, EN_A,
STEP_SY, DIR_SY, EN_SERVO,
END_SY, END_RA, END_LA,
)
# ==================
# Constants
# ==================
CW = True
CCW = False
ON = 0 # A4988/TB6600: enable actiu en LOW
OFF = 1
TAKE = True
LEAVE = False
# Tipus de resposta del seguidor de línia
CLEAR = 0
CROSSING = 1
OBJECT = 2
# Noms de tasques (equivalent a les constants string del C++)
MOVE_TO_CROSSING = "Move to crossing"
TURN_90_CW = "Turn 90 CW"
TURN_90_CCW = "Turn 90 CCW"
MOVE_TO_OBJECT = "Move to object"
TAKE_SOMETHING = "Take something"
LEAVE_SOMETHING = "Leave something"
DO_NOTHING = "Do nothing"
# Paràmetres de moviment
WHEELS_MAX_SPEED = 130.0 # steps/s
WHEELS_ACCEL = 190.0 # steps/s²
ARMS_MAX_SPEED = 250.0 # steps/s
ARMS_ACCEL = 125.0 # steps/s²
SYRINGE_MAX_SPEED = 800.0 # steps/s
SYRINGE_ACCEL = 500.0 # steps/s²
WHEEL_MECH_REDUCTION = 5
WHEEL_STEPS_PER_REVOLUTION = 200 * WHEEL_MECH_REDUCTION # 1000 passos/volta de roda
MM_TO_CROSSING_CENTER = 62 # mm des del creuament detectat fins al centre
MM_TO_OBJECT = 20 # mm addicionals un cop detectat l'objecte
# Llindar de negre per ADS1115 (GAIN_ONE ±4.096V, single-ended 026400 per a 3.3V).
# Equivalent a 1500/4095 de l'ESP32 de 12 bits → ~9700 en ADS1115.
BLACK_THRESHOLD = 9700
# Llindar d'error de rotació (equivalent a 20/4095 de l'ESP32 → ~130 en ADS1115).
ROTATION_ERROR_THRESHOLD = 130
# Posicions dels braços en passos des del home
ARM_LOWER_POSITION = 120
ARM_L_UPPER_POSITION = 900
ARM_R_UPPER_POSITION = 550
# Xeringa: 10 rev * 200 passos/rev * microstepping x4 = 8000 passos estesa del tot
_SY_FULL_EXTENDED_STEPS = 10 * 200 * 4
LINE_FOLLOWER_FREQ = 100 # Hz
LINE_FOLLOWER_PERIOD = 1.0 / LINE_FOLLOWER_FREQ # s
# ==================
# Classe Stepper
# ==================
class Stepper:
"""
Motor pas a pas en mode DRIVER (STEP/DIR).
Equivalent a AccelStepper(DRIVER, step_pin, dir_pin).
Genera polsos STEP via pigpio.gpio_trigger().
"""
PULSE_US = 10 # Amplada del pols STEP en µs (A4988 requereix ≥1µs)
def __init__(self, pi: pigpio.pi, step_pin: int, dir_pin: int):
self._pi = pi
self._step_pin = step_pin
self._dir_pin = dir_pin
self._pos = 0 # posició actual (passos)
self._target = 0 # posició objectiu
self._speed = 0.0 # velocitat actual (passos/s, signada)
self._max_speed = 1.0
self._accel = 1.0
self._last_step_us = self._now_us()
self._step_interval_us = 0 # 0 = aturat
pi.set_mode(step_pin, pigpio.OUTPUT)
pi.set_mode(dir_pin, pigpio.OUTPUT)
pi.write(step_pin, 0)
pi.write(dir_pin, 0)
@staticmethod
def _now_us() -> int:
return time.monotonic_ns() // 1000
def set_max_speed(self, speed: float):
self._max_speed = abs(speed)
def set_acceleration(self, accel: float):
self._accel = abs(accel)
def move_to(self, position: int):
self._target = int(position)
def move(self, relative: int):
self._target = self._pos + int(relative)
def set_current_position(self, pos: int):
self._pos = int(pos)
self._target = int(pos)
self._speed = 0.0
self._step_interval_us = 0
def current_position(self) -> int:
return self._pos
def distance_to_go(self) -> int:
return self._target - self._pos
def is_running(self) -> bool:
return self._target != self._pos
def stop(self):
self._target = self._pos
self._speed = 0.0
self._step_interval_us = 0
def set_speed(self, speed: float):
"""Estableix velocitat constant per a run_speed()."""
self._speed = float(speed)
self._step_interval_us = int(1_000_000 / abs(speed)) if speed != 0.0 else 0
def _do_step(self, direction: int):
self._pi.write(self._dir_pin, 1 if direction > 0 else 0)
self._pi.gpio_trigger(self._step_pin, self.PULSE_US, 1)
self._pos += direction
def run_speed(self) -> bool:
"""Fa un pas a velocitat constant. No bloquejant — cridar des del bucle de steppers."""
if self._step_interval_us == 0:
return False
now = self._now_us()
if now - self._last_step_us >= self._step_interval_us:
self._do_step(1 if self._speed > 0 else -1)
self._last_step_us = now
return True
return False
def run(self) -> bool:
"""
Fa un pas cap a _target amb acceleració/desacceleració.
No bloquejant — cridar des del bucle de steppers.
Implementa l'algorisme de AccelStepper: Δv = accel / v per pas.
"""
dtg = self.distance_to_go()
if dtg == 0:
self._speed = 0.0
self._step_interval_us = 0
return False
abs_speed = abs(self._speed)
if abs_speed < 1.0:
abs_speed = math.sqrt(self._accel / 2.0) # velocitat inicial AccelStepper
now = self._now_us()
if now - self._last_step_us < int(1_000_000 / abs_speed):
return False # no és hora del proper pas
# Actualitza la velocitat per al proper pas
direction = 1 if dtg > 0 else -1
stop_dist = (abs_speed ** 2) / (2.0 * self._accel) if self._accel > 0 else 0
if abs(dtg) <= max(stop_dist, 1):
new_speed = abs_speed - (self._accel / abs_speed)
new_speed = max(new_speed, 1.0)
else:
new_speed = abs_speed + (self._accel / abs_speed)
new_speed = min(new_speed, self._max_speed)
self._speed = new_speed * direction
self._do_step(direction)
self._last_step_us = now
return True
# ==================
# Instàncies globals
# ==================
_pi: pigpio.pi = None
_i2c = None
_dist_sensor = None
_ads = None
_chan_r: AnalogIn = None
_chan_l: AnalogIn = None
wheel_R: Stepper = None
wheel_L: Stepper = None
arm_R: Stepper = None
arm_L: Stepper = None
syringe: Stepper = None
wheels_speed_mode: bool = False # True → run_speed(), False → run() (posició)
_stepper_stop = threading.Event()
_stepper_thread: threading.Thread = None
# ==================
# Setup i cleanup
# ==================
def motion_setup_steppers(pi: pigpio.pi):
"""
Inicialitza GPIOs i steppers sense els sensors I2C.
Útil per a tests de motors quan VL53L0X/ADS1115 no estan connectats.
"""
global _pi, wheel_R, wheel_L, arm_R, arm_L, syringe, _stepper_thread
_pi = pi
for pin in (EN_W, EN_A, EN_SERVO):
pi.set_mode(pin, pigpio.OUTPUT)
for pin in (END_LA, END_RA, END_SY):
pi.set_mode(pin, pigpio.INPUT)
pi.set_pull_up_down(pin, pigpio.PUD_UP) # Hall actiu-baix, pull-up intern
enable_arms(OFF)
enable_wheels(OFF)
wheel_R = Stepper(pi, STEP_R_W, DIR_R_W)
wheel_L = Stepper(pi, STEP_L_W, DIR_L_W)
arm_R = Stepper(pi, STEP_R_A, DIR_R_A)
arm_L = Stepper(pi, STEP_L_A, DIR_L_A)
syringe = Stepper(pi, STEP_SY, DIR_SY)
wheel_R.set_max_speed(WHEELS_MAX_SPEED); wheel_R.set_acceleration(WHEELS_ACCEL)
wheel_L.set_max_speed(WHEELS_MAX_SPEED); wheel_L.set_acceleration(WHEELS_ACCEL)
arm_R.set_max_speed(ARMS_MAX_SPEED); arm_R.set_acceleration(ARMS_ACCEL)
arm_L.set_max_speed(ARMS_MAX_SPEED); arm_L.set_acceleration(ARMS_ACCEL)
syringe.set_max_speed(SYRINGE_MAX_SPEED); syringe.set_acceleration(SYRINGE_ACCEL)
_stepper_stop.clear()
_stepper_thread = threading.Thread(target=_stepper_loop, daemon=True, name="steppers")
_stepper_thread.start()
def motion_setup_sensors(pi: pigpio.pi):
"""
Inicialitza únicament els sensors I2C (VL53L0X, ADS1115). Sense steppers.
Útil per a tests de sensors quan els motors no estan connectats.
"""
global _pi, _i2c, _dist_sensor, _ads, _chan_r, _chan_l
_pi = pi
_i2c = adafruit_extended_bus.ExtendedI2C(3)
_dist_sensor = adafruit_vl53l0x.VL53L0X(_i2c)
_ads = ADS.ADS1115(_i2c)
_chan_r = AnalogIn(_ads, ADS.P0)
_chan_l = AnalogIn(_ads, ADS.P1)
def motion_setup(pi: pigpio.pi):
"""
Inicialitza GPIOs, steppers, sensor de distància VL53L0X,
ADC ADS1115 per als sensors de línia, i arrenca el bucle de steppers.
Requereix /boot/config.txt: dtoverlay=i2c-gpio,bus=3,i2c_gpio_sda=2,i2c_gpio_scl=1
"""
motion_setup_steppers(pi)
motion_setup_sensors(pi)
def motion_cleanup():
"""Atura el bucle de steppers i desactiva tots els motors."""
_stepper_stop.set()
if _stepper_thread:
_stepper_thread.join(timeout=1.0)
enable_wheels(OFF)
enable_arms(OFF)
enable_syringe(OFF)
# ==================
# Enable / disable
# ==================
def enable_wheels(state: bool):
_pi.write(EN_W, state)
def enable_arms(state: bool):
_pi.write(EN_A, state)
def enable_syringe(state: bool):
_pi.write(EN_SERVO, state)
def is_endstop_detecting(pin: int) -> bool:
return not _pi.read(pin) # efecte Hall actiu en baix
# ==================
# Helpers de moviment
# ==================
def mm_to_steps(mm: int) -> int:
# Perímetre roda Ø152mm = 2·π·76 ≈ 477mm
return (mm * WHEEL_STEPS_PER_REVOLUTION * 2) // 1000
def wheels_set_position(position: int = 0):
wheel_L.set_current_position(position)
wheel_R.set_current_position(position)
def wheels_set_speed(speed: float, rotate: bool = False, direction: bool = CW):
if rotate:
wheel_L.set_speed( speed if direction == CW else -speed)
wheel_R.set_speed(-speed if direction == CW else speed)
else:
wheel_L.set_speed(speed)
wheel_R.set_speed(speed)
def move_arms_to(position: int):
arm_L.move_to(position)
arm_R.move_to(position)
while arm_L.distance_to_go() != 0 and arm_R.distance_to_go() != 0:
time.sleep(0.1)
def move_arms_up():
arm_L.move_to(ARM_L_UPPER_POSITION)
arm_R.move_to(ARM_R_UPPER_POSITION)
while arm_L.distance_to_go() != 0 and arm_R.distance_to_go() != 0:
time.sleep(0.1)
def move_wheels_to(position: int, invert: bool = False):
wheel_L.move_to(position)
wheel_R.move_to(-position if invert else position)
while wheel_L.is_running() and wheel_R.is_running():
time.sleep(0.1)
# ==================
# Homing
# ==================
def arms_home():
"""Cicle de homing dels braços (bloquejant). El bucle de steppers fa el moviment."""
enable_arms(ON)
arm_L.move(-1250)
arm_R.move(-1250)
while True:
if is_endstop_detecting(END_LA):
arm_L.stop()
if is_endstop_detecting(END_RA):
arm_R.stop()
l_done = is_endstop_detecting(END_LA) or arm_L.distance_to_go() == 0
r_done = is_endstop_detecting(END_RA) or arm_R.distance_to_go() == 0
if l_done and r_done:
break
time.sleep(0.005)
arm_L.set_current_position(0)
arm_R.set_current_position(0)
arm_L.move(ARM_L_UPPER_POSITION)
arm_R.move(ARM_R_UPPER_POSITION)
while arm_L.distance_to_go() != 0 or arm_R.distance_to_go() != 0:
time.sleep(0.01)
def syringe_home():
"""Cicle de homing de la xeringa (bloquejant). El bucle de steppers fa el moviment."""
enable_syringe(ON)
syringe.move(-11000)
while not is_endstop_detecting(END_SY):
if syringe.distance_to_go() == 0:
break
time.sleep(0.005)
syringe.stop()
syringe.set_current_position(0)
enable_syringe(OFF)
# ==================
# Sensor de distància
# ==================
def distance_to_object() -> int:
"""Retorna la distància en mm a l'objecte més proper. 65535 si fora de rang."""
try:
return _dist_sensor.range
except Exception:
return 65535
# ==================
# Seguidor de línia
# ==================
def _read_line_sensors() -> tuple:
"""Llegeix sensors de línia via ADS1115. Retorna (dreta, esquerra), 032767."""
return _chan_r.value, _chan_l.value
def compute_new_speed(speed: float) -> float:
accel = WHEELS_ACCEL / LINE_FOLLOWER_FREQ
return min(speed + accel, WHEELS_MAX_SPEED)
def follow_line_loop(speed: float, forward: bool = True) -> int:
"""
Seguidor de línia no bloquejant. Retorna CLEAR, CROSSING o OBJECT.
Si CLEAR, aplica correcció proporcional a les velocitats de les rodes.
"""
distance_threshold = 50 # mm
lf_r, lf_l = _read_line_sensors()
if lf_l > BLACK_THRESHOLD and lf_r > BLACK_THRESHOLD and forward:
return CROSSING
elif distance_to_object() < distance_threshold and forward:
return OBJECT
else:
p_factor = 5
error = lf_r - lf_l
correction_r = error // p_factor
correction_l = -correction_r
wheel_L.set_speed((speed + correction_r) if forward else (-speed + correction_r))
wheel_R.set_speed((speed + correction_l) if forward else (-speed + correction_l))
return CLEAR
def run_to_crossing_center(speed: float) -> float:
"""
Avança per centrar el robot sobre el creuament detectat.
Retorna la nova velocitat (reduïda a la meitat).
"""
wheels_set_position(0 - mm_to_steps(MM_TO_CROSSING_CENTER))
while follow_line_loop(speed) == CROSSING:
wheels_set_speed(speed)
time.sleep(0.1)
speed /= 2
while wheel_L.current_position() < 0 and wheel_R.current_position() < 0:
wheels_set_speed(speed)
time.sleep(0.1)
wheels_set_speed(0)
return speed
# ==================
# Bucle de steppers (thread permanent)
# ==================
def _stepper_loop():
"""
Thread que genera els polsos STEP per a tots els motors.
Equivalent a task_update_steppers() del FreeRTOS.
S'inicia automàticament a motion_setup().
"""
while not _stepper_stop.is_set():
if wheels_speed_mode:
wheel_R.run_speed()
wheel_L.run_speed()
else:
wheel_R.run()
wheel_L.run()
arm_R.run()
arm_L.run()
syringe.run()
# ==================
# Tasques (FreeRTOS tasks → funcions bloquejants, cridades des de threads)
# ==================
def task_move_to(expected_target: int):
"""
Segueix la línia fins arribar a expected_target (CROSSING o OBJECT).
Equivalent a task_move_to() del FreeRTOS.
"""
global wheels_speed_mode
speed = 0.0
wheels_speed_mode = True
enable_wheels(ON)
lf_response = CLEAR
while True:
speed = compute_new_speed(speed)
lf_response = follow_line_loop(speed)
if lf_response != CLEAR:
break
time.sleep(LINE_FOLLOWER_PERIOD)
if lf_response == CROSSING and expected_target == CROSSING:
run_to_crossing_center(speed)
elif lf_response == OBJECT and expected_target == OBJECT:
wheel_L.stop()
wheel_R.stop()
wheels_speed_mode = False
enable_wheels(OFF)
def task_rotate(direction: bool):
"""
Gira el robot 90° (CW o CCW).
Fase 1: acceleració fins al 90% dels passos de rotació.
Fase 2: desacceleració i ajust fi sobre la línia via sensors analògics.
"""
global wheels_speed_mode
# Arc 90° a Ø250mm → 2·π·125/4 ≈ 196mm; roda Ø152mm → 196/477 ≈ 0.41 voltes
ROTATION_STEPS = (WHEEL_STEPS_PER_REVOLUTION * 42) // 100
positive_wheel = wheel_L if direction == CW else wheel_R
speed = 0.0
wheels_speed_mode = True
enable_wheels(ON)
wheels_set_position(0)
while positive_wheel.current_position() < (ROTATION_STEPS * 90) // 100:
speed = compute_new_speed(speed)
wheels_set_speed(speed, rotate=True, direction=direction)
time.sleep(0.01)
speed /= 2
while True:
lf_r, lf_l = _read_line_sensors()
error = abs(lf_l - lf_r)
if error < ROTATION_ERROR_THRESHOLD or \
positive_wheel.current_position() > (ROTATION_STEPS * 110) // 100:
break
wheels_set_speed(speed - (speed / error), rotate=True, direction=direction)
time.sleep(0.01)
wheels_set_speed(0)
enable_wheels(OFF)
wheels_speed_mode = False
def task_take_or_leave_something(take: bool):
"""
Avança fins a l'objecte, baixa braços, opera la xeringa i torna al creuament.
take=True → xucla; take=False → buida.
"""
global wheels_speed_mode
speed = 0.0
wheels_speed_mode = True
enable_wheels(ON)
wheels_set_position(0) # guardem la posició home per tornar-hi
lf_response = CLEAR
while True:
speed = compute_new_speed(speed)
lf_response = follow_line_loop(speed)
if lf_response != CLEAR:
break
time.sleep(LINE_FOLLOWER_PERIOD)
if lf_response == OBJECT:
target_l = wheel_L.current_position() + mm_to_steps(MM_TO_OBJECT)
target_r = wheel_R.current_position() + mm_to_steps(MM_TO_OBJECT)
while wheel_L.current_position() < target_l and wheel_R.current_position() < target_r:
wheels_set_speed(speed / 2)
time.sleep(0.1)
wheels_set_speed(0)
enable_wheels(OFF)
enable_arms(ON)
move_arms_to(ARM_LOWER_POSITION)
enable_syringe(ON)
syringe.move_to(_SY_FULL_EXTENDED_STEPS if take else 0)
while syringe.distance_to_go() != 0:
if not take and is_endstop_detecting(END_SY):
syringe.stop()
syringe.set_current_position(0)
break
time.sleep(0.1)
enable_syringe(OFF)
move_arms_up()
enable_wheels(ON)
wheels_speed_mode = False
wheels_set_speed(WHEELS_MAX_SPEED)
move_wheels_to(0)
elif lf_response == CROSSING:
run_to_crossing_center(speed)
wheels_speed_mode = False
enable_wheels(OFF)
def task_idle():
"""Pausa breu fins que quibot.py assigni una nova tasca."""
time.sleep(0.5)