Files
TalkEdit/backend/main.py

159 lines
4.8 KiB
Python

import logging
import os
import stat
import sys
from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import FastAPI, Query, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from routers import transcribe, export, ai, captions, audio
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Dev log file — frontend forwards console.error/warn here so the agent can read it
DEV_LOG_PATH = Path(__file__).parent.parent / "webview.log"
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("AI Video Editor backend starting up")
yield
logger.info("AI Video Editor backend shutting down")
app = FastAPI(
title="AI Video Editor Backend",
version="0.1.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["Content-Range", "Accept-Ranges", "Content-Length"],
)
app.include_router(transcribe.router)
app.include_router(export.router)
app.include_router(ai.router)
app.include_router(captions.router)
app.include_router(audio.router)
MIME_MAP = {
".mp4": "video/mp4",
".mkv": "video/x-matroska",
".mov": "video/quicktime",
".avi": "video/x-msvideo",
".webm": "video/webm",
".m4a": "audio/mp4",
".wav": "audio/wav",
".mp3": "audio/mpeg",
".flac": "audio/flac",
}
@app.get("/file")
async def serve_local_file(request: Request, path: str = Query(...)):
"""Stream a local file with HTTP Range support (required for video seeking)."""
file_path = Path(path)
if not file_path.is_file():
logger.warning(f"[serve_file] File not found: {path}")
raise HTTPException(status_code=404, detail=f"File not found: {path}")
file_size = file_path.stat().st_size
content_type = MIME_MAP.get(file_path.suffix.lower(), "application/octet-stream")
range_header = request.headers.get("range")
logger.info(
f"[serve_file] {file_path.name} | size={file_size} | "
f"type={content_type} | range={range_header or 'none'}"
)
if content_type == "application/octet-stream":
logger.warning(
f"[serve_file] Unknown MIME type for extension '{file_path.suffix}'"
f"browser may fail to decode audio/video for '{file_path.name}'"
)
if file_size == 0:
logger.error(f"[serve_file] File is empty: {path}")
raise HTTPException(status_code=422, detail=f"File is empty: {path}")
if range_header:
try:
range_spec = range_header.replace("bytes=", "")
range_start_str, range_end_str = range_spec.split("-")
range_start = int(range_start_str) if range_start_str else 0
range_end = int(range_end_str) if range_end_str else file_size - 1
range_end = min(range_end, file_size - 1)
except (ValueError, TypeError) as e:
logger.error(f"[serve_file] Malformed Range header '{range_header}': {e}")
raise HTTPException(status_code=416, detail=f"Invalid Range header: {range_header}")
content_length = range_end - range_start + 1
def iter_range():
with open(file_path, "rb") as f:
f.seek(range_start)
remaining = content_length
while remaining > 0:
chunk = f.read(min(65536, remaining))
if not chunk:
break
remaining -= len(chunk)
yield chunk
return StreamingResponse(
iter_range(),
status_code=206,
media_type=content_type,
headers={
"Content-Range": f"bytes {range_start}-{range_end}/{file_size}",
"Accept-Ranges": "bytes",
"Content-Length": str(content_length),
},
)
def iter_file():
with open(file_path, "rb") as f:
while chunk := f.read(65536):
yield chunk
return StreamingResponse(
iter_file(),
media_type=content_type,
headers={
"Accept-Ranges": "bytes",
"Content-Length": str(file_size),
},
)
@app.get("/health")
async def health():
return {"status": "ok"}
import datetime
@app.post("/dev/log")
async def dev_log(request: Request):
data = await request.json()
level = data.get("level", "log")
msg = str(data.get("message", ""))
args = [str(a) for a in data.get("args", [])]
ts = datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3]
line = f"[{ts}] [{level.upper():5}] {msg}"
if args:
line += " " + " ".join(args)
line += "\n"
with open(DEV_LOG_PATH, "a") as f:
f.write(line)
return {"ok": True}