TTs whisper
This commit is contained in:
286
raspi/quibot.py
Normal file
286
raspi/quibot.py
Normal file
@@ -0,0 +1,286 @@
|
||||
"""
|
||||
quibot.py — Programa principal del robot QuiBot H2O.
|
||||
Inicialitza tots els mòduls, executa el homing i arrenca els threads.
|
||||
Equivalent a QuiBot.ino del codi Arduino/ESP32.
|
||||
|
||||
Threads permanents:
|
||||
- task_read_blocks → llegeix blocs i executa accions (aquest fitxer)
|
||||
- task_read_gestures → llegeix gestos i executa accions (aquest fitxer)
|
||||
- _stepper_loop → genera polsos STEP dels motors (motion.py)
|
||||
- _task_update_leds → parpelleig dels LEDs (eyes.py)
|
||||
- _poll_loop → polling del sensor de gestos (gesture.py)
|
||||
"""
|
||||
|
||||
import time
|
||||
import threading
|
||||
import signal
|
||||
import sys
|
||||
import pigpio
|
||||
|
||||
from motion import (
|
||||
motion_setup, motion_cleanup,
|
||||
arms_home, syringe_home,
|
||||
task_move_to, task_rotate, task_take_or_leave_something, task_idle,
|
||||
distance_to_object,
|
||||
CROSSING, TAKE, LEAVE, CW, CCW,
|
||||
)
|
||||
from blocks import (
|
||||
blocks_setup,
|
||||
read_block_color, servo_move_to,
|
||||
OPEN_POSITION, EJECT_POSITION,
|
||||
BK, RD, GN, BU, YE, OG, VT,
|
||||
)
|
||||
from eyes import (
|
||||
eyes_setup, eyes_cleanup,
|
||||
eyes_turn_on, eyes_turn_off,
|
||||
eyes_gesture_mode_on, eyes_gesture_mode_off,
|
||||
EYES_OPEN, EYES_FW, EYES_DOWN,
|
||||
RED, GREEN, BLUE, YELLOW, ORANGE, CYAN,
|
||||
)
|
||||
from gesture import (
|
||||
gesture_setup, gesture_cleanup,
|
||||
read_gesture,
|
||||
GS_NONE, GS_FORWARD, GS_LEFT, GS_RIGHT,
|
||||
GS_UP, GS_DOWN, GS_CLOCKWISE, GS_ANTICLOCKWISE, GS_WAVE,
|
||||
)
|
||||
|
||||
# ==================
|
||||
# Colors addicionals (equivalents CRGB del C++)
|
||||
# ==================
|
||||
|
||||
GRAY = (128, 128, 128) # CRGB::Gray → estat normal
|
||||
DARK_RED = (139, 0, 0) # CRGB::DarkRed → avançar
|
||||
|
||||
# ==================
|
||||
# Timeouts
|
||||
# ==================
|
||||
|
||||
INSERT_BLOCK_MS = 2.0 # s — espera que l'infant insereixi el bloc
|
||||
EJECT_BLOCK_MS = 2.0 # s — espera que el bloc caigui
|
||||
CHECK_COLOR_MS = 1.0 # s — interval entre lectures de color
|
||||
|
||||
# ==================
|
||||
# Estat global
|
||||
# ==================
|
||||
|
||||
_pi: pigpio.pi = None
|
||||
|
||||
# Mutex que evita que tasca de blocs i tasca de gestos llancin accions simultànies.
|
||||
# En el C++ original compartien TaskHandle sense mutex explícit (possible race condition).
|
||||
# Aquí ho fem correctament.
|
||||
_action_lock = threading.Lock()
|
||||
|
||||
_shutdown_event = threading.Event()
|
||||
_gesture_mode_active = False # False = mode blocs, True = mode gestos
|
||||
_mode_lock = threading.Lock()
|
||||
|
||||
|
||||
# ==================
|
||||
# Helper d'execució d'accions
|
||||
# ==================
|
||||
|
||||
def _execute_action(fn, *args):
|
||||
"""
|
||||
Adquireix el lock d'acció, executa fn(*args) de forma bloquejant i l'allibera.
|
||||
Garanteix que mai s'executen dues accions de moviment simultànies.
|
||||
"""
|
||||
with _action_lock:
|
||||
fn(*args)
|
||||
|
||||
|
||||
# ==================
|
||||
# Tasca de blocs
|
||||
# ==================
|
||||
|
||||
def task_read_blocks():
|
||||
"""
|
||||
Llegeix blocs contínuament, executa l'acció corresponent al color i expulsa el bloc.
|
||||
Equivalent a task_read_blocks() del FreeRTOS.
|
||||
"""
|
||||
while not _shutdown_event.is_set():
|
||||
eyes_state = False
|
||||
eyes_turn_on(EYES_OPEN, GRAY)
|
||||
|
||||
# Obre el servo per permetre la inserció del bloc
|
||||
servo_move_to(OPEN_POSITION)
|
||||
time.sleep(INSERT_BLOCK_MS)
|
||||
|
||||
# Espera que hi hagi un bloc i llegeix el seu color
|
||||
color_id = BK
|
||||
while not _shutdown_event.is_set():
|
||||
if distance_to_object() < 80:
|
||||
# Objecte a menys de 80mm — esperem que es retiri
|
||||
if not eyes_state:
|
||||
eyes_turn_on(EYES_DOWN, GRAY)
|
||||
eyes_state = True
|
||||
else:
|
||||
color_id = read_block_color()
|
||||
if eyes_state:
|
||||
eyes_turn_on(EYES_OPEN, GRAY, 1, False)
|
||||
eyes_state = False
|
||||
if color_id != BK:
|
||||
break
|
||||
time.sleep(CHECK_COLOR_MS)
|
||||
|
||||
# Si estem en mode gestos, ignora el bloc
|
||||
with _mode_lock:
|
||||
if _gesture_mode_active:
|
||||
continue
|
||||
|
||||
# Executa l'acció corresponent al color del bloc
|
||||
if color_id == RD:
|
||||
eyes_turn_on(EYES_FW, DARK_RED, 2)
|
||||
time.sleep(1.0)
|
||||
_execute_action(task_move_to, CROSSING)
|
||||
|
||||
elif color_id == GN:
|
||||
eyes_turn_on(EYES_OPEN, GREEN, 2)
|
||||
time.sleep(1.0)
|
||||
_execute_action(task_rotate, CW)
|
||||
|
||||
elif color_id == BU:
|
||||
eyes_turn_on(EYES_OPEN, BLUE, 2)
|
||||
time.sleep(1.0)
|
||||
_execute_action(task_rotate, CCW)
|
||||
|
||||
elif color_id == YE:
|
||||
eyes_turn_on(EYES_OPEN, YELLOW, 2)
|
||||
time.sleep(1.0)
|
||||
_execute_action(task_take_or_leave_something, TAKE)
|
||||
|
||||
elif color_id == OG:
|
||||
eyes_turn_on(EYES_OPEN, ORANGE, 2)
|
||||
time.sleep(1.0)
|
||||
_execute_action(task_take_or_leave_something, LEAVE)
|
||||
|
||||
elif color_id == VT:
|
||||
_execute_action(task_idle)
|
||||
|
||||
eyes_turn_on(EYES_OPEN, GRAY)
|
||||
|
||||
# Expulsa el bloc
|
||||
servo_move_to(EJECT_POSITION)
|
||||
time.sleep(EJECT_BLOCK_MS)
|
||||
|
||||
|
||||
# ==================
|
||||
# Tasca de gestos
|
||||
# ==================
|
||||
|
||||
def task_read_gestures():
|
||||
"""
|
||||
Llegeix gestos contínuament.
|
||||
GS_WAVE activa/desactiva el mode gestos.
|
||||
Equivalent a task_read_gestures() del FreeRTOS.
|
||||
"""
|
||||
gesture_mode_active = False
|
||||
|
||||
while not _shutdown_event.is_set():
|
||||
gesture_id = read_gesture()
|
||||
|
||||
# WAVE: toggle entre mode blocs i mode gestos
|
||||
if gesture_id == GS_WAVE:
|
||||
with _mode_lock:
|
||||
_gesture_mode_active = not _gesture_mode_active
|
||||
active = _gesture_mode_active
|
||||
if active:
|
||||
eyes_gesture_mode_on()
|
||||
print("Gesture mode ON")
|
||||
else:
|
||||
eyes_gesture_mode_off()
|
||||
print("Gesture mode OFF")
|
||||
time.sleep(1.0)
|
||||
continue
|
||||
|
||||
with _mode_lock:
|
||||
active = _gesture_mode_active
|
||||
if not active or gesture_id == GS_NONE:
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
|
||||
# Executa l'acció corresponent al gest
|
||||
if gesture_id == GS_FORWARD:
|
||||
eyes_turn_on(EYES_FW, DARK_RED, 2)
|
||||
_execute_action(task_move_to, CROSSING)
|
||||
|
||||
elif gesture_id == GS_RIGHT:
|
||||
eyes_turn_on(EYES_OPEN, GREEN, 2)
|
||||
_execute_action(task_rotate, CW)
|
||||
|
||||
elif gesture_id == GS_LEFT:
|
||||
eyes_turn_on(EYES_OPEN, BLUE, 2)
|
||||
_execute_action(task_rotate, CCW)
|
||||
|
||||
elif gesture_id == GS_UP:
|
||||
eyes_turn_on(EYES_OPEN, YELLOW, 2)
|
||||
_execute_action(task_take_or_leave_something, TAKE)
|
||||
|
||||
elif gesture_id == GS_DOWN:
|
||||
eyes_turn_on(EYES_OPEN, ORANGE, 2)
|
||||
_execute_action(task_take_or_leave_something, LEAVE)
|
||||
|
||||
elif gesture_id in (GS_CLOCKWISE, GS_ANTICLOCKWISE):
|
||||
_execute_action(task_idle)
|
||||
|
||||
eyes_turn_on(EYES_OPEN, CYAN)
|
||||
time.sleep(0.5)
|
||||
|
||||
|
||||
# ==================
|
||||
# Shutdown
|
||||
# ==================
|
||||
|
||||
def _shutdown(sig, frame):
|
||||
print("\nAturant QuiBot...")
|
||||
_shutdown_event.set()
|
||||
|
||||
|
||||
# ==================
|
||||
# Main
|
||||
# ==================
|
||||
|
||||
def main():
|
||||
global _pi
|
||||
|
||||
# Connecta amb pigpiod (ha d'estar en marxa amb: sudo pigpiod -s 1)
|
||||
_pi = pigpio.pi()
|
||||
if not _pi.connected:
|
||||
print("ERROR: No s'ha pogut connectar a pigpiod. Executa: sudo pigpiod -s 1")
|
||||
sys.exit(1)
|
||||
|
||||
# Inicialitza tots els mòduls
|
||||
blocks_setup(_pi)
|
||||
motion_setup(_pi)
|
||||
eyes_setup(_pi)
|
||||
gesture_setup()
|
||||
|
||||
# Homing (bloquejant)
|
||||
arms_home()
|
||||
syringe_home()
|
||||
|
||||
# Registra els senyals de sortida
|
||||
signal.signal(signal.SIGINT, _shutdown)
|
||||
signal.signal(signal.SIGTERM, _shutdown)
|
||||
|
||||
# Arrenca els threads principals
|
||||
t_blocks = threading.Thread(target=task_read_blocks, daemon=True, name="blocks")
|
||||
t_gestures = threading.Thread(target=task_read_gestures, daemon=True, name="gestures")
|
||||
|
||||
t_blocks.start()
|
||||
t_gestures.start()
|
||||
|
||||
print("QuiBot llest.")
|
||||
|
||||
# Espera senyal de sortida
|
||||
_shutdown_event.wait()
|
||||
|
||||
# Cleanup ordenat
|
||||
motion_cleanup()
|
||||
eyes_cleanup()
|
||||
gesture_cleanup()
|
||||
_pi.stop()
|
||||
print("QuiBot aturat.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user