Files
quibot/backend/main.py
BinarySandia04 0e7fbbfdca
All checks were successful
Build / build-web (push) Successful in 16s
Build / build-backend (push) Successful in 3s
Build / release (push) Successful in 3s
Build APK / build (push) Successful in 8m10s
Things
2026-06-11 15:09:56 +02:00

210 lines
5.4 KiB
Python

from fastapi import FastAPI, File, Form, UploadFile, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
import subprocess
import threading
import time
import os
import json
import uuid
import hashlib
from pathlib import Path
from pydantic import BaseModel
import RPi.GPIO as GPIO
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
INCOMING_DIR = Path("/tmp/quibot-audio/incoming")
LOCKED_DIR = Path("/tmp/quibot-audio/locked")
PROCESSED_DIR = Path("/tmp/quibot-audio/processed")
INCOMING_DIR.mkdir(parents=True, exist_ok=True)
LOCKED_DIR.mkdir(parents=True, exist_ok=True)
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
# -------------------------
# GPIO SETUP
# -------------------------
STEP = 23
DIR = 24
EN = 25
GPIO.setmode(GPIO.BCM)
GPIO.setup(STEP, GPIO.OUT)
GPIO.setup(DIR, GPIO.OUT)
GPIO.setup(EN, GPIO.OUT)
GPIO.output(EN, GPIO.LOW)
motor_thread = None
def step_motor(steps, direction, delay=0.001):
GPIO.output(DIR, direction)
for _ in range(steps):
GPIO.output(STEP, GPIO.HIGH)
time.sleep(delay)
GPIO.output(STEP, GPIO.LOW)
time.sleep(delay)
def motor_step(dir):
dir_pin = GPIO.HIGH if dir == "forward" else GPIO.LOW
time.sleep(0.02) # small delay before starting
print("Motor running...")
step_motor(200, dir_pin, 0.001)
# -------------------------
# SAFE COMMAND WHITELIST
# -------------------------
COMMANDS = {
"restart_nginx": ["sudo", "systemctl", "restart", "nginx"],
"uptime": ["uptime"],
"update": ["sudo", "apt", "update"]
}
# -------------------------
# API ENDPOINTS
# -------------------------
@app.post("/run")
def run_task(task: str, token: str):
if token != "MY_SECRET_TOKEN":
raise HTTPException(status_code=403, detail="Unauthorized")
if task not in COMMANDS:
raise HTTPException(status_code=400, detail="Invalid task")
try:
result = subprocess.check_output(COMMANDS[task], text=True)
return {"output": result}
except subprocess.CalledProcessError as e:
return {"error": e.output}
@app.post("/motor/step/forward")
def start_motor(token: str):
global motor_thread
if token != "MY_SECRET_TOKEN":
raise HTTPException(status_code=403, detail="Unauthorized")
motor_thread = threading.Thread(target=motor_step, args=("forward",), daemon=True)
motor_thread.start()
return {"status": "motor started"}
@app.post("/motor/step/backwards")
def start_motor(token: str):
global motor_thread
if token != "MY_SECRET_TOKEN":
raise HTTPException(status_code=403, detail="Unauthorized")
motor_thread = threading.Thread(target=motor_step, args=("backwards",), daemon=True)
motor_thread.start()
return {"status": "motor started"}
@app.post("/motor/stop")
def stop_motor(token: str):
if token != "MY_SECRET_TOKEN":
raise HTTPException(status_code=403, detail="Unauthorized")
GPIO.output(EN, GPIO.HIGH) # disable driver
return {"status": "motor stopped"}
@app.post("/audio/upload")
async def upload_audio(file: UploadFile = File(...), format: str = "wav"):
raw_content = await file.read()
checksum = hashlib.sha256(raw_content).hexdigest()[:16]
filename = f"{checksum[:10]}-{uuid.uuid4().hex[:8]}.wav"
filepath = INCOMING_DIR / filename
filepath.write_bytes(raw_content)
return {"status": "received", "filename": str(filepath), "lock_url": f"/audio/lock/{filepath.name}"}
@app.get("/audio/incoming")
def list_incoming():
files = []
for f in sorted(INCOMING_DIR.iterdir()):
meta = f.stat()
files.append({
"filename": f.name,
"size_bytes": meta.st_size,
"modified_iso": time.ctime(meta.st_mtime),
})
return {"count": len(files), "files": files}
@app.post("/audio/lock/{filename}")
def lock_audio(filename: str):
src = INCOMING_DIR / filename
dst = LOCKED_DIR / filename
if not src.exists():
raise HTTPException(status_code=404, detail=f"File {filename} not found")
if dst.exists():
return {"status": "already_locked", "filename": filename}
os.rename(str(src), str(dst))
return {"status": "locked", "filename": filename}
@app.post("/audio/unlock/{filename}")
def unlock_audio(filename: str):
src = LOCKED_DIR / filename
dst = INCOMING_DIR / filename
if not src.exists():
raise HTTPException(status_code=404, detail=f"File {filename} not found")
os.rename(str(src), str(dst))
return {"status": "unlocked", "filename": filename}
@app.post("/audio/cancel/{filename}")
def cancel_audio(filename: str):
src = LOCKED_DIR / filename
dst = INCOMING_DIR / filename
if not src.exists():
raise HTTPException(status_code=404, detail=f"File {filename} not found")
os.rename(str(src), str(dst))
return {"status": "cancelled", "filename": filename}
@app.post("/audio/process/{filename}")
def process_audio(filename: str):
locked = LOCKED_DIR / filename
processed = PROCESSED_DIR / filename
if not locked.exists():
raise HTTPException(status_code=404, detail=f"File {filename} not found")
os.rename(str(locked), str(processed))
return {"status": "processed", "filename": filename}
@app.on_event("shutdown")
def shutdown():
global motor_running
motor_running = False
GPIO.output(EN, GPIO.HIGH)
GPIO.cleanup()