""" motion.py — Control de motors (rodes, braços, xeringa), sensor de distància VL53L0X i seguidor de línia TCRT5000 via ADS1115. Equivalent a motion.cpp del codi Arduino/ESP32. """ import math import time import threading import pigpio import adafruit_extended_bus import adafruit_vl53l0x import adafruit_ads1x15.ads1115 as ADS from adafruit_ads1x15.analog_in import AnalogIn from pins import ( STEP_R_W, DIR_R_W, STEP_L_W, DIR_L_W, EN_W, STEP_R_A, DIR_R_A, STEP_L_A, DIR_L_A, EN_A, STEP_SY, DIR_SY, EN_SERVO, END_SY, END_RA, END_LA, ) # ================== # Constants # ================== CW = True CCW = False ON = 0 # A4988/TB6600: enable actiu en LOW OFF = 1 TAKE = True LEAVE = False # Tipus de resposta del seguidor de línia CLEAR = 0 CROSSING = 1 OBJECT = 2 # Noms de tasques (equivalent a les constants string del C++) MOVE_TO_CROSSING = "Move to crossing" TURN_90_CW = "Turn 90 CW" TURN_90_CCW = "Turn 90 CCW" MOVE_TO_OBJECT = "Move to object" TAKE_SOMETHING = "Take something" LEAVE_SOMETHING = "Leave something" DO_NOTHING = "Do nothing" # Paràmetres de moviment WHEELS_MAX_SPEED = 130.0 # steps/s WHEELS_ACCEL = 190.0 # steps/s² ARMS_MAX_SPEED = 250.0 # steps/s ARMS_ACCEL = 125.0 # steps/s² SYRINGE_MAX_SPEED = 800.0 # steps/s SYRINGE_ACCEL = 500.0 # steps/s² WHEEL_MECH_REDUCTION = 5 WHEEL_STEPS_PER_REVOLUTION = 200 * WHEEL_MECH_REDUCTION # 1000 passos/volta de roda MM_TO_CROSSING_CENTER = 62 # mm des del creuament detectat fins al centre MM_TO_OBJECT = 20 # mm addicionals un cop detectat l'objecte # Llindar de negre per ADS1115 (GAIN_ONE ±4.096V, single-ended 0–26400 per a 3.3V). # Equivalent a 1500/4095 de l'ESP32 de 12 bits → ~9700 en ADS1115. BLACK_THRESHOLD = 9700 # Llindar d'error de rotació (equivalent a 20/4095 de l'ESP32 → ~130 en ADS1115). ROTATION_ERROR_THRESHOLD = 130 # Posicions dels braços en passos des del home ARM_LOWER_POSITION = 120 ARM_L_UPPER_POSITION = 900 ARM_R_UPPER_POSITION = 550 # Xeringa: 10 rev * 200 passos/rev * microstepping x4 = 8000 passos estesa del tot _SY_FULL_EXTENDED_STEPS = 10 * 200 * 4 LINE_FOLLOWER_FREQ = 100 # Hz LINE_FOLLOWER_PERIOD = 1.0 / LINE_FOLLOWER_FREQ # s # ================== # Classe Stepper # ================== class Stepper: """ Motor pas a pas en mode DRIVER (STEP/DIR). Equivalent a AccelStepper(DRIVER, step_pin, dir_pin). Genera polsos STEP via pigpio.gpio_trigger(). """ PULSE_US = 10 # Amplada del pols STEP en µs (A4988 requereix ≥1µs) def __init__(self, pi: pigpio.pi, step_pin: int, dir_pin: int): self._pi = pi self._step_pin = step_pin self._dir_pin = dir_pin self._pos = 0 # posició actual (passos) self._target = 0 # posició objectiu self._speed = 0.0 # velocitat actual (passos/s, signada) self._max_speed = 1.0 self._accel = 1.0 self._last_step_us = self._now_us() self._step_interval_us = 0 # 0 = aturat pi.set_mode(step_pin, pigpio.OUTPUT) pi.set_mode(dir_pin, pigpio.OUTPUT) pi.write(step_pin, 0) pi.write(dir_pin, 0) @staticmethod def _now_us() -> int: return time.monotonic_ns() // 1000 def set_max_speed(self, speed: float): self._max_speed = abs(speed) def set_acceleration(self, accel: float): self._accel = abs(accel) def move_to(self, position: int): self._target = int(position) def move(self, relative: int): self._target = self._pos + int(relative) def set_current_position(self, pos: int): self._pos = int(pos) self._target = int(pos) self._speed = 0.0 self._step_interval_us = 0 def current_position(self) -> int: return self._pos def distance_to_go(self) -> int: return self._target - self._pos def is_running(self) -> bool: return self._target != self._pos def stop(self): self._target = self._pos self._speed = 0.0 self._step_interval_us = 0 def set_speed(self, speed: float): """Estableix velocitat constant per a run_speed().""" self._speed = float(speed) self._step_interval_us = int(1_000_000 / abs(speed)) if speed != 0.0 else 0 def _do_step(self, direction: int): self._pi.write(self._dir_pin, 1 if direction > 0 else 0) self._pi.gpio_trigger(self._step_pin, self.PULSE_US, 1) self._pos += direction def run_speed(self) -> bool: """Fa un pas a velocitat constant. No bloquejant — cridar des del bucle de steppers.""" if self._step_interval_us == 0: return False now = self._now_us() if now - self._last_step_us >= self._step_interval_us: self._do_step(1 if self._speed > 0 else -1) self._last_step_us = now return True return False def run(self) -> bool: """ Fa un pas cap a _target amb acceleració/desacceleració. No bloquejant — cridar des del bucle de steppers. Implementa l'algorisme de AccelStepper: Δv = accel / v per pas. """ dtg = self.distance_to_go() if dtg == 0: self._speed = 0.0 self._step_interval_us = 0 return False abs_speed = abs(self._speed) if abs_speed < 1.0: abs_speed = math.sqrt(self._accel / 2.0) # velocitat inicial AccelStepper now = self._now_us() if now - self._last_step_us < int(1_000_000 / abs_speed): return False # no és hora del proper pas # Actualitza la velocitat per al proper pas direction = 1 if dtg > 0 else -1 stop_dist = (abs_speed ** 2) / (2.0 * self._accel) if self._accel > 0 else 0 if abs(dtg) <= max(stop_dist, 1): new_speed = abs_speed - (self._accel / abs_speed) new_speed = max(new_speed, 1.0) else: new_speed = abs_speed + (self._accel / abs_speed) new_speed = min(new_speed, self._max_speed) self._speed = new_speed * direction self._do_step(direction) self._last_step_us = now return True # ================== # Instàncies globals # ================== _pi: pigpio.pi = None _i2c = None _dist_sensor = None _ads = None _chan_r: AnalogIn = None _chan_l: AnalogIn = None wheel_R: Stepper = None wheel_L: Stepper = None arm_R: Stepper = None arm_L: Stepper = None syringe: Stepper = None wheels_speed_mode: bool = False # True → run_speed(), False → run() (posició) _stepper_stop = threading.Event() _stepper_thread: threading.Thread = None # ================== # Setup i cleanup # ================== def motion_setup_steppers(pi: pigpio.pi): """ Inicialitza GPIOs i steppers sense els sensors I2C. Útil per a tests de motors quan VL53L0X/ADS1115 no estan connectats. """ global _pi, wheel_R, wheel_L, arm_R, arm_L, syringe, _stepper_thread _pi = pi for pin in (EN_W, EN_A, EN_SERVO): pi.set_mode(pin, pigpio.OUTPUT) for pin in (END_LA, END_RA, END_SY): pi.set_mode(pin, pigpio.INPUT) pi.set_pull_up_down(pin, pigpio.PUD_UP) # Hall actiu-baix, pull-up intern enable_arms(OFF) enable_wheels(OFF) wheel_R = Stepper(pi, STEP_R_W, DIR_R_W) wheel_L = Stepper(pi, STEP_L_W, DIR_L_W) arm_R = Stepper(pi, STEP_R_A, DIR_R_A) arm_L = Stepper(pi, STEP_L_A, DIR_L_A) syringe = Stepper(pi, STEP_SY, DIR_SY) wheel_R.set_max_speed(WHEELS_MAX_SPEED); wheel_R.set_acceleration(WHEELS_ACCEL) wheel_L.set_max_speed(WHEELS_MAX_SPEED); wheel_L.set_acceleration(WHEELS_ACCEL) arm_R.set_max_speed(ARMS_MAX_SPEED); arm_R.set_acceleration(ARMS_ACCEL) arm_L.set_max_speed(ARMS_MAX_SPEED); arm_L.set_acceleration(ARMS_ACCEL) syringe.set_max_speed(SYRINGE_MAX_SPEED); syringe.set_acceleration(SYRINGE_ACCEL) _stepper_stop.clear() _stepper_thread = threading.Thread(target=_stepper_loop, daemon=True, name="steppers") _stepper_thread.start() def motion_setup_sensors(pi: pigpio.pi): """ Inicialitza únicament els sensors I2C (VL53L0X, ADS1115). Sense steppers. Útil per a tests de sensors quan els motors no estan connectats. """ global _pi, _i2c, _dist_sensor, _ads, _chan_r, _chan_l _pi = pi _i2c = adafruit_extended_bus.ExtendedI2C(3) _dist_sensor = adafruit_vl53l0x.VL53L0X(_i2c) _ads = ADS.ADS1115(_i2c) _chan_r = AnalogIn(_ads, ADS.P0) _chan_l = AnalogIn(_ads, ADS.P1) def motion_setup(pi: pigpio.pi): """ Inicialitza GPIOs, steppers, sensor de distància VL53L0X, ADC ADS1115 per als sensors de línia, i arrenca el bucle de steppers. Requereix /boot/config.txt: dtoverlay=i2c-gpio,bus=3,i2c_gpio_sda=2,i2c_gpio_scl=1 """ motion_setup_steppers(pi) motion_setup_sensors(pi) def motion_cleanup(): """Atura el bucle de steppers i desactiva tots els motors.""" _stepper_stop.set() if _stepper_thread: _stepper_thread.join(timeout=1.0) enable_wheels(OFF) enable_arms(OFF) enable_syringe(OFF) # ================== # Enable / disable # ================== def enable_wheels(state: bool): _pi.write(EN_W, state) def enable_arms(state: bool): _pi.write(EN_A, state) def enable_syringe(state: bool): _pi.write(EN_SERVO, state) def is_endstop_detecting(pin: int) -> bool: return not _pi.read(pin) # efecte Hall actiu en baix # ================== # Helpers de moviment # ================== def mm_to_steps(mm: int) -> int: # Perímetre roda Ø152mm = 2·π·76 ≈ 477mm return (mm * WHEEL_STEPS_PER_REVOLUTION * 2) // 1000 def wheels_set_position(position: int = 0): wheel_L.set_current_position(position) wheel_R.set_current_position(position) def wheels_set_speed(speed: float, rotate: bool = False, direction: bool = CW): if rotate: wheel_L.set_speed( speed if direction == CW else -speed) wheel_R.set_speed(-speed if direction == CW else speed) else: wheel_L.set_speed(speed) wheel_R.set_speed(speed) def move_arms_to(position: int): arm_L.move_to(position) arm_R.move_to(position) while arm_L.distance_to_go() != 0 and arm_R.distance_to_go() != 0: time.sleep(0.1) def move_arms_up(): arm_L.move_to(ARM_L_UPPER_POSITION) arm_R.move_to(ARM_R_UPPER_POSITION) while arm_L.distance_to_go() != 0 and arm_R.distance_to_go() != 0: time.sleep(0.1) def move_wheels_to(position: int, invert: bool = False): wheel_L.move_to(position) wheel_R.move_to(-position if invert else position) while wheel_L.is_running() and wheel_R.is_running(): time.sleep(0.1) # ================== # Homing # ================== def arms_home(): """Cicle de homing dels braços (bloquejant). El bucle de steppers fa el moviment.""" enable_arms(ON) arm_L.move(-1250) arm_R.move(-1250) while True: if is_endstop_detecting(END_LA): arm_L.stop() if is_endstop_detecting(END_RA): arm_R.stop() l_done = is_endstop_detecting(END_LA) or arm_L.distance_to_go() == 0 r_done = is_endstop_detecting(END_RA) or arm_R.distance_to_go() == 0 if l_done and r_done: break time.sleep(0.005) arm_L.set_current_position(0) arm_R.set_current_position(0) arm_L.move(ARM_L_UPPER_POSITION) arm_R.move(ARM_R_UPPER_POSITION) while arm_L.distance_to_go() != 0 or arm_R.distance_to_go() != 0: time.sleep(0.01) def syringe_home(): """Cicle de homing de la xeringa (bloquejant). El bucle de steppers fa el moviment.""" enable_syringe(ON) syringe.move(-11000) while not is_endstop_detecting(END_SY): if syringe.distance_to_go() == 0: break time.sleep(0.005) syringe.stop() syringe.set_current_position(0) enable_syringe(OFF) # ================== # Sensor de distància # ================== def distance_to_object() -> int: """Retorna la distància en mm a l'objecte més proper. 65535 si fora de rang.""" try: return _dist_sensor.range except Exception: return 65535 # ================== # Seguidor de línia # ================== def _read_line_sensors() -> tuple: """Llegeix sensors de línia via ADS1115. Retorna (dreta, esquerra), 0–32767.""" return _chan_r.value, _chan_l.value def compute_new_speed(speed: float) -> float: accel = WHEELS_ACCEL / LINE_FOLLOWER_FREQ return min(speed + accel, WHEELS_MAX_SPEED) def follow_line_loop(speed: float, forward: bool = True) -> int: """ Seguidor de línia no bloquejant. Retorna CLEAR, CROSSING o OBJECT. Si CLEAR, aplica correcció proporcional a les velocitats de les rodes. """ distance_threshold = 50 # mm lf_r, lf_l = _read_line_sensors() if lf_l > BLACK_THRESHOLD and lf_r > BLACK_THRESHOLD and forward: return CROSSING elif distance_to_object() < distance_threshold and forward: return OBJECT else: p_factor = 5 error = lf_r - lf_l correction_r = error // p_factor correction_l = -correction_r wheel_L.set_speed((speed + correction_r) if forward else (-speed + correction_r)) wheel_R.set_speed((speed + correction_l) if forward else (-speed + correction_l)) return CLEAR def run_to_crossing_center(speed: float) -> float: """ Avança per centrar el robot sobre el creuament detectat. Retorna la nova velocitat (reduïda a la meitat). """ wheels_set_position(0 - mm_to_steps(MM_TO_CROSSING_CENTER)) while follow_line_loop(speed) == CROSSING: wheels_set_speed(speed) time.sleep(0.1) speed /= 2 while wheel_L.current_position() < 0 and wheel_R.current_position() < 0: wheels_set_speed(speed) time.sleep(0.1) wheels_set_speed(0) return speed # ================== # Bucle de steppers (thread permanent) # ================== def _stepper_loop(): """ Thread que genera els polsos STEP per a tots els motors. Equivalent a task_update_steppers() del FreeRTOS. S'inicia automàticament a motion_setup(). """ while not _stepper_stop.is_set(): if wheels_speed_mode: wheel_R.run_speed() wheel_L.run_speed() else: wheel_R.run() wheel_L.run() arm_R.run() arm_L.run() syringe.run() # ================== # Tasques (FreeRTOS tasks → funcions bloquejants, cridades des de threads) # ================== def task_move_to(expected_target: int): """ Segueix la línia fins arribar a expected_target (CROSSING o OBJECT). Equivalent a task_move_to() del FreeRTOS. """ global wheels_speed_mode speed = 0.0 wheels_speed_mode = True enable_wheels(ON) lf_response = CLEAR while True: speed = compute_new_speed(speed) lf_response = follow_line_loop(speed) if lf_response != CLEAR: break time.sleep(LINE_FOLLOWER_PERIOD) if lf_response == CROSSING and expected_target == CROSSING: run_to_crossing_center(speed) elif lf_response == OBJECT and expected_target == OBJECT: wheel_L.stop() wheel_R.stop() wheels_speed_mode = False enable_wheels(OFF) def task_rotate(direction: bool): """ Gira el robot 90° (CW o CCW). Fase 1: acceleració fins al 90% dels passos de rotació. Fase 2: desacceleració i ajust fi sobre la línia via sensors analògics. """ global wheels_speed_mode # Arc 90° a Ø250mm → 2·π·125/4 ≈ 196mm; roda Ø152mm → 196/477 ≈ 0.41 voltes ROTATION_STEPS = (WHEEL_STEPS_PER_REVOLUTION * 42) // 100 positive_wheel = wheel_L if direction == CW else wheel_R speed = 0.0 wheels_speed_mode = True enable_wheels(ON) wheels_set_position(0) while positive_wheel.current_position() < (ROTATION_STEPS * 90) // 100: speed = compute_new_speed(speed) wheels_set_speed(speed, rotate=True, direction=direction) time.sleep(0.01) speed /= 2 while True: lf_r, lf_l = _read_line_sensors() error = abs(lf_l - lf_r) if error < ROTATION_ERROR_THRESHOLD or \ positive_wheel.current_position() > (ROTATION_STEPS * 110) // 100: break wheels_set_speed(speed - (speed / error), rotate=True, direction=direction) time.sleep(0.01) wheels_set_speed(0) enable_wheels(OFF) wheels_speed_mode = False def task_take_or_leave_something(take: bool): """ Avança fins a l'objecte, baixa braços, opera la xeringa i torna al creuament. take=True → xucla; take=False → buida. """ global wheels_speed_mode speed = 0.0 wheels_speed_mode = True enable_wheels(ON) wheels_set_position(0) # guardem la posició home per tornar-hi lf_response = CLEAR while True: speed = compute_new_speed(speed) lf_response = follow_line_loop(speed) if lf_response != CLEAR: break time.sleep(LINE_FOLLOWER_PERIOD) if lf_response == OBJECT: target_l = wheel_L.current_position() + mm_to_steps(MM_TO_OBJECT) target_r = wheel_R.current_position() + mm_to_steps(MM_TO_OBJECT) while wheel_L.current_position() < target_l and wheel_R.current_position() < target_r: wheels_set_speed(speed / 2) time.sleep(0.1) wheels_set_speed(0) enable_wheels(OFF) enable_arms(ON) move_arms_to(ARM_LOWER_POSITION) enable_syringe(ON) syringe.move_to(_SY_FULL_EXTENDED_STEPS if take else 0) while syringe.distance_to_go() != 0: if not take and is_endstop_detecting(END_SY): syringe.stop() syringe.set_current_position(0) break time.sleep(0.1) enable_syringe(OFF) move_arms_up() enable_wheels(ON) wheels_speed_mode = False wheels_set_speed(WHEELS_MAX_SPEED) move_wheels_to(0) elif lf_response == CROSSING: run_to_crossing_center(speed) wheels_speed_mode = False enable_wheels(OFF) def task_idle(): """Pausa breu fins que quibot.py assigni una nova tasca.""" time.sleep(0.5)