Jkdsjksj
Some checks failed
Build / build-web (push) Failing after 17s
Build / build-backend (push) Successful in 2s
Build / release (push) Has been skipped
Build APK / build (push) Failing after 41s
Build APK / release (push) Has been skipped

This commit is contained in:
2026-06-19 09:34:52 +02:00
parent 023d3c04b9
commit 432df63298
24 changed files with 589 additions and 2644 deletions

120
AGENTS.md
View File

@@ -2,16 +2,17 @@
## Overview
QuiBot is an educational robotics platform (UPC/UNE collaboration) consisting of a programmable robot with color-block recognition, gesture control, stepper motors, RGB LED eyes, and multiple input methods (web dashboard, Android voice app). The codebase comprises four independent application layers communicating via HTTP JSON APIs.
QuiBot is an educational robotics platform (UPC/UNE collaboration) consisting of a programmable robot with color-block recognition, gesture control, stepper motors, RGB LED eyes, and multiple input methods (web dashboard, Android voice app). The codebase comprises two independent application layers communicating via HTTP JSON APIs.
```
[quibot-web Nuxt SPA] ──HTTP──> [backend Express] ──HTTP──> [raspi FastAPI (Pi)]
[quibot-web Nuxt SPA] ──HTTP──> [backend Express]
│ │
[apk Expo RN app] ──HTTP──> (same backend) [Python hardware drivers]
├──▶ Raspberry Pi (port 8000) — motor/audio endpoints
[apk Expo RN app] ──HTTP──> ├──▶ LLM (llamacpp)
└──▶ TTS (Piper)
```
**Tech stack**: Python (pigpio, FastAPI) | TypeScript/Express | Nuxt 4/Vue 3 | Expo/React Native
**Tech stack**: TypeScript/Express | Nuxt 4/Vue 3 | Expo/React Native
**No database**: All state is in-memory, file-based (`/tmp/quibot-audio/`), or localStorage.
---
@@ -20,30 +21,22 @@ QuiBot is an educational robotics platform (UPC/UNE collaboration) consisting of
```
quibot/
├── raspi/ # Raspberry Pi brain — Python, controls hardware
│ ├── main.py # FastAPI server (port 8000) on the Pi
│ ├── quibot.py # Main program: block/gesture threads (like Arduino QuiBot.ino)
│ ├── motion.py # Stepper class, homing, line-following, high-level tasks
│ ├── gesture.py # PAJ7620U2 gesture sensor (I2C, polled at 50ms)
│ ├── blocks.py # TCS34725 color sensor + servo block ejection
│ ├── eyes.py # WS2811 LED matrix (128 LEDs, pigpio waveforms, breathing animation)
│ ├── pins.py # BCM GPIO pin map for all hardware
│ └── tests/ # Manual diagnostic scripts (not automated)
├── backend/ # Local Express server (port 3000), proxy to Pi
├── backend/ # Express server (port 5000), proxy to Pi + LLM/TTS
│ ├── src/
│ │ ├── index.ts # Express entry: CORS, JSON parser, /health
│ │ ├── config.ts # Env config: RASPBERRY_PI_HOST, PORT, QUIBOT_TOKEN
│ │ ├── index.ts # Express entry: CORS, JSON parser, /health, routes
│ │ ├── config.ts # Env config: raspi, PIPER_URL, LLAMA_CPP_URL, PORT
│ │ ├── routes/router.ts # Mounts all controllers
│ │ ├── services/raspi.service.ts # Axios proxy layer to Pi FastAPI
│ │ ├── services/raspi.service.ts # Axios proxy layer to Pi endpoints
│ │ └── controllers/
│ │ ├── motor.controller.ts # Motor step/stop/upload
│ │ ├── audio.controller.ts # Audio file lifecycle (incoming/locked/processed)
│ │ ├── command.controller.ts # POST /commands proxy to raspi /run
│ │ ── settings.controller.ts # GET/PUT /settings runtime config
│ │ ── settings.controller.ts # GET/PUT /settings runtime config
│ │ └── tts.controller.ts # TTS synthesis via Piper
│ └── dist/ # Compiled output (generated)
├── quibot-web/ # Nuxt 4 dashboard SPA
│ ├── app/app.vue # Single-page control panel: block queue, D-pad, eye controls, gesture log
│ ├── server/api/ # Nitro server routes proxying to raspi
│ ├── server/api/ # Nitro server routes proxying to backend Express
│ │ ├── motor/step/[direction].post.ts
│ │ └── motor/stop.post.ts
│ ├── nuxt.config.ts # Runtime config: QUIBOT_BASE_URL, QUIBOT_TOKEN
@@ -61,7 +54,9 @@ quibot/
---
## Raspberry Pi Layer (`raspi/`)
## Raspberry Pi Layer (remote, port 8000)
The Raspberry Pi runs a lightweight HTTP server exposing hardware control endpoints. The `raspi/` source directory is no longer part of this repository — it lives on the Pi itself.
**Hardware target**: Raspberry Pi Zero 2W controlling a robot with:
- 5 NEMA-style stepper motors (wheels x2, arms x2, syringe) via A4988/TB600 drivers in STEP/DIR mode
@@ -74,7 +69,7 @@ quibot/
- Hall-effect endstops on GPIOs 12, 16, 17
- Optional: I2S audio amp (MAX98357A) + mic (SPH0645)
### Key files
### Hardware source files (on Pi)
- **`pins.py`** — BCM GPIO pin numbering for every component (STEP, DIR, EN pins, I2C lines, endstops, LED_DATA on GPIO26)
- **`motion.py`** — `Stepper` class with AccelStepper-style acceleration profiling via `pigpio.gpio_trigger()`. 5 motor instances (`wheel_R`, `wheel_L`, `arm_R`, `arm_L`, `syringe`). Continuous stepper daemon thread (`_stepper_loop`) at ~100Hz. Homing routines read Hall-effect endstops. Line-following with proportional correction on TCRT5000 values via ADS1115.
@@ -104,7 +99,7 @@ quibot/
## Backend Layer (`backend/`)
**Role**: Express.js HTTP proxy sitting between frontend/mobile and the Raspberry Pi's FastAPI server. Token passthrough, no business logic.
**Role**: Express.js HTTP server providing frontend/mobile API, proxying hardware commands to the Raspberry Pi, and managing TTS/LLM integration.
### Configuration (`.env`, loaded by `config.ts`)
@@ -113,15 +108,19 @@ quibot/
| `RASPBERRY_PI_HOST` | `http://raspberrypi.local` | Pi API URL |
| `RASPBERRY_PI_PORT` | `8000` | Pi API port |
| `QUIBOT_TOKEN` | `MY_SECRET_TOKEN` | Auth token for all Pi endpoints |
| `PORT` | `3000` | Backend listen port |
| `PORT` | `5000` | Backend listen port |
| `PIPER_URL` | `''` | Piper TTS service URL |
| `LLAMA_CPP_URL` | `''` | LLM inference service URL |
| `LLAMA_API_KEY` | `''` | LLM API key |
| `LLAMA_PREAMBLE` | `''` | Path or content for LLM preamble |
### Architecture
```
index.ts → Express app, CORS, JSON parser, /health endpoint
routes/router.ts → Mounts all controllers under /motor, /audio, /commands, /settings
routes/router.ts → Mounts all controllers under /motor, /audio, /commands, /settings, /tts
config.ts → Mutable getter/setter env vars (runtime update via PUT /settings)
raspi.service.ts → Axios proxy methods for every Pi endpoint + multipart file upload handling
raspi.service.ts → Axios proxy methods for Pi endpoints + multipart file upload handling
```
### Controllers
@@ -130,6 +129,7 @@ raspi.service.ts → Axios proxy methods for every Pi endpoint + multipart file
- **`audio.controller.ts`** — `GET /audio/incoming`, `POST /audio/lock/:filename`, `/unlock/:filename`, `/cancel/:filename`, `/process/:filename`. All proxy to raspi audio file lifecycle endpoints.
- **`command.controller.ts`** — `POST /commands { task }` → proxied to raspi `/run?task=...&token=...`
- **`settings.controller.ts`** — `GET /settings` returns config; `PUT /settings` updates `raspberryPi.host`, `raspberryPi.port`, `token` at runtime.
- **`tts.controller.ts`** — `POST /tts { text, lang }` → Synthesizes audio via Piper TTS service. Saves WAV files to `/tmp/quibot-audio/tts/`.
### Build/Run
@@ -150,7 +150,7 @@ node dist/index.js # Or use tsx/nodemon for dev
| Key | Default | Purpose |
|-----|---------|---------|
| `QUIBOT_BASE_URL` | `http://quibot:8000` | Base URL for raspi FastAPI |
| `QUIBOT_BASE_URL` | `http://quibot:8000` | Base URL for backend Express (or Pi) |
| `QUIBOT_TOKEN` | `MY_SECRET_TOKEN` | Auth token |
### UI Panels (`app/app.vue` — single-file SPA, 1369 lines)
@@ -170,8 +170,8 @@ node dist/index.js # Or use tsx/nodemon for dev
| Method | Path | Description |
|--------|------|-------------|
| POST | `/api/motor/stop` | Proxies to raspi `/motor/stop` |
| POST | `/api/motor/step/:direction` | Proxies to raspi `/motor/step/forward\|backwards` |
| POST | `/api/motor/stop` | Proxies to backend Express `/motor/stop` |
| POST | `/api/motor/step/:direction` | Proxies to backend Express `/motor/step/forward\|backwards` |
**Note**: The frontend also calls `POST /api/eye/shape`, `/api/eye/color`, `/api/eye/on`, `/api/eye/off`, `/api/gesture/on`, `/api/gesture/off` — server routes for these may need to be created (frontend references them but they don't have explicit server handlers yet).
@@ -221,7 +221,26 @@ CI: `build-apk.yml` runs expo prebuild, decodes keystore from secrets, builds si
## Complete API Reference
### Raspberry Pi FastAPI (`raspi/main.py`) — port 8000
### Backend Express (`backend/`) — port 5000
| Method | Path | Body | Description |
|--------|------|------|-------------|
| GET | `/health` | — | Returns settings object |
| POST | `/commands` | `{ task }` | Proxy to raspi `/run` |
| POST | `/motor/step/forward` | — | Motor forward proxy to Pi |
| POST | `/motor/step/backward` | — | Motor backward proxy to Pi (maps to `/backwards`) |
| POST | `/motor/stop` | — | Motor stop proxy to Pi |
| POST | `/motor/upload` | multipart file | Audio upload via multer → proxied to Pi |
| GET | `/audio/incoming` | — | List incoming audio files from Pi |
| POST | `/audio/lock/:filename` | — | Lock audio file on Pi |
| POST | `/audio/unlock/:filename` | — | Unlock audio file on Pi |
| POST | `/audio/cancel/:filename` | — | Cancel locked audio on Pi |
| POST | `/audio/process/:filename` | — | Mark processed on Pi |
| GET | `/settings` | — | Returns config |
| PUT | `/settings` | `{ raspberryPi: { host, port }, token }` | Update runtime config |
| POST | `/tts` | query: `text`, `lang` | Synthesize speech via Piper TTS |
### Raspberry Pi HTTP Server (remote, port 8000)
| Method | Path | Params/Body | Description |
|--------|------|-------------|-------------|
@@ -238,24 +257,6 @@ CI: `build-apk.yml` runs expo prebuild, decodes keystore from secrets, builds si
**Auth**: Query parameter `token` matching `QUIBOT_TOKEN` env var (default: `MY_SECRET_TOKEN`).
### Backend Express (`backend/`) — port 3000
| Method | Path | Body | Description |
|--------|------|------|-------------|
| GET | `/health` | — | Returns settings object |
| POST | `/commands` | `{ task }` | Proxy to raspi `/run` |
| POST | `/motor/step/forward` | — | Motor forward proxy |
| POST | `/motor/step/backward` | — | Motor backward proxy (maps to raspi `/backwards`) |
| POST | `/motor/stop` | — | Motor stop proxy |
| POST | `/motor/upload` | multipart file | Audio upload via multer in-memory buffer |
| GET | `/audio/incoming` | — | List incoming audio files |
| POST | `/audio/lock/:filename` | — | Lock audio file |
| POST | `/audio/unlock/:filename` | — | Unlock audio file |
| POST | `/audio/cancel/:filename` | — | Cancel locked audio |
| POST | `/audio/process/:filename` | — | Mark processed |
| GET | `/settings` | — | Returns config |
| PUT | `/settings` | `{ raspberryPi: { host, port }, token }` | Update runtime config |
---
## Command Flow Examples
@@ -266,8 +267,10 @@ User clicks "Forward" in D-pad
→ $fetch('/api/motor/step/forward', { method: 'POST' })
→ Nuxt Nitro route: server/api/motor/step/[direction].post.ts
→ $fetch(config.quibotBaseUrl + '/motor/step/forward', { query: { token } })
raspi FastAPI /motor/step/forward
→ motor_step("forward") in daemon thread
Backend Express /motor/step/forward
raspi.service.motorStepForward()
→ Pi FastAPI /motor/step/forward?token=...
→ motor_step("forward") in daemon thread on Pi
→ step_motor(200, DIR, 1ms pulses)
```
@@ -282,6 +285,16 @@ Child inserts colored block → quibot.py task_read_blocks() polls distance sens
→ After action: servo_move_to(EJECT_POSITION)
```
### TTS Synthesis
```
User triggers speech in web UI
→ POST /tts?text=hello&lang=ca&token=...
→ Backend Express tts.controller.ts
→ piperService.synthesize() → Piper TTS service
→ WAV file saved to /tmp/quibot-audio/tts/{uuid}.wav
→ Returns audioUrl + filename
```
### APK → Audio Upload
```
User stops recording in VoiceDrop → expo-av .m4a file
@@ -317,8 +330,9 @@ User toggles mode in web UI
## Testing
All tests in `raspi/tests/` are **manual diagnostic scripts** (not automated frameworks). Each test is run independently by uncommenting the desired function call at the bottom of the file. Requirements:
Tests reside on the Pi alongside the hardware source code, not in this repository.
Requirements:
- `sudo pigpiod -s 1` daemon running
- Python venv activated with hardware dependencies installed
- Pi connected to robot hardware
@@ -337,8 +351,8 @@ All tests in `raspi/tests/` are **manual diagnostic scripts** (not automated fra
1. **Token auth** is always a query parameter matching `QUIBOT_TOKEN` (default: `MY_SECRET_TOKEN`)
2. **No database** — use filesystem for persistence, localStorage for web client state
3. **Backend is a dumb proxy** — no business logic, just forwards HTTP requests with token passthrough
4. **Motor commands are fire-and-forget** — motor runs in daemon thread until `/motor/stop`
3. **Backend proxies Pi endpoints** — motor/audio commands forwarded via raspi.service.ts
4. **Motor commands are fire-and-forget** — motor runs in daemon thread on Pi until `/motor/stop`
5. **Audio lifecycle**: incoming → locked (claim) → processed OR unlocked (release) / cancelled
6. **Eyes breathing** runs continuously at MIN_BR(80)-MAX_BR(170) brightness in background
7. **`quibot.py` owns block/gesture autonomy** — blocks are processed internally on the Pi without backend/web involvement

View File

@@ -49,6 +49,14 @@
"backgroundColor": "#000000"
}
}
],
[
"expo-build-properties",
{
"android": {
"usesCleartextTraffic": true
}
}
]
],
"experiments": {

View File

@@ -1,5 +1,4 @@
import { Audio, InterruptionModeAndroid, InterruptionModeIOS } from "expo-av";
import * as Speech from "expo-speech";
import { router, useFocusEffect } from "expo-router";
import { useCallback, useEffect, useRef, useState } from "react";
import Svg, { Path } from "react-native-svg";
@@ -63,7 +62,9 @@ export default function RecorderScreen() {
const [transcriptionText, setTranscriptionText] = useState("");
const [isUploading, setIsUploading] = useState(false);
const [isHolding, setIsHolding] = useState(false);
const [isPlaying, setIsPlaying] = useState(false);
const recordingRef = useRef<Audio.Recording | null>(null);
const soundRef = useRef<Audio.Sound | null>(null);
const refreshSettings = useCallback(() => {
let isMounted = true;
@@ -123,15 +124,29 @@ export default function RecorderScreen() {
};
}, [recording]);
async function speak(text: string) {
if (!text || !text.trim()) {
console.log("[TTS] Skipping empty text");
return;
useEffect(() => {
return () => {
void unloadSound();
};
}, []);
async function unloadSound() {
if (soundRef.current) {
try {
await soundRef.current.stopAsync();
await soundRef.current.unloadAsync();
} catch (err) {
console.log("[TTS] Error unloading sound:", err);
}
soundRef.current = null;
}
setIsPlaying(false);
}
console.log("[TTS] ===== START speak =====");
Speech.stop();
await new Promise((r) => setTimeout(r, 100));
async function speakWithAudio(audioUrl: string, backendBase: string) {
if (!audioUrl) return false;
await unloadSound();
try {
await Audio.setAudioModeAsync({
@@ -142,38 +157,103 @@ export default function RecorderScreen() {
shouldDuckAndroid: true,
staysActiveInBackground: false,
});
console.log("[TTS] Audio mode reset OK");
} catch (err) {
console.log("[TTS] Audio mode error:", err);
}
const lang = locale === "ca" ? "ca-ES" : "en-US";
await new Promise((r) => setTimeout(r, 800));
console.log("[TTS] Calling Speech.speak. Text length:", text.length, "Lang:", lang);
try {
Speech.speak(text, {
language: lang,
onDone: () => console.log("[TTS] ✅ onDone fired"),
onError: (error) => console.log("[TTS] ❌ onError:", error),
});
console.log("[TTS] Speech.speak() call returned OK");
const fullUrl = audioUrl.startsWith("http")
? audioUrl
: `${backendBase.replace(/\/+$/, "")}/${audioUrl.replace(/^\/+/, "")}`;
console.log("[TTS] Loading audio from:", fullUrl);
setIsPlaying(true);
setStatusMessage(strings.playing);
const { sound } = await Audio.Sound.createAsync(
{ uri: fullUrl },
{ shouldPlay: true, volume: 1.0 },
(status) => {
if (status.isLoaded && status.didJustFinish) {
console.log("[TTS] Audio playback finished");
void unloadSound();
}
},
);
soundRef.current = sound;
const status = await sound.getStatusAsync();
const durationMs = status.isLoaded ? (status.durationMillis ?? 0) : 0;
console.log("[TTS] Playing audio, duration:", durationMs, "ms");
return true;
} catch (err) {
console.log("[TTS] Speech.speak() threw:", err);
console.log("[TTS] Audio playback error:", err);
setIsPlaying(false);
return false;
}
}
async function speakSequentially(texts: string[]) {
if (texts.length === 0) return;
const trimmedUrl = backendUrl.trim().replace(/\/+$/, "");
for (let i = 0; i < texts.length; i++) {
await speak(texts[i]);
await new Promise((r) => setTimeout(r, 1500));
const text = texts[i];
if (!text || !text.trim()) continue;
try {
setStatusMessage(strings.playing);
console.log("[TTS] Generating TTS audio for text:", text.substring(0, 50));
const localeLang = locale === "ca" ? "ca" : "en";
const ttsParams = new URLSearchParams({
text: text.trim(),
language: localeLang,
});
if (authToken.trim()) {
ttsParams.append("token", authToken.trim());
}
const ttsUrl = `${trimmedUrl}/tts?${ttsParams.toString()}`;
const ttsResponse = await fetch(ttsUrl, { method: "POST" });
if (!ttsResponse.ok) {
const errText = await ttsResponse.text();
console.log("[TTS] TTS endpoint error:", ttsResponse.status, errText);
continue;
}
const ttsData = await ttsResponse.json();
if (!ttsData.audioUrl) {
console.log("[TTS] No audioUrl in response:", ttsData);
continue;
}
const played = await speakWithAudio(ttsData.audioUrl, trimmedUrl);
if (!played) {
setStatusMessage(strings.uploadFailed);
}
if (i < texts.length - 1) {
await new Promise((r) => setTimeout(r, 800));
}
} catch (err) {
console.log("[TTS] speakSequentially error:", err);
}
}
}
async function speak(text: string) {
const texts = [text].filter(Boolean);
await speakSequentially(texts);
}
async function startRecording() {
try {
Speech.stop();
await unloadSound();
setTranscriptionText("");
setResponsePreview("");
setLlmResponseText("");
@@ -367,7 +447,7 @@ export default function RecorderScreen() {
try {
setIsUploading(true);
setStatusMessage(strings.uploadingRecording);
Speech.stop();
await unloadSound();
setTranscriptionText("");
setResponsePreview("");
setLlmResponseText("");

View File

@@ -15,7 +15,6 @@
"expo": "~54.0.35",
"expo-av": "~16.0.8",
"expo-router": "~6.0.24",
"expo-speech": "~14.0.8",
"expo-splash-screen": "~31.0.13",
"expo-status-bar": "~3.0.9",
"react": "19.1.0",

View File

@@ -1,5 +1,6 @@
import dotenv from 'dotenv';
import { readFileSync } from 'fs';
import { join } from 'path';
dotenv.config();
@@ -7,6 +8,7 @@ let _raspberryHost = process.env.RASPBERRY_PI_HOST ?? 'http://raspberrypi.local'
let _raspberryPort = Number(process.env.RASPBERRY_PI_PORT) || 8000;
let _token = process.env.QUIBOT_TOKEN ?? 'MY_SECRET_TOKEN';
const APP_PORT = Number(process.env.PORT) || 5000;
const piperUrl = process.env.PIPER_URL ?? '';
const llamacppUrl = process.env.LLAMA_CPP_URL ?? '';
const llamacppApiKey = process.env.LLAMA_API_KEY ?? '';
const llamaPreambleRaw = process.env.LLAMA_PREAMBLE ?? '';
@@ -42,4 +44,9 @@ export const getLlamacppUrl = () => llamacppUrl;
export const getLlamacppApiKey = () => llamacppApiKey;
export const getLlamacppPreamble = () => llamacppPreamble;
export const getPiperUrl = () => piperUrl;
export const getPiperModel = () =>
process.env.PIPER_MODEL ||
join('/tmp', 'quibot-piper-models', 'ca_ES-upc_ona-medium.onnx');
export const getAppPort = () => APP_PORT;

View File

@@ -0,0 +1,52 @@
import { Router } from 'express';
import { randomUUID } from 'crypto';
import { join } from 'path';
import { mkdirSync, writeFileSync } from 'fs';
import { piperService } from '../services/piper.service.js';
import { getToken } from '../config.js';
const router = Router();
const TTS_AUDIO_DIR = join('/tmp', 'quibot-audio', 'tts');
try { mkdirSync(TTS_AUDIO_DIR, { recursive: true }); } catch { /* ignore */ }
router.post('/', async (req, res) => {
try {
const text = (req.query.text as string) || (req.body?.text as string);
// Accept 'lang' or 'language' — APK sends 'language', old tests use 'lang'
const lang = (req.query.lang as string) || (req.query.language as string) || 'ca';
const token = (req.query.token as string) || '';
if (!text?.trim()) {
return res.status(400).json({ error: 'Missing query parameter: text' });
}
const expectedToken = getToken();
if (token && token !== expectedToken) {
return res.status(401).json({ error: 'Unauthorized: invalid token' });
}
console.log(`[tts] Generating audio for text (${lang}): "${text.substring(0, 60)}..."`);
// Ensure Piper subprocess is initialized before synthesis
await piperService.initWav();
const wavBuffer = await piperService.synthWav(text.trim());
const filename = `${randomUUID()}.wav`;
writeFileSync(join(TTS_AUDIO_DIR, filename), wavBuffer);
console.log(`[tts] Audio saved: ${filename}`);
return res.json({
audioUrl: `/tts-audio/${filename}`,
filename,
});
} catch (err: unknown) {
const message = err instanceof Error ? err.message : 'Unknown error';
console.error(`[tts] Error: ${message}`);
return res.status(500).json({ error: `TTS synthesis failed: ${message}` });
}
});
export default router;

View File

@@ -3,6 +3,7 @@ import cors from 'cors';
import router from './routes/router.js';
import { getAppPort, getConfig } from './config.js';
import { whisperService } from './services/whisper.service.js';
import { piperService as piperWorker } from './services/piper.service.js';
const app = express();
@@ -14,6 +15,9 @@ app.use('/audio', express.json());
app.use('/motor', express.json());
app.use('/commands', express.json());
// Serve generated TTS audio files to the APK
app.use('/tts-audio', express.static('/tmp/quibot-audio/tts'));
app.use(router);
app.get('/health', (_req, res) => {
@@ -24,12 +28,13 @@ app.get('/health', (_req, res) => {
const server = app.listen(getAppPort(), () => {
console.log(`QuiBot backend listening on port ${getAppPort()}`);
whisperService.spawn();
piperWorker.initWav().catch(() => { /* model may not exist yet → lazy init on first TTS call */ });
});
async function shutdown(signal: string) {
console.log(`[server] ${signal} received, shutting down...`);
server.close(async () => {
await whisperService.shutdown();
await Promise.all([whisperService.shutdown(), piperWorker.shutdown()]);
process.exit(0);
});
}

102
backend/src/piper-worker.py Normal file
View File

@@ -0,0 +1,102 @@
#!/usr/bin/env python3
"""Persistent Piper TTS worker single subprocess, model loaded once."""
import sys
import json
import wave
import io
class PiperStdoutSink:
"""Wraps stdout so we write exactly N bytes."""
def __init__(self):
self._remaining = 0
def set_length(self, length: int):
self._remaining = length
def write(self, data: bytes):
to_write = min(len(data), self._remaining)
if to_write > 0:
sys.stdout.buffer.write(data[:to_write])
sys.stdout.buffer.flush()
self._remaining -= to_write
def main():
from piper import PiperVoice
model_path = ""
config_path = None
voice = None
# Defaults (will be overridden by init message)
DEFAULT_MODEL = "/tmp/quibot-piper-models/ca_ES-upc_ona-medium.onnx"
DEFAULT_CONFIG = "/tmp/quibot-piper-models/ca_ES-upc_ona-medium.onnx.json"
# Signal node that the process is alive and listening
print(json.dumps({"type": "ready"}), flush=True)
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
msg = json.loads(line)
except json.JSONDecodeError:
continue
if msg.get("type") == "init":
model_path = msg.get("model", DEFAULT_MODEL) or DEFAULT_MODEL
config_path = msg.get("config", None)
print(f"[piper-worker] Loading model='{model_path}'", file=sys.stderr, flush=True)
try:
voice = PiperVoice.load(model_path, config_path=config_path)
print(json.dumps({"type": "init_ok"}), flush=True)
except Exception as exc:
err_msg = str(exc).replace('"', '\\"')
print(json.dumps({"type": "init_error", "error": err_msg}), flush=True)
elif msg.get("type") == "synthesize":
text = msg.get("text", "")
msg_id = msg.get("msgId", "")
if not voice:
print(json.dumps({
"type": "error",
"text": "Model not loaded, send init first",
"msgId": msg_id,
}), flush=True)
continue
try:
buf = io.BytesIO()
wf = wave.open(buf, 'wb')
# set_wav_config is called inside synthesize_wav via set_wav_format=True (default)
voice.synthesize_wav(text, wf)
wf.close()
wav_bytes = buf.getvalue()
# Frame: send length prefix as ASCII number + newline, then raw bytes
header = f"{len(wav_bytes)}\n".encode('ascii')
sys.stdout.buffer.write(header)
sys.stdout.buffer.write(wav_bytes)
sys.stdout.buffer.flush()
print(json.dumps({
"type": "synthesized",
"bytes": len(wav_bytes),
"msgId": msg_id,
}), flush=True)
except Exception as exc:
err_msg = str(exc).replace('"', '\\"')
print(json.dumps({
"type": "error",
"text": err_msg,
"msgId": msg_id,
}), flush=True)
if __name__ == "__main__":
main()

View File

@@ -3,6 +3,7 @@ import motorController from '../controllers/motor.controller.js';
import audioController from '../controllers/audio.controller.js';
import commandController from '../controllers/command.controller.js';
import settingsController from '../controllers/settings.controller.js';
import ttsController from '../controllers/tts.controller.js';
const router = Router();
@@ -10,5 +11,6 @@ router.use('/motor', motorController);
router.use('/audio', audioController);
router.use('/commands', commandController);
router.use('/settings', settingsController);
router.use('/tts', ttsController);
export default router;

View File

@@ -0,0 +1,26 @@
import axios from 'axios';
import { getPiperUrl } from '../config.js';
export interface PiperSynthesisParams {
text: string;
lang?: string;
}
class PiperHttpService {
async synthesize(params: PiperSynthesisParams): Promise<Buffer> {
const piperUrl = getPiperUrl();
if (!piperUrl) throw new Error('PIPER_URL not configured');
const speakerId = params.lang === 'en' ? 1 : 0;
const response = await axios.post(
`${piperUrl}/api/tts`,
{ text: params.text, speaker_id: speakerId },
{ responseType: 'arraybuffer', timeout: 30_000 },
);
return Buffer.from(response.data, 'binary');
}
}
export const piperService = new PiperHttpService();

View File

@@ -0,0 +1,210 @@
import { spawn, ChildProcess } from 'child_process';
import { join } from 'path';
import { fileURLToPath } from 'url';
import { randomUUID } from 'crypto';
import { readFileSync, rmSync, readdirSync, statSync, unlinkSync, mkdirSync } from 'fs';
import { piperService as httpPiperService } from './piper.http.service.js';
import { getPiperUrl, getPiperModel } from '../config.js';
const __filename = fileURLToPath(import.meta.url);
const __dirname = join(__filename, '..');
const SCRIPT_DIR = join(__dirname, '..');
const TTS_DIR = join('/tmp', 'quibot-audio', 'tts-piper');
mkdirSync(TTS_DIR, { recursive: true });
// ─── type-guard for JSON messages from piper-worker ───
type PiperMsg =
| { type: 'ready' }
| { type: 'init_ok' }
| { type: 'init_error'; error: string }
| { type: 'synthesized'; wavPath: string; bytes?: number; msgId: string }
| { type: 'error'; text: string; msgId: string };
class PiperLocalService {
private proc: ChildProcess | null = null;
private pendingInit: Promise<void> | null = null;
private cleanupTimer: ReturnType<typeof setInterval> | null = null;
// ── spawn + stdout parser (simple: pure JSON on stdout, WAV on disk) ──
private setupStdout(): void {
if (!this.proc?.stdout) return;
let buf = '';
this.proc.stdout.on('data', (chunk: Buffer) => {
buf += chunk.toString();
while (true) {
const nl = buf.indexOf('\n');
if (nl === -1) break;
const line = buf.slice(0, nl).trim();
buf = buf.slice(nl + 1);
if (!line) continue;
this.handleLine(line);
}
});
}
private handleLine(line: string): void {
try { const msg = JSON.parse(line) as PiperMsg; /* handled below */ } catch { return; }
const msg = JSON.parse(line) as PiperMsg;
if (msg.type === 'ready') return;
if (msg.type === 'init_ok') { this.resolveInit(); }
if (msg.type === 'init_error') {
if (this.pendingInit) { this.pendingInit = null; this.initResolve = null; }
this.initReject(new Error(msg.error));
}
if (msg.type === 'synthesized') {
this.resolveResponse(msg.msgId, msg.wavPath);
}
}
private async writeStdin(line: string): Promise<void> {
if (!this.proc?.stdin) throw new Error('piper-svc: stdin unavailable');
this.proc.stdin.write(line + '\n');
}
// ── pending-init promises (simplest possible design) ──
private initResolve: (() => void) | null = null;
private initReject: (e: Error) => void = () => { throw new Error('no reject handler'); };
private resolveInit(): void {
if (!this.pendingInit) return;
this.initResolve?.(); this.pendingInit = null;
this.initResolve = null;
}
// ── pending synth responses ──
private respMap = new Map<string, (wavPath: string) => void>();
private resolveResponse(msgId: string, wavPath: string): void {
const fn = this.respMap.get(msgId);
this.respMap.delete(msgId);
if (fn) fn(wavPath);
}
// ── public spawn / initWav / synthWav ──
private async _spawn(): Promise<void> {
if (this.proc) return;
const workerPath = join(SCRIPT_DIR, 'src', 'piper-worker.py');
const venv = process.env.VIRTUAL_ENV || join(SCRIPT_DIR, '.venv', 'bin', 'python3');
this.proc = spawn(venv, [workerPath], { stdio: ['pipe', 'pipe', 'pipe'] });
this.setupStdout();
this.proc.on('exit', () => {
// reject all pending
for (const [, fn] of this.respMap) fn('');
this.respMap.clear();
if (this.pendingInit) { this.initReject(new Error('piper process exited')); this.pendingInit = null; }
});
// ── cleanup old WAV files every 5 min ──
const timer = setInterval(() => {
try {
const now = Date.now();
for (const entry of readdirSync(TTS_DIR)) {
const fp = join(TTS_DIR, entry);
try {
const s = statSync(fp);
if (s.isFile() && now - s.mtimeMs > 300_000) unlinkSync(fp);
} catch { /* ignore */ }
}
} catch { /* ignore */ }
}, 5 * 60 * 1000);
}
async initWav(): Promise<void> {
if (!this.proc) await this._spawn();
if (this.pendingInit) return this.pendingInit;
this.pendingInit = new Promise<void>((resolve, reject) => {
// override the global reject
const origReject = this.initReject;
this.initResolve = resolve;
this.initReject = (e: Error) => {
// cleanup
for (const [, fn] of this.respMap) fn('');
this.respMap.clear();
reject(e);
};
});
const modelPath = getPiperModel() || join('/tmp', 'quibot-piper-models', 'ca_ES-upc_ona-medium.onnx');
const cfgPath = modelPath.replace(/\.onnx$/, '.onnx.json');
await this.writeStdin(
JSON.stringify({ type: 'init', model: modelPath, config: cfgPath }),
);
return this.pendingInit;
}
/** Synthesize with local Piper subprocess (primary) and HTTP Piper fallback */
async synthesize(params: { text: string }): Promise<Buffer> {
try {
await this.initWav();
return await this.synthWav(params.text);
} catch (localErr) {
const url = getPiperUrl();
if (url) {
console.log(`[tts] Local Piper failed: ${localErr instanceof Error ? localErr.message : localErr}. Falling back to remote.`);
return await httpPiperService.synthesize({ ...params, lang: 'ca' });
}
throw localErr;
}
}
async synthWav(text: string): Promise<Buffer> {
if (!this.proc) await this._spawn(); // auto-spawn; init runs concurrently
const msgId = randomUUID() + '-' + Date.now();
return new Promise((resolve, reject) => {
let cleared = false;
const timer = setTimeout(() => {
if (cleared) return;
cleared = true;
this.respMap.delete(msgId);
reject(new Error('piper-svc: synthesis timed out'));
}, 30_000);
this.respMap.set(msgId, (wavPath: string) => {
if (cleared) return;
cleared = true;
clearTimeout(timer);
try {
const buf = readFileSync(wavPath);
rmSync(wavPath, { force: true });
resolve(buf);
} catch (err: unknown) {
reject(err instanceof Error ? err : new Error('read WAV failed'));
}
});
this.writeStdin(
JSON.stringify({ type: 'synthesize', text, msgId }),
).catch((e) => {
if (cleared) return;
cleared = true;
this.respMap.delete(msgId);
reject(e instanceof Error ? e : new Error(String(e)));
});
});
}
// ── shutdown ──
async shutdown(): Promise<void> {
this.cleanupTimer?.refresh(); this.cleanupTimer = null;
if (!this.proc) return;
const p = this.proc; this.proc = null;
await new Promise<void>((res) => {
p.on('exit', res);
setTimeout(res, 3000);
if (!p.killed) { try { p.stdin?.end(); } catch {} p.kill('SIGTERM'); }
});
}
}
// ─── singleton ────────────────────────────────────────────────
export const piperService = new PiperLocalService();

2
raspi/.gitignore vendored
View File

@@ -1,2 +0,0 @@
__pycache__/
venv/

View File

@@ -1,160 +0,0 @@
"""
blocks.py — Lectura del sensor de color TCS34725 i servo d'expulsió de blocs.
Equivalent a blocks.cpp del codi Arduino/ESP32.
Requereix /boot/config.txt:
dtoverlay=i2c-gpio,bus=4,i2c_gpio_sda=22,i2c_gpio_scl=27
"""
import time
import pigpio
import adafruit_extended_bus
import adafruit_tcs34725
from pins import SERVO_PWM
# ==================
# IDs de color
# ==================
BK = 0 # Negre (no reconegut)
RD = 1 # Vermell → avançar
GN = 2 # Verd → girar dreta
BU = 3 # Blau → girar esquerra
YE = 4 # Groc → xuclar líquid
OG = 5 # Taronja → buidar líquid
VT = 6 # Violeta → sorpresa
NUM_COLORS = 7
# Taula de colors de referència (valors RGB 0255, calibrats amb el sensor)
_COLORS = [
{"name": "BK", "r": 80, "g": 80, "b": 80},
{"name": "RD", "r": 202, "g": 32, "b": 34},
{"name": "GN", "r": 107, "g": 90, "b": 57},
{"name": "BU", "r": 104, "g": 83, "b": 66},
{"name": "YE", "r": 150, "g": 69, "b": 33},
{"name": "OG", "r": 185, "g": 44, "b": 32},
{"name": "VT", "r": 129, "g": 70, "b": 55},
]
# ==================
# Paràmetres del servo
# ==================
# Valors en µs de pulse width (pigpio set_servo_pulsewidth).
# Conversió des de l'ESP32 (16 bits, 50Hz): valor/65535 * 20000µs
# MIN = 3277/65535 * 20000 ≈ 1000µs
# MAX = 8000/65535 * 20000 ≈ 2440µs
# EJECT= 6450/65535 * 20000 ≈ 1968µs
MIN_SERVO_US = 1000 # µs → ~0°
MAX_SERVO_US = 2440 # µs → ~180°
OPEN_POSITION = 2440 # µs (equivalent a 8000 ESP32)
EJECT_POSITION = 1968 # µs (equivalent a 6450 ESP32)
_INCREMENT_US = 3 # µs per iteració de 1ms (equivalent a increment de 10 ESP32)
# ==================
# Instàncies globals
# ==================
_pi: pigpio.pi = None
_color_sensor = None
_current_servo_pos: int = OPEN_POSITION
# ==================
# Setup
# ==================
def blocks_setup(pi: pigpio.pi):
"""
Inicialitza el servo i el sensor de color TCS34725.
El servo arranca en OPEN_POSITION.
"""
global _pi, _color_sensor, _current_servo_pos
_pi = pi
# Servo
pi.set_mode(SERVO_PWM, pigpio.OUTPUT)
_current_servo_pos = OPEN_POSITION
pi.set_servo_pulsewidth(SERVO_PWM, _current_servo_pos)
# Sensor de color TCS34725 via bus I2C 4 (bit-bang GPIO22=SDA, GPIO27=SCL)
i2c = adafruit_extended_bus.ExtendedI2C(4)
_color_sensor = adafruit_tcs34725.TCS34725(i2c)
_color_sensor.integration_time = 50 # ms
_color_sensor.gain = 4 # 4x (equivalent a TCS34725::Gain::X04)
# ==================
# Servo
# ==================
def servo_move_to(target_us: int):
"""
Mou el servo fins a target_us de forma suau.
Incrementa/decrementa _INCREMENT_US cada mil·lisegon.
"""
global _current_servo_pos
if not (MIN_SERVO_US <= target_us <= MAX_SERVO_US):
return
while True:
if _current_servo_pos < target_us - _INCREMENT_US:
_current_servo_pos += _INCREMENT_US
elif _current_servo_pos > target_us + _INCREMENT_US:
_current_servo_pos -= _INCREMENT_US
else:
_current_servo_pos = target_us
_pi.set_servo_pulsewidth(SERVO_PWM, _current_servo_pos)
time.sleep(0.001)
if _current_servo_pos == target_us:
return
# ==================
# Sensor de color
# ==================
def _calc_colors_difference(measured: tuple, reference: dict) -> int:
"""Distància Manhattan entre el color mesurat i un color de referència."""
return (abs(measured[0] - reference["r"]) +
abs(measured[1] - reference["g"]) +
abs(measured[2] - reference["b"]))
def read_color_raw() -> tuple:
"""Retorna (r, g, b) en bytes 0-255 sense classificació. Útil per calibrar."""
try:
return _color_sensor.color_rgb_bytes
except Exception:
return (0, 0, 0)
def read_block_color() -> int:
"""
Llegeix el color del bloc des del sensor TCS34725.
Compara contra la taula de referència per distància Manhattan.
Retorna l'ID del color més proper, o BK si cap supera el llindar.
"""
MAX_DIFFERENCE = 15
try:
r, g, b = _color_sensor.color_rgb_bytes
except Exception:
return BK
min_difference = MAX_DIFFERENCE
min_diff_color = BK
for color_id, ref in enumerate(_COLORS):
diff = _calc_colors_difference((r, g, b), ref)
if diff < min_difference:
min_difference = diff
min_diff_color = color_id
return min_diff_color

View File

@@ -1,273 +0,0 @@
"""
eyes.py — Control de les matrius LED 8x8 RGB WS2811 (ulls del robot).
Equivalent a eyes.cpp del codi Arduino/ESP32.
FastLED → pigpio waveforms (GPIO26, qualsevol pin).
REQUISIT: iniciar el dimoni pigpio amb resolució d'1µs:
sudo pigpiod -s 1
Si s'inicia sense -s 1 (defecte 5µs), els LEDs no funcionaran correctament.
Si en el futur es fa una modificació hardware (GPIO26 → GPIO18 o GPIO21),
es pot substituir _send_ws2811() per rpi_ws281x sense canviar cap altra funció.
"""
import time
import threading
import pigpio
from pins import LED_DATA
# ==================
# Constants
# ==================
ROW_NUM = 8
COL_NUM = 8
NUM_LEDS = ROW_NUM * COL_NUM * 2 # 128 LEDs (2 matrius 8x8)
MAX_BR = 170 # Brillantor màxima del parpelleig (0255)
MIN_BR = 80 # Brillantor mínima del parpelleig
# Colors predefinits (R, G, B)
WHITE = (255, 255, 255)
RED = (255, 0, 0)
GREEN = ( 0, 255, 0)
BLUE = ( 0, 0, 255)
YELLOW = (255, 200, 0)
ORANGE = (255, 80, 0)
PURPLE = (180, 0, 255)
CYAN = ( 0, 255, 255) # Color del mode gestos
BLACK = ( 0, 0, 0)
# ==================
# Formes dels ulls (índexs dels LEDs actius)
# ==================
class EyeShape:
"""Conjunt de LEDs que formen una expressió dels ulls."""
def __init__(self, leds: list):
self.leds = leds
self.len = len(leds)
EYES_OPEN = EyeShape([
102, 89, 38, 25, 106, 101, 90, 85, 42, 37, 26, 21,
107, 100, 91, 84, 43, 36, 27, 20, 108, 99, 92, 83,
44, 35, 28, 19, 109, 98, 93, 82, 45, 34, 29, 18,
97, 94, 33, 30
])
EYES_FW = EyeShape([
103, 88, 39, 24, 105, 102, 89, 86, 41, 38, 25, 22,
117, 106, 101, 90, 85, 74, 53, 42, 37, 26, 21, 10,
123, 116, 100, 91, 75, 68, 59, 52, 36, 27, 11, 4,
99, 92, 35, 28, 98, 93, 34, 29, 97, 94, 33, 30, 96,
95, 32, 31
])
EYES_DOWN = EyeShape([97, 94, 33, 30])
# Nova forma per al mode gestos: marc extern dels dos ulls (expressió "atenta")
EYES_GESTURE = EyeShape([
96, 97, 98, 99, 100, 101, 102, 103,
104, 111, 112, 119, 120, 127,
31, 32, 33, 34, 35, 36, 37, 38, 39,
0, 7, 8, 15, 16, 23
])
# ==================
# Estat global
# ==================
_pi: pigpio.pi = None
_leds = [[0, 0, 0] for _ in range(NUM_LEDS)]
_brightness = MAX_BR
_leds_lock = threading.Lock()
_update_stop = threading.Event()
_update_thread: threading.Thread = None
# Màscara GPIO per a les waveforms de pigpio
_GPIO_MASK: int = 0
# ==================
# WS2811 via pigpio waveforms
# ==================
def _send_ws2811(data: bytes):
"""
Envia dades RGB als LEDs WS2811 via pigpio waveforms.
Ordre de color: GRB (igual que FastLED amb WS2811, GRB).
Timing a 1µs de resolució (requereix sudo pigpiod -s 1):
- Bit 0: 1µs HIGH + 2µs LOW (spec: 0.5µs + 2.0µs)
- Bit 1: 2µs HIGH + 1µs LOW (spec: 1.2µs + 1.3µs)
- Reset: 80µs LOW
"""
pulses = []
for byte_val in data:
for bit in range(7, -1, -1):
if byte_val & (1 << bit):
pulses.append(pigpio.pulse(_GPIO_MASK, 0, 2))
pulses.append(pigpio.pulse(0, _GPIO_MASK, 1))
else:
pulses.append(pigpio.pulse(_GPIO_MASK, 0, 1))
pulses.append(pigpio.pulse(0, _GPIO_MASK, 2))
pulses.append(pigpio.pulse(0, _GPIO_MASK, 80)) # reset
_pi.wave_add_new()
_pi.wave_add_generic(pulses)
wid = _pi.wave_create()
if wid >= 0:
_pi.wave_send_once(wid)
while _pi.wave_tx_busy():
pass
_pi.wave_delete(wid)
def _eyes_show(brightness: int):
"""Renderitza l'estat actual de _leds amb la brillantor indicada."""
data = bytearray(NUM_LEDS * 3)
scale = brightness / 255
for i, (r, g, b) in enumerate(_leds):
data[i * 3 + 0] = int(g * scale) # WS2811 GRB: primer G
data[i * 3 + 1] = int(r * scale)
data[i * 3 + 2] = int(b * scale)
_send_ws2811(bytes(data))
# ==================
# Setup i cleanup
# ==================
def eyes_setup(pi: pigpio.pi):
"""Inicialitza el GPIO i arrenca el thread de parpelleig."""
global _pi, _GPIO_MASK, _update_thread
_pi = pi
_GPIO_MASK = 1 << LED_DATA
pi.set_mode(LED_DATA, pigpio.OUTPUT)
pi.write(LED_DATA, 0)
_update_stop.clear()
_update_thread = threading.Thread(
target=_task_update_leds, daemon=True, name="eyes"
)
_update_thread.start()
def eyes_cleanup():
"""Atura el thread de parpelleig i apaga els LEDs."""
_update_stop.set()
if _update_thread:
_update_thread.join(timeout=1.0)
with _leds_lock:
for i in range(NUM_LEDS):
_leds[i] = [0, 0, 0]
_eyes_show(255)
# ==================
# Thread de parpelleig (equivalent a task_update_leds del FreeRTOS)
# ==================
def _task_update_leds():
"""
Bucle continu que fa respirar la brillantor dels ulls.
Equivalent a task_update_leds() del FreeRTOS.
"""
global _brightness
going_up = False # Al C++ comença a MAX_BR i baixa
_brightness = MAX_BR
while not _update_stop.is_set():
if going_up:
if _brightness < MAX_BR:
_brightness += 2
else:
going_up = False
else:
if _brightness > MIN_BR:
_brightness -= 2
else:
going_up = True
with _leds_lock:
_eyes_show(_brightness)
time.sleep(0.05)
# ==================
# Animacions (equivalent a les funcions de eyes.cpp)
# ==================
def eyes_turn_off():
"""Apaga tots els LEDs amb un fos progressiu."""
for _ in range(50):
with _leds_lock:
for i in range(NUM_LEDS):
r, g, b = _leds[i]
_leds[i] = [int(r * 245 / 255),
int(g * 245 / 255),
int(b * 245 / 255)]
time.sleep(0.01)
with _leds_lock:
for i in range(NUM_LEDS):
_leds[i] = [0, 0, 0]
def eyes_turn_on(shape: EyeShape, color: tuple,
repeat: int = 1, forward: bool = True):
"""
Encén els LEDs d'una forma un per un, amb animació.
shape: forma a dibuixar (EYES_OPEN, EYES_FW, EYES_DOWN…)
color: color RGB com a tupla (r, g, b)
repeat: nombre de vegades que es repeteix l'animació
forward: True = ordre normal, False = ordre invers
"""
r, g, b = color
for rep in range(repeat):
eyes_turn_off()
for i in range(shape.len):
idx = shape.leds[i if forward else (shape.len - 1 - i)]
with _leds_lock:
_leds[idx] = [r, g, b]
time.sleep(0.008)
if rep < repeat - 1:
for i in range(shape.len):
idx = shape.leds[i if forward else (shape.len - 1 - i)]
with _leds_lock:
_leds[idx] = [0, 0, 0]
time.sleep(0.008)
# ==================
# Animacions noves — mode gestos (TFG)
# ==================
def eyes_gesture_mode_on():
"""
Animació d'activació del mode gestos.
Parpelleig doble en cian per indicar que el robot escolta gestos.
"""
eyes_turn_on(EYES_OPEN, CYAN, repeat=2)
def eyes_gesture_mode_off():
"""
Animació de desactivació del mode gestos.
Torna als ulls oberts en blanc.
"""
eyes_turn_off()
eyes_turn_on(EYES_OPEN, WHITE)
def eyes_listening():
"""
Expressió "escoltant": marc extern dels ulls en cian.
Es mostra mentre el robot espera un gest.
"""
eyes_turn_on(EYES_GESTURE, CYAN)

View File

@@ -1,238 +0,0 @@
"""
gesture.py — Lectura del sensor de gestos PAJ7620U2 via I2C raw (smbus2).
Equivalent a gesture.cpp del codi Arduino/ESP32.
No hi ha pin INT disponible al PCB → polling cada 50ms en un thread.
Bus I2C 3 (GPIO2=SDA, GPIO1=SCL, compartit amb VL53L0X i ADS1115).
Llibreria C++ equivalent: RevEng_PAJ7620 (Aaron S. Crandall)
"""
import time
import threading
import smbus2
# ==================
# IDs de gest
# ==================
GS_NONE = 0
GS_FORWARD = 1
GS_LEFT = 2
GS_RIGHT = 3
GS_UP = 4
GS_DOWN = 5
GS_CLOCKWISE = 6
GS_ANTICLOCKWISE = 7
GS_WAVE = 8
# Àlies per al quibot.py
GS_CW = GS_CLOCKWISE
GS_CCW = GS_ANTICLOCKWISE
# ==================
# Registres PAJ7620U2
# ==================
_PAJ7620_ADDR = 0x73
_REG_BANK_SEL = 0xEF # Registre de selecció de banc (0x00=banc0, 0x01=banc1)
_REG_PART_ID_LSB = 0x00 # Ha de retornar 0x20
_REG_PART_ID_MSB = 0x01 # Ha de retornar 0x76
_REG_GESTURE_0 = 0x43 # Bits 07: Right, Left, Up, Down, Forward, Backward, CW, CCW
_REG_GESTURE_1 = 0x44 # Bit 0: Wave
# Bits de gest al registre 0x43
_BIT_RIGHT = 0x01
_BIT_LEFT = 0x02
_BIT_UP = 0x04
_BIT_DOWN = 0x08
_BIT_FORWARD = 0x10
_BIT_BACKWARD = 0x20
_BIT_CW = 0x40
_BIT_CCW = 0x80
# Bit de gest al registre 0x44
_BIT_WAVE = 0x01
# ==================
# Seqüències d'inicialització
# ==================
_INIT_BANK0 = [
(0x32, 0x29), (0x33, 0x01), (0x34, 0x00), (0x35, 0x01), (0x36, 0x00),
(0x37, 0x07), (0x38, 0x17), (0x39, 0x06), (0x3A, 0x12), (0x3F, 0x00),
(0x40, 0x02), (0x41, 0xFF), (0x42, 0x01), (0x46, 0x2D), (0x47, 0x0F),
(0x48, 0x3C), (0x49, 0x00), (0x4A, 0x1E), (0x4B, 0x00), (0x4C, 0x20),
(0x4D, 0x00), (0x4E, 0x1A), (0x4F, 0x14), (0x50, 0x00), (0x51, 0x10),
(0x52, 0x00), (0x5C, 0x02), (0x5D, 0x00), (0x5E, 0x10), (0x5F, 0x3F),
(0x60, 0x27), (0x61, 0x28), (0x62, 0x00), (0x63, 0x03), (0x64, 0xF7),
(0x65, 0x03), (0x66, 0xD9), (0x67, 0x03), (0x68, 0x01), (0x69, 0xC8),
(0x6A, 0x40), (0x6D, 0x04), (0x6E, 0x00), (0x6F, 0x00), (0x70, 0x80),
(0x71, 0x00), (0x72, 0x00), (0x73, 0x00), (0x74, 0xF0), (0x75, 0x00),
(0x80, 0x42), (0x81, 0x44), (0x82, 0x04), (0x83, 0x20), (0x84, 0x20),
(0x85, 0x00), (0x86, 0x10), (0x87, 0x00), (0x88, 0x05), (0x89, 0x18),
(0x8A, 0x10), (0x8B, 0x01), (0x8C, 0x37), (0x8D, 0x00), (0x8E, 0xF0),
(0x8F, 0x81), (0x90, 0x06), (0x91, 0x06), (0x92, 0x1E), (0x93, 0x0D),
(0x94, 0x0A), (0x95, 0x0A), (0x96, 0x0C), (0x97, 0x05), (0x98, 0x0A),
(0x99, 0x41), (0x9A, 0x14), (0x9B, 0x0A), (0x9C, 0x3F), (0x9D, 0x33),
(0x9E, 0xAE), (0x9F, 0xF9), (0xA0, 0x48), (0xA1, 0x13), (0xA2, 0x10),
(0xA3, 0x08), (0xA4, 0x30), (0xA5, 0x19), (0xA6, 0x10), (0xA7, 0x08),
(0xA8, 0x24), (0xA9, 0x04), (0xAA, 0x1E), (0xAB, 0x1E), (0xCC, 0x19),
(0xCD, 0x0B), (0xCE, 0x13), (0xCF, 0x64), (0xD0, 0x21), (0xD1, 0x0F),
(0xD2, 0x88), (0xE0, 0x01), (0xE1, 0x04), (0xE2, 0x41), (0xE3, 0xD6),
(0xE4, 0x00), (0xE5, 0x0C), (0xE6, 0x0A), (0xE7, 0x00), (0xE8, 0x00),
(0xE9, 0x00), (0xEE, 0x07),
]
_INIT_BANK1 = [
(0x00, 0x1E), (0x01, 0x1E), (0x02, 0x0F), (0x03, 0x10), (0x04, 0x02),
(0x05, 0x00), (0x06, 0xB0), (0x07, 0x04), (0x08, 0x0D), (0x09, 0x0E),
(0x0A, 0x9C), (0x0B, 0x04), (0x0C, 0x05), (0x0D, 0x0F), (0x0E, 0x02),
(0x0F, 0x12), (0x10, 0x02), (0x11, 0x02), (0x12, 0x00), (0x13, 0x01),
(0x14, 0x05), (0x15, 0x07), (0x16, 0x05), (0x17, 0x07), (0x18, 0x01),
(0x19, 0x04), (0x1A, 0x05), (0x1B, 0x0C), (0x1C, 0x2A), (0x1D, 0x01),
(0x1E, 0x00), (0x21, 0x00), (0x22, 0x00), (0x23, 0x00), (0x25, 0x01),
(0x26, 0x00), (0x27, 0x39), (0x28, 0x7F), (0x29, 0x08), (0x30, 0x03),
(0x31, 0x00), (0x32, 0x1A), (0x33, 0x1A), (0x34, 0x07), (0x35, 0x07),
(0x36, 0x01), (0x37, 0xFF), (0x38, 0x36), (0x39, 0x07), (0x3A, 0x00),
(0x3E, 0xFF), (0x3F, 0x00), (0x40, 0x77), (0x41, 0x40), (0x42, 0x00),
(0x43, 0x30), (0x44, 0xA0), (0x45, 0x5C), (0x46, 0x00), (0x47, 0x00),
(0x48, 0x58), (0x4A, 0x1E), (0x4B, 0x1E), (0x4C, 0x00), (0x4D, 0x00),
(0x4E, 0xA0), (0x4F, 0x80), (0x50, 0x00), (0x51, 0x00), (0x52, 0x00),
(0x53, 0x00), (0x54, 0x00), (0x57, 0x80), (0x59, 0x10), (0x5A, 0x08),
(0x5B, 0x94), (0x5C, 0xE8), (0x5D, 0x08), (0x5E, 0x3D), (0x5F, 0x99),
(0x60, 0x45), (0x61, 0x40), (0x63, 0x2D), (0x64, 0x02), (0x65, 0x96),
(0x66, 0x00), (0x67, 0x97), (0x68, 0x01), (0x69, 0xCD), (0x6A, 0x01),
(0x6B, 0xB0), (0x6C, 0x04), (0x6D, 0x2C), (0x6E, 0x01), (0x6F, 0x32),
(0x71, 0x00), (0x72, 0x01), (0x73, 0x35), (0x74, 0x00), (0x75, 0x33),
(0x76, 0x31), (0x77, 0x01), (0x7C, 0x84), (0x7D, 0x03), (0x7E, 0x01),
]
# ==================
# Estat global
# ==================
_bus: smbus2.SMBus = None
_gesture: int = GS_NONE
_gesture_lock = threading.Lock()
_poll_stop = threading.Event()
# ==================
# Helpers I2C
# ==================
def _select_bank(bank: int):
_bus.write_byte_data(_PAJ7620_ADDR, _REG_BANK_SEL, bank)
def _write(reg: int, val: int):
_bus.write_byte_data(_PAJ7620_ADDR, reg, val)
def _read(reg: int) -> int:
return _bus.read_byte_data(_PAJ7620_ADDR, reg)
# ==================
# Setup
# ==================
def gesture_setup():
"""
Inicialitza el PAJ7620U2 via I2C raw (smbus2, bus 3).
Arrenca el thread de polling (equivalent al ISR del C++).
"""
global _bus
_bus = smbus2.SMBus(3)
# Desperta el sensor (primer accés I2C)
try:
_bus.write_byte(_PAJ7620_ADDR, 0)
except Exception:
pass
time.sleep(0.001) # 700µs d'espera per wake-up
# Verifica ID del dispositiu
_select_bank(0)
id_lsb = _read(_REG_PART_ID_LSB)
id_msb = _read(_REG_PART_ID_MSB)
if id_lsb != 0x20 or id_msb != 0x76:
print(f"ERROR: PAJ7620U2 NOT FOUND (ID: {id_msb:02X}{id_lsb:02X})")
else:
print("Gesture sensor init OK")
# Escriu registres d'inicialització (banc 0)
_select_bank(0)
for reg, val in _INIT_BANK0:
_write(reg, val)
# Escriu registres d'inicialització (banc 1)
_select_bank(1)
for reg, val in _INIT_BANK1:
_write(reg, val)
# Torna al banc 0 per a la lectura de gestos
_select_bank(0)
# Arrenca el thread de polling
_poll_stop.clear()
threading.Thread(target=_poll_loop, daemon=True, name="gesture").start()
def gesture_cleanup():
"""Atura el polling i tanca el bus I2C."""
_poll_stop.set()
if _bus:
_bus.close()
# ==================
# Thread de polling (equivalent al ISR + flag del C++)
# ==================
def _poll_loop():
"""
Llegeix els registres de gest cada 50ms.
Equivalent a on_gesture_interrupt() + gesture_available flag del C++.
"""
global _gesture
while not _poll_stop.is_set():
try:
g0 = _read(_REG_GESTURE_0)
g1 = _read(_REG_GESTURE_1)
detected = GS_NONE
if g0 & _BIT_FORWARD: detected = GS_FORWARD
elif g0 & _BIT_LEFT: detected = GS_LEFT
elif g0 & _BIT_RIGHT: detected = GS_RIGHT
elif g0 & _BIT_UP: detected = GS_UP
elif g0 & _BIT_DOWN: detected = GS_DOWN
elif g0 & _BIT_CW: detected = GS_CLOCKWISE
elif g0 & _BIT_CCW: detected = GS_ANTICLOCKWISE
elif g1 & _BIT_WAVE: detected = GS_WAVE
if detected != GS_NONE:
with _gesture_lock:
_gesture = detected
except Exception:
pass # error I2C puntual → ignora i continua
time.sleep(0.05) # 50ms de polling (20Hz)
# ==================
# Lectura de gest
# ==================
def read_gesture() -> int:
"""
Retorna l'últim gest detectat i el reseteja a GS_NONE.
No bloquejant — equivalent a read_gesture() del C++.
"""
global _gesture
with _gesture_lock:
gest = _gesture
_gesture = GS_NONE
return gest

View File

@@ -1,210 +0,0 @@
from fastapi import FastAPI, File, Form, UploadFile, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
import subprocess
import threading
import time
import os
import json
import uuid
import hashlib
from pathlib import Path
from pydantic import BaseModel
import RPi.GPIO as GPIO
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
INCOMING_DIR = Path("/tmp/quibot-audio/incoming")
LOCKED_DIR = Path("/tmp/quibot-audio/locked")
PROCESSED_DIR = Path("/tmp/quibot-audio/processed")
INCOMING_DIR.mkdir(parents=True, exist_ok=True)
LOCKED_DIR.mkdir(parents=True, exist_ok=True)
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
# -------------------------
# GPIO SETUP
# -------------------------
STEP = 23
DIR = 24
EN = 25
GPIO.setmode(GPIO.BCM)
GPIO.setup(STEP, GPIO.OUT)
GPIO.setup(DIR, GPIO.OUT)
GPIO.setup(EN, GPIO.OUT)
GPIO.output(EN, GPIO.LOW)
motor_thread = None
def step_motor(steps, direction, delay=0.001):
GPIO.output(DIR, direction)
for _ in range(steps):
GPIO.output(STEP, GPIO.HIGH)
time.sleep(delay)
GPIO.output(STEP, GPIO.LOW)
time.sleep(delay)
def motor_step(dir):
dir_pin = GPIO.HIGH if dir == "forward" else GPIO.LOW
time.sleep(0.02) # small delay before starting
print("Motor running...")
step_motor(200, dir_pin, 0.001)
# -------------------------
# SAFE COMMAND WHITELIST
# -------------------------
COMMANDS = {
"restart_nginx": ["sudo", "systemctl", "restart", "nginx"],
"uptime": ["uptime"],
"update": ["sudo", "apt", "update"]
}
# -------------------------
# API ENDPOINTS
# -------------------------
@app.post("/run")
def run_task(task: str, token: str):
if token != "MY_SECRET_TOKEN":
raise HTTPException(status_code=403, detail="Unauthorized")
if task not in COMMANDS:
raise HTTPException(status_code=400, detail="Invalid task")
try:
result = subprocess.check_output(COMMANDS[task], text=True)
return {"output": result}
except subprocess.CalledProcessError as e:
return {"error": e.output}
@app.post("/motor/step/forward")
def start_motor(token: str):
global motor_thread
if token != "MY_SECRET_TOKEN":
raise HTTPException(status_code=403, detail="Unauthorized")
motor_thread = threading.Thread(target=motor_step, args=("forward",), daemon=True)
motor_thread.start()
return {"status": "motor started"}
@app.post("/motor/step/backwards")
def start_motor(token: str):
global motor_thread
if token != "MY_SECRET_TOKEN":
raise HTTPException(status_code=403, detail="Unauthorized")
motor_thread = threading.Thread(target=motor_step, args=("backwards",), daemon=True)
motor_thread.start()
return {"status": "motor started"}
@app.post("/motor/stop")
def stop_motor(token: str):
if token != "MY_SECRET_TOKEN":
raise HTTPException(status_code=403, detail="Unauthorized")
GPIO.output(EN, GPIO.HIGH) # disable driver
return {"status": "motor stopped"}
@app.post("/audio/upload")
async def upload_audio(file: UploadFile = File(...), format: str = "wav"):
raw_content = await file.read()
checksum = hashlib.sha256(raw_content).hexdigest()[:16]
filename = f"{checksum[:10]}-{uuid.uuid4().hex[:8]}.wav"
filepath = INCOMING_DIR / filename
filepath.write_bytes(raw_content)
return {"status": "received", "filename": str(filepath), "lock_url": f"/audio/lock/{filepath.name}"}
@app.get("/audio/incoming")
def list_incoming():
files = []
for f in sorted(INCOMING_DIR.iterdir()):
meta = f.stat()
files.append({
"filename": f.name,
"size_bytes": meta.st_size,
"modified_iso": time.ctime(meta.st_mtime),
})
return {"count": len(files), "files": files}
@app.post("/audio/lock/{filename}")
def lock_audio(filename: str):
src = INCOMING_DIR / filename
dst = LOCKED_DIR / filename
if not src.exists():
raise HTTPException(status_code=404, detail=f"File {filename} not found")
if dst.exists():
return {"status": "already_locked", "filename": filename}
os.rename(str(src), str(dst))
return {"status": "locked", "filename": filename}
@app.post("/audio/unlock/{filename}")
def unlock_audio(filename: str):
src = LOCKED_DIR / filename
dst = INCOMING_DIR / filename
if not src.exists():
raise HTTPException(status_code=404, detail=f"File {filename} not found")
os.rename(str(src), str(dst))
return {"status": "unlocked", "filename": filename}
@app.post("/audio/cancel/{filename}")
def cancel_audio(filename: str):
src = LOCKED_DIR / filename
dst = INCOMING_DIR / filename
if not src.exists():
raise HTTPException(status_code=404, detail=f"File {filename} not found")
os.rename(str(src), str(dst))
return {"status": "cancelled", "filename": filename}
@app.post("/audio/process/{filename}")
def process_audio(filename: str):
locked = LOCKED_DIR / filename
processed = PROCESSED_DIR / filename
if not locked.exists():
raise HTTPException(status_code=404, detail=f"File {filename} not found")
os.rename(str(locked), str(processed))
return {"status": "processed", "filename": filename}
@app.on_event("shutdown")
def shutdown():
global motor_running
motor_running = False
GPIO.output(EN, GPIO.HIGH)
GPIO.cleanup()

View File

@@ -1,619 +0,0 @@
"""
motion.py — Control de motors (rodes, braços, xeringa), sensor de distància
VL53L0X i seguidor de línia TCRT5000 via ADS1115.
Equivalent a motion.cpp del codi Arduino/ESP32.
"""
import math
import time
import threading
import pigpio
import adafruit_extended_bus
import adafruit_vl53l0x
import adafruit_ads1x15.ads1115 as ADS
from adafruit_ads1x15.analog_in import AnalogIn
from pins import (
STEP_R_W, DIR_R_W, STEP_L_W, DIR_L_W, EN_W,
STEP_R_A, DIR_R_A, STEP_L_A, DIR_L_A, EN_A,
STEP_SY, DIR_SY, EN_SERVO,
END_SY, END_RA, END_LA,
)
# ==================
# Constants
# ==================
CW = True
CCW = False
ON = 0 # A4988/TB6600: enable actiu en LOW
OFF = 1
TAKE = True
LEAVE = False
# Tipus de resposta del seguidor de línia
CLEAR = 0
CROSSING = 1
OBJECT = 2
# Noms de tasques (equivalent a les constants string del C++)
MOVE_TO_CROSSING = "Move to crossing"
TURN_90_CW = "Turn 90 CW"
TURN_90_CCW = "Turn 90 CCW"
MOVE_TO_OBJECT = "Move to object"
TAKE_SOMETHING = "Take something"
LEAVE_SOMETHING = "Leave something"
DO_NOTHING = "Do nothing"
# Paràmetres de moviment
WHEELS_MAX_SPEED = 130.0 # steps/s
WHEELS_ACCEL = 190.0 # steps/s²
ARMS_MAX_SPEED = 250.0 # steps/s
ARMS_ACCEL = 125.0 # steps/s²
SYRINGE_MAX_SPEED = 800.0 # steps/s
SYRINGE_ACCEL = 500.0 # steps/s²
WHEEL_MECH_REDUCTION = 5
WHEEL_STEPS_PER_REVOLUTION = 200 * WHEEL_MECH_REDUCTION # 1000 passos/volta de roda
MM_TO_CROSSING_CENTER = 62 # mm des del creuament detectat fins al centre
MM_TO_OBJECT = 20 # mm addicionals un cop detectat l'objecte
# Llindar de negre per ADS1115 (GAIN_ONE ±4.096V, single-ended 026400 per a 3.3V).
# Equivalent a 1500/4095 de l'ESP32 de 12 bits → ~9700 en ADS1115.
BLACK_THRESHOLD = 9700
# Llindar d'error de rotació (equivalent a 20/4095 de l'ESP32 → ~130 en ADS1115).
ROTATION_ERROR_THRESHOLD = 130
# Posicions dels braços en passos des del home
ARM_LOWER_POSITION = 120
ARM_L_UPPER_POSITION = 900
ARM_R_UPPER_POSITION = 550
# Xeringa: 10 rev * 200 passos/rev * microstepping x4 = 8000 passos estesa del tot
_SY_FULL_EXTENDED_STEPS = 10 * 200 * 4
LINE_FOLLOWER_FREQ = 100 # Hz
LINE_FOLLOWER_PERIOD = 1.0 / LINE_FOLLOWER_FREQ # s
# ==================
# Classe Stepper
# ==================
class Stepper:
"""
Motor pas a pas en mode DRIVER (STEP/DIR).
Equivalent a AccelStepper(DRIVER, step_pin, dir_pin).
Genera polsos STEP via pigpio.gpio_trigger().
"""
PULSE_US = 10 # Amplada del pols STEP en µs (A4988 requereix ≥1µs)
def __init__(self, pi: pigpio.pi, step_pin: int, dir_pin: int):
self._pi = pi
self._step_pin = step_pin
self._dir_pin = dir_pin
self._pos = 0 # posició actual (passos)
self._target = 0 # posició objectiu
self._speed = 0.0 # velocitat actual (passos/s, signada)
self._max_speed = 1.0
self._accel = 1.0
self._last_step_us = self._now_us()
self._step_interval_us = 0 # 0 = aturat
pi.set_mode(step_pin, pigpio.OUTPUT)
pi.set_mode(dir_pin, pigpio.OUTPUT)
pi.write(step_pin, 0)
pi.write(dir_pin, 0)
@staticmethod
def _now_us() -> int:
return time.monotonic_ns() // 1000
def set_max_speed(self, speed: float):
self._max_speed = abs(speed)
def set_acceleration(self, accel: float):
self._accel = abs(accel)
def move_to(self, position: int):
self._target = int(position)
def move(self, relative: int):
self._target = self._pos + int(relative)
def set_current_position(self, pos: int):
self._pos = int(pos)
self._target = int(pos)
self._speed = 0.0
self._step_interval_us = 0
def current_position(self) -> int:
return self._pos
def distance_to_go(self) -> int:
return self._target - self._pos
def is_running(self) -> bool:
return self._target != self._pos
def stop(self):
self._target = self._pos
self._speed = 0.0
self._step_interval_us = 0
def set_speed(self, speed: float):
"""Estableix velocitat constant per a run_speed()."""
self._speed = float(speed)
self._step_interval_us = int(1_000_000 / abs(speed)) if speed != 0.0 else 0
def _do_step(self, direction: int):
self._pi.write(self._dir_pin, 1 if direction > 0 else 0)
self._pi.gpio_trigger(self._step_pin, self.PULSE_US, 1)
self._pos += direction
def run_speed(self) -> bool:
"""Fa un pas a velocitat constant. No bloquejant — cridar des del bucle de steppers."""
if self._step_interval_us == 0:
return False
now = self._now_us()
if now - self._last_step_us >= self._step_interval_us:
self._do_step(1 if self._speed > 0 else -1)
self._last_step_us = now
return True
return False
def run(self) -> bool:
"""
Fa un pas cap a _target amb acceleració/desacceleració.
No bloquejant — cridar des del bucle de steppers.
Implementa l'algorisme de AccelStepper: Δv = accel / v per pas.
"""
dtg = self.distance_to_go()
if dtg == 0:
self._speed = 0.0
self._step_interval_us = 0
return False
abs_speed = abs(self._speed)
if abs_speed < 1.0:
abs_speed = math.sqrt(self._accel / 2.0) # velocitat inicial AccelStepper
now = self._now_us()
if now - self._last_step_us < int(1_000_000 / abs_speed):
return False # no és hora del proper pas
# Actualitza la velocitat per al proper pas
direction = 1 if dtg > 0 else -1
stop_dist = (abs_speed ** 2) / (2.0 * self._accel) if self._accel > 0 else 0
if abs(dtg) <= max(stop_dist, 1):
new_speed = abs_speed - (self._accel / abs_speed)
new_speed = max(new_speed, 1.0)
else:
new_speed = abs_speed + (self._accel / abs_speed)
new_speed = min(new_speed, self._max_speed)
self._speed = new_speed * direction
self._do_step(direction)
self._last_step_us = now
return True
# ==================
# Instàncies globals
# ==================
_pi: pigpio.pi = None
_i2c = None
_dist_sensor = None
_ads = None
_chan_r: AnalogIn = None
_chan_l: AnalogIn = None
wheel_R: Stepper = None
wheel_L: Stepper = None
arm_R: Stepper = None
arm_L: Stepper = None
syringe: Stepper = None
wheels_speed_mode: bool = False # True → run_speed(), False → run() (posició)
_stepper_stop = threading.Event()
_stepper_thread: threading.Thread = None
# ==================
# Setup i cleanup
# ==================
def motion_setup_steppers(pi: pigpio.pi):
"""
Inicialitza GPIOs i steppers sense els sensors I2C.
Útil per a tests de motors quan VL53L0X/ADS1115 no estan connectats.
"""
global _pi, wheel_R, wheel_L, arm_R, arm_L, syringe, _stepper_thread
_pi = pi
for pin in (EN_W, EN_A, EN_SERVO):
pi.set_mode(pin, pigpio.OUTPUT)
for pin in (END_LA, END_RA, END_SY):
pi.set_mode(pin, pigpio.INPUT)
pi.set_pull_up_down(pin, pigpio.PUD_UP) # Hall actiu-baix, pull-up intern
enable_arms(OFF)
enable_wheels(OFF)
wheel_R = Stepper(pi, STEP_R_W, DIR_R_W)
wheel_L = Stepper(pi, STEP_L_W, DIR_L_W)
arm_R = Stepper(pi, STEP_R_A, DIR_R_A)
arm_L = Stepper(pi, STEP_L_A, DIR_L_A)
syringe = Stepper(pi, STEP_SY, DIR_SY)
wheel_R.set_max_speed(WHEELS_MAX_SPEED); wheel_R.set_acceleration(WHEELS_ACCEL)
wheel_L.set_max_speed(WHEELS_MAX_SPEED); wheel_L.set_acceleration(WHEELS_ACCEL)
arm_R.set_max_speed(ARMS_MAX_SPEED); arm_R.set_acceleration(ARMS_ACCEL)
arm_L.set_max_speed(ARMS_MAX_SPEED); arm_L.set_acceleration(ARMS_ACCEL)
syringe.set_max_speed(SYRINGE_MAX_SPEED); syringe.set_acceleration(SYRINGE_ACCEL)
_stepper_stop.clear()
_stepper_thread = threading.Thread(target=_stepper_loop, daemon=True, name="steppers")
_stepper_thread.start()
def motion_setup_sensors(pi: pigpio.pi):
"""
Inicialitza únicament els sensors I2C (VL53L0X, ADS1115). Sense steppers.
Útil per a tests de sensors quan els motors no estan connectats.
"""
global _pi, _i2c, _dist_sensor, _ads, _chan_r, _chan_l
_pi = pi
_i2c = adafruit_extended_bus.ExtendedI2C(3)
_dist_sensor = adafruit_vl53l0x.VL53L0X(_i2c)
_ads = ADS.ADS1115(_i2c)
_chan_r = AnalogIn(_ads, ADS.P0)
_chan_l = AnalogIn(_ads, ADS.P1)
def motion_setup(pi: pigpio.pi):
"""
Inicialitza GPIOs, steppers, sensor de distància VL53L0X,
ADC ADS1115 per als sensors de línia, i arrenca el bucle de steppers.
Requereix /boot/config.txt: dtoverlay=i2c-gpio,bus=3,i2c_gpio_sda=2,i2c_gpio_scl=1
"""
motion_setup_steppers(pi)
motion_setup_sensors(pi)
def motion_cleanup():
"""Atura el bucle de steppers i desactiva tots els motors."""
_stepper_stop.set()
if _stepper_thread:
_stepper_thread.join(timeout=1.0)
enable_wheels(OFF)
enable_arms(OFF)
enable_syringe(OFF)
# ==================
# Enable / disable
# ==================
def enable_wheels(state: bool):
_pi.write(EN_W, state)
def enable_arms(state: bool):
_pi.write(EN_A, state)
def enable_syringe(state: bool):
_pi.write(EN_SERVO, state)
def is_endstop_detecting(pin: int) -> bool:
return not _pi.read(pin) # efecte Hall actiu en baix
# ==================
# Helpers de moviment
# ==================
def mm_to_steps(mm: int) -> int:
# Perímetre roda Ø152mm = 2·π·76 ≈ 477mm
return (mm * WHEEL_STEPS_PER_REVOLUTION * 2) // 1000
def wheels_set_position(position: int = 0):
wheel_L.set_current_position(position)
wheel_R.set_current_position(position)
def wheels_set_speed(speed: float, rotate: bool = False, direction: bool = CW):
if rotate:
wheel_L.set_speed( speed if direction == CW else -speed)
wheel_R.set_speed(-speed if direction == CW else speed)
else:
wheel_L.set_speed(speed)
wheel_R.set_speed(speed)
def move_arms_to(position: int):
arm_L.move_to(position)
arm_R.move_to(position)
while arm_L.distance_to_go() != 0 and arm_R.distance_to_go() != 0:
time.sleep(0.1)
def move_arms_up():
arm_L.move_to(ARM_L_UPPER_POSITION)
arm_R.move_to(ARM_R_UPPER_POSITION)
while arm_L.distance_to_go() != 0 and arm_R.distance_to_go() != 0:
time.sleep(0.1)
def move_wheels_to(position: int, invert: bool = False):
wheel_L.move_to(position)
wheel_R.move_to(-position if invert else position)
while wheel_L.is_running() and wheel_R.is_running():
time.sleep(0.1)
# ==================
# Homing
# ==================
def arms_home():
"""Cicle de homing dels braços (bloquejant). El bucle de steppers fa el moviment."""
enable_arms(ON)
arm_L.move(-1250)
arm_R.move(-1250)
while True:
if is_endstop_detecting(END_LA):
arm_L.stop()
if is_endstop_detecting(END_RA):
arm_R.stop()
l_done = is_endstop_detecting(END_LA) or arm_L.distance_to_go() == 0
r_done = is_endstop_detecting(END_RA) or arm_R.distance_to_go() == 0
if l_done and r_done:
break
time.sleep(0.005)
arm_L.set_current_position(0)
arm_R.set_current_position(0)
arm_L.move(ARM_L_UPPER_POSITION)
arm_R.move(ARM_R_UPPER_POSITION)
while arm_L.distance_to_go() != 0 or arm_R.distance_to_go() != 0:
time.sleep(0.01)
def syringe_home():
"""Cicle de homing de la xeringa (bloquejant). El bucle de steppers fa el moviment."""
enable_syringe(ON)
syringe.move(-11000)
while not is_endstop_detecting(END_SY):
if syringe.distance_to_go() == 0:
break
time.sleep(0.005)
syringe.stop()
syringe.set_current_position(0)
enable_syringe(OFF)
# ==================
# Sensor de distància
# ==================
def distance_to_object() -> int:
"""Retorna la distància en mm a l'objecte més proper. 65535 si fora de rang."""
try:
return _dist_sensor.range
except Exception:
return 65535
# ==================
# Seguidor de línia
# ==================
def _read_line_sensors() -> tuple:
"""Llegeix sensors de línia via ADS1115. Retorna (dreta, esquerra), 032767."""
return _chan_r.value, _chan_l.value
def compute_new_speed(speed: float) -> float:
accel = WHEELS_ACCEL / LINE_FOLLOWER_FREQ
return min(speed + accel, WHEELS_MAX_SPEED)
def follow_line_loop(speed: float, forward: bool = True) -> int:
"""
Seguidor de línia no bloquejant. Retorna CLEAR, CROSSING o OBJECT.
Si CLEAR, aplica correcció proporcional a les velocitats de les rodes.
"""
distance_threshold = 50 # mm
lf_r, lf_l = _read_line_sensors()
if lf_l > BLACK_THRESHOLD and lf_r > BLACK_THRESHOLD and forward:
return CROSSING
elif distance_to_object() < distance_threshold and forward:
return OBJECT
else:
p_factor = 5
error = lf_r - lf_l
correction_r = error // p_factor
correction_l = -correction_r
wheel_L.set_speed((speed + correction_r) if forward else (-speed + correction_r))
wheel_R.set_speed((speed + correction_l) if forward else (-speed + correction_l))
return CLEAR
def run_to_crossing_center(speed: float) -> float:
"""
Avança per centrar el robot sobre el creuament detectat.
Retorna la nova velocitat (reduïda a la meitat).
"""
wheels_set_position(0 - mm_to_steps(MM_TO_CROSSING_CENTER))
while follow_line_loop(speed) == CROSSING:
wheels_set_speed(speed)
time.sleep(0.1)
speed /= 2
while wheel_L.current_position() < 0 and wheel_R.current_position() < 0:
wheels_set_speed(speed)
time.sleep(0.1)
wheels_set_speed(0)
return speed
# ==================
# Bucle de steppers (thread permanent)
# ==================
def _stepper_loop():
"""
Thread que genera els polsos STEP per a tots els motors.
Equivalent a task_update_steppers() del FreeRTOS.
S'inicia automàticament a motion_setup().
"""
while not _stepper_stop.is_set():
if wheels_speed_mode:
wheel_R.run_speed()
wheel_L.run_speed()
else:
wheel_R.run()
wheel_L.run()
arm_R.run()
arm_L.run()
syringe.run()
# ==================
# Tasques (FreeRTOS tasks → funcions bloquejants, cridades des de threads)
# ==================
def task_move_to(expected_target: int):
"""
Segueix la línia fins arribar a expected_target (CROSSING o OBJECT).
Equivalent a task_move_to() del FreeRTOS.
"""
global wheels_speed_mode
speed = 0.0
wheels_speed_mode = True
enable_wheels(ON)
lf_response = CLEAR
while True:
speed = compute_new_speed(speed)
lf_response = follow_line_loop(speed)
if lf_response != CLEAR:
break
time.sleep(LINE_FOLLOWER_PERIOD)
if lf_response == CROSSING and expected_target == CROSSING:
run_to_crossing_center(speed)
elif lf_response == OBJECT and expected_target == OBJECT:
wheel_L.stop()
wheel_R.stop()
wheels_speed_mode = False
enable_wheels(OFF)
def task_rotate(direction: bool):
"""
Gira el robot 90° (CW o CCW).
Fase 1: acceleració fins al 90% dels passos de rotació.
Fase 2: desacceleració i ajust fi sobre la línia via sensors analògics.
"""
global wheels_speed_mode
# Arc 90° a Ø250mm → 2·π·125/4 ≈ 196mm; roda Ø152mm → 196/477 ≈ 0.41 voltes
ROTATION_STEPS = (WHEEL_STEPS_PER_REVOLUTION * 42) // 100
positive_wheel = wheel_L if direction == CW else wheel_R
speed = 0.0
wheels_speed_mode = True
enable_wheels(ON)
wheels_set_position(0)
while positive_wheel.current_position() < (ROTATION_STEPS * 90) // 100:
speed = compute_new_speed(speed)
wheels_set_speed(speed, rotate=True, direction=direction)
time.sleep(0.01)
speed /= 2
while True:
lf_r, lf_l = _read_line_sensors()
error = abs(lf_l - lf_r)
if error < ROTATION_ERROR_THRESHOLD or \
positive_wheel.current_position() > (ROTATION_STEPS * 110) // 100:
break
wheels_set_speed(speed - (speed / error), rotate=True, direction=direction)
time.sleep(0.01)
wheels_set_speed(0)
enable_wheels(OFF)
wheels_speed_mode = False
def task_take_or_leave_something(take: bool):
"""
Avança fins a l'objecte, baixa braços, opera la xeringa i torna al creuament.
take=True → xucla; take=False → buida.
"""
global wheels_speed_mode
speed = 0.0
wheels_speed_mode = True
enable_wheels(ON)
wheels_set_position(0) # guardem la posició home per tornar-hi
lf_response = CLEAR
while True:
speed = compute_new_speed(speed)
lf_response = follow_line_loop(speed)
if lf_response != CLEAR:
break
time.sleep(LINE_FOLLOWER_PERIOD)
if lf_response == OBJECT:
target_l = wheel_L.current_position() + mm_to_steps(MM_TO_OBJECT)
target_r = wheel_R.current_position() + mm_to_steps(MM_TO_OBJECT)
while wheel_L.current_position() < target_l and wheel_R.current_position() < target_r:
wheels_set_speed(speed / 2)
time.sleep(0.1)
wheels_set_speed(0)
enable_wheels(OFF)
enable_arms(ON)
move_arms_to(ARM_LOWER_POSITION)
enable_syringe(ON)
syringe.move_to(_SY_FULL_EXTENDED_STEPS if take else 0)
while syringe.distance_to_go() != 0:
if not take and is_endstop_detecting(END_SY):
syringe.stop()
syringe.set_current_position(0)
break
time.sleep(0.1)
enable_syringe(OFF)
move_arms_up()
enable_wheels(ON)
wheels_speed_mode = False
wheels_set_speed(WHEELS_MAX_SPEED)
move_wheels_to(0)
elif lf_response == CROSSING:
run_to_crossing_center(speed)
wheels_speed_mode = False
enable_wheels(OFF)
def task_idle():
"""Pausa breu fins que quibot.py assigni una nova tasca."""
time.sleep(0.5)

View File

@@ -1,84 +0,0 @@
"""
pins.py — Definició de pins GPIO (BCM) de la Raspberry Pi Zero 2W.
Equivalent a io.h del codi Arduino/ESP32.
Tots els números fan referència a la numeració BCM.
"""
# ==================
# MOTORS
# ==================
# Servo d'expulsió de blocs
EN_SERVO = 9
SERVO_PWM = 10
# Motor pas a pas xeringa
STEP_SY = 8
DIR_SY = 5
# Motor pas a pas roda dreta
STEP_R_W = 25
DIR_R_W = 23
# Motor pas a pas roda esquerra
STEP_L_W = 7
DIR_L_W = 11
# Enable motors de rodes (compartit)
EN_W = 6
# Motor pas a pas braç dret
STEP_R_A = 3
DIR_R_A = 4
# Motor pas a pas braç esquerre
STEP_L_A = 13
DIR_L_A = 0
# Enable motors de braços (compartit)
EN_A = 21
# ==================
# SENSORS
# ==================
# Bus I2C principal — VL53L0X (distància) i PAJ7620U2 (gestos), compartit
SDA_DIST = 2
SCL_DIST = 1
SDA_GEST = SDA_DIST # mateixa línia
SCL_GEST = SCL_DIST # mateixa línia
# INT_GEST no connectat a la PCB — el driver usa polling
# Bus I2C sensor de color TCS34725 (bit-bang)
SDA_COL = 22
SCL_COL = 27
# Final de carrera xeringa (efecte Hall)
END_SY = 12
# Final de carrera braç dret (efecte Hall)
END_RA = 16
# Final de carrera braç esquerre (efecte Hall)
END_LA = 17
# Sensors seguidors de línia (TCRT5000)
LINES_R = 14
LINES_L = 15
# ==================
# DISPLAY
# ==================
# Dades matriu LED 8x8 RGB WS2811 (2x ull)
LED_DATA = 26
# ==================
# ÀUDIO (afegit per company, no usat pel robot)
# ==================
# I2S — amplificador MAX98357A + micròfon SPH0645
I2C_BCLK = 18
I2C_LRCLK = 19
AMP_DIN = 24
MIC = 20

View File

@@ -1,286 +0,0 @@
"""
quibot.py — Programa principal del robot QuiBot H2O.
Inicialitza tots els mòduls, executa el homing i arrenca els threads.
Equivalent a QuiBot.ino del codi Arduino/ESP32.
Threads permanents:
- task_read_blocks → llegeix blocs i executa accions (aquest fitxer)
- task_read_gestures → llegeix gestos i executa accions (aquest fitxer)
- _stepper_loop → genera polsos STEP dels motors (motion.py)
- _task_update_leds → parpelleig dels LEDs (eyes.py)
- _poll_loop → polling del sensor de gestos (gesture.py)
"""
import time
import threading
import signal
import sys
import pigpio
from motion import (
motion_setup, motion_cleanup,
arms_home, syringe_home,
task_move_to, task_rotate, task_take_or_leave_something, task_idle,
distance_to_object,
CROSSING, TAKE, LEAVE, CW, CCW,
)
from blocks import (
blocks_setup,
read_block_color, servo_move_to,
OPEN_POSITION, EJECT_POSITION,
BK, RD, GN, BU, YE, OG, VT,
)
from eyes import (
eyes_setup, eyes_cleanup,
eyes_turn_on, eyes_turn_off,
eyes_gesture_mode_on, eyes_gesture_mode_off,
EYES_OPEN, EYES_FW, EYES_DOWN,
RED, GREEN, BLUE, YELLOW, ORANGE, CYAN,
)
from gesture import (
gesture_setup, gesture_cleanup,
read_gesture,
GS_NONE, GS_FORWARD, GS_LEFT, GS_RIGHT,
GS_UP, GS_DOWN, GS_CLOCKWISE, GS_ANTICLOCKWISE, GS_WAVE,
)
# ==================
# Colors addicionals (equivalents CRGB del C++)
# ==================
GRAY = (128, 128, 128) # CRGB::Gray → estat normal
DARK_RED = (139, 0, 0) # CRGB::DarkRed → avançar
# ==================
# Timeouts
# ==================
INSERT_BLOCK_MS = 2.0 # s — espera que l'infant insereixi el bloc
EJECT_BLOCK_MS = 2.0 # s — espera que el bloc caigui
CHECK_COLOR_MS = 1.0 # s — interval entre lectures de color
# ==================
# Estat global
# ==================
_pi: pigpio.pi = None
# Mutex que evita que tasca de blocs i tasca de gestos llancin accions simultànies.
# En el C++ original compartien TaskHandle sense mutex explícit (possible race condition).
# Aquí ho fem correctament.
_action_lock = threading.Lock()
_shutdown_event = threading.Event()
_gesture_mode_active = False # False = mode blocs, True = mode gestos
_mode_lock = threading.Lock()
# ==================
# Helper d'execució d'accions
# ==================
def _execute_action(fn, *args):
"""
Adquireix el lock d'acció, executa fn(*args) de forma bloquejant i l'allibera.
Garanteix que mai s'executen dues accions de moviment simultànies.
"""
with _action_lock:
fn(*args)
# ==================
# Tasca de blocs
# ==================
def task_read_blocks():
"""
Llegeix blocs contínuament, executa l'acció corresponent al color i expulsa el bloc.
Equivalent a task_read_blocks() del FreeRTOS.
"""
while not _shutdown_event.is_set():
eyes_state = False
eyes_turn_on(EYES_OPEN, GRAY)
# Obre el servo per permetre la inserció del bloc
servo_move_to(OPEN_POSITION)
time.sleep(INSERT_BLOCK_MS)
# Espera que hi hagi un bloc i llegeix el seu color
color_id = BK
while not _shutdown_event.is_set():
if distance_to_object() < 80:
# Objecte a menys de 80mm — esperem que es retiri
if not eyes_state:
eyes_turn_on(EYES_DOWN, GRAY)
eyes_state = True
else:
color_id = read_block_color()
if eyes_state:
eyes_turn_on(EYES_OPEN, GRAY, 1, False)
eyes_state = False
if color_id != BK:
break
time.sleep(CHECK_COLOR_MS)
# Si estem en mode gestos, ignora el bloc
with _mode_lock:
if _gesture_mode_active:
continue
# Executa l'acció corresponent al color del bloc
if color_id == RD:
eyes_turn_on(EYES_FW, DARK_RED, 2)
time.sleep(1.0)
_execute_action(task_move_to, CROSSING)
elif color_id == GN:
eyes_turn_on(EYES_OPEN, GREEN, 2)
time.sleep(1.0)
_execute_action(task_rotate, CW)
elif color_id == BU:
eyes_turn_on(EYES_OPEN, BLUE, 2)
time.sleep(1.0)
_execute_action(task_rotate, CCW)
elif color_id == YE:
eyes_turn_on(EYES_OPEN, YELLOW, 2)
time.sleep(1.0)
_execute_action(task_take_or_leave_something, TAKE)
elif color_id == OG:
eyes_turn_on(EYES_OPEN, ORANGE, 2)
time.sleep(1.0)
_execute_action(task_take_or_leave_something, LEAVE)
elif color_id == VT:
_execute_action(task_idle)
eyes_turn_on(EYES_OPEN, GRAY)
# Expulsa el bloc
servo_move_to(EJECT_POSITION)
time.sleep(EJECT_BLOCK_MS)
# ==================
# Tasca de gestos
# ==================
def task_read_gestures():
"""
Llegeix gestos contínuament.
GS_WAVE activa/desactiva el mode gestos.
Equivalent a task_read_gestures() del FreeRTOS.
"""
gesture_mode_active = False
while not _shutdown_event.is_set():
gesture_id = read_gesture()
# WAVE: toggle entre mode blocs i mode gestos
if gesture_id == GS_WAVE:
with _mode_lock:
_gesture_mode_active = not _gesture_mode_active
active = _gesture_mode_active
if active:
eyes_gesture_mode_on()
print("Gesture mode ON")
else:
eyes_gesture_mode_off()
print("Gesture mode OFF")
time.sleep(1.0)
continue
with _mode_lock:
active = _gesture_mode_active
if not active or gesture_id == GS_NONE:
time.sleep(0.1)
continue
# Executa l'acció corresponent al gest
if gesture_id == GS_FORWARD:
eyes_turn_on(EYES_FW, DARK_RED, 2)
_execute_action(task_move_to, CROSSING)
elif gesture_id == GS_RIGHT:
eyes_turn_on(EYES_OPEN, GREEN, 2)
_execute_action(task_rotate, CW)
elif gesture_id == GS_LEFT:
eyes_turn_on(EYES_OPEN, BLUE, 2)
_execute_action(task_rotate, CCW)
elif gesture_id == GS_UP:
eyes_turn_on(EYES_OPEN, YELLOW, 2)
_execute_action(task_take_or_leave_something, TAKE)
elif gesture_id == GS_DOWN:
eyes_turn_on(EYES_OPEN, ORANGE, 2)
_execute_action(task_take_or_leave_something, LEAVE)
elif gesture_id in (GS_CLOCKWISE, GS_ANTICLOCKWISE):
_execute_action(task_idle)
eyes_turn_on(EYES_OPEN, CYAN)
time.sleep(0.5)
# ==================
# Shutdown
# ==================
def _shutdown(sig, frame):
print("\nAturant QuiBot...")
_shutdown_event.set()
# ==================
# Main
# ==================
def main():
global _pi
# Connecta amb pigpiod (ha d'estar en marxa amb: sudo pigpiod -s 1)
_pi = pigpio.pi()
if not _pi.connected:
print("ERROR: No s'ha pogut connectar a pigpiod. Executa: sudo pigpiod -s 1")
sys.exit(1)
# Inicialitza tots els mòduls
blocks_setup(_pi)
motion_setup(_pi)
eyes_setup(_pi)
gesture_setup()
# Homing (bloquejant)
arms_home()
syringe_home()
# Registra els senyals de sortida
signal.signal(signal.SIGINT, _shutdown)
signal.signal(signal.SIGTERM, _shutdown)
# Arrenca els threads principals
t_blocks = threading.Thread(target=task_read_blocks, daemon=True, name="blocks")
t_gestures = threading.Thread(target=task_read_gestures, daemon=True, name="gestures")
t_blocks.start()
t_gestures.start()
print("QuiBot llest.")
# Espera senyal de sortida
_shutdown_event.wait()
# Cleanup ordenat
motion_cleanup()
eyes_cleanup()
gesture_cleanup()
_pi.stop()
print("QuiBot aturat.")
if __name__ == "__main__":
main()

View File

@@ -1,147 +0,0 @@
"""
test_blocks.py — Tests individuals del mòdul blocks.py.
Executa des del directori Rasp/: python tests/test_blocks.py
Descomenta la funció que vols provar al final del fitxer.
Assegura't que el venv està activat i pigpiod en marxa (sudo pigpiod -s 1).
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
import time
import pigpio
from blocks import (
blocks_setup,
servo_move_to, OPEN_POSITION, EJECT_POSITION, MIN_SERVO_US, MAX_SERVO_US,
read_block_color, read_color_raw,
_COLORS,
BK, RD, GN, BU, YE, OG, VT,
)
_COLOR_NAMES = {
BK: "Negre (BK)",
RD: "Vermell (RD)",
GN: "Verd (GN)",
BU: "Blau (BU)",
YE: "Groc (YE)",
OG: "Taronja (OG)",
VT: "Violeta (VT)",
}
def _setup():
pi = pigpio.pi()
if not pi.connected:
print("ERROR: pigpiod no està en marxa. Executa: sudo pigpiod -s 1")
sys.exit(1)
blocks_setup(pi)
return pi
def _teardown(pi):
pi.stop()
# ==================
# TEST 1 — Servo
# ==================
def test_servo():
"""
Mou el servo a les posicions principals: oberta, expulsió i torna a oberta.
Hauries de veure/sentir el servo moure's suaument entre posicions.
"""
print("=== TEST SERVO ===")
pi = _setup()
print(f"Movent a OPEN_POSITION ({OPEN_POSITION} µs)...")
servo_move_to(OPEN_POSITION)
time.sleep(1.0)
print("Open: OK")
print(f"Movent a EJECT_POSITION ({EJECT_POSITION} µs)...")
servo_move_to(EJECT_POSITION)
time.sleep(1.0)
print("Eject: OK")
print(f"Tornant a OPEN_POSITION...")
servo_move_to(OPEN_POSITION)
time.sleep(1.0)
print("Torna a open: OK")
print(f"Provant posició mínima ({MIN_SERVO_US} µs)...")
servo_move_to(MIN_SERVO_US)
time.sleep(1.0)
print(f"Tornant a OPEN_POSITION...")
servo_move_to(OPEN_POSITION)
time.sleep(1.0)
_teardown(pi)
print("Test servo completat.\n")
# ==================
# TEST 2 — Sensor de color TCS34725
# ==================
def test_color_sensor():
"""
Llegeix el color 15 vegades cada segon.
Posa davant del sensor els blocs de colors per verificar que els reconeix.
"""
print("=== TEST SENSOR DE COLOR TCS34725 ===")
pi = _setup()
print("Llegint color durant 15 segons (posa els blocs davant del sensor)...")
for i in range(15):
color_id = read_block_color()
name = _COLOR_NAMES.get(color_id, "Desconegut")
print(f" Lectura {i+1:2d}: {name}")
time.sleep(1.0)
_teardown(pi)
print("Test sensor de color completat.\n")
# ==================
# TEST 3 — Calibració del sensor de color (valors RGB crus)
# ==================
def test_color_raw():
"""
Mostra valors RGB crus i la classificació actual durant 30 segons.
Útil per calibrar la taula _COLORS a blocks.py.
Format: R=xxx G=xxx B=xxx → classificat com 'XX' (diff=xx)
Si el diff és gran (>10), els valors de referència necessiten ajust.
"""
print("=== TEST COLOR RAW (calibració) ===")
pi = _setup()
print("Llegint valors RGB crus durant 30 segons (posa cada bloc davant del sensor)...")
print(f" {'R':>5} {'G':>5} {'B':>5} classificat diff")
for i in range(30):
r, g, b = read_color_raw()
best_name = "??"
best_diff = 9999
for ref in _COLORS:
diff = abs(r - ref["r"]) + abs(g - ref["g"]) + abs(b - ref["b"])
if diff < best_diff:
best_diff = diff
best_name = ref["name"]
print(f" R={r:3d} G={g:3d} B={b:3d}{best_name} (diff={best_diff})")
time.sleep(1.0)
_teardown(pi)
print("Test color raw completat.\n")
# ==================
# Execució
# ==================
if __name__ == "__main__":
# Descomenta el test que vols executar:
test_servo()
# test_color_sensor()
# test_color_raw() # Per calibrar la taula de colors

View File

@@ -1,156 +0,0 @@
"""
test_eyes.py — Tests individuals del mòdul eyes.py.
Executa des del directori Rasp/: python tests/test_eyes.py
Descomenta la funció que vols provar al final del fitxer.
Assegura't que el venv està activat i pigpiod en marxa (sudo pigpiod -s 1).
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
import time
import pigpio
from eyes import (
eyes_setup, eyes_cleanup,
eyes_turn_on, eyes_turn_off,
eyes_gesture_mode_on, eyes_gesture_mode_off, eyes_listening,
EYES_OPEN, EYES_FW, EYES_DOWN, EYES_GESTURE,
WHITE, RED, GREEN, BLUE, YELLOW, ORANGE, PURPLE, CYAN, BLACK,
)
def _setup():
pi = pigpio.pi()
if not pi.connected:
print("ERROR: pigpiod no està en marxa. Executa: sudo pigpiod -s 1")
sys.exit(1)
eyes_setup(pi)
return pi
def _teardown(pi):
eyes_cleanup()
pi.stop()
# ==================
# TEST 1 — Formes i colors bàsics
# ==================
def test_shapes():
"""
Mostra totes les formes existents en colors diferents.
Hauries de veure les formes als ulls LED del robot.
"""
print("=== TEST FORMES I COLORS ===")
pi = _setup()
print("EYES_OPEN en blanc...")
eyes_turn_on(EYES_OPEN, WHITE)
time.sleep(2.0)
print("EYES_FW en vermell...")
eyes_turn_on(EYES_FW, RED)
time.sleep(2.0)
print("EYES_DOWN en blau...")
eyes_turn_on(EYES_DOWN, BLUE)
time.sleep(2.0)
print("EYES_OPEN en verd...")
eyes_turn_on(EYES_OPEN, GREEN)
time.sleep(2.0)
print("Apagant...")
eyes_turn_off()
time.sleep(1.0)
_teardown(pi)
print("Test formes completat.\n")
# ==================
# TEST 2 — Animació de repeat i direcció
# ==================
def test_animation():
"""
Prova l'animació amb repeat i les dues direccions.
Hauries de veure els LEDs encenent-se un per un en ordre normal i invers.
"""
print("=== TEST ANIMACIÓ ===")
pi = _setup()
print("EYES_OPEN groc, repeat=2, endavant...")
eyes_turn_on(EYES_OPEN, YELLOW, repeat=2, forward=True)
time.sleep(1.0)
print("EYES_FW taronja, repeat=2, enrere...")
eyes_turn_on(EYES_FW, ORANGE, repeat=2, forward=False)
time.sleep(1.0)
eyes_turn_off()
_teardown(pi)
print("Test animació completat.\n")
# ==================
# TEST 3 — Animacions mode gestos (TFG)
# ==================
def test_gesture_animations():
"""
Prova les animacions específiques del mode gestos.
Hauries de veure: doble parpelleig cian, tornada a blanc, marc cian.
"""
print("=== TEST ANIMACIONS MODE GESTOS ===")
pi = _setup()
print("Activant mode gestos (doble parpelleig cian)...")
eyes_gesture_mode_on()
time.sleep(2.0)
print("Escoltant gest (marc cian)...")
eyes_listening()
time.sleep(2.0)
print("Desactivant mode gestos (torna a blanc)...")
eyes_gesture_mode_off()
time.sleep(2.0)
eyes_turn_off()
_teardown(pi)
print("Test animacions gestos completat.\n")
# ==================
# TEST 4 — Parpelleig (breathing)
# ==================
def test_breathing():
"""
Verifica que el thread de parpelleig funciona correctament.
Hauries de veure la brillantor dels LEDs pujant i baixant suaument.
"""
print("=== TEST PARPELLEIG (BREATHING) ===")
pi = _setup()
print("EYES_OPEN blanc — observa el parpelleig durant 10 segons...")
eyes_turn_on(EYES_OPEN, WHITE)
time.sleep(10.0)
eyes_turn_off()
_teardown(pi)
print("Test parpelleig completat.\n")
# ==================
# Execució
# ==================
if __name__ == "__main__":
# Descomenta el test que vols executar:
test_shapes()
# test_animation()
# test_gesture_animations()
# test_breathing()

View File

@@ -1,94 +0,0 @@
"""
test_gesture.py — Tests del mòdul gesture.py (sensor PAJ7620U2).
Executa des del directori Rasp/: python tests/test_gesture.py
Descomenta la funció que vols provar al final del fitxer.
Assegura't que el venv està activat i pigpiod en marxa (sudo pigpiod -s 1).
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
import time
from gesture import (
gesture_setup, gesture_cleanup,
read_gesture,
GS_NONE, GS_FORWARD, GS_LEFT, GS_RIGHT,
GS_UP, GS_DOWN, GS_CLOCKWISE, GS_ANTICLOCKWISE, GS_WAVE,
)
_GESTURE_NAMES = {
GS_NONE: "Cap (GS_NONE)",
GS_FORWARD: "Endavant (GS_FORWARD)",
GS_LEFT: "Esquerra (GS_LEFT)",
GS_RIGHT: "Dreta (GS_RIGHT)",
GS_UP: "Amunt (GS_UP)",
GS_DOWN: "Avall (GS_DOWN)",
GS_CLOCKWISE: "Horari (GS_CLOCKWISE)",
GS_ANTICLOCKWISE: "Antihorari (GS_ANTICLOCKWISE)",
GS_WAVE: "Wave (GS_WAVE)",
}
# ==================
# TEST 1 — Connexió i inicialització
# ==================
def test_connection():
"""
Comprova que el sensor PAJ7620U2 és accessible via I2C.
Ha de mostrar 'Gesture sensor init OK' sense errors.
"""
print("=== TEST CONNEXIÓ PAJ7620U2 ===")
gesture_setup()
time.sleep(0.5)
gesture_cleanup()
print("Test connexió completat.\n")
# ==================
# TEST 2 — Lectura de gestos
# ==================
def test_read_gestures():
"""
Llegeix gestos durant 30 segons i els mostra per pantalla.
Fes gestos davant del sensor per verificar que els detecta correctament:
- Mà cap endavant/enrere → GS_FORWARD
- Mà cap a l'esquerra → GS_LEFT
- Mà cap a la dreta → GS_RIGHT
- Mà cap amunt → GS_UP
- Mà cap avall → GS_DOWN
- Rotació horària → GS_CLOCKWISE
- Rotació antihorària → GS_ANTICLOCKWISE
- Sacsejada (wave) → GS_WAVE
"""
print("=== TEST LECTURA GESTOS PAJ7620U2 ===")
print("Fes gestos davant del sensor durant 30 segons...")
gesture_setup()
time.sleep(0.5)
inici = time.time()
while time.time() - inici < 30:
gest = read_gesture()
if gest != GS_NONE:
nom = _GESTURE_NAMES.get(gest, f"Desconegut ({gest})")
print(f" Gest detectat: {nom}")
time.sleep(0.05)
gesture_cleanup()
print("Test lectura gestos completat.\n")
# ==================
# Execució
# ==================
if __name__ == "__main__":
# Descomenta el test que vols executar:
test_connection()
# test_read_gestures()

View File

@@ -1,238 +0,0 @@
"""
test_motion.py — Tests individuals del mòdul motion.py.
Executa des del directori Rasp/: python tests/test_motion.py
Descomenta la funció que vols provar al final del fitxer.
Assegura't que el venv està activat i pigpiod en marxa (sudo pigpiod -s 1).
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
import time
import pigpio
import motion
from motion import (
motion_setup, motion_setup_steppers, motion_setup_sensors, motion_cleanup,
enable_wheels, enable_arms, enable_syringe,
arms_home, syringe_home,
distance_to_object,
ON, OFF, CW, CCW,
)
def _pi_connect():
pi = pigpio.pi()
if not pi.connected:
print("ERROR: pigpiod no està en marxa. Executa: sudo pigpiod -s 1")
sys.exit(1)
return pi
def _setup_motors():
"""Setup mínim per a tests de motors (sense sensors I2C)."""
pi = _pi_connect()
motion_setup_steppers(pi)
return pi
def _setup_sensors():
"""Setup mínim per a tests de sensors I2C (sense steppers)."""
pi = _pi_connect()
motion_setup_sensors(pi)
return pi
def _teardown_motors(pi):
motion_cleanup()
pi.stop()
def _teardown_sensors(pi):
pi.stop()
# ==================
# TEST 1 — Motors pas a pas (rodes)
# ==================
def test_motor_unic():
"""
Prova únicament la roda DRETA: 200 passos endavant i enrere.
Primer test a fer — verifica que un sol motor funciona correctament.
Hauries de veure la roda girar ~1/5 de volta (reducció 1:5).
"""
print("=== TEST MOTOR ÚNIC (RODA DRETA) ===")
pi = _setup_motors()
PASSOS = 200
print(f"Movent roda DRETA {PASSOS} passos endavant...")
enable_wheels(ON)
motion.wheel_R.move_to(PASSOS)
while motion.wheel_R.is_running():
time.sleep(0.05)
print("Roda dreta endavant: OK")
time.sleep(0.5)
print(f"Movent roda DRETA {PASSOS} passos enrere...")
motion.wheel_R.move_to(0)
while motion.wheel_R.is_running():
time.sleep(0.05)
print("Roda dreta enrere: OK")
enable_wheels(OFF)
_teardown_motors(pi)
print("Test motor únic completat.\n")
def test_motors():
"""
Prova les dues rodes: avança 200 passos i torna enrere.
Executa test_motor_unic() primer per verificar que un motor funciona.
Hauries de veure cada roda girar ~1/5 de volta (reducció 1:5).
"""
print("=== TEST MOTORS (RODES) ===")
pi = _setup_motors()
PASSOS = 200
print(f"Movent roda DRETA {PASSOS} passos endavant...")
enable_wheels(ON)
motion.wheel_R.move_to(PASSOS)
while motion.wheel_R.is_running():
time.sleep(0.05)
print("Roda dreta: OK")
time.sleep(0.5)
print(f"Movent roda DRETA {PASSOS} passos enrere...")
motion.wheel_R.move_to(0)
while motion.wheel_R.is_running():
time.sleep(0.05)
print("Roda dreta enrere: OK")
time.sleep(0.5)
print(f"Movent roda ESQUERRA {PASSOS} passos endavant...")
motion.wheel_L.move_to(PASSOS)
while motion.wheel_L.is_running():
time.sleep(0.05)
print("Roda esquerra: OK")
time.sleep(0.5)
print(f"Movent roda ESQUERRA {PASSOS} passos enrere...")
motion.wheel_L.move_to(0)
while motion.wheel_L.is_running():
time.sleep(0.05)
print("Roda esquerra enrere: OK")
enable_wheels(OFF)
_teardown_motors(pi)
print("Test motors completat.\n")
# ==================
# TEST 2 — Homing (finals de carrera)
# ==================
def test_homing_brac():
"""
Executa el homing únicament dels BRAÇOS.
Primer test de homing — verifica que un sol conjunt de finals de carrera funciona.
ATENCIÓ: assegura't que els braços tinguin espai per moure's.
"""
print("=== TEST HOMING BRAÇOS ===")
pi = _setup_motors()
print("Iniciant homing dels BRAÇOS...")
arms_home()
print("Homing braços: OK")
_teardown_motors(pi)
print("Test homing braços completat.\n")
def test_homing():
"""
Executa el homing complet: braços i xeringa.
Executa test_homing_brac() primer per verificar els finals de carrera dels braços.
ATENCIÓ: assegura't que els braços i la xeringa tinguin espai per moure's.
"""
print("=== TEST HOMING COMPLET ===")
pi = _setup_motors()
print("Iniciant homing dels BRAÇOS...")
arms_home()
print("Homing braços: OK")
time.sleep(1.0)
print("Iniciant homing de la XERINGA...")
syringe_home()
print("Homing xeringa: OK")
_teardown_motors(pi)
print("Test homing completat.\n")
# ==================
# TEST 3 — Sensor de distància VL53L0X
# ==================
def test_distance_sensor():
"""
Llegeix la distància 10 vegades cada 500ms.
Posa la mà davant del sensor per verificar que canvia el valor.
"""
print("=== TEST SENSOR DISTÀNCIA VL53L0X ===")
pi = _setup_sensors()
print("Llegint distància durant 5 segons (posa la mà davant del sensor)...")
for i in range(10):
dist = distance_to_object()
if dist == 65535:
print(f" Lectura {i+1:2d}: fora de rang")
else:
print(f" Lectura {i+1:2d}: {dist} mm")
time.sleep(0.5)
_teardown_sensors(pi)
print("Test sensor distància completat.\n")
# ==================
# TEST 4 — Sensors de línia ADS1115
# ==================
def test_line_sensors():
"""
Llegeix els dos sensors de línia 10 vegades cada 500ms.
Posa el sensor sobre superfícies de diferent color per veure la variació.
Valor alt (~9700+) = negre. Valor baix = blanc/clar.
"""
print("=== TEST SENSORS DE LÍNIA ADS1115 ===")
pi = _setup_sensors()
print("Llegint sensors de línia durant 5 segons...")
print(" (posa els sensors sobre blanc i negre per veure la diferència)")
for i in range(10):
r = motion._chan_r.value
l = motion._chan_l.value
print(f" Lectura {i+1:2d}: DRETA={r:5d} ESQUERRA={l:5d} error={r-l:+6d}")
time.sleep(0.5)
_teardown_sensors(pi)
print("Test sensors de línia completat.\n")
# ==================
# Execució
# ==================
if __name__ == "__main__":
# Descomenta el test que vols executar:
test_motor_unic() # Primer: prova un sol motor
# test_motors() # Segon: prova les dues rodes
# test_homing_brac() # Primer homing: només els braços
# test_homing() # Homing complet: braços i xeringa
# test_distance_sensor()
# test_line_sensors()

View File

@@ -1,53 +0,0 @@
"""
test_simple_motor.py — Test mínim de la roda dreta per diagnosticar problemes.
Executa des del directori Rasp/tests/: python3 test_simple_motor.py
Diferències respecte a test_motion.py:
- Usa pi.write() directe en lloc de gpio_trigger()
- Bucle bloquejant amb time.sleep() en lloc de thread
- Sense acceleració, velocitat constant
"""
import pigpio
import time
STEP = 25 # STEP_R_W
DIR = 23 # DIR_R_W
EN = 6 # EN_W (actiu LOW)
pi = pigpio.pi()
if not pi.connected:
print("ERROR: pigpiod no està en marxa")
exit(1)
pi.set_mode(STEP, pigpio.OUTPUT)
pi.set_mode(DIR, pigpio.OUTPUT)
pi.set_mode(EN, pigpio.OUTPUT)
pi.write(EN, 0) # Activa el driver (LOW = ON)
pi.write(DIR, 1) # Endavant
# 500 passos/s → període = 1/500 = 2ms → mig període = 1ms
DELAY = 0.001
print("Movent 200 passos endavant...")
for _ in range(200):
pi.write(STEP, 1)
time.sleep(DELAY)
pi.write(STEP, 0)
time.sleep(DELAY)
print("Fet. Esperant 1s...")
time.sleep(1)
pi.write(DIR, 0) # Enrere
print("Movent 200 passos enrere...")
for _ in range(200):
pi.write(STEP, 1)
time.sleep(DELAY)
pi.write(STEP, 0)
time.sleep(DELAY)
print("Fet.")
pi.write(EN, 1) # Desactiva el driver
pi.stop()