from fastapi import FastAPI, HTTPException import subprocess import threading import time import RPi.GPIO as GPIO app = FastAPI() # ------------------------- # GPIO SETUP # ------------------------- STEP = 23 DIR = 24 EN = 25 GPIO.setmode(GPIO.BCM) GPIO.setup(STEP, GPIO.OUT) GPIO.setup(DIR, GPIO.OUT) GPIO.setup(EN, GPIO.OUT) GPIO.output(EN, GPIO.LOW) motor_running = False motor_thread = None def step_motor(steps, direction, delay=0.001): GPIO.output(DIR, direction) for _ in range(steps): GPIO.output(STEP, GPIO.HIGH) time.sleep(delay) GPIO.output(STEP, GPIO.LOW) time.sleep(delay) def motor_loop(): global motor_running while True: if not motor_running: time.sleep(0.1) print("Waiting for motor to start...") continue print("Motor running...") step_motor(200, GPIO.HIGH, 0.001) time.sleep(1) step_motor(200, GPIO.LOW, 0.001) time.sleep(1) # ------------------------- # SAFE COMMAND WHITELIST # ------------------------- COMMANDS = { "restart_nginx": ["sudo", "systemctl", "restart", "nginx"], "uptime": ["uptime"], "update": ["sudo", "apt", "update"] } # ------------------------- # API ENDPOINTS # ------------------------- motor_thread = threading.Thread(target=motor_loop, daemon=True) motor_thread.start() print("Motor thread initialized") @app.post("/run") def run_task(task: str, token: str): if token != "MY_SECRET_TOKEN": raise HTTPException(status_code=403, detail="Unauthorized") if task not in COMMANDS: raise HTTPException(status_code=400, detail="Invalid task") try: result = subprocess.check_output(COMMANDS[task], text=True) return {"output": result} except subprocess.CalledProcessError as e: return {"error": e.output} @app.post("/motor/start") def start_motor(token: str): global motor_running, motor_thread if token != "MY_SECRET_TOKEN": raise HTTPException(status_code=403, detail="Unauthorized") if motor_running: return {"status": "already running"} motor_running = True GPIO.output(EN, GPIO.LOW) # disable driver return {"status": "motor started"} @app.post("/motor/stop") def stop_motor(token: str): global motor_running if token != "MY_SECRET_TOKEN": raise HTTPException(status_code=403, detail="Unauthorized") motor_running = False GPIO.output(EN, GPIO.HIGH) # disable driver return {"status": "motor stopped"} @app.on_event("shutdown") def shutdown(): global motor_running motor_running = False GPIO.output(EN, GPIO.HIGH) GPIO.cleanup()