Test Motor
All checks were successful
Build / build-backend (push) Successful in 4s
Build / build-web (push) Successful in 13s
Build / release (push) Successful in 6s

This commit is contained in:
2026-04-22 12:19:49 +02:00
parent e5a65f2dcf
commit 965d6dde63

View File

@@ -1,18 +1,71 @@
from fastapi import FastAPI, HTTPException from fastapi import FastAPI, HTTPException
import subprocess import subprocess
import threading
import time
import RPi.GPIO as GPIO
app = FastAPI() app = FastAPI()
# Whitelist allowed actions (VERY important) # -------------------------
# GPIO SETUP
# -------------------------
STEP = 23
DIR = 24
EN = 25
GPIO.setmode(GPIO.BCM)
GPIO.setup(STEP, GPIO.OUT)
GPIO.setup(DIR, GPIO.OUT)
GPIO.setup(EN, GPIO.OUT)
GPIO.output(EN, GPIO.LOW)
motor_running = False
motor_thread = None
def step_motor(steps, direction, delay=0.001):
GPIO.output(DIR, direction)
for _ in range(steps):
if not motor_running:
break
GPIO.output(STEP, GPIO.HIGH)
time.sleep(delay)
GPIO.output(STEP, GPIO.LOW)
time.sleep(delay)
def motor_loop():
global motor_running
while motor_running:
step_motor(200, GPIO.HIGH, 0.001)
time.sleep(1)
if not motor_running:
break
step_motor(200, GPIO.LOW, 0.001)
time.sleep(1)
# -------------------------
# SAFE COMMAND WHITELIST
# -------------------------
COMMANDS = { COMMANDS = {
"restart_nginx": ["sudo", "systemctl", "restart", "nginx"], "restart_nginx": ["sudo", "systemctl", "restart", "nginx"],
"uptime": ["uptime"], "uptime": ["uptime"],
"update": ["sudo", "apt", "update"] "update": ["sudo", "apt", "update"]
} }
# -------------------------
# API ENDPOINTS
# -------------------------
@app.post("/run") @app.post("/run")
def run_task(task: str, token: str): def run_task(task: str, token: str):
# simple auth check (replace with real auth later)
if token != "MY_SECRET_TOKEN": if token != "MY_SECRET_TOKEN":
raise HTTPException(status_code=403, detail="Unauthorized") raise HTTPException(status_code=403, detail="Unauthorized")
@@ -24,3 +77,41 @@ def run_task(task: str, token: str):
return {"output": result} return {"output": result}
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
return {"error": e.output} return {"error": e.output}
@app.post("/motor/start")
def start_motor(token: str):
global motor_running, motor_thread
if token != "MY_SECRET_TOKEN":
raise HTTPException(status_code=403, detail="Unauthorized")
if motor_running:
return {"status": "already running"}
motor_running = True
motor_thread = threading.Thread(target=motor_loop, daemon=True)
motor_thread.start()
return {"status": "motor started"}
@app.post("/motor/stop")
def stop_motor(token: str):
global motor_running
if token != "MY_SECRET_TOKEN":
raise HTTPException(status_code=403, detail="Unauthorized")
motor_running = False
GPIO.output(EN, GPIO.HIGH) # disable driver
return {"status": "motor stopped"}
@app.on_event("shutdown")
def shutdown():
global motor_running
motor_running = False
GPIO.output(EN, GPIO.HIGH)
GPIO.cleanup()