diff --git a/AGENTS.md b/AGENTS.md index 9a3974f..d5d679d 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -2,16 +2,17 @@ ## Overview -QuiBot is an educational robotics platform (UPC/UNE collaboration) consisting of a programmable robot with color-block recognition, gesture control, stepper motors, RGB LED eyes, and multiple input methods (web dashboard, Android voice app). The codebase comprises four independent application layers communicating via HTTP JSON APIs. +QuiBot is an educational robotics platform (UPC/UNE collaboration) consisting of a programmable robot with color-block recognition, gesture control, stepper motors, RGB LED eyes, and multiple input methods (web dashboard, Android voice app). The codebase comprises two independent application layers communicating via HTTP JSON APIs. ``` -[quibot-web Nuxt SPA] ──HTTP──> [backend Express] ──HTTP──> [raspi FastAPI (Pi)] - │ │ - ▼ ▼ - [apk Expo RN app] ──HTTP──> (same backend) [Python hardware drivers] +[quibot-web Nuxt SPA] ──HTTP──> [backend Express] + │ │ + ▼ ├──▶ Raspberry Pi (port 8000) — motor/audio endpoints + [apk Expo RN app] ──HTTP──> ├──▶ LLM (llamacpp) + └──▶ TTS (Piper) ``` -**Tech stack**: Python (pigpio, FastAPI) | TypeScript/Express | Nuxt 4/Vue 3 | Expo/React Native +**Tech stack**: TypeScript/Express | Nuxt 4/Vue 3 | Expo/React Native **No database**: All state is in-memory, file-based (`/tmp/quibot-audio/`), or localStorage. --- @@ -20,30 +21,22 @@ QuiBot is an educational robotics platform (UPC/UNE collaboration) consisting of ``` quibot/ -├── raspi/ # Raspberry Pi brain — Python, controls hardware -│ ├── main.py # FastAPI server (port 8000) on the Pi -│ ├── quibot.py # Main program: block/gesture threads (like Arduino QuiBot.ino) -│ ├── motion.py # Stepper class, homing, line-following, high-level tasks -│ ├── gesture.py # PAJ7620U2 gesture sensor (I2C, polled at 50ms) -│ ├── blocks.py # TCS34725 color sensor + servo block ejection -│ ├── eyes.py # WS2811 LED matrix (128 LEDs, pigpio waveforms, breathing animation) -│ ├── pins.py # BCM GPIO pin map for all hardware -│ └── tests/ # Manual diagnostic scripts (not automated) -├── backend/ # Local Express server (port 3000), proxy to Pi +├── backend/ # Express server (port 5000), proxy to Pi + LLM/TTS │ ├── src/ -│ │ ├── index.ts # Express entry: CORS, JSON parser, /health -│ │ ├── config.ts # Env config: RASPBERRY_PI_HOST, PORT, QUIBOT_TOKEN +│ │ ├── index.ts # Express entry: CORS, JSON parser, /health, routes +│ │ ├── config.ts # Env config: raspi, PIPER_URL, LLAMA_CPP_URL, PORT │ │ ├── routes/router.ts # Mounts all controllers -│ │ ├── services/raspi.service.ts # Axios proxy layer to Pi FastAPI +│ │ ├── services/raspi.service.ts # Axios proxy layer to Pi endpoints │ │ └── controllers/ │ │ ├── motor.controller.ts # Motor step/stop/upload │ │ ├── audio.controller.ts # Audio file lifecycle (incoming/locked/processed) │ │ ├── command.controller.ts # POST /commands proxy to raspi /run -│ │ └── settings.controller.ts # GET/PUT /settings runtime config +│ │ ├── settings.controller.ts # GET/PUT /settings runtime config +│ │ └── tts.controller.ts # TTS synthesis via Piper │ └── dist/ # Compiled output (generated) ├── quibot-web/ # Nuxt 4 dashboard SPA │ ├── app/app.vue # Single-page control panel: block queue, D-pad, eye controls, gesture log -│ ├── server/api/ # Nitro server routes proxying to raspi +│ ├── server/api/ # Nitro server routes proxying to backend Express │ │ ├── motor/step/[direction].post.ts │ │ └── motor/stop.post.ts │ ├── nuxt.config.ts # Runtime config: QUIBOT_BASE_URL, QUIBOT_TOKEN @@ -61,7 +54,9 @@ quibot/ --- -## Raspberry Pi Layer (`raspi/`) +## Raspberry Pi Layer (remote, port 8000) + +The Raspberry Pi runs a lightweight HTTP server exposing hardware control endpoints. The `raspi/` source directory is no longer part of this repository — it lives on the Pi itself. **Hardware target**: Raspberry Pi Zero 2W controlling a robot with: - 5 NEMA-style stepper motors (wheels x2, arms x2, syringe) via A4988/TB600 drivers in STEP/DIR mode @@ -74,7 +69,7 @@ quibot/ - Hall-effect endstops on GPIOs 12, 16, 17 - Optional: I2S audio amp (MAX98357A) + mic (SPH0645) -### Key files +### Hardware source files (on Pi) - **`pins.py`** — BCM GPIO pin numbering for every component (STEP, DIR, EN pins, I2C lines, endstops, LED_DATA on GPIO26) - **`motion.py`** — `Stepper` class with AccelStepper-style acceleration profiling via `pigpio.gpio_trigger()`. 5 motor instances (`wheel_R`, `wheel_L`, `arm_R`, `arm_L`, `syringe`). Continuous stepper daemon thread (`_stepper_loop`) at ~100Hz. Homing routines read Hall-effect endstops. Line-following with proportional correction on TCRT5000 values via ADS1115. @@ -104,7 +99,7 @@ quibot/ ## Backend Layer (`backend/`) -**Role**: Express.js HTTP proxy sitting between frontend/mobile and the Raspberry Pi's FastAPI server. Token passthrough, no business logic. +**Role**: Express.js HTTP server providing frontend/mobile API, proxying hardware commands to the Raspberry Pi, and managing TTS/LLM integration. ### Configuration (`.env`, loaded by `config.ts`) @@ -113,15 +108,19 @@ quibot/ | `RASPBERRY_PI_HOST` | `http://raspberrypi.local` | Pi API URL | | `RASPBERRY_PI_PORT` | `8000` | Pi API port | | `QUIBOT_TOKEN` | `MY_SECRET_TOKEN` | Auth token for all Pi endpoints | -| `PORT` | `3000` | Backend listen port | +| `PORT` | `5000` | Backend listen port | +| `PIPER_URL` | `''` | Piper TTS service URL | +| `LLAMA_CPP_URL` | `''` | LLM inference service URL | +| `LLAMA_API_KEY` | `''` | LLM API key | +| `LLAMA_PREAMBLE` | `''` | Path or content for LLM preamble | ### Architecture ``` index.ts → Express app, CORS, JSON parser, /health endpoint -routes/router.ts → Mounts all controllers under /motor, /audio, /commands, /settings +routes/router.ts → Mounts all controllers under /motor, /audio, /commands, /settings, /tts config.ts → Mutable getter/setter env vars (runtime update via PUT /settings) -raspi.service.ts → Axios proxy methods for every Pi endpoint + multipart file upload handling +raspi.service.ts → Axios proxy methods for Pi endpoints + multipart file upload handling ``` ### Controllers @@ -130,6 +129,7 @@ raspi.service.ts → Axios proxy methods for every Pi endpoint + multipart file - **`audio.controller.ts`** — `GET /audio/incoming`, `POST /audio/lock/:filename`, `/unlock/:filename`, `/cancel/:filename`, `/process/:filename`. All proxy to raspi audio file lifecycle endpoints. - **`command.controller.ts`** — `POST /commands { task }` → proxied to raspi `/run?task=...&token=...` - **`settings.controller.ts`** — `GET /settings` returns config; `PUT /settings` updates `raspberryPi.host`, `raspberryPi.port`, `token` at runtime. +- **`tts.controller.ts`** — `POST /tts { text, lang }` → Synthesizes audio via Piper TTS service. Saves WAV files to `/tmp/quibot-audio/tts/`. ### Build/Run @@ -150,7 +150,7 @@ node dist/index.js # Or use tsx/nodemon for dev | Key | Default | Purpose | |-----|---------|---------| -| `QUIBOT_BASE_URL` | `http://quibot:8000` | Base URL for raspi FastAPI | +| `QUIBOT_BASE_URL` | `http://quibot:8000` | Base URL for backend Express (or Pi) | | `QUIBOT_TOKEN` | `MY_SECRET_TOKEN` | Auth token | ### UI Panels (`app/app.vue` — single-file SPA, 1369 lines) @@ -170,8 +170,8 @@ node dist/index.js # Or use tsx/nodemon for dev | Method | Path | Description | |--------|------|-------------| -| POST | `/api/motor/stop` | Proxies to raspi `/motor/stop` | -| POST | `/api/motor/step/:direction` | Proxies to raspi `/motor/step/forward\|backwards` | +| POST | `/api/motor/stop` | Proxies to backend Express `/motor/stop` | +| POST | `/api/motor/step/:direction` | Proxies to backend Express `/motor/step/forward\|backwards` | **Note**: The frontend also calls `POST /api/eye/shape`, `/api/eye/color`, `/api/eye/on`, `/api/eye/off`, `/api/gesture/on`, `/api/gesture/off` — server routes for these may need to be created (frontend references them but they don't have explicit server handlers yet). @@ -221,7 +221,26 @@ CI: `build-apk.yml` runs expo prebuild, decodes keystore from secrets, builds si ## Complete API Reference -### Raspberry Pi FastAPI (`raspi/main.py`) — port 8000 +### Backend Express (`backend/`) — port 5000 + +| Method | Path | Body | Description | +|--------|------|------|-------------| +| GET | `/health` | — | Returns settings object | +| POST | `/commands` | `{ task }` | Proxy to raspi `/run` | +| POST | `/motor/step/forward` | — | Motor forward proxy to Pi | +| POST | `/motor/step/backward` | — | Motor backward proxy to Pi (maps to `/backwards`) | +| POST | `/motor/stop` | — | Motor stop proxy to Pi | +| POST | `/motor/upload` | multipart file | Audio upload via multer → proxied to Pi | +| GET | `/audio/incoming` | — | List incoming audio files from Pi | +| POST | `/audio/lock/:filename` | — | Lock audio file on Pi | +| POST | `/audio/unlock/:filename` | — | Unlock audio file on Pi | +| POST | `/audio/cancel/:filename` | — | Cancel locked audio on Pi | +| POST | `/audio/process/:filename` | — | Mark processed on Pi | +| GET | `/settings` | — | Returns config | +| PUT | `/settings` | `{ raspberryPi: { host, port }, token }` | Update runtime config | +| POST | `/tts` | query: `text`, `lang` | Synthesize speech via Piper TTS | + +### Raspberry Pi HTTP Server (remote, port 8000) | Method | Path | Params/Body | Description | |--------|------|-------------|-------------| @@ -238,24 +257,6 @@ CI: `build-apk.yml` runs expo prebuild, decodes keystore from secrets, builds si **Auth**: Query parameter `token` matching `QUIBOT_TOKEN` env var (default: `MY_SECRET_TOKEN`). -### Backend Express (`backend/`) — port 3000 - -| Method | Path | Body | Description | -|--------|------|------|-------------| -| GET | `/health` | — | Returns settings object | -| POST | `/commands` | `{ task }` | Proxy to raspi `/run` | -| POST | `/motor/step/forward` | — | Motor forward proxy | -| POST | `/motor/step/backward` | — | Motor backward proxy (maps to raspi `/backwards`) | -| POST | `/motor/stop` | — | Motor stop proxy | -| POST | `/motor/upload` | multipart file | Audio upload via multer in-memory buffer | -| GET | `/audio/incoming` | — | List incoming audio files | -| POST | `/audio/lock/:filename` | — | Lock audio file | -| POST | `/audio/unlock/:filename` | — | Unlock audio file | -| POST | `/audio/cancel/:filename` | — | Cancel locked audio | -| POST | `/audio/process/:filename` | — | Mark processed | -| GET | `/settings` | — | Returns config | -| PUT | `/settings` | `{ raspberryPi: { host, port }, token }` | Update runtime config | - --- ## Command Flow Examples @@ -266,9 +267,11 @@ User clicks "Forward" in D-pad → $fetch('/api/motor/step/forward', { method: 'POST' }) → Nuxt Nitro route: server/api/motor/step/[direction].post.ts → $fetch(config.quibotBaseUrl + '/motor/step/forward', { query: { token } }) - → raspi FastAPI /motor/step/forward - → motor_step("forward") in daemon thread - → step_motor(200, DIR, 1ms pulses) + → Backend Express /motor/step/forward + → raspi.service.motorStepForward() + → Pi FastAPI /motor/step/forward?token=... + → motor_step("forward") in daemon thread on Pi + → step_motor(200, DIR, 1ms pulses) ``` ### Block Processing (internal to Pi) @@ -278,8 +281,18 @@ Child inserts colored block → quibot.py task_read_blocks() polls distance sens → Manhattan distance classification against color lookup table → RED: eyes_turn_on(EYES_FW, DARK_RED, 2) _execute_action(task_move_to, CROSSING) - → enable_wheels(ON) → follow_line_loop(speed) (proportional on TCRT5000) - → After action: servo_move_to(EJECT_POSITION) + → enable_wheels(ON) → follow_line_loop(speed) (proportional on TCRT5000) + → After action: servo_move_to(EJECT_POSITION) +``` + +### TTS Synthesis +``` +User triggers speech in web UI + → POST /tts?text=hello&lang=ca&token=... + → Backend Express tts.controller.ts + → piperService.synthesize() → Piper TTS service + → WAV file saved to /tmp/quibot-audio/tts/{uuid}.wav + → Returns audioUrl + filename ``` ### APK → Audio Upload @@ -317,8 +330,9 @@ User toggles mode in web UI ## Testing -All tests in `raspi/tests/` are **manual diagnostic scripts** (not automated frameworks). Each test is run independently by uncommenting the desired function call at the bottom of the file. Requirements: +Tests reside on the Pi alongside the hardware source code, not in this repository. +Requirements: - `sudo pigpiod -s 1` daemon running - Python venv activated with hardware dependencies installed - Pi connected to robot hardware @@ -337,8 +351,8 @@ All tests in `raspi/tests/` are **manual diagnostic scripts** (not automated fra 1. **Token auth** is always a query parameter matching `QUIBOT_TOKEN` (default: `MY_SECRET_TOKEN`) 2. **No database** — use filesystem for persistence, localStorage for web client state -3. **Backend is a dumb proxy** — no business logic, just forwards HTTP requests with token passthrough -4. **Motor commands are fire-and-forget** — motor runs in daemon thread until `/motor/stop` +3. **Backend proxies Pi endpoints** — motor/audio commands forwarded via raspi.service.ts +4. **Motor commands are fire-and-forget** — motor runs in daemon thread on Pi until `/motor/stop` 5. **Audio lifecycle**: incoming → locked (claim) → processed OR unlocked (release) / cancelled 6. **Eyes breathing** runs continuously at MIN_BR(80)-MAX_BR(170) brightness in background 7. **`quibot.py` owns block/gesture autonomy** — blocks are processed internally on the Pi without backend/web involvement diff --git a/apk/app.json b/apk/app.json index 7c45d9c..c12fff3 100644 --- a/apk/app.json +++ b/apk/app.json @@ -49,6 +49,14 @@ "backgroundColor": "#000000" } } + ], + [ + "expo-build-properties", + { + "android": { + "usesCleartextTraffic": true + } + } ] ], "experiments": { diff --git a/apk/app/index.tsx b/apk/app/index.tsx index eebca44..42e9e10 100644 --- a/apk/app/index.tsx +++ b/apk/app/index.tsx @@ -1,5 +1,4 @@ import { Audio, InterruptionModeAndroid, InterruptionModeIOS } from "expo-av"; -import * as Speech from "expo-speech"; import { router, useFocusEffect } from "expo-router"; import { useCallback, useEffect, useRef, useState } from "react"; import Svg, { Path } from "react-native-svg"; @@ -63,7 +62,9 @@ export default function RecorderScreen() { const [transcriptionText, setTranscriptionText] = useState(""); const [isUploading, setIsUploading] = useState(false); const [isHolding, setIsHolding] = useState(false); + const [isPlaying, setIsPlaying] = useState(false); const recordingRef = useRef(null); + const soundRef = useRef(null); const refreshSettings = useCallback(() => { let isMounted = true; @@ -123,15 +124,29 @@ export default function RecorderScreen() { }; }, [recording]); - async function speak(text: string) { - if (!text || !text.trim()) { - console.log("[TTS] Skipping empty text"); - return; - } + useEffect(() => { + return () => { + void unloadSound(); + }; + }, []); - console.log("[TTS] ===== START speak ====="); - Speech.stop(); - await new Promise((r) => setTimeout(r, 100)); + async function unloadSound() { + if (soundRef.current) { + try { + await soundRef.current.stopAsync(); + await soundRef.current.unloadAsync(); + } catch (err) { + console.log("[TTS] Error unloading sound:", err); + } + soundRef.current = null; + } + setIsPlaying(false); + } + + async function speakWithAudio(audioUrl: string, backendBase: string) { + if (!audioUrl) return false; + + await unloadSound(); try { await Audio.setAudioModeAsync({ @@ -142,38 +157,103 @@ export default function RecorderScreen() { shouldDuckAndroid: true, staysActiveInBackground: false, }); - console.log("[TTS] Audio mode reset OK"); } catch (err) { console.log("[TTS] Audio mode error:", err); } - const lang = locale === "ca" ? "ca-ES" : "en-US"; - await new Promise((r) => setTimeout(r, 800)); - - console.log("[TTS] Calling Speech.speak. Text length:", text.length, "Lang:", lang); try { - Speech.speak(text, { - language: lang, - onDone: () => console.log("[TTS] ✅ onDone fired"), - onError: (error) => console.log("[TTS] ❌ onError:", error), - }); - console.log("[TTS] Speech.speak() call returned OK"); + const fullUrl = audioUrl.startsWith("http") + ? audioUrl + : `${backendBase.replace(/\/+$/, "")}/${audioUrl.replace(/^\/+/, "")}`; + + console.log("[TTS] Loading audio from:", fullUrl); + setIsPlaying(true); + setStatusMessage(strings.playing); + + const { sound } = await Audio.Sound.createAsync( + { uri: fullUrl }, + { shouldPlay: true, volume: 1.0 }, + (status) => { + if (status.isLoaded && status.didJustFinish) { + console.log("[TTS] Audio playback finished"); + void unloadSound(); + } + }, + ); + + soundRef.current = sound; + const status = await sound.getStatusAsync(); + const durationMs = status.isLoaded ? (status.durationMillis ?? 0) : 0; + console.log("[TTS] Playing audio, duration:", durationMs, "ms"); + + return true; } catch (err) { - console.log("[TTS] Speech.speak() threw:", err); + console.log("[TTS] Audio playback error:", err); + setIsPlaying(false); + return false; } } async function speakSequentially(texts: string[]) { if (texts.length === 0) return; + + const trimmedUrl = backendUrl.trim().replace(/\/+$/, ""); for (let i = 0; i < texts.length; i++) { - await speak(texts[i]); - await new Promise((r) => setTimeout(r, 1500)); + const text = texts[i]; + if (!text || !text.trim()) continue; + + try { + setStatusMessage(strings.playing); + console.log("[TTS] Generating TTS audio for text:", text.substring(0, 50)); + + const localeLang = locale === "ca" ? "ca" : "en"; + const ttsParams = new URLSearchParams({ + text: text.trim(), + language: localeLang, + }); + + if (authToken.trim()) { + ttsParams.append("token", authToken.trim()); + } + + const ttsUrl = `${trimmedUrl}/tts?${ttsParams.toString()}`; + const ttsResponse = await fetch(ttsUrl, { method: "POST" }); + + if (!ttsResponse.ok) { + const errText = await ttsResponse.text(); + console.log("[TTS] TTS endpoint error:", ttsResponse.status, errText); + continue; + } + + const ttsData = await ttsResponse.json(); + + if (!ttsData.audioUrl) { + console.log("[TTS] No audioUrl in response:", ttsData); + continue; + } + + const played = await speakWithAudio(ttsData.audioUrl, trimmedUrl); + if (!played) { + setStatusMessage(strings.uploadFailed); + } + + if (i < texts.length - 1) { + await new Promise((r) => setTimeout(r, 800)); + } + } catch (err) { + console.log("[TTS] speakSequentially error:", err); + } } } + async function speak(text: string) { + const texts = [text].filter(Boolean); + await speakSequentially(texts); + } + async function startRecording() { try { - Speech.stop(); + await unloadSound(); setTranscriptionText(""); setResponsePreview(""); setLlmResponseText(""); @@ -367,7 +447,7 @@ export default function RecorderScreen() { try { setIsUploading(true); setStatusMessage(strings.uploadingRecording); - Speech.stop(); + await unloadSound(); setTranscriptionText(""); setResponsePreview(""); setLlmResponseText(""); diff --git a/apk/package.json b/apk/package.json index aba2b50..b0c5f54 100644 --- a/apk/package.json +++ b/apk/package.json @@ -15,7 +15,6 @@ "expo": "~54.0.35", "expo-av": "~16.0.8", "expo-router": "~6.0.24", - "expo-speech": "~14.0.8", "expo-splash-screen": "~31.0.13", "expo-status-bar": "~3.0.9", "react": "19.1.0", diff --git a/backend/src/config.ts b/backend/src/config.ts index 7126d50..7f6fccf 100644 --- a/backend/src/config.ts +++ b/backend/src/config.ts @@ -1,5 +1,6 @@ import dotenv from 'dotenv'; import { readFileSync } from 'fs'; +import { join } from 'path'; dotenv.config(); @@ -7,6 +8,7 @@ let _raspberryHost = process.env.RASPBERRY_PI_HOST ?? 'http://raspberrypi.local' let _raspberryPort = Number(process.env.RASPBERRY_PI_PORT) || 8000; let _token = process.env.QUIBOT_TOKEN ?? 'MY_SECRET_TOKEN'; const APP_PORT = Number(process.env.PORT) || 5000; +const piperUrl = process.env.PIPER_URL ?? ''; const llamacppUrl = process.env.LLAMA_CPP_URL ?? ''; const llamacppApiKey = process.env.LLAMA_API_KEY ?? ''; const llamaPreambleRaw = process.env.LLAMA_PREAMBLE ?? ''; @@ -42,4 +44,9 @@ export const getLlamacppUrl = () => llamacppUrl; export const getLlamacppApiKey = () => llamacppApiKey; export const getLlamacppPreamble = () => llamacppPreamble; +export const getPiperUrl = () => piperUrl; +export const getPiperModel = () => + process.env.PIPER_MODEL || + join('/tmp', 'quibot-piper-models', 'ca_ES-upc_ona-medium.onnx'); + export const getAppPort = () => APP_PORT; diff --git a/backend/src/controllers/tts.controller.ts b/backend/src/controllers/tts.controller.ts new file mode 100644 index 0000000..1e81940 --- /dev/null +++ b/backend/src/controllers/tts.controller.ts @@ -0,0 +1,52 @@ +import { Router } from 'express'; +import { randomUUID } from 'crypto'; +import { join } from 'path'; +import { mkdirSync, writeFileSync } from 'fs'; +import { piperService } from '../services/piper.service.js'; +import { getToken } from '../config.js'; + +const router = Router(); + +const TTS_AUDIO_DIR = join('/tmp', 'quibot-audio', 'tts'); +try { mkdirSync(TTS_AUDIO_DIR, { recursive: true }); } catch { /* ignore */ } + +router.post('/', async (req, res) => { + try { + const text = (req.query.text as string) || (req.body?.text as string); + // Accept 'lang' or 'language' — APK sends 'language', old tests use 'lang' + const lang = (req.query.lang as string) || (req.query.language as string) || 'ca'; + const token = (req.query.token as string) || ''; + + if (!text?.trim()) { + return res.status(400).json({ error: 'Missing query parameter: text' }); + } + + const expectedToken = getToken(); + if (token && token !== expectedToken) { + return res.status(401).json({ error: 'Unauthorized: invalid token' }); + } + + console.log(`[tts] Generating audio for text (${lang}): "${text.substring(0, 60)}..."`); + + // Ensure Piper subprocess is initialized before synthesis + await piperService.initWav(); + + const wavBuffer = await piperService.synthWav(text.trim()); + + const filename = `${randomUUID()}.wav`; + writeFileSync(join(TTS_AUDIO_DIR, filename), wavBuffer); + + console.log(`[tts] Audio saved: ${filename}`); + + return res.json({ + audioUrl: `/tts-audio/${filename}`, + filename, + }); + } catch (err: unknown) { + const message = err instanceof Error ? err.message : 'Unknown error'; + console.error(`[tts] Error: ${message}`); + return res.status(500).json({ error: `TTS synthesis failed: ${message}` }); + } +}); + +export default router; diff --git a/backend/src/index.ts b/backend/src/index.ts index f83c8a2..b7d8f41 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -3,6 +3,7 @@ import cors from 'cors'; import router from './routes/router.js'; import { getAppPort, getConfig } from './config.js'; import { whisperService } from './services/whisper.service.js'; +import { piperService as piperWorker } from './services/piper.service.js'; const app = express(); @@ -14,6 +15,9 @@ app.use('/audio', express.json()); app.use('/motor', express.json()); app.use('/commands', express.json()); +// Serve generated TTS audio files to the APK +app.use('/tts-audio', express.static('/tmp/quibot-audio/tts')); + app.use(router); app.get('/health', (_req, res) => { @@ -24,12 +28,13 @@ app.get('/health', (_req, res) => { const server = app.listen(getAppPort(), () => { console.log(`QuiBot backend listening on port ${getAppPort()}`); whisperService.spawn(); + piperWorker.initWav().catch(() => { /* model may not exist yet → lazy init on first TTS call */ }); }); async function shutdown(signal: string) { console.log(`[server] ${signal} received, shutting down...`); server.close(async () => { - await whisperService.shutdown(); + await Promise.all([whisperService.shutdown(), piperWorker.shutdown()]); process.exit(0); }); } diff --git a/backend/src/piper-worker.py b/backend/src/piper-worker.py new file mode 100644 index 0000000..f1417a0 --- /dev/null +++ b/backend/src/piper-worker.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 +"""Persistent Piper TTS worker – single subprocess, model loaded once.""" + +import sys +import json +import wave +import io + + +class PiperStdoutSink: + """Wraps stdout so we write exactly N bytes.""" + + def __init__(self): + self._remaining = 0 + + def set_length(self, length: int): + self._remaining = length + + def write(self, data: bytes): + to_write = min(len(data), self._remaining) + if to_write > 0: + sys.stdout.buffer.write(data[:to_write]) + sys.stdout.buffer.flush() + self._remaining -= to_write + + +def main(): + from piper import PiperVoice + + model_path = "" + config_path = None + voice = None + + # Defaults (will be overridden by init message) + DEFAULT_MODEL = "/tmp/quibot-piper-models/ca_ES-upc_ona-medium.onnx" + DEFAULT_CONFIG = "/tmp/quibot-piper-models/ca_ES-upc_ona-medium.onnx.json" + + # Signal node that the process is alive and listening + print(json.dumps({"type": "ready"}), flush=True) + + for line in sys.stdin: + line = line.strip() + if not line: + continue + + try: + msg = json.loads(line) + except json.JSONDecodeError: + continue + + if msg.get("type") == "init": + model_path = msg.get("model", DEFAULT_MODEL) or DEFAULT_MODEL + config_path = msg.get("config", None) + print(f"[piper-worker] Loading model='{model_path}'", file=sys.stderr, flush=True) + try: + voice = PiperVoice.load(model_path, config_path=config_path) + print(json.dumps({"type": "init_ok"}), flush=True) + except Exception as exc: + err_msg = str(exc).replace('"', '\\"') + print(json.dumps({"type": "init_error", "error": err_msg}), flush=True) + + elif msg.get("type") == "synthesize": + text = msg.get("text", "") + msg_id = msg.get("msgId", "") + if not voice: + print(json.dumps({ + "type": "error", + "text": "Model not loaded, send init first", + "msgId": msg_id, + }), flush=True) + continue + + try: + buf = io.BytesIO() + wf = wave.open(buf, 'wb') + # set_wav_config is called inside synthesize_wav via set_wav_format=True (default) + voice.synthesize_wav(text, wf) + wf.close() + wav_bytes = buf.getvalue() + + # Frame: send length prefix as ASCII number + newline, then raw bytes + header = f"{len(wav_bytes)}\n".encode('ascii') + sys.stdout.buffer.write(header) + sys.stdout.buffer.write(wav_bytes) + sys.stdout.buffer.flush() + + print(json.dumps({ + "type": "synthesized", + "bytes": len(wav_bytes), + "msgId": msg_id, + }), flush=True) + except Exception as exc: + err_msg = str(exc).replace('"', '\\"') + print(json.dumps({ + "type": "error", + "text": err_msg, + "msgId": msg_id, + }), flush=True) + + +if __name__ == "__main__": + main() diff --git a/backend/src/routes/router.ts b/backend/src/routes/router.ts index 8fe454a..e519862 100644 --- a/backend/src/routes/router.ts +++ b/backend/src/routes/router.ts @@ -3,6 +3,7 @@ import motorController from '../controllers/motor.controller.js'; import audioController from '../controllers/audio.controller.js'; import commandController from '../controllers/command.controller.js'; import settingsController from '../controllers/settings.controller.js'; +import ttsController from '../controllers/tts.controller.js'; const router = Router(); @@ -10,5 +11,6 @@ router.use('/motor', motorController); router.use('/audio', audioController); router.use('/commands', commandController); router.use('/settings', settingsController); +router.use('/tts', ttsController); export default router; diff --git a/backend/src/services/piper.http.service.ts b/backend/src/services/piper.http.service.ts new file mode 100644 index 0000000..fea20f3 --- /dev/null +++ b/backend/src/services/piper.http.service.ts @@ -0,0 +1,26 @@ +import axios from 'axios'; +import { getPiperUrl } from '../config.js'; + +export interface PiperSynthesisParams { + text: string; + lang?: string; +} + +class PiperHttpService { + async synthesize(params: PiperSynthesisParams): Promise { + const piperUrl = getPiperUrl(); + if (!piperUrl) throw new Error('PIPER_URL not configured'); + + const speakerId = params.lang === 'en' ? 1 : 0; + + const response = await axios.post( + `${piperUrl}/api/tts`, + { text: params.text, speaker_id: speakerId }, + { responseType: 'arraybuffer', timeout: 30_000 }, + ); + + return Buffer.from(response.data, 'binary'); + } +} + +export const piperService = new PiperHttpService(); diff --git a/backend/src/services/piper.service.ts b/backend/src/services/piper.service.ts new file mode 100644 index 0000000..fdc8df6 --- /dev/null +++ b/backend/src/services/piper.service.ts @@ -0,0 +1,210 @@ +import { spawn, ChildProcess } from 'child_process'; +import { join } from 'path'; +import { fileURLToPath } from 'url'; +import { randomUUID } from 'crypto'; +import { readFileSync, rmSync, readdirSync, statSync, unlinkSync, mkdirSync } from 'fs'; +import { piperService as httpPiperService } from './piper.http.service.js'; +import { getPiperUrl, getPiperModel } from '../config.js'; + +const __filename = fileURLToPath(import.meta.url); +const __dirname = join(__filename, '..'); +const SCRIPT_DIR = join(__dirname, '..'); + +const TTS_DIR = join('/tmp', 'quibot-audio', 'tts-piper'); +mkdirSync(TTS_DIR, { recursive: true }); + +// ─── type-guard for JSON messages from piper-worker ─── +type PiperMsg = + | { type: 'ready' } + | { type: 'init_ok' } + | { type: 'init_error'; error: string } + | { type: 'synthesized'; wavPath: string; bytes?: number; msgId: string } + | { type: 'error'; text: string; msgId: string }; + +class PiperLocalService { + private proc: ChildProcess | null = null; + private pendingInit: Promise | null = null; + private cleanupTimer: ReturnType | null = null; + + // ── spawn + stdout parser (simple: pure JSON on stdout, WAV on disk) ── + + private setupStdout(): void { + if (!this.proc?.stdout) return; + let buf = ''; + this.proc.stdout.on('data', (chunk: Buffer) => { + buf += chunk.toString(); + while (true) { + const nl = buf.indexOf('\n'); + if (nl === -1) break; + const line = buf.slice(0, nl).trim(); + buf = buf.slice(nl + 1); + if (!line) continue; + this.handleLine(line); + } + }); + } + + private handleLine(line: string): void { + try { const msg = JSON.parse(line) as PiperMsg; /* handled below */ } catch { return; } + const msg = JSON.parse(line) as PiperMsg; + if (msg.type === 'ready') return; + if (msg.type === 'init_ok') { this.resolveInit(); } + if (msg.type === 'init_error') { + if (this.pendingInit) { this.pendingInit = null; this.initResolve = null; } + this.initReject(new Error(msg.error)); + } + if (msg.type === 'synthesized') { + this.resolveResponse(msg.msgId, msg.wavPath); + } + } + + private async writeStdin(line: string): Promise { + if (!this.proc?.stdin) throw new Error('piper-svc: stdin unavailable'); + this.proc.stdin.write(line + '\n'); + } + + // ── pending-init promises (simplest possible design) ── + private initResolve: (() => void) | null = null; + private initReject: (e: Error) => void = () => { throw new Error('no reject handler'); }; + + private resolveInit(): void { + if (!this.pendingInit) return; + this.initResolve?.(); this.pendingInit = null; + this.initResolve = null; + } + + // ── pending synth responses ── + private respMap = new Map void>(); + + private resolveResponse(msgId: string, wavPath: string): void { + const fn = this.respMap.get(msgId); + this.respMap.delete(msgId); + if (fn) fn(wavPath); + } + + // ── public spawn / initWav / synthWav ── + + private async _spawn(): Promise { + if (this.proc) return; + + const workerPath = join(SCRIPT_DIR, 'src', 'piper-worker.py'); + const venv = process.env.VIRTUAL_ENV || join(SCRIPT_DIR, '.venv', 'bin', 'python3'); + this.proc = spawn(venv, [workerPath], { stdio: ['pipe', 'pipe', 'pipe'] }); + + this.setupStdout(); + + this.proc.on('exit', () => { + // reject all pending + for (const [, fn] of this.respMap) fn(''); + this.respMap.clear(); + if (this.pendingInit) { this.initReject(new Error('piper process exited')); this.pendingInit = null; } + }); + + // ── cleanup old WAV files every 5 min ── + const timer = setInterval(() => { + try { + const now = Date.now(); + for (const entry of readdirSync(TTS_DIR)) { + const fp = join(TTS_DIR, entry); + try { + const s = statSync(fp); + if (s.isFile() && now - s.mtimeMs > 300_000) unlinkSync(fp); + } catch { /* ignore */ } + } + } catch { /* ignore */ } + }, 5 * 60 * 1000); + } + + async initWav(): Promise { + if (!this.proc) await this._spawn(); + if (this.pendingInit) return this.pendingInit; + + this.pendingInit = new Promise((resolve, reject) => { + // override the global reject + const origReject = this.initReject; + this.initResolve = resolve; + this.initReject = (e: Error) => { + // cleanup + for (const [, fn] of this.respMap) fn(''); + this.respMap.clear(); + reject(e); + }; + }); + + const modelPath = getPiperModel() || join('/tmp', 'quibot-piper-models', 'ca_ES-upc_ona-medium.onnx'); + const cfgPath = modelPath.replace(/\.onnx$/, '.onnx.json'); + await this.writeStdin( + JSON.stringify({ type: 'init', model: modelPath, config: cfgPath }), + ); + return this.pendingInit; + } + + /** Synthesize with local Piper subprocess (primary) and HTTP Piper fallback */ + async synthesize(params: { text: string }): Promise { + try { + await this.initWav(); + return await this.synthWav(params.text); + } catch (localErr) { + const url = getPiperUrl(); + if (url) { + console.log(`[tts] Local Piper failed: ${localErr instanceof Error ? localErr.message : localErr}. Falling back to remote.`); + return await httpPiperService.synthesize({ ...params, lang: 'ca' }); + } + throw localErr; + } + } + + async synthWav(text: string): Promise { + if (!this.proc) await this._spawn(); // auto-spawn; init runs concurrently + const msgId = randomUUID() + '-' + Date.now(); + + return new Promise((resolve, reject) => { + let cleared = false; + const timer = setTimeout(() => { + if (cleared) return; + cleared = true; + this.respMap.delete(msgId); + reject(new Error('piper-svc: synthesis timed out')); + }, 30_000); + + this.respMap.set(msgId, (wavPath: string) => { + if (cleared) return; + cleared = true; + clearTimeout(timer); + try { + const buf = readFileSync(wavPath); + rmSync(wavPath, { force: true }); + resolve(buf); + } catch (err: unknown) { + reject(err instanceof Error ? err : new Error('read WAV failed')); + } + }); + + this.writeStdin( + JSON.stringify({ type: 'synthesize', text, msgId }), + ).catch((e) => { + if (cleared) return; + cleared = true; + this.respMap.delete(msgId); + reject(e instanceof Error ? e : new Error(String(e))); + }); + }); + } + + // ── shutdown ── + + async shutdown(): Promise { + this.cleanupTimer?.refresh(); this.cleanupTimer = null; + if (!this.proc) return; + const p = this.proc; this.proc = null; + await new Promise((res) => { + p.on('exit', res); + setTimeout(res, 3000); + if (!p.killed) { try { p.stdin?.end(); } catch {} p.kill('SIGTERM'); } + }); + } +} + +// ─── singleton ──────────────────────────────────────────────── + +export const piperService = new PiperLocalService(); diff --git a/raspi/.gitignore b/raspi/.gitignore deleted file mode 100644 index 0540009..0000000 --- a/raspi/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -__pycache__/ -venv/ \ No newline at end of file diff --git a/raspi/blocks.py b/raspi/blocks.py deleted file mode 100644 index 39ed457..0000000 --- a/raspi/blocks.py +++ /dev/null @@ -1,160 +0,0 @@ -""" -blocks.py — Lectura del sensor de color TCS34725 i servo d'expulsió de blocs. -Equivalent a blocks.cpp del codi Arduino/ESP32. - -Requereix /boot/config.txt: - dtoverlay=i2c-gpio,bus=4,i2c_gpio_sda=22,i2c_gpio_scl=27 -""" - -import time -import pigpio -import adafruit_extended_bus -import adafruit_tcs34725 - -from pins import SERVO_PWM - -# ================== -# IDs de color -# ================== - -BK = 0 # Negre (no reconegut) -RD = 1 # Vermell → avançar -GN = 2 # Verd → girar dreta -BU = 3 # Blau → girar esquerra -YE = 4 # Groc → xuclar líquid -OG = 5 # Taronja → buidar líquid -VT = 6 # Violeta → sorpresa - -NUM_COLORS = 7 - -# Taula de colors de referència (valors RGB 0–255, calibrats amb el sensor) -_COLORS = [ - {"name": "BK", "r": 80, "g": 80, "b": 80}, - {"name": "RD", "r": 202, "g": 32, "b": 34}, - {"name": "GN", "r": 107, "g": 90, "b": 57}, - {"name": "BU", "r": 104, "g": 83, "b": 66}, - {"name": "YE", "r": 150, "g": 69, "b": 33}, - {"name": "OG", "r": 185, "g": 44, "b": 32}, - {"name": "VT", "r": 129, "g": 70, "b": 55}, -] - -# ================== -# Paràmetres del servo -# ================== - -# Valors en µs de pulse width (pigpio set_servo_pulsewidth). -# Conversió des de l'ESP32 (16 bits, 50Hz): valor/65535 * 20000µs -# MIN = 3277/65535 * 20000 ≈ 1000µs -# MAX = 8000/65535 * 20000 ≈ 2440µs -# EJECT= 6450/65535 * 20000 ≈ 1968µs -MIN_SERVO_US = 1000 # µs → ~0° -MAX_SERVO_US = 2440 # µs → ~180° -OPEN_POSITION = 2440 # µs (equivalent a 8000 ESP32) -EJECT_POSITION = 1968 # µs (equivalent a 6450 ESP32) -_INCREMENT_US = 3 # µs per iteració de 1ms (equivalent a increment de 10 ESP32) - -# ================== -# Instàncies globals -# ================== - -_pi: pigpio.pi = None -_color_sensor = None -_current_servo_pos: int = OPEN_POSITION - - -# ================== -# Setup -# ================== - -def blocks_setup(pi: pigpio.pi): - """ - Inicialitza el servo i el sensor de color TCS34725. - El servo arranca en OPEN_POSITION. - """ - global _pi, _color_sensor, _current_servo_pos - - _pi = pi - - # Servo - pi.set_mode(SERVO_PWM, pigpio.OUTPUT) - _current_servo_pos = OPEN_POSITION - pi.set_servo_pulsewidth(SERVO_PWM, _current_servo_pos) - - # Sensor de color TCS34725 via bus I2C 4 (bit-bang GPIO22=SDA, GPIO27=SCL) - i2c = adafruit_extended_bus.ExtendedI2C(4) - _color_sensor = adafruit_tcs34725.TCS34725(i2c) - _color_sensor.integration_time = 50 # ms - _color_sensor.gain = 4 # 4x (equivalent a TCS34725::Gain::X04) - - -# ================== -# Servo -# ================== - -def servo_move_to(target_us: int): - """ - Mou el servo fins a target_us de forma suau. - Incrementa/decrementa _INCREMENT_US cada mil·lisegon. - """ - global _current_servo_pos - - if not (MIN_SERVO_US <= target_us <= MAX_SERVO_US): - return - - while True: - if _current_servo_pos < target_us - _INCREMENT_US: - _current_servo_pos += _INCREMENT_US - elif _current_servo_pos > target_us + _INCREMENT_US: - _current_servo_pos -= _INCREMENT_US - else: - _current_servo_pos = target_us - - _pi.set_servo_pulsewidth(SERVO_PWM, _current_servo_pos) - time.sleep(0.001) - - if _current_servo_pos == target_us: - return - - -# ================== -# Sensor de color -# ================== - -def _calc_colors_difference(measured: tuple, reference: dict) -> int: - """Distància Manhattan entre el color mesurat i un color de referència.""" - return (abs(measured[0] - reference["r"]) + - abs(measured[1] - reference["g"]) + - abs(measured[2] - reference["b"])) - - -def read_color_raw() -> tuple: - """Retorna (r, g, b) en bytes 0-255 sense classificació. Útil per calibrar.""" - try: - return _color_sensor.color_rgb_bytes - except Exception: - return (0, 0, 0) - - -def read_block_color() -> int: - """ - Llegeix el color del bloc des del sensor TCS34725. - Compara contra la taula de referència per distància Manhattan. - Retorna l'ID del color més proper, o BK si cap supera el llindar. - """ - MAX_DIFFERENCE = 15 - - try: - r, g, b = _color_sensor.color_rgb_bytes - except Exception: - return BK - - min_difference = MAX_DIFFERENCE - min_diff_color = BK - - for color_id, ref in enumerate(_COLORS): - diff = _calc_colors_difference((r, g, b), ref) - if diff < min_difference: - min_difference = diff - min_diff_color = color_id - - return min_diff_color diff --git a/raspi/eyes.py b/raspi/eyes.py deleted file mode 100644 index 5a534d0..0000000 --- a/raspi/eyes.py +++ /dev/null @@ -1,273 +0,0 @@ -""" -eyes.py — Control de les matrius LED 8x8 RGB WS2811 (ulls del robot). -Equivalent a eyes.cpp del codi Arduino/ESP32. - -FastLED → pigpio waveforms (GPIO26, qualsevol pin). - -REQUISIT: iniciar el dimoni pigpio amb resolució d'1µs: - sudo pigpiod -s 1 -Si s'inicia sense -s 1 (defecte 5µs), els LEDs no funcionaran correctament. - -Si en el futur es fa una modificació hardware (GPIO26 → GPIO18 o GPIO21), -es pot substituir _send_ws2811() per rpi_ws281x sense canviar cap altra funció. -""" - -import time -import threading -import pigpio - -from pins import LED_DATA - -# ================== -# Constants -# ================== - -ROW_NUM = 8 -COL_NUM = 8 -NUM_LEDS = ROW_NUM * COL_NUM * 2 # 128 LEDs (2 matrius 8x8) - -MAX_BR = 170 # Brillantor màxima del parpelleig (0–255) -MIN_BR = 80 # Brillantor mínima del parpelleig - -# Colors predefinits (R, G, B) -WHITE = (255, 255, 255) -RED = (255, 0, 0) -GREEN = ( 0, 255, 0) -BLUE = ( 0, 0, 255) -YELLOW = (255, 200, 0) -ORANGE = (255, 80, 0) -PURPLE = (180, 0, 255) -CYAN = ( 0, 255, 255) # Color del mode gestos -BLACK = ( 0, 0, 0) - -# ================== -# Formes dels ulls (índexs dels LEDs actius) -# ================== - -class EyeShape: - """Conjunt de LEDs que formen una expressió dels ulls.""" - def __init__(self, leds: list): - self.leds = leds - self.len = len(leds) - - -EYES_OPEN = EyeShape([ - 102, 89, 38, 25, 106, 101, 90, 85, 42, 37, 26, 21, - 107, 100, 91, 84, 43, 36, 27, 20, 108, 99, 92, 83, - 44, 35, 28, 19, 109, 98, 93, 82, 45, 34, 29, 18, - 97, 94, 33, 30 -]) - -EYES_FW = EyeShape([ - 103, 88, 39, 24, 105, 102, 89, 86, 41, 38, 25, 22, - 117, 106, 101, 90, 85, 74, 53, 42, 37, 26, 21, 10, - 123, 116, 100, 91, 75, 68, 59, 52, 36, 27, 11, 4, - 99, 92, 35, 28, 98, 93, 34, 29, 97, 94, 33, 30, 96, - 95, 32, 31 -]) - -EYES_DOWN = EyeShape([97, 94, 33, 30]) - -# Nova forma per al mode gestos: marc extern dels dos ulls (expressió "atenta") -EYES_GESTURE = EyeShape([ - 96, 97, 98, 99, 100, 101, 102, 103, - 104, 111, 112, 119, 120, 127, - 31, 32, 33, 34, 35, 36, 37, 38, 39, - 0, 7, 8, 15, 16, 23 -]) - -# ================== -# Estat global -# ================== - -_pi: pigpio.pi = None -_leds = [[0, 0, 0] for _ in range(NUM_LEDS)] -_brightness = MAX_BR -_leds_lock = threading.Lock() - -_update_stop = threading.Event() -_update_thread: threading.Thread = None - -# Màscara GPIO per a les waveforms de pigpio -_GPIO_MASK: int = 0 - - -# ================== -# WS2811 via pigpio waveforms -# ================== - -def _send_ws2811(data: bytes): - """ - Envia dades RGB als LEDs WS2811 via pigpio waveforms. - Ordre de color: GRB (igual que FastLED amb WS2811, GRB). - Timing a 1µs de resolució (requereix sudo pigpiod -s 1): - - Bit 0: 1µs HIGH + 2µs LOW (spec: 0.5µs + 2.0µs) - - Bit 1: 2µs HIGH + 1µs LOW (spec: 1.2µs + 1.3µs) - - Reset: 80µs LOW - """ - pulses = [] - for byte_val in data: - for bit in range(7, -1, -1): - if byte_val & (1 << bit): - pulses.append(pigpio.pulse(_GPIO_MASK, 0, 2)) - pulses.append(pigpio.pulse(0, _GPIO_MASK, 1)) - else: - pulses.append(pigpio.pulse(_GPIO_MASK, 0, 1)) - pulses.append(pigpio.pulse(0, _GPIO_MASK, 2)) - pulses.append(pigpio.pulse(0, _GPIO_MASK, 80)) # reset - - _pi.wave_add_new() - _pi.wave_add_generic(pulses) - wid = _pi.wave_create() - if wid >= 0: - _pi.wave_send_once(wid) - while _pi.wave_tx_busy(): - pass - _pi.wave_delete(wid) - - -def _eyes_show(brightness: int): - """Renderitza l'estat actual de _leds amb la brillantor indicada.""" - data = bytearray(NUM_LEDS * 3) - scale = brightness / 255 - for i, (r, g, b) in enumerate(_leds): - data[i * 3 + 0] = int(g * scale) # WS2811 GRB: primer G - data[i * 3 + 1] = int(r * scale) - data[i * 3 + 2] = int(b * scale) - _send_ws2811(bytes(data)) - - -# ================== -# Setup i cleanup -# ================== - -def eyes_setup(pi: pigpio.pi): - """Inicialitza el GPIO i arrenca el thread de parpelleig.""" - global _pi, _GPIO_MASK, _update_thread - - _pi = pi - _GPIO_MASK = 1 << LED_DATA - - pi.set_mode(LED_DATA, pigpio.OUTPUT) - pi.write(LED_DATA, 0) - - _update_stop.clear() - _update_thread = threading.Thread( - target=_task_update_leds, daemon=True, name="eyes" - ) - _update_thread.start() - - -def eyes_cleanup(): - """Atura el thread de parpelleig i apaga els LEDs.""" - _update_stop.set() - if _update_thread: - _update_thread.join(timeout=1.0) - with _leds_lock: - for i in range(NUM_LEDS): - _leds[i] = [0, 0, 0] - _eyes_show(255) - - -# ================== -# Thread de parpelleig (equivalent a task_update_leds del FreeRTOS) -# ================== - -def _task_update_leds(): - """ - Bucle continu que fa respirar la brillantor dels ulls. - Equivalent a task_update_leds() del FreeRTOS. - """ - global _brightness - going_up = False # Al C++ comença a MAX_BR i baixa - _brightness = MAX_BR - - while not _update_stop.is_set(): - if going_up: - if _brightness < MAX_BR: - _brightness += 2 - else: - going_up = False - else: - if _brightness > MIN_BR: - _brightness -= 2 - else: - going_up = True - - with _leds_lock: - _eyes_show(_brightness) - time.sleep(0.05) - - -# ================== -# Animacions (equivalent a les funcions de eyes.cpp) -# ================== - -def eyes_turn_off(): - """Apaga tots els LEDs amb un fos progressiu.""" - for _ in range(50): - with _leds_lock: - for i in range(NUM_LEDS): - r, g, b = _leds[i] - _leds[i] = [int(r * 245 / 255), - int(g * 245 / 255), - int(b * 245 / 255)] - time.sleep(0.01) - with _leds_lock: - for i in range(NUM_LEDS): - _leds[i] = [0, 0, 0] - - -def eyes_turn_on(shape: EyeShape, color: tuple, - repeat: int = 1, forward: bool = True): - """ - Encén els LEDs d'una forma un per un, amb animació. - shape: forma a dibuixar (EYES_OPEN, EYES_FW, EYES_DOWN…) - color: color RGB com a tupla (r, g, b) - repeat: nombre de vegades que es repeteix l'animació - forward: True = ordre normal, False = ordre invers - """ - r, g, b = color - for rep in range(repeat): - eyes_turn_off() - for i in range(shape.len): - idx = shape.leds[i if forward else (shape.len - 1 - i)] - with _leds_lock: - _leds[idx] = [r, g, b] - time.sleep(0.008) - - if rep < repeat - 1: - for i in range(shape.len): - idx = shape.leds[i if forward else (shape.len - 1 - i)] - with _leds_lock: - _leds[idx] = [0, 0, 0] - time.sleep(0.008) - - -# ================== -# Animacions noves — mode gestos (TFG) -# ================== - -def eyes_gesture_mode_on(): - """ - Animació d'activació del mode gestos. - Parpelleig doble en cian per indicar que el robot escolta gestos. - """ - eyes_turn_on(EYES_OPEN, CYAN, repeat=2) - - -def eyes_gesture_mode_off(): - """ - Animació de desactivació del mode gestos. - Torna als ulls oberts en blanc. - """ - eyes_turn_off() - eyes_turn_on(EYES_OPEN, WHITE) - - -def eyes_listening(): - """ - Expressió "escoltant": marc extern dels ulls en cian. - Es mostra mentre el robot espera un gest. - """ - eyes_turn_on(EYES_GESTURE, CYAN) diff --git a/raspi/gesture.py b/raspi/gesture.py deleted file mode 100644 index 04bd6e6..0000000 --- a/raspi/gesture.py +++ /dev/null @@ -1,238 +0,0 @@ -""" -gesture.py — Lectura del sensor de gestos PAJ7620U2 via I2C raw (smbus2). -Equivalent a gesture.cpp del codi Arduino/ESP32. - -No hi ha pin INT disponible al PCB → polling cada 50ms en un thread. -Bus I2C 3 (GPIO2=SDA, GPIO1=SCL, compartit amb VL53L0X i ADS1115). - -Llibreria C++ equivalent: RevEng_PAJ7620 (Aaron S. Crandall) -""" - -import time -import threading -import smbus2 - -# ================== -# IDs de gest -# ================== - -GS_NONE = 0 -GS_FORWARD = 1 -GS_LEFT = 2 -GS_RIGHT = 3 -GS_UP = 4 -GS_DOWN = 5 -GS_CLOCKWISE = 6 -GS_ANTICLOCKWISE = 7 -GS_WAVE = 8 - -# Àlies per al quibot.py -GS_CW = GS_CLOCKWISE -GS_CCW = GS_ANTICLOCKWISE - -# ================== -# Registres PAJ7620U2 -# ================== - -_PAJ7620_ADDR = 0x73 -_REG_BANK_SEL = 0xEF # Registre de selecció de banc (0x00=banc0, 0x01=banc1) -_REG_PART_ID_LSB = 0x00 # Ha de retornar 0x20 -_REG_PART_ID_MSB = 0x01 # Ha de retornar 0x76 -_REG_GESTURE_0 = 0x43 # Bits 0–7: Right, Left, Up, Down, Forward, Backward, CW, CCW -_REG_GESTURE_1 = 0x44 # Bit 0: Wave - -# Bits de gest al registre 0x43 -_BIT_RIGHT = 0x01 -_BIT_LEFT = 0x02 -_BIT_UP = 0x04 -_BIT_DOWN = 0x08 -_BIT_FORWARD = 0x10 -_BIT_BACKWARD = 0x20 -_BIT_CW = 0x40 -_BIT_CCW = 0x80 - -# Bit de gest al registre 0x44 -_BIT_WAVE = 0x01 - -# ================== -# Seqüències d'inicialització -# ================== - -_INIT_BANK0 = [ - (0x32, 0x29), (0x33, 0x01), (0x34, 0x00), (0x35, 0x01), (0x36, 0x00), - (0x37, 0x07), (0x38, 0x17), (0x39, 0x06), (0x3A, 0x12), (0x3F, 0x00), - (0x40, 0x02), (0x41, 0xFF), (0x42, 0x01), (0x46, 0x2D), (0x47, 0x0F), - (0x48, 0x3C), (0x49, 0x00), (0x4A, 0x1E), (0x4B, 0x00), (0x4C, 0x20), - (0x4D, 0x00), (0x4E, 0x1A), (0x4F, 0x14), (0x50, 0x00), (0x51, 0x10), - (0x52, 0x00), (0x5C, 0x02), (0x5D, 0x00), (0x5E, 0x10), (0x5F, 0x3F), - (0x60, 0x27), (0x61, 0x28), (0x62, 0x00), (0x63, 0x03), (0x64, 0xF7), - (0x65, 0x03), (0x66, 0xD9), (0x67, 0x03), (0x68, 0x01), (0x69, 0xC8), - (0x6A, 0x40), (0x6D, 0x04), (0x6E, 0x00), (0x6F, 0x00), (0x70, 0x80), - (0x71, 0x00), (0x72, 0x00), (0x73, 0x00), (0x74, 0xF0), (0x75, 0x00), - (0x80, 0x42), (0x81, 0x44), (0x82, 0x04), (0x83, 0x20), (0x84, 0x20), - (0x85, 0x00), (0x86, 0x10), (0x87, 0x00), (0x88, 0x05), (0x89, 0x18), - (0x8A, 0x10), (0x8B, 0x01), (0x8C, 0x37), (0x8D, 0x00), (0x8E, 0xF0), - (0x8F, 0x81), (0x90, 0x06), (0x91, 0x06), (0x92, 0x1E), (0x93, 0x0D), - (0x94, 0x0A), (0x95, 0x0A), (0x96, 0x0C), (0x97, 0x05), (0x98, 0x0A), - (0x99, 0x41), (0x9A, 0x14), (0x9B, 0x0A), (0x9C, 0x3F), (0x9D, 0x33), - (0x9E, 0xAE), (0x9F, 0xF9), (0xA0, 0x48), (0xA1, 0x13), (0xA2, 0x10), - (0xA3, 0x08), (0xA4, 0x30), (0xA5, 0x19), (0xA6, 0x10), (0xA7, 0x08), - (0xA8, 0x24), (0xA9, 0x04), (0xAA, 0x1E), (0xAB, 0x1E), (0xCC, 0x19), - (0xCD, 0x0B), (0xCE, 0x13), (0xCF, 0x64), (0xD0, 0x21), (0xD1, 0x0F), - (0xD2, 0x88), (0xE0, 0x01), (0xE1, 0x04), (0xE2, 0x41), (0xE3, 0xD6), - (0xE4, 0x00), (0xE5, 0x0C), (0xE6, 0x0A), (0xE7, 0x00), (0xE8, 0x00), - (0xE9, 0x00), (0xEE, 0x07), -] - -_INIT_BANK1 = [ - (0x00, 0x1E), (0x01, 0x1E), (0x02, 0x0F), (0x03, 0x10), (0x04, 0x02), - (0x05, 0x00), (0x06, 0xB0), (0x07, 0x04), (0x08, 0x0D), (0x09, 0x0E), - (0x0A, 0x9C), (0x0B, 0x04), (0x0C, 0x05), (0x0D, 0x0F), (0x0E, 0x02), - (0x0F, 0x12), (0x10, 0x02), (0x11, 0x02), (0x12, 0x00), (0x13, 0x01), - (0x14, 0x05), (0x15, 0x07), (0x16, 0x05), (0x17, 0x07), (0x18, 0x01), - (0x19, 0x04), (0x1A, 0x05), (0x1B, 0x0C), (0x1C, 0x2A), (0x1D, 0x01), - (0x1E, 0x00), (0x21, 0x00), (0x22, 0x00), (0x23, 0x00), (0x25, 0x01), - (0x26, 0x00), (0x27, 0x39), (0x28, 0x7F), (0x29, 0x08), (0x30, 0x03), - (0x31, 0x00), (0x32, 0x1A), (0x33, 0x1A), (0x34, 0x07), (0x35, 0x07), - (0x36, 0x01), (0x37, 0xFF), (0x38, 0x36), (0x39, 0x07), (0x3A, 0x00), - (0x3E, 0xFF), (0x3F, 0x00), (0x40, 0x77), (0x41, 0x40), (0x42, 0x00), - (0x43, 0x30), (0x44, 0xA0), (0x45, 0x5C), (0x46, 0x00), (0x47, 0x00), - (0x48, 0x58), (0x4A, 0x1E), (0x4B, 0x1E), (0x4C, 0x00), (0x4D, 0x00), - (0x4E, 0xA0), (0x4F, 0x80), (0x50, 0x00), (0x51, 0x00), (0x52, 0x00), - (0x53, 0x00), (0x54, 0x00), (0x57, 0x80), (0x59, 0x10), (0x5A, 0x08), - (0x5B, 0x94), (0x5C, 0xE8), (0x5D, 0x08), (0x5E, 0x3D), (0x5F, 0x99), - (0x60, 0x45), (0x61, 0x40), (0x63, 0x2D), (0x64, 0x02), (0x65, 0x96), - (0x66, 0x00), (0x67, 0x97), (0x68, 0x01), (0x69, 0xCD), (0x6A, 0x01), - (0x6B, 0xB0), (0x6C, 0x04), (0x6D, 0x2C), (0x6E, 0x01), (0x6F, 0x32), - (0x71, 0x00), (0x72, 0x01), (0x73, 0x35), (0x74, 0x00), (0x75, 0x33), - (0x76, 0x31), (0x77, 0x01), (0x7C, 0x84), (0x7D, 0x03), (0x7E, 0x01), -] - -# ================== -# Estat global -# ================== - -_bus: smbus2.SMBus = None -_gesture: int = GS_NONE -_gesture_lock = threading.Lock() -_poll_stop = threading.Event() - - -# ================== -# Helpers I2C -# ================== - -def _select_bank(bank: int): - _bus.write_byte_data(_PAJ7620_ADDR, _REG_BANK_SEL, bank) - -def _write(reg: int, val: int): - _bus.write_byte_data(_PAJ7620_ADDR, reg, val) - -def _read(reg: int) -> int: - return _bus.read_byte_data(_PAJ7620_ADDR, reg) - - -# ================== -# Setup -# ================== - -def gesture_setup(): - """ - Inicialitza el PAJ7620U2 via I2C raw (smbus2, bus 3). - Arrenca el thread de polling (equivalent al ISR del C++). - """ - global _bus - - _bus = smbus2.SMBus(3) - - # Desperta el sensor (primer accés I2C) - try: - _bus.write_byte(_PAJ7620_ADDR, 0) - except Exception: - pass - time.sleep(0.001) # 700µs d'espera per wake-up - - # Verifica ID del dispositiu - _select_bank(0) - id_lsb = _read(_REG_PART_ID_LSB) - id_msb = _read(_REG_PART_ID_MSB) - if id_lsb != 0x20 or id_msb != 0x76: - print(f"ERROR: PAJ7620U2 NOT FOUND (ID: {id_msb:02X}{id_lsb:02X})") - else: - print("Gesture sensor init OK") - - # Escriu registres d'inicialització (banc 0) - _select_bank(0) - for reg, val in _INIT_BANK0: - _write(reg, val) - - # Escriu registres d'inicialització (banc 1) - _select_bank(1) - for reg, val in _INIT_BANK1: - _write(reg, val) - - # Torna al banc 0 per a la lectura de gestos - _select_bank(0) - - # Arrenca el thread de polling - _poll_stop.clear() - threading.Thread(target=_poll_loop, daemon=True, name="gesture").start() - - -def gesture_cleanup(): - """Atura el polling i tanca el bus I2C.""" - _poll_stop.set() - if _bus: - _bus.close() - - -# ================== -# Thread de polling (equivalent al ISR + flag del C++) -# ================== - -def _poll_loop(): - """ - Llegeix els registres de gest cada 50ms. - Equivalent a on_gesture_interrupt() + gesture_available flag del C++. - """ - global _gesture - - while not _poll_stop.is_set(): - try: - g0 = _read(_REG_GESTURE_0) - g1 = _read(_REG_GESTURE_1) - - detected = GS_NONE - if g0 & _BIT_FORWARD: detected = GS_FORWARD - elif g0 & _BIT_LEFT: detected = GS_LEFT - elif g0 & _BIT_RIGHT: detected = GS_RIGHT - elif g0 & _BIT_UP: detected = GS_UP - elif g0 & _BIT_DOWN: detected = GS_DOWN - elif g0 & _BIT_CW: detected = GS_CLOCKWISE - elif g0 & _BIT_CCW: detected = GS_ANTICLOCKWISE - elif g1 & _BIT_WAVE: detected = GS_WAVE - - if detected != GS_NONE: - with _gesture_lock: - _gesture = detected - - except Exception: - pass # error I2C puntual → ignora i continua - - time.sleep(0.05) # 50ms de polling (20Hz) - - -# ================== -# Lectura de gest -# ================== - -def read_gesture() -> int: - """ - Retorna l'últim gest detectat i el reseteja a GS_NONE. - No bloquejant — equivalent a read_gesture() del C++. - """ - global _gesture - with _gesture_lock: - gest = _gesture - _gesture = GS_NONE - return gest diff --git a/raspi/main.py b/raspi/main.py deleted file mode 100644 index b209091..0000000 --- a/raspi/main.py +++ /dev/null @@ -1,210 +0,0 @@ -from fastapi import FastAPI, File, Form, UploadFile, HTTPException, Query -from fastapi.middleware.cors import CORSMiddleware -import subprocess -import threading -import time -import os -import json -import uuid -import hashlib -from pathlib import Path -from pydantic import BaseModel -import RPi.GPIO as GPIO - -app = FastAPI() - -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_methods=["*"], - allow_headers=["*"], -) - -INCOMING_DIR = Path("/tmp/quibot-audio/incoming") -LOCKED_DIR = Path("/tmp/quibot-audio/locked") -PROCESSED_DIR = Path("/tmp/quibot-audio/processed") -INCOMING_DIR.mkdir(parents=True, exist_ok=True) -LOCKED_DIR.mkdir(parents=True, exist_ok=True) -PROCESSED_DIR.mkdir(parents=True, exist_ok=True) - -# ------------------------- -# GPIO SETUP -# ------------------------- -STEP = 23 -DIR = 24 -EN = 25 - -GPIO.setmode(GPIO.BCM) -GPIO.setup(STEP, GPIO.OUT) -GPIO.setup(DIR, GPIO.OUT) -GPIO.setup(EN, GPIO.OUT) - -GPIO.output(EN, GPIO.LOW) - - -motor_thread = None - -def step_motor(steps, direction, delay=0.001): - GPIO.output(DIR, direction) - - for _ in range(steps): - GPIO.output(STEP, GPIO.HIGH) - time.sleep(delay) - GPIO.output(STEP, GPIO.LOW) - time.sleep(delay) - -def motor_step(dir): - dir_pin = GPIO.HIGH if dir == "forward" else GPIO.LOW - time.sleep(0.02) # small delay before starting - print("Motor running...") - step_motor(200, dir_pin, 0.001) - - -# ------------------------- -# SAFE COMMAND WHITELIST -# ------------------------- -COMMANDS = { - "restart_nginx": ["sudo", "systemctl", "restart", "nginx"], - "uptime": ["uptime"], - "update": ["sudo", "apt", "update"] -} - - -# ------------------------- -# API ENDPOINTS -# ------------------------- - -@app.post("/run") -def run_task(task: str, token: str): - if token != "MY_SECRET_TOKEN": - raise HTTPException(status_code=403, detail="Unauthorized") - - if task not in COMMANDS: - raise HTTPException(status_code=400, detail="Invalid task") - - try: - result = subprocess.check_output(COMMANDS[task], text=True) - return {"output": result} - except subprocess.CalledProcessError as e: - return {"error": e.output} - - -@app.post("/motor/step/forward") -def start_motor(token: str): - global motor_thread - - if token != "MY_SECRET_TOKEN": - raise HTTPException(status_code=403, detail="Unauthorized") - - - motor_thread = threading.Thread(target=motor_step, args=("forward",), daemon=True) - motor_thread.start() - - return {"status": "motor started"} - -@app.post("/motor/step/backwards") -def start_motor(token: str): - global motor_thread - - if token != "MY_SECRET_TOKEN": - raise HTTPException(status_code=403, detail="Unauthorized") - - - motor_thread = threading.Thread(target=motor_step, args=("backwards",), daemon=True) - motor_thread.start() - - return {"status": "motor started"} - -@app.post("/motor/stop") -def stop_motor(token: str): - if token != "MY_SECRET_TOKEN": - raise HTTPException(status_code=403, detail="Unauthorized") - - GPIO.output(EN, GPIO.HIGH) # disable driver - - return {"status": "motor stopped"} - - -@app.post("/audio/upload") -async def upload_audio(file: UploadFile = File(...), format: str = "wav"): - raw_content = await file.read() - - checksum = hashlib.sha256(raw_content).hexdigest()[:16] - filename = f"{checksum[:10]}-{uuid.uuid4().hex[:8]}.wav" - - filepath = INCOMING_DIR / filename - filepath.write_bytes(raw_content) - - return {"status": "received", "filename": str(filepath), "lock_url": f"/audio/lock/{filepath.name}"} - - -@app.get("/audio/incoming") -def list_incoming(): - files = [] - for f in sorted(INCOMING_DIR.iterdir()): - meta = f.stat() - files.append({ - "filename": f.name, - "size_bytes": meta.st_size, - "modified_iso": time.ctime(meta.st_mtime), - }) - return {"count": len(files), "files": files} - - -@app.post("/audio/lock/{filename}") -def lock_audio(filename: str): - src = INCOMING_DIR / filename - dst = LOCKED_DIR / filename - - if not src.exists(): - raise HTTPException(status_code=404, detail=f"File {filename} not found") - - if dst.exists(): - return {"status": "already_locked", "filename": filename} - - os.rename(str(src), str(dst)) - return {"status": "locked", "filename": filename} - - -@app.post("/audio/unlock/{filename}") -def unlock_audio(filename: str): - src = LOCKED_DIR / filename - dst = INCOMING_DIR / filename - - if not src.exists(): - raise HTTPException(status_code=404, detail=f"File {filename} not found") - - os.rename(str(src), str(dst)) - return {"status": "unlocked", "filename": filename} - - -@app.post("/audio/cancel/{filename}") -def cancel_audio(filename: str): - src = LOCKED_DIR / filename - dst = INCOMING_DIR / filename - - if not src.exists(): - raise HTTPException(status_code=404, detail=f"File {filename} not found") - - os.rename(str(src), str(dst)) - return {"status": "cancelled", "filename": filename} - - -@app.post("/audio/process/{filename}") -def process_audio(filename: str): - locked = LOCKED_DIR / filename - processed = PROCESSED_DIR / filename - - if not locked.exists(): - raise HTTPException(status_code=404, detail=f"File {filename} not found") - - os.rename(str(locked), str(processed)) - return {"status": "processed", "filename": filename} - - -@app.on_event("shutdown") -def shutdown(): - global motor_running - motor_running = False - GPIO.output(EN, GPIO.HIGH) - GPIO.cleanup() \ No newline at end of file diff --git a/raspi/motion.py b/raspi/motion.py deleted file mode 100644 index 6dbd37b..0000000 --- a/raspi/motion.py +++ /dev/null @@ -1,619 +0,0 @@ -""" -motion.py — Control de motors (rodes, braços, xeringa), sensor de distància - VL53L0X i seguidor de línia TCRT5000 via ADS1115. -Equivalent a motion.cpp del codi Arduino/ESP32. -""" - -import math -import time -import threading -import pigpio -import adafruit_extended_bus -import adafruit_vl53l0x -import adafruit_ads1x15.ads1115 as ADS -from adafruit_ads1x15.analog_in import AnalogIn - -from pins import ( - STEP_R_W, DIR_R_W, STEP_L_W, DIR_L_W, EN_W, - STEP_R_A, DIR_R_A, STEP_L_A, DIR_L_A, EN_A, - STEP_SY, DIR_SY, EN_SERVO, - END_SY, END_RA, END_LA, -) - -# ================== -# Constants -# ================== - -CW = True -CCW = False -ON = 0 # A4988/TB6600: enable actiu en LOW -OFF = 1 -TAKE = True -LEAVE = False - -# Tipus de resposta del seguidor de línia -CLEAR = 0 -CROSSING = 1 -OBJECT = 2 - -# Noms de tasques (equivalent a les constants string del C++) -MOVE_TO_CROSSING = "Move to crossing" -TURN_90_CW = "Turn 90 CW" -TURN_90_CCW = "Turn 90 CCW" -MOVE_TO_OBJECT = "Move to object" -TAKE_SOMETHING = "Take something" -LEAVE_SOMETHING = "Leave something" -DO_NOTHING = "Do nothing" - -# Paràmetres de moviment -WHEELS_MAX_SPEED = 130.0 # steps/s -WHEELS_ACCEL = 190.0 # steps/s² -ARMS_MAX_SPEED = 250.0 # steps/s -ARMS_ACCEL = 125.0 # steps/s² -SYRINGE_MAX_SPEED = 800.0 # steps/s -SYRINGE_ACCEL = 500.0 # steps/s² - -WHEEL_MECH_REDUCTION = 5 -WHEEL_STEPS_PER_REVOLUTION = 200 * WHEEL_MECH_REDUCTION # 1000 passos/volta de roda - -MM_TO_CROSSING_CENTER = 62 # mm des del creuament detectat fins al centre -MM_TO_OBJECT = 20 # mm addicionals un cop detectat l'objecte - -# Llindar de negre per ADS1115 (GAIN_ONE ±4.096V, single-ended 0–26400 per a 3.3V). -# Equivalent a 1500/4095 de l'ESP32 de 12 bits → ~9700 en ADS1115. -BLACK_THRESHOLD = 9700 - -# Llindar d'error de rotació (equivalent a 20/4095 de l'ESP32 → ~130 en ADS1115). -ROTATION_ERROR_THRESHOLD = 130 - -# Posicions dels braços en passos des del home -ARM_LOWER_POSITION = 120 -ARM_L_UPPER_POSITION = 900 -ARM_R_UPPER_POSITION = 550 - -# Xeringa: 10 rev * 200 passos/rev * microstepping x4 = 8000 passos estesa del tot -_SY_FULL_EXTENDED_STEPS = 10 * 200 * 4 - -LINE_FOLLOWER_FREQ = 100 # Hz -LINE_FOLLOWER_PERIOD = 1.0 / LINE_FOLLOWER_FREQ # s - - -# ================== -# Classe Stepper -# ================== - -class Stepper: - """ - Motor pas a pas en mode DRIVER (STEP/DIR). - Equivalent a AccelStepper(DRIVER, step_pin, dir_pin). - Genera polsos STEP via pigpio.gpio_trigger(). - """ - - PULSE_US = 10 # Amplada del pols STEP en µs (A4988 requereix ≥1µs) - - def __init__(self, pi: pigpio.pi, step_pin: int, dir_pin: int): - self._pi = pi - self._step_pin = step_pin - self._dir_pin = dir_pin - self._pos = 0 # posició actual (passos) - self._target = 0 # posició objectiu - self._speed = 0.0 # velocitat actual (passos/s, signada) - self._max_speed = 1.0 - self._accel = 1.0 - self._last_step_us = self._now_us() - self._step_interval_us = 0 # 0 = aturat - - pi.set_mode(step_pin, pigpio.OUTPUT) - pi.set_mode(dir_pin, pigpio.OUTPUT) - pi.write(step_pin, 0) - pi.write(dir_pin, 0) - - @staticmethod - def _now_us() -> int: - return time.monotonic_ns() // 1000 - - def set_max_speed(self, speed: float): - self._max_speed = abs(speed) - - def set_acceleration(self, accel: float): - self._accel = abs(accel) - - def move_to(self, position: int): - self._target = int(position) - - def move(self, relative: int): - self._target = self._pos + int(relative) - - def set_current_position(self, pos: int): - self._pos = int(pos) - self._target = int(pos) - self._speed = 0.0 - self._step_interval_us = 0 - - def current_position(self) -> int: - return self._pos - - def distance_to_go(self) -> int: - return self._target - self._pos - - def is_running(self) -> bool: - return self._target != self._pos - - def stop(self): - self._target = self._pos - self._speed = 0.0 - self._step_interval_us = 0 - - def set_speed(self, speed: float): - """Estableix velocitat constant per a run_speed().""" - self._speed = float(speed) - self._step_interval_us = int(1_000_000 / abs(speed)) if speed != 0.0 else 0 - - def _do_step(self, direction: int): - self._pi.write(self._dir_pin, 1 if direction > 0 else 0) - self._pi.gpio_trigger(self._step_pin, self.PULSE_US, 1) - self._pos += direction - - def run_speed(self) -> bool: - """Fa un pas a velocitat constant. No bloquejant — cridar des del bucle de steppers.""" - if self._step_interval_us == 0: - return False - now = self._now_us() - if now - self._last_step_us >= self._step_interval_us: - self._do_step(1 if self._speed > 0 else -1) - self._last_step_us = now - return True - return False - - def run(self) -> bool: - """ - Fa un pas cap a _target amb acceleració/desacceleració. - No bloquejant — cridar des del bucle de steppers. - Implementa l'algorisme de AccelStepper: Δv = accel / v per pas. - """ - dtg = self.distance_to_go() - if dtg == 0: - self._speed = 0.0 - self._step_interval_us = 0 - return False - - abs_speed = abs(self._speed) - if abs_speed < 1.0: - abs_speed = math.sqrt(self._accel / 2.0) # velocitat inicial AccelStepper - - now = self._now_us() - if now - self._last_step_us < int(1_000_000 / abs_speed): - return False # no és hora del proper pas - - # Actualitza la velocitat per al proper pas - direction = 1 if dtg > 0 else -1 - stop_dist = (abs_speed ** 2) / (2.0 * self._accel) if self._accel > 0 else 0 - if abs(dtg) <= max(stop_dist, 1): - new_speed = abs_speed - (self._accel / abs_speed) - new_speed = max(new_speed, 1.0) - else: - new_speed = abs_speed + (self._accel / abs_speed) - new_speed = min(new_speed, self._max_speed) - - self._speed = new_speed * direction - self._do_step(direction) - self._last_step_us = now - return True - - -# ================== -# Instàncies globals -# ================== - -_pi: pigpio.pi = None -_i2c = None -_dist_sensor = None -_ads = None -_chan_r: AnalogIn = None -_chan_l: AnalogIn = None - -wheel_R: Stepper = None -wheel_L: Stepper = None -arm_R: Stepper = None -arm_L: Stepper = None -syringe: Stepper = None - -wheels_speed_mode: bool = False # True → run_speed(), False → run() (posició) - -_stepper_stop = threading.Event() -_stepper_thread: threading.Thread = None - - -# ================== -# Setup i cleanup -# ================== - -def motion_setup_steppers(pi: pigpio.pi): - """ - Inicialitza GPIOs i steppers sense els sensors I2C. - Útil per a tests de motors quan VL53L0X/ADS1115 no estan connectats. - """ - global _pi, wheel_R, wheel_L, arm_R, arm_L, syringe, _stepper_thread - - _pi = pi - - for pin in (EN_W, EN_A, EN_SERVO): - pi.set_mode(pin, pigpio.OUTPUT) - for pin in (END_LA, END_RA, END_SY): - pi.set_mode(pin, pigpio.INPUT) - pi.set_pull_up_down(pin, pigpio.PUD_UP) # Hall actiu-baix, pull-up intern - - enable_arms(OFF) - enable_wheels(OFF) - - wheel_R = Stepper(pi, STEP_R_W, DIR_R_W) - wheel_L = Stepper(pi, STEP_L_W, DIR_L_W) - arm_R = Stepper(pi, STEP_R_A, DIR_R_A) - arm_L = Stepper(pi, STEP_L_A, DIR_L_A) - syringe = Stepper(pi, STEP_SY, DIR_SY) - - wheel_R.set_max_speed(WHEELS_MAX_SPEED); wheel_R.set_acceleration(WHEELS_ACCEL) - wheel_L.set_max_speed(WHEELS_MAX_SPEED); wheel_L.set_acceleration(WHEELS_ACCEL) - arm_R.set_max_speed(ARMS_MAX_SPEED); arm_R.set_acceleration(ARMS_ACCEL) - arm_L.set_max_speed(ARMS_MAX_SPEED); arm_L.set_acceleration(ARMS_ACCEL) - syringe.set_max_speed(SYRINGE_MAX_SPEED); syringe.set_acceleration(SYRINGE_ACCEL) - - _stepper_stop.clear() - _stepper_thread = threading.Thread(target=_stepper_loop, daemon=True, name="steppers") - _stepper_thread.start() - - -def motion_setup_sensors(pi: pigpio.pi): - """ - Inicialitza únicament els sensors I2C (VL53L0X, ADS1115). Sense steppers. - Útil per a tests de sensors quan els motors no estan connectats. - """ - global _pi, _i2c, _dist_sensor, _ads, _chan_r, _chan_l - - _pi = pi - - _i2c = adafruit_extended_bus.ExtendedI2C(3) - _dist_sensor = adafruit_vl53l0x.VL53L0X(_i2c) - _ads = ADS.ADS1115(_i2c) - _chan_r = AnalogIn(_ads, ADS.P0) - _chan_l = AnalogIn(_ads, ADS.P1) - - -def motion_setup(pi: pigpio.pi): - """ - Inicialitza GPIOs, steppers, sensor de distància VL53L0X, - ADC ADS1115 per als sensors de línia, i arrenca el bucle de steppers. - Requereix /boot/config.txt: dtoverlay=i2c-gpio,bus=3,i2c_gpio_sda=2,i2c_gpio_scl=1 - """ - motion_setup_steppers(pi) - motion_setup_sensors(pi) - - -def motion_cleanup(): - """Atura el bucle de steppers i desactiva tots els motors.""" - _stepper_stop.set() - if _stepper_thread: - _stepper_thread.join(timeout=1.0) - enable_wheels(OFF) - enable_arms(OFF) - enable_syringe(OFF) - - -# ================== -# Enable / disable -# ================== - -def enable_wheels(state: bool): - _pi.write(EN_W, state) - -def enable_arms(state: bool): - _pi.write(EN_A, state) - -def enable_syringe(state: bool): - _pi.write(EN_SERVO, state) - -def is_endstop_detecting(pin: int) -> bool: - return not _pi.read(pin) # efecte Hall actiu en baix - - -# ================== -# Helpers de moviment -# ================== - -def mm_to_steps(mm: int) -> int: - # Perímetre roda Ø152mm = 2·π·76 ≈ 477mm - return (mm * WHEEL_STEPS_PER_REVOLUTION * 2) // 1000 - -def wheels_set_position(position: int = 0): - wheel_L.set_current_position(position) - wheel_R.set_current_position(position) - -def wheels_set_speed(speed: float, rotate: bool = False, direction: bool = CW): - if rotate: - wheel_L.set_speed( speed if direction == CW else -speed) - wheel_R.set_speed(-speed if direction == CW else speed) - else: - wheel_L.set_speed(speed) - wheel_R.set_speed(speed) - -def move_arms_to(position: int): - arm_L.move_to(position) - arm_R.move_to(position) - while arm_L.distance_to_go() != 0 and arm_R.distance_to_go() != 0: - time.sleep(0.1) - -def move_arms_up(): - arm_L.move_to(ARM_L_UPPER_POSITION) - arm_R.move_to(ARM_R_UPPER_POSITION) - while arm_L.distance_to_go() != 0 and arm_R.distance_to_go() != 0: - time.sleep(0.1) - -def move_wheels_to(position: int, invert: bool = False): - wheel_L.move_to(position) - wheel_R.move_to(-position if invert else position) - while wheel_L.is_running() and wheel_R.is_running(): - time.sleep(0.1) - - -# ================== -# Homing -# ================== - -def arms_home(): - """Cicle de homing dels braços (bloquejant). El bucle de steppers fa el moviment.""" - enable_arms(ON) - arm_L.move(-1250) - arm_R.move(-1250) - - while True: - if is_endstop_detecting(END_LA): - arm_L.stop() - if is_endstop_detecting(END_RA): - arm_R.stop() - l_done = is_endstop_detecting(END_LA) or arm_L.distance_to_go() == 0 - r_done = is_endstop_detecting(END_RA) or arm_R.distance_to_go() == 0 - if l_done and r_done: - break - time.sleep(0.005) - - arm_L.set_current_position(0) - arm_R.set_current_position(0) - arm_L.move(ARM_L_UPPER_POSITION) - arm_R.move(ARM_R_UPPER_POSITION) - - while arm_L.distance_to_go() != 0 or arm_R.distance_to_go() != 0: - time.sleep(0.01) - - -def syringe_home(): - """Cicle de homing de la xeringa (bloquejant). El bucle de steppers fa el moviment.""" - enable_syringe(ON) - syringe.move(-11000) - - while not is_endstop_detecting(END_SY): - if syringe.distance_to_go() == 0: - break - time.sleep(0.005) - - syringe.stop() - syringe.set_current_position(0) - enable_syringe(OFF) - - -# ================== -# Sensor de distància -# ================== - -def distance_to_object() -> int: - """Retorna la distància en mm a l'objecte més proper. 65535 si fora de rang.""" - try: - return _dist_sensor.range - except Exception: - return 65535 - - -# ================== -# Seguidor de línia -# ================== - -def _read_line_sensors() -> tuple: - """Llegeix sensors de línia via ADS1115. Retorna (dreta, esquerra), 0–32767.""" - return _chan_r.value, _chan_l.value - -def compute_new_speed(speed: float) -> float: - accel = WHEELS_ACCEL / LINE_FOLLOWER_FREQ - return min(speed + accel, WHEELS_MAX_SPEED) - -def follow_line_loop(speed: float, forward: bool = True) -> int: - """ - Seguidor de línia no bloquejant. Retorna CLEAR, CROSSING o OBJECT. - Si CLEAR, aplica correcció proporcional a les velocitats de les rodes. - """ - distance_threshold = 50 # mm - - lf_r, lf_l = _read_line_sensors() - - if lf_l > BLACK_THRESHOLD and lf_r > BLACK_THRESHOLD and forward: - return CROSSING - elif distance_to_object() < distance_threshold and forward: - return OBJECT - else: - p_factor = 5 - error = lf_r - lf_l - correction_r = error // p_factor - correction_l = -correction_r - - wheel_L.set_speed((speed + correction_r) if forward else (-speed + correction_r)) - wheel_R.set_speed((speed + correction_l) if forward else (-speed + correction_l)) - return CLEAR - -def run_to_crossing_center(speed: float) -> float: - """ - Avança per centrar el robot sobre el creuament detectat. - Retorna la nova velocitat (reduïda a la meitat). - """ - wheels_set_position(0 - mm_to_steps(MM_TO_CROSSING_CENTER)) - - while follow_line_loop(speed) == CROSSING: - wheels_set_speed(speed) - time.sleep(0.1) - - speed /= 2 - while wheel_L.current_position() < 0 and wheel_R.current_position() < 0: - wheels_set_speed(speed) - time.sleep(0.1) - wheels_set_speed(0) - return speed - - -# ================== -# Bucle de steppers (thread permanent) -# ================== - -def _stepper_loop(): - """ - Thread que genera els polsos STEP per a tots els motors. - Equivalent a task_update_steppers() del FreeRTOS. - S'inicia automàticament a motion_setup(). - """ - while not _stepper_stop.is_set(): - if wheels_speed_mode: - wheel_R.run_speed() - wheel_L.run_speed() - else: - wheel_R.run() - wheel_L.run() - arm_R.run() - arm_L.run() - syringe.run() - - -# ================== -# Tasques (FreeRTOS tasks → funcions bloquejants, cridades des de threads) -# ================== - -def task_move_to(expected_target: int): - """ - Segueix la línia fins arribar a expected_target (CROSSING o OBJECT). - Equivalent a task_move_to() del FreeRTOS. - """ - global wheels_speed_mode - - speed = 0.0 - wheels_speed_mode = True - enable_wheels(ON) - - lf_response = CLEAR - while True: - speed = compute_new_speed(speed) - lf_response = follow_line_loop(speed) - if lf_response != CLEAR: - break - time.sleep(LINE_FOLLOWER_PERIOD) - - if lf_response == CROSSING and expected_target == CROSSING: - run_to_crossing_center(speed) - elif lf_response == OBJECT and expected_target == OBJECT: - wheel_L.stop() - wheel_R.stop() - - wheels_speed_mode = False - enable_wheels(OFF) - - -def task_rotate(direction: bool): - """ - Gira el robot 90° (CW o CCW). - Fase 1: acceleració fins al 90% dels passos de rotació. - Fase 2: desacceleració i ajust fi sobre la línia via sensors analògics. - """ - global wheels_speed_mode - - # Arc 90° a Ø250mm → 2·π·125/4 ≈ 196mm; roda Ø152mm → 196/477 ≈ 0.41 voltes - ROTATION_STEPS = (WHEEL_STEPS_PER_REVOLUTION * 42) // 100 - positive_wheel = wheel_L if direction == CW else wheel_R - - speed = 0.0 - wheels_speed_mode = True - enable_wheels(ON) - wheels_set_position(0) - - while positive_wheel.current_position() < (ROTATION_STEPS * 90) // 100: - speed = compute_new_speed(speed) - wheels_set_speed(speed, rotate=True, direction=direction) - time.sleep(0.01) - - speed /= 2 - while True: - lf_r, lf_l = _read_line_sensors() - error = abs(lf_l - lf_r) - if error < ROTATION_ERROR_THRESHOLD or \ - positive_wheel.current_position() > (ROTATION_STEPS * 110) // 100: - break - wheels_set_speed(speed - (speed / error), rotate=True, direction=direction) - time.sleep(0.01) - - wheels_set_speed(0) - enable_wheels(OFF) - wheels_speed_mode = False - - -def task_take_or_leave_something(take: bool): - """ - Avança fins a l'objecte, baixa braços, opera la xeringa i torna al creuament. - take=True → xucla; take=False → buida. - """ - global wheels_speed_mode - - speed = 0.0 - wheels_speed_mode = True - enable_wheels(ON) - wheels_set_position(0) # guardem la posició home per tornar-hi - - lf_response = CLEAR - while True: - speed = compute_new_speed(speed) - lf_response = follow_line_loop(speed) - if lf_response != CLEAR: - break - time.sleep(LINE_FOLLOWER_PERIOD) - - if lf_response == OBJECT: - target_l = wheel_L.current_position() + mm_to_steps(MM_TO_OBJECT) - target_r = wheel_R.current_position() + mm_to_steps(MM_TO_OBJECT) - while wheel_L.current_position() < target_l and wheel_R.current_position() < target_r: - wheels_set_speed(speed / 2) - time.sleep(0.1) - wheels_set_speed(0) - enable_wheels(OFF) - - enable_arms(ON) - move_arms_to(ARM_LOWER_POSITION) - - enable_syringe(ON) - syringe.move_to(_SY_FULL_EXTENDED_STEPS if take else 0) - while syringe.distance_to_go() != 0: - if not take and is_endstop_detecting(END_SY): - syringe.stop() - syringe.set_current_position(0) - break - time.sleep(0.1) - enable_syringe(OFF) - - move_arms_up() - - enable_wheels(ON) - wheels_speed_mode = False - wheels_set_speed(WHEELS_MAX_SPEED) - move_wheels_to(0) - - elif lf_response == CROSSING: - run_to_crossing_center(speed) - - wheels_speed_mode = False - enable_wheels(OFF) - - -def task_idle(): - """Pausa breu fins que quibot.py assigni una nova tasca.""" - time.sleep(0.5) diff --git a/raspi/pins.py b/raspi/pins.py deleted file mode 100644 index dca6307..0000000 --- a/raspi/pins.py +++ /dev/null @@ -1,84 +0,0 @@ -""" -pins.py — Definició de pins GPIO (BCM) de la Raspberry Pi Zero 2W. -Equivalent a io.h del codi Arduino/ESP32. -Tots els números fan referència a la numeració BCM. -""" - -# ================== -# MOTORS -# ================== - -# Servo d'expulsió de blocs -EN_SERVO = 9 -SERVO_PWM = 10 - -# Motor pas a pas xeringa -STEP_SY = 8 -DIR_SY = 5 - -# Motor pas a pas roda dreta -STEP_R_W = 25 -DIR_R_W = 23 - -# Motor pas a pas roda esquerra -STEP_L_W = 7 -DIR_L_W = 11 - -# Enable motors de rodes (compartit) -EN_W = 6 - -# Motor pas a pas braç dret -STEP_R_A = 3 -DIR_R_A = 4 - -# Motor pas a pas braç esquerre -STEP_L_A = 13 -DIR_L_A = 0 - -# Enable motors de braços (compartit) -EN_A = 21 - -# ================== -# SENSORS -# ================== - -# Bus I2C principal — VL53L0X (distància) i PAJ7620U2 (gestos), compartit -SDA_DIST = 2 -SCL_DIST = 1 -SDA_GEST = SDA_DIST # mateixa línia -SCL_GEST = SCL_DIST # mateixa línia -# INT_GEST no connectat a la PCB — el driver usa polling - -# Bus I2C sensor de color TCS34725 (bit-bang) -SDA_COL = 22 -SCL_COL = 27 - -# Final de carrera xeringa (efecte Hall) -END_SY = 12 - -# Final de carrera braç dret (efecte Hall) -END_RA = 16 - -# Final de carrera braç esquerre (efecte Hall) -END_LA = 17 - -# Sensors seguidors de línia (TCRT5000) -LINES_R = 14 -LINES_L = 15 - -# ================== -# DISPLAY -# ================== - -# Dades matriu LED 8x8 RGB WS2811 (2x ull) -LED_DATA = 26 - -# ================== -# ÀUDIO (afegit per company, no usat pel robot) -# ================== - -# I2S — amplificador MAX98357A + micròfon SPH0645 -I2C_BCLK = 18 -I2C_LRCLK = 19 -AMP_DIN = 24 -MIC = 20 diff --git a/raspi/quibot.py b/raspi/quibot.py deleted file mode 100644 index c696159..0000000 --- a/raspi/quibot.py +++ /dev/null @@ -1,286 +0,0 @@ -""" -quibot.py — Programa principal del robot QuiBot H2O. -Inicialitza tots els mòduls, executa el homing i arrenca els threads. -Equivalent a QuiBot.ino del codi Arduino/ESP32. - -Threads permanents: - - task_read_blocks → llegeix blocs i executa accions (aquest fitxer) - - task_read_gestures → llegeix gestos i executa accions (aquest fitxer) - - _stepper_loop → genera polsos STEP dels motors (motion.py) - - _task_update_leds → parpelleig dels LEDs (eyes.py) - - _poll_loop → polling del sensor de gestos (gesture.py) -""" - -import time -import threading -import signal -import sys -import pigpio - -from motion import ( - motion_setup, motion_cleanup, - arms_home, syringe_home, - task_move_to, task_rotate, task_take_or_leave_something, task_idle, - distance_to_object, - CROSSING, TAKE, LEAVE, CW, CCW, -) -from blocks import ( - blocks_setup, - read_block_color, servo_move_to, - OPEN_POSITION, EJECT_POSITION, - BK, RD, GN, BU, YE, OG, VT, -) -from eyes import ( - eyes_setup, eyes_cleanup, - eyes_turn_on, eyes_turn_off, - eyes_gesture_mode_on, eyes_gesture_mode_off, - EYES_OPEN, EYES_FW, EYES_DOWN, - RED, GREEN, BLUE, YELLOW, ORANGE, CYAN, -) -from gesture import ( - gesture_setup, gesture_cleanup, - read_gesture, - GS_NONE, GS_FORWARD, GS_LEFT, GS_RIGHT, - GS_UP, GS_DOWN, GS_CLOCKWISE, GS_ANTICLOCKWISE, GS_WAVE, -) - -# ================== -# Colors addicionals (equivalents CRGB del C++) -# ================== - -GRAY = (128, 128, 128) # CRGB::Gray → estat normal -DARK_RED = (139, 0, 0) # CRGB::DarkRed → avançar - -# ================== -# Timeouts -# ================== - -INSERT_BLOCK_MS = 2.0 # s — espera que l'infant insereixi el bloc -EJECT_BLOCK_MS = 2.0 # s — espera que el bloc caigui -CHECK_COLOR_MS = 1.0 # s — interval entre lectures de color - -# ================== -# Estat global -# ================== - -_pi: pigpio.pi = None - -# Mutex que evita que tasca de blocs i tasca de gestos llancin accions simultànies. -# En el C++ original compartien TaskHandle sense mutex explícit (possible race condition). -# Aquí ho fem correctament. -_action_lock = threading.Lock() - -_shutdown_event = threading.Event() -_gesture_mode_active = False # False = mode blocs, True = mode gestos -_mode_lock = threading.Lock() - - -# ================== -# Helper d'execució d'accions -# ================== - -def _execute_action(fn, *args): - """ - Adquireix el lock d'acció, executa fn(*args) de forma bloquejant i l'allibera. - Garanteix que mai s'executen dues accions de moviment simultànies. - """ - with _action_lock: - fn(*args) - - -# ================== -# Tasca de blocs -# ================== - -def task_read_blocks(): - """ - Llegeix blocs contínuament, executa l'acció corresponent al color i expulsa el bloc. - Equivalent a task_read_blocks() del FreeRTOS. - """ - while not _shutdown_event.is_set(): - eyes_state = False - eyes_turn_on(EYES_OPEN, GRAY) - - # Obre el servo per permetre la inserció del bloc - servo_move_to(OPEN_POSITION) - time.sleep(INSERT_BLOCK_MS) - - # Espera que hi hagi un bloc i llegeix el seu color - color_id = BK - while not _shutdown_event.is_set(): - if distance_to_object() < 80: - # Objecte a menys de 80mm — esperem que es retiri - if not eyes_state: - eyes_turn_on(EYES_DOWN, GRAY) - eyes_state = True - else: - color_id = read_block_color() - if eyes_state: - eyes_turn_on(EYES_OPEN, GRAY, 1, False) - eyes_state = False - if color_id != BK: - break - time.sleep(CHECK_COLOR_MS) - - # Si estem en mode gestos, ignora el bloc - with _mode_lock: - if _gesture_mode_active: - continue - - # Executa l'acció corresponent al color del bloc - if color_id == RD: - eyes_turn_on(EYES_FW, DARK_RED, 2) - time.sleep(1.0) - _execute_action(task_move_to, CROSSING) - - elif color_id == GN: - eyes_turn_on(EYES_OPEN, GREEN, 2) - time.sleep(1.0) - _execute_action(task_rotate, CW) - - elif color_id == BU: - eyes_turn_on(EYES_OPEN, BLUE, 2) - time.sleep(1.0) - _execute_action(task_rotate, CCW) - - elif color_id == YE: - eyes_turn_on(EYES_OPEN, YELLOW, 2) - time.sleep(1.0) - _execute_action(task_take_or_leave_something, TAKE) - - elif color_id == OG: - eyes_turn_on(EYES_OPEN, ORANGE, 2) - time.sleep(1.0) - _execute_action(task_take_or_leave_something, LEAVE) - - elif color_id == VT: - _execute_action(task_idle) - - eyes_turn_on(EYES_OPEN, GRAY) - - # Expulsa el bloc - servo_move_to(EJECT_POSITION) - time.sleep(EJECT_BLOCK_MS) - - -# ================== -# Tasca de gestos -# ================== - -def task_read_gestures(): - """ - Llegeix gestos contínuament. - GS_WAVE activa/desactiva el mode gestos. - Equivalent a task_read_gestures() del FreeRTOS. - """ - gesture_mode_active = False - - while not _shutdown_event.is_set(): - gesture_id = read_gesture() - - # WAVE: toggle entre mode blocs i mode gestos - if gesture_id == GS_WAVE: - with _mode_lock: - _gesture_mode_active = not _gesture_mode_active - active = _gesture_mode_active - if active: - eyes_gesture_mode_on() - print("Gesture mode ON") - else: - eyes_gesture_mode_off() - print("Gesture mode OFF") - time.sleep(1.0) - continue - - with _mode_lock: - active = _gesture_mode_active - if not active or gesture_id == GS_NONE: - time.sleep(0.1) - continue - - # Executa l'acció corresponent al gest - if gesture_id == GS_FORWARD: - eyes_turn_on(EYES_FW, DARK_RED, 2) - _execute_action(task_move_to, CROSSING) - - elif gesture_id == GS_RIGHT: - eyes_turn_on(EYES_OPEN, GREEN, 2) - _execute_action(task_rotate, CW) - - elif gesture_id == GS_LEFT: - eyes_turn_on(EYES_OPEN, BLUE, 2) - _execute_action(task_rotate, CCW) - - elif gesture_id == GS_UP: - eyes_turn_on(EYES_OPEN, YELLOW, 2) - _execute_action(task_take_or_leave_something, TAKE) - - elif gesture_id == GS_DOWN: - eyes_turn_on(EYES_OPEN, ORANGE, 2) - _execute_action(task_take_or_leave_something, LEAVE) - - elif gesture_id in (GS_CLOCKWISE, GS_ANTICLOCKWISE): - _execute_action(task_idle) - - eyes_turn_on(EYES_OPEN, CYAN) - time.sleep(0.5) - - -# ================== -# Shutdown -# ================== - -def _shutdown(sig, frame): - print("\nAturant QuiBot...") - _shutdown_event.set() - - -# ================== -# Main -# ================== - -def main(): - global _pi - - # Connecta amb pigpiod (ha d'estar en marxa amb: sudo pigpiod -s 1) - _pi = pigpio.pi() - if not _pi.connected: - print("ERROR: No s'ha pogut connectar a pigpiod. Executa: sudo pigpiod -s 1") - sys.exit(1) - - # Inicialitza tots els mòduls - blocks_setup(_pi) - motion_setup(_pi) - eyes_setup(_pi) - gesture_setup() - - # Homing (bloquejant) - arms_home() - syringe_home() - - # Registra els senyals de sortida - signal.signal(signal.SIGINT, _shutdown) - signal.signal(signal.SIGTERM, _shutdown) - - # Arrenca els threads principals - t_blocks = threading.Thread(target=task_read_blocks, daemon=True, name="blocks") - t_gestures = threading.Thread(target=task_read_gestures, daemon=True, name="gestures") - - t_blocks.start() - t_gestures.start() - - print("QuiBot llest.") - - # Espera senyal de sortida - _shutdown_event.wait() - - # Cleanup ordenat - motion_cleanup() - eyes_cleanup() - gesture_cleanup() - _pi.stop() - print("QuiBot aturat.") - - -if __name__ == "__main__": - main() diff --git a/raspi/tests/test_blocks.py b/raspi/tests/test_blocks.py deleted file mode 100644 index 3ffe773..0000000 --- a/raspi/tests/test_blocks.py +++ /dev/null @@ -1,147 +0,0 @@ -""" -test_blocks.py — Tests individuals del mòdul blocks.py. -Executa des del directori Rasp/: python tests/test_blocks.py - -Descomenta la funció que vols provar al final del fitxer. -Assegura't que el venv està activat i pigpiod en marxa (sudo pigpiod -s 1). -""" - -import sys -import os -sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) - -import time -import pigpio -from blocks import ( - blocks_setup, - servo_move_to, OPEN_POSITION, EJECT_POSITION, MIN_SERVO_US, MAX_SERVO_US, - read_block_color, read_color_raw, - _COLORS, - BK, RD, GN, BU, YE, OG, VT, -) - -_COLOR_NAMES = { - BK: "Negre (BK)", - RD: "Vermell (RD)", - GN: "Verd (GN)", - BU: "Blau (BU)", - YE: "Groc (YE)", - OG: "Taronja (OG)", - VT: "Violeta (VT)", -} - -def _setup(): - pi = pigpio.pi() - if not pi.connected: - print("ERROR: pigpiod no està en marxa. Executa: sudo pigpiod -s 1") - sys.exit(1) - blocks_setup(pi) - return pi - -def _teardown(pi): - pi.stop() - - -# ================== -# TEST 1 — Servo -# ================== - -def test_servo(): - """ - Mou el servo a les posicions principals: oberta, expulsió i torna a oberta. - Hauries de veure/sentir el servo moure's suaument entre posicions. - """ - print("=== TEST SERVO ===") - pi = _setup() - - print(f"Movent a OPEN_POSITION ({OPEN_POSITION} µs)...") - servo_move_to(OPEN_POSITION) - time.sleep(1.0) - print("Open: OK") - - print(f"Movent a EJECT_POSITION ({EJECT_POSITION} µs)...") - servo_move_to(EJECT_POSITION) - time.sleep(1.0) - print("Eject: OK") - - print(f"Tornant a OPEN_POSITION...") - servo_move_to(OPEN_POSITION) - time.sleep(1.0) - print("Torna a open: OK") - - print(f"Provant posició mínima ({MIN_SERVO_US} µs)...") - servo_move_to(MIN_SERVO_US) - time.sleep(1.0) - - print(f"Tornant a OPEN_POSITION...") - servo_move_to(OPEN_POSITION) - time.sleep(1.0) - - _teardown(pi) - print("Test servo completat.\n") - - -# ================== -# TEST 2 — Sensor de color TCS34725 -# ================== - -def test_color_sensor(): - """ - Llegeix el color 15 vegades cada segon. - Posa davant del sensor els blocs de colors per verificar que els reconeix. - """ - print("=== TEST SENSOR DE COLOR TCS34725 ===") - pi = _setup() - - print("Llegint color durant 15 segons (posa els blocs davant del sensor)...") - for i in range(15): - color_id = read_block_color() - name = _COLOR_NAMES.get(color_id, "Desconegut") - print(f" Lectura {i+1:2d}: {name}") - time.sleep(1.0) - - _teardown(pi) - print("Test sensor de color completat.\n") - - -# ================== -# TEST 3 — Calibració del sensor de color (valors RGB crus) -# ================== - -def test_color_raw(): - """ - Mostra valors RGB crus i la classificació actual durant 30 segons. - Útil per calibrar la taula _COLORS a blocks.py. - Format: R=xxx G=xxx B=xxx → classificat com 'XX' (diff=xx) - Si el diff és gran (>10), els valors de referència necessiten ajust. - """ - print("=== TEST COLOR RAW (calibració) ===") - pi = _setup() - - print("Llegint valors RGB crus durant 30 segons (posa cada bloc davant del sensor)...") - print(f" {'R':>5} {'G':>5} {'B':>5} classificat diff") - for i in range(30): - r, g, b = read_color_raw() - best_name = "??" - best_diff = 9999 - for ref in _COLORS: - diff = abs(r - ref["r"]) + abs(g - ref["g"]) + abs(b - ref["b"]) - if diff < best_diff: - best_diff = diff - best_name = ref["name"] - print(f" R={r:3d} G={g:3d} B={b:3d} → {best_name} (diff={best_diff})") - time.sleep(1.0) - - _teardown(pi) - print("Test color raw completat.\n") - - -# ================== -# Execució -# ================== - -if __name__ == "__main__": - # Descomenta el test que vols executar: - test_servo() - # test_color_sensor() - # test_color_raw() # Per calibrar la taula de colors diff --git a/raspi/tests/test_eyes.py b/raspi/tests/test_eyes.py deleted file mode 100644 index 3b13d04..0000000 --- a/raspi/tests/test_eyes.py +++ /dev/null @@ -1,156 +0,0 @@ -""" -test_eyes.py — Tests individuals del mòdul eyes.py. -Executa des del directori Rasp/: python tests/test_eyes.py - -Descomenta la funció que vols provar al final del fitxer. -Assegura't que el venv està activat i pigpiod en marxa (sudo pigpiod -s 1). -""" - -import sys -import os -sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) - -import time -import pigpio -from eyes import ( - eyes_setup, eyes_cleanup, - eyes_turn_on, eyes_turn_off, - eyes_gesture_mode_on, eyes_gesture_mode_off, eyes_listening, - EYES_OPEN, EYES_FW, EYES_DOWN, EYES_GESTURE, - WHITE, RED, GREEN, BLUE, YELLOW, ORANGE, PURPLE, CYAN, BLACK, -) - -def _setup(): - pi = pigpio.pi() - if not pi.connected: - print("ERROR: pigpiod no està en marxa. Executa: sudo pigpiod -s 1") - sys.exit(1) - eyes_setup(pi) - return pi - -def _teardown(pi): - eyes_cleanup() - pi.stop() - - -# ================== -# TEST 1 — Formes i colors bàsics -# ================== - -def test_shapes(): - """ - Mostra totes les formes existents en colors diferents. - Hauries de veure les formes als ulls LED del robot. - """ - print("=== TEST FORMES I COLORS ===") - pi = _setup() - - print("EYES_OPEN en blanc...") - eyes_turn_on(EYES_OPEN, WHITE) - time.sleep(2.0) - - print("EYES_FW en vermell...") - eyes_turn_on(EYES_FW, RED) - time.sleep(2.0) - - print("EYES_DOWN en blau...") - eyes_turn_on(EYES_DOWN, BLUE) - time.sleep(2.0) - - print("EYES_OPEN en verd...") - eyes_turn_on(EYES_OPEN, GREEN) - time.sleep(2.0) - - print("Apagant...") - eyes_turn_off() - time.sleep(1.0) - - _teardown(pi) - print("Test formes completat.\n") - - -# ================== -# TEST 2 — Animació de repeat i direcció -# ================== - -def test_animation(): - """ - Prova l'animació amb repeat i les dues direccions. - Hauries de veure els LEDs encenent-se un per un en ordre normal i invers. - """ - print("=== TEST ANIMACIÓ ===") - pi = _setup() - - print("EYES_OPEN groc, repeat=2, endavant...") - eyes_turn_on(EYES_OPEN, YELLOW, repeat=2, forward=True) - time.sleep(1.0) - - print("EYES_FW taronja, repeat=2, enrere...") - eyes_turn_on(EYES_FW, ORANGE, repeat=2, forward=False) - time.sleep(1.0) - - eyes_turn_off() - _teardown(pi) - print("Test animació completat.\n") - - -# ================== -# TEST 3 — Animacions mode gestos (TFG) -# ================== - -def test_gesture_animations(): - """ - Prova les animacions específiques del mode gestos. - Hauries de veure: doble parpelleig cian, tornada a blanc, marc cian. - """ - print("=== TEST ANIMACIONS MODE GESTOS ===") - pi = _setup() - - print("Activant mode gestos (doble parpelleig cian)...") - eyes_gesture_mode_on() - time.sleep(2.0) - - print("Escoltant gest (marc cian)...") - eyes_listening() - time.sleep(2.0) - - print("Desactivant mode gestos (torna a blanc)...") - eyes_gesture_mode_off() - time.sleep(2.0) - - eyes_turn_off() - _teardown(pi) - print("Test animacions gestos completat.\n") - - -# ================== -# TEST 4 — Parpelleig (breathing) -# ================== - -def test_breathing(): - """ - Verifica que el thread de parpelleig funciona correctament. - Hauries de veure la brillantor dels LEDs pujant i baixant suaument. - """ - print("=== TEST PARPELLEIG (BREATHING) ===") - pi = _setup() - - print("EYES_OPEN blanc — observa el parpelleig durant 10 segons...") - eyes_turn_on(EYES_OPEN, WHITE) - time.sleep(10.0) - - eyes_turn_off() - _teardown(pi) - print("Test parpelleig completat.\n") - - -# ================== -# Execució -# ================== - -if __name__ == "__main__": - # Descomenta el test que vols executar: - test_shapes() - # test_animation() - # test_gesture_animations() - # test_breathing() diff --git a/raspi/tests/test_gesture.py b/raspi/tests/test_gesture.py deleted file mode 100644 index e10bf97..0000000 --- a/raspi/tests/test_gesture.py +++ /dev/null @@ -1,94 +0,0 @@ -""" -test_gesture.py — Tests del mòdul gesture.py (sensor PAJ7620U2). -Executa des del directori Rasp/: python tests/test_gesture.py - -Descomenta la funció que vols provar al final del fitxer. -Assegura't que el venv està activat i pigpiod en marxa (sudo pigpiod -s 1). -""" - -import sys -import os -sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) - -import time -from gesture import ( - gesture_setup, gesture_cleanup, - read_gesture, - GS_NONE, GS_FORWARD, GS_LEFT, GS_RIGHT, - GS_UP, GS_DOWN, GS_CLOCKWISE, GS_ANTICLOCKWISE, GS_WAVE, -) - -_GESTURE_NAMES = { - GS_NONE: "Cap (GS_NONE)", - GS_FORWARD: "Endavant (GS_FORWARD)", - GS_LEFT: "Esquerra (GS_LEFT)", - GS_RIGHT: "Dreta (GS_RIGHT)", - GS_UP: "Amunt (GS_UP)", - GS_DOWN: "Avall (GS_DOWN)", - GS_CLOCKWISE: "Horari (GS_CLOCKWISE)", - GS_ANTICLOCKWISE: "Antihorari (GS_ANTICLOCKWISE)", - GS_WAVE: "Wave (GS_WAVE)", -} - - -# ================== -# TEST 1 — Connexió i inicialització -# ================== - -def test_connection(): - """ - Comprova que el sensor PAJ7620U2 és accessible via I2C. - Ha de mostrar 'Gesture sensor init OK' sense errors. - """ - print("=== TEST CONNEXIÓ PAJ7620U2 ===") - - gesture_setup() - time.sleep(0.5) - - gesture_cleanup() - print("Test connexió completat.\n") - - -# ================== -# TEST 2 — Lectura de gestos -# ================== - -def test_read_gestures(): - """ - Llegeix gestos durant 30 segons i els mostra per pantalla. - Fes gestos davant del sensor per verificar que els detecta correctament: - - Mà cap endavant/enrere → GS_FORWARD - - Mà cap a l'esquerra → GS_LEFT - - Mà cap a la dreta → GS_RIGHT - - Mà cap amunt → GS_UP - - Mà cap avall → GS_DOWN - - Rotació horària → GS_CLOCKWISE - - Rotació antihorària → GS_ANTICLOCKWISE - - Sacsejada (wave) → GS_WAVE - """ - print("=== TEST LECTURA GESTOS PAJ7620U2 ===") - print("Fes gestos davant del sensor durant 30 segons...") - - gesture_setup() - time.sleep(0.5) - - inici = time.time() - while time.time() - inici < 30: - gest = read_gesture() - if gest != GS_NONE: - nom = _GESTURE_NAMES.get(gest, f"Desconegut ({gest})") - print(f" Gest detectat: {nom}") - time.sleep(0.05) - - gesture_cleanup() - print("Test lectura gestos completat.\n") - - -# ================== -# Execució -# ================== - -if __name__ == "__main__": - # Descomenta el test que vols executar: - test_connection() - # test_read_gestures() diff --git a/raspi/tests/test_motion.py b/raspi/tests/test_motion.py deleted file mode 100644 index 470364b..0000000 --- a/raspi/tests/test_motion.py +++ /dev/null @@ -1,238 +0,0 @@ -""" -test_motion.py — Tests individuals del mòdul motion.py. -Executa des del directori Rasp/: python tests/test_motion.py - -Descomenta la funció que vols provar al final del fitxer. -Assegura't que el venv està activat i pigpiod en marxa (sudo pigpiod -s 1). -""" - -import sys -import os -sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) - -import time -import pigpio -import motion -from motion import ( - motion_setup, motion_setup_steppers, motion_setup_sensors, motion_cleanup, - enable_wheels, enable_arms, enable_syringe, - arms_home, syringe_home, - distance_to_object, - ON, OFF, CW, CCW, -) - -def _pi_connect(): - pi = pigpio.pi() - if not pi.connected: - print("ERROR: pigpiod no està en marxa. Executa: sudo pigpiod -s 1") - sys.exit(1) - return pi - -def _setup_motors(): - """Setup mínim per a tests de motors (sense sensors I2C).""" - pi = _pi_connect() - motion_setup_steppers(pi) - return pi - -def _setup_sensors(): - """Setup mínim per a tests de sensors I2C (sense steppers).""" - pi = _pi_connect() - motion_setup_sensors(pi) - return pi - -def _teardown_motors(pi): - motion_cleanup() - pi.stop() - -def _teardown_sensors(pi): - pi.stop() - - -# ================== -# TEST 1 — Motors pas a pas (rodes) -# ================== - -def test_motor_unic(): - """ - Prova únicament la roda DRETA: 200 passos endavant i enrere. - Primer test a fer — verifica que un sol motor funciona correctament. - Hauries de veure la roda girar ~1/5 de volta (reducció 1:5). - """ - print("=== TEST MOTOR ÚNIC (RODA DRETA) ===") - pi = _setup_motors() - - PASSOS = 200 - - print(f"Movent roda DRETA {PASSOS} passos endavant...") - enable_wheels(ON) - motion.wheel_R.move_to(PASSOS) - while motion.wheel_R.is_running(): - time.sleep(0.05) - print("Roda dreta endavant: OK") - - time.sleep(0.5) - - print(f"Movent roda DRETA {PASSOS} passos enrere...") - motion.wheel_R.move_to(0) - while motion.wheel_R.is_running(): - time.sleep(0.05) - print("Roda dreta enrere: OK") - - enable_wheels(OFF) - _teardown_motors(pi) - print("Test motor únic completat.\n") - - -def test_motors(): - """ - Prova les dues rodes: avança 200 passos i torna enrere. - Executa test_motor_unic() primer per verificar que un motor funciona. - Hauries de veure cada roda girar ~1/5 de volta (reducció 1:5). - """ - print("=== TEST MOTORS (RODES) ===") - pi = _setup_motors() - - PASSOS = 200 - - print(f"Movent roda DRETA {PASSOS} passos endavant...") - enable_wheels(ON) - motion.wheel_R.move_to(PASSOS) - while motion.wheel_R.is_running(): - time.sleep(0.05) - print("Roda dreta: OK") - - time.sleep(0.5) - - print(f"Movent roda DRETA {PASSOS} passos enrere...") - motion.wheel_R.move_to(0) - while motion.wheel_R.is_running(): - time.sleep(0.05) - print("Roda dreta enrere: OK") - - time.sleep(0.5) - - print(f"Movent roda ESQUERRA {PASSOS} passos endavant...") - motion.wheel_L.move_to(PASSOS) - while motion.wheel_L.is_running(): - time.sleep(0.05) - print("Roda esquerra: OK") - - time.sleep(0.5) - - print(f"Movent roda ESQUERRA {PASSOS} passos enrere...") - motion.wheel_L.move_to(0) - while motion.wheel_L.is_running(): - time.sleep(0.05) - print("Roda esquerra enrere: OK") - - enable_wheels(OFF) - _teardown_motors(pi) - print("Test motors completat.\n") - - -# ================== -# TEST 2 — Homing (finals de carrera) -# ================== - -def test_homing_brac(): - """ - Executa el homing únicament dels BRAÇOS. - Primer test de homing — verifica que un sol conjunt de finals de carrera funciona. - ATENCIÓ: assegura't que els braços tinguin espai per moure's. - """ - print("=== TEST HOMING BRAÇOS ===") - pi = _setup_motors() - - print("Iniciant homing dels BRAÇOS...") - arms_home() - print("Homing braços: OK") - - _teardown_motors(pi) - print("Test homing braços completat.\n") - - -def test_homing(): - """ - Executa el homing complet: braços i xeringa. - Executa test_homing_brac() primer per verificar els finals de carrera dels braços. - ATENCIÓ: assegura't que els braços i la xeringa tinguin espai per moure's. - """ - print("=== TEST HOMING COMPLET ===") - pi = _setup_motors() - - print("Iniciant homing dels BRAÇOS...") - arms_home() - print("Homing braços: OK") - - time.sleep(1.0) - - print("Iniciant homing de la XERINGA...") - syringe_home() - print("Homing xeringa: OK") - - _teardown_motors(pi) - print("Test homing completat.\n") - - -# ================== -# TEST 3 — Sensor de distància VL53L0X -# ================== - -def test_distance_sensor(): - """ - Llegeix la distància 10 vegades cada 500ms. - Posa la mà davant del sensor per verificar que canvia el valor. - """ - print("=== TEST SENSOR DISTÀNCIA VL53L0X ===") - pi = _setup_sensors() - - print("Llegint distància durant 5 segons (posa la mà davant del sensor)...") - for i in range(10): - dist = distance_to_object() - if dist == 65535: - print(f" Lectura {i+1:2d}: fora de rang") - else: - print(f" Lectura {i+1:2d}: {dist} mm") - time.sleep(0.5) - - _teardown_sensors(pi) - print("Test sensor distància completat.\n") - - -# ================== -# TEST 4 — Sensors de línia ADS1115 -# ================== - -def test_line_sensors(): - """ - Llegeix els dos sensors de línia 10 vegades cada 500ms. - Posa el sensor sobre superfícies de diferent color per veure la variació. - Valor alt (~9700+) = negre. Valor baix = blanc/clar. - """ - print("=== TEST SENSORS DE LÍNIA ADS1115 ===") - pi = _setup_sensors() - - print("Llegint sensors de línia durant 5 segons...") - print(" (posa els sensors sobre blanc i negre per veure la diferència)") - for i in range(10): - r = motion._chan_r.value - l = motion._chan_l.value - print(f" Lectura {i+1:2d}: DRETA={r:5d} ESQUERRA={l:5d} error={r-l:+6d}") - time.sleep(0.5) - - _teardown_sensors(pi) - print("Test sensors de línia completat.\n") - - -# ================== -# Execució -# ================== - -if __name__ == "__main__": - # Descomenta el test que vols executar: - test_motor_unic() # Primer: prova un sol motor - # test_motors() # Segon: prova les dues rodes - # test_homing_brac() # Primer homing: només els braços - # test_homing() # Homing complet: braços i xeringa - # test_distance_sensor() - # test_line_sensors() diff --git a/raspi/tests/test_simple_motor.py b/raspi/tests/test_simple_motor.py deleted file mode 100644 index fb7ec06..0000000 --- a/raspi/tests/test_simple_motor.py +++ /dev/null @@ -1,53 +0,0 @@ -""" -test_simple_motor.py — Test mínim de la roda dreta per diagnosticar problemes. -Executa des del directori Rasp/tests/: python3 test_simple_motor.py - -Diferències respecte a test_motion.py: -- Usa pi.write() directe en lloc de gpio_trigger() -- Bucle bloquejant amb time.sleep() en lloc de thread -- Sense acceleració, velocitat constant -""" - -import pigpio -import time - -STEP = 25 # STEP_R_W -DIR = 23 # DIR_R_W -EN = 6 # EN_W (actiu LOW) - -pi = pigpio.pi() -if not pi.connected: - print("ERROR: pigpiod no està en marxa") - exit(1) - -pi.set_mode(STEP, pigpio.OUTPUT) -pi.set_mode(DIR, pigpio.OUTPUT) -pi.set_mode(EN, pigpio.OUTPUT) - -pi.write(EN, 0) # Activa el driver (LOW = ON) -pi.write(DIR, 1) # Endavant - -# 500 passos/s → període = 1/500 = 2ms → mig període = 1ms -DELAY = 0.001 - -print("Movent 200 passos endavant...") -for _ in range(200): - pi.write(STEP, 1) - time.sleep(DELAY) - pi.write(STEP, 0) - time.sleep(DELAY) - -print("Fet. Esperant 1s...") -time.sleep(1) - -pi.write(DIR, 0) # Enrere -print("Movent 200 passos enrere...") -for _ in range(200): - pi.write(STEP, 1) - time.sleep(DELAY) - pi.write(STEP, 0) - time.sleep(DELAY) - -print("Fet.") -pi.write(EN, 1) # Desactiva el driver -pi.stop()