287 lines
8.0 KiB
Python
287 lines
8.0 KiB
Python
"""
|
|
quibot.py — Programa principal del robot QuiBot H2O.
|
|
Inicialitza tots els mòduls, executa el homing i arrenca els threads.
|
|
Equivalent a QuiBot.ino del codi Arduino/ESP32.
|
|
|
|
Threads permanents:
|
|
- task_read_blocks → llegeix blocs i executa accions (aquest fitxer)
|
|
- task_read_gestures → llegeix gestos i executa accions (aquest fitxer)
|
|
- _stepper_loop → genera polsos STEP dels motors (motion.py)
|
|
- _task_update_leds → parpelleig dels LEDs (eyes.py)
|
|
- _poll_loop → polling del sensor de gestos (gesture.py)
|
|
"""
|
|
|
|
import time
|
|
import threading
|
|
import signal
|
|
import sys
|
|
import pigpio
|
|
|
|
from motion import (
|
|
motion_setup, motion_cleanup,
|
|
arms_home, syringe_home,
|
|
task_move_to, task_rotate, task_take_or_leave_something, task_idle,
|
|
distance_to_object,
|
|
CROSSING, TAKE, LEAVE, CW, CCW,
|
|
)
|
|
from blocks import (
|
|
blocks_setup,
|
|
read_block_color, servo_move_to,
|
|
OPEN_POSITION, EJECT_POSITION,
|
|
BK, RD, GN, BU, YE, OG, VT,
|
|
)
|
|
from eyes import (
|
|
eyes_setup, eyes_cleanup,
|
|
eyes_turn_on, eyes_turn_off,
|
|
eyes_gesture_mode_on, eyes_gesture_mode_off,
|
|
EYES_OPEN, EYES_FW, EYES_DOWN,
|
|
RED, GREEN, BLUE, YELLOW, ORANGE, CYAN,
|
|
)
|
|
from gesture import (
|
|
gesture_setup, gesture_cleanup,
|
|
read_gesture,
|
|
GS_NONE, GS_FORWARD, GS_LEFT, GS_RIGHT,
|
|
GS_UP, GS_DOWN, GS_CLOCKWISE, GS_ANTICLOCKWISE, GS_WAVE,
|
|
)
|
|
|
|
# ==================
|
|
# Colors addicionals (equivalents CRGB del C++)
|
|
# ==================
|
|
|
|
GRAY = (128, 128, 128) # CRGB::Gray → estat normal
|
|
DARK_RED = (139, 0, 0) # CRGB::DarkRed → avançar
|
|
|
|
# ==================
|
|
# Timeouts
|
|
# ==================
|
|
|
|
INSERT_BLOCK_MS = 2.0 # s — espera que l'infant insereixi el bloc
|
|
EJECT_BLOCK_MS = 2.0 # s — espera que el bloc caigui
|
|
CHECK_COLOR_MS = 1.0 # s — interval entre lectures de color
|
|
|
|
# ==================
|
|
# Estat global
|
|
# ==================
|
|
|
|
_pi: pigpio.pi = None
|
|
|
|
# Mutex que evita que tasca de blocs i tasca de gestos llancin accions simultànies.
|
|
# En el C++ original compartien TaskHandle sense mutex explícit (possible race condition).
|
|
# Aquí ho fem correctament.
|
|
_action_lock = threading.Lock()
|
|
|
|
_shutdown_event = threading.Event()
|
|
_gesture_mode_active = False # False = mode blocs, True = mode gestos
|
|
_mode_lock = threading.Lock()
|
|
|
|
|
|
# ==================
|
|
# Helper d'execució d'accions
|
|
# ==================
|
|
|
|
def _execute_action(fn, *args):
|
|
"""
|
|
Adquireix el lock d'acció, executa fn(*args) de forma bloquejant i l'allibera.
|
|
Garanteix que mai s'executen dues accions de moviment simultànies.
|
|
"""
|
|
with _action_lock:
|
|
fn(*args)
|
|
|
|
|
|
# ==================
|
|
# Tasca de blocs
|
|
# ==================
|
|
|
|
def task_read_blocks():
|
|
"""
|
|
Llegeix blocs contínuament, executa l'acció corresponent al color i expulsa el bloc.
|
|
Equivalent a task_read_blocks() del FreeRTOS.
|
|
"""
|
|
while not _shutdown_event.is_set():
|
|
eyes_state = False
|
|
eyes_turn_on(EYES_OPEN, GRAY)
|
|
|
|
# Obre el servo per permetre la inserció del bloc
|
|
servo_move_to(OPEN_POSITION)
|
|
time.sleep(INSERT_BLOCK_MS)
|
|
|
|
# Espera que hi hagi un bloc i llegeix el seu color
|
|
color_id = BK
|
|
while not _shutdown_event.is_set():
|
|
if distance_to_object() < 80:
|
|
# Objecte a menys de 80mm — esperem que es retiri
|
|
if not eyes_state:
|
|
eyes_turn_on(EYES_DOWN, GRAY)
|
|
eyes_state = True
|
|
else:
|
|
color_id = read_block_color()
|
|
if eyes_state:
|
|
eyes_turn_on(EYES_OPEN, GRAY, 1, False)
|
|
eyes_state = False
|
|
if color_id != BK:
|
|
break
|
|
time.sleep(CHECK_COLOR_MS)
|
|
|
|
# Si estem en mode gestos, ignora el bloc
|
|
with _mode_lock:
|
|
if _gesture_mode_active:
|
|
continue
|
|
|
|
# Executa l'acció corresponent al color del bloc
|
|
if color_id == RD:
|
|
eyes_turn_on(EYES_FW, DARK_RED, 2)
|
|
time.sleep(1.0)
|
|
_execute_action(task_move_to, CROSSING)
|
|
|
|
elif color_id == GN:
|
|
eyes_turn_on(EYES_OPEN, GREEN, 2)
|
|
time.sleep(1.0)
|
|
_execute_action(task_rotate, CW)
|
|
|
|
elif color_id == BU:
|
|
eyes_turn_on(EYES_OPEN, BLUE, 2)
|
|
time.sleep(1.0)
|
|
_execute_action(task_rotate, CCW)
|
|
|
|
elif color_id == YE:
|
|
eyes_turn_on(EYES_OPEN, YELLOW, 2)
|
|
time.sleep(1.0)
|
|
_execute_action(task_take_or_leave_something, TAKE)
|
|
|
|
elif color_id == OG:
|
|
eyes_turn_on(EYES_OPEN, ORANGE, 2)
|
|
time.sleep(1.0)
|
|
_execute_action(task_take_or_leave_something, LEAVE)
|
|
|
|
elif color_id == VT:
|
|
_execute_action(task_idle)
|
|
|
|
eyes_turn_on(EYES_OPEN, GRAY)
|
|
|
|
# Expulsa el bloc
|
|
servo_move_to(EJECT_POSITION)
|
|
time.sleep(EJECT_BLOCK_MS)
|
|
|
|
|
|
# ==================
|
|
# Tasca de gestos
|
|
# ==================
|
|
|
|
def task_read_gestures():
|
|
"""
|
|
Llegeix gestos contínuament.
|
|
GS_WAVE activa/desactiva el mode gestos.
|
|
Equivalent a task_read_gestures() del FreeRTOS.
|
|
"""
|
|
gesture_mode_active = False
|
|
|
|
while not _shutdown_event.is_set():
|
|
gesture_id = read_gesture()
|
|
|
|
# WAVE: toggle entre mode blocs i mode gestos
|
|
if gesture_id == GS_WAVE:
|
|
with _mode_lock:
|
|
_gesture_mode_active = not _gesture_mode_active
|
|
active = _gesture_mode_active
|
|
if active:
|
|
eyes_gesture_mode_on()
|
|
print("Gesture mode ON")
|
|
else:
|
|
eyes_gesture_mode_off()
|
|
print("Gesture mode OFF")
|
|
time.sleep(1.0)
|
|
continue
|
|
|
|
with _mode_lock:
|
|
active = _gesture_mode_active
|
|
if not active or gesture_id == GS_NONE:
|
|
time.sleep(0.1)
|
|
continue
|
|
|
|
# Executa l'acció corresponent al gest
|
|
if gesture_id == GS_FORWARD:
|
|
eyes_turn_on(EYES_FW, DARK_RED, 2)
|
|
_execute_action(task_move_to, CROSSING)
|
|
|
|
elif gesture_id == GS_RIGHT:
|
|
eyes_turn_on(EYES_OPEN, GREEN, 2)
|
|
_execute_action(task_rotate, CW)
|
|
|
|
elif gesture_id == GS_LEFT:
|
|
eyes_turn_on(EYES_OPEN, BLUE, 2)
|
|
_execute_action(task_rotate, CCW)
|
|
|
|
elif gesture_id == GS_UP:
|
|
eyes_turn_on(EYES_OPEN, YELLOW, 2)
|
|
_execute_action(task_take_or_leave_something, TAKE)
|
|
|
|
elif gesture_id == GS_DOWN:
|
|
eyes_turn_on(EYES_OPEN, ORANGE, 2)
|
|
_execute_action(task_take_or_leave_something, LEAVE)
|
|
|
|
elif gesture_id in (GS_CLOCKWISE, GS_ANTICLOCKWISE):
|
|
_execute_action(task_idle)
|
|
|
|
eyes_turn_on(EYES_OPEN, CYAN)
|
|
time.sleep(0.5)
|
|
|
|
|
|
# ==================
|
|
# Shutdown
|
|
# ==================
|
|
|
|
def _shutdown(sig, frame):
|
|
print("\nAturant QuiBot...")
|
|
_shutdown_event.set()
|
|
|
|
|
|
# ==================
|
|
# Main
|
|
# ==================
|
|
|
|
def main():
|
|
global _pi
|
|
|
|
# Connecta amb pigpiod (ha d'estar en marxa amb: sudo pigpiod -s 1)
|
|
_pi = pigpio.pi()
|
|
if not _pi.connected:
|
|
print("ERROR: No s'ha pogut connectar a pigpiod. Executa: sudo pigpiod -s 1")
|
|
sys.exit(1)
|
|
|
|
# Inicialitza tots els mòduls
|
|
blocks_setup(_pi)
|
|
motion_setup(_pi)
|
|
eyes_setup(_pi)
|
|
gesture_setup()
|
|
|
|
# Homing (bloquejant)
|
|
arms_home()
|
|
syringe_home()
|
|
|
|
# Registra els senyals de sortida
|
|
signal.signal(signal.SIGINT, _shutdown)
|
|
signal.signal(signal.SIGTERM, _shutdown)
|
|
|
|
# Arrenca els threads principals
|
|
t_blocks = threading.Thread(target=task_read_blocks, daemon=True, name="blocks")
|
|
t_gestures = threading.Thread(target=task_read_gestures, daemon=True, name="gestures")
|
|
|
|
t_blocks.start()
|
|
t_gestures.start()
|
|
|
|
print("QuiBot llest.")
|
|
|
|
# Espera senyal de sortida
|
|
_shutdown_event.wait()
|
|
|
|
# Cleanup ordenat
|
|
motion_cleanup()
|
|
eyes_cleanup()
|
|
gesture_cleanup()
|
|
_pi.stop()
|
|
print("QuiBot aturat.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|