"""Export endpoint for video cutting and rendering.""" import logging import tempfile import os from typing import List, Optional from fastapi import APIRouter, HTTPException from pydantic import BaseModel from services.video_editor import export_stream_copy, export_reencode, export_reencode_with_subs from services.audio_cleaner import clean_audio from services.caption_generator import generate_srt, generate_ass, save_captions logger = logging.getLogger(__name__) router = APIRouter() class SegmentModel(BaseModel): start: float end: float class GainRangeModel(SegmentModel): gain_db: float class ExportWordModel(BaseModel): word: str start: float end: float confidence: float = 0.0 class ExportRequest(BaseModel): input_path: str output_path: str keep_segments: List[SegmentModel] mute_ranges: Optional[List[SegmentModel]] = None gain_ranges: Optional[List[GainRangeModel]] = None global_gain_db: float = 0.0 mode: str = "fast" resolution: str = "1080p" format: str = "mp4" enhanceAudio: bool = False captions: str = "none" words: Optional[List[ExportWordModel]] = None deleted_indices: Optional[List[int]] = None def _map_ranges_to_output_timeline( ranges: List[dict], keep_segments: List[dict], ) -> List[dict]: """Map source-time ranges to output timeline after cuts are applied.""" if not ranges or not keep_segments: return [] mapped: List[dict] = [] output_cursor = 0.0 for keep in keep_segments: keep_start = float(keep["start"]) keep_end = float(keep["end"]) keep_len = max(0.0, keep_end - keep_start) if keep_len <= 0: continue for src_range in ranges: overlap_start = max(keep_start, float(src_range["start"])) overlap_end = min(keep_end, float(src_range["end"])) if overlap_end <= overlap_start: continue mapped_range = { "start": output_cursor + (overlap_start - keep_start), "end": output_cursor + (overlap_end - keep_start), } if "gain_db" in src_range: mapped_range["gain_db"] = float(src_range["gain_db"]) mapped.append(mapped_range) output_cursor += keep_len return mapped def _mux_audio(video_path: str, audio_path: str, output_path: str) -> str: """Replace video's audio track with cleaned audio using FFmpeg.""" import subprocess cmd = [ "ffmpeg", "-y", "-i", video_path, "-i", audio_path, "-c:v", "copy", "-map", "0:v:0", "-map", "1:a:0", "-shortest", output_path, ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(f"Audio mux failed: {result.stderr[-300:]}") return output_path @router.post("/export") async def export_video(req: ExportRequest): try: segments = [{"start": s.start, "end": s.end} for s in req.keep_segments] mute_segments = [{"start": s.start, "end": s.end} for s in req.mute_ranges] if req.mute_ranges else None gain_segments = [{"start": s.start, "end": s.end, "gain_db": s.gain_db} for s in req.gain_ranges] if req.gain_ranges else None if not segments and not mute_segments: raise HTTPException(status_code=400, detail="No segments to export") mapped_gain_segments = _map_ranges_to_output_timeline(gain_segments or [], segments) has_gain = abs(float(req.global_gain_db)) > 1e-6 or bool(gain_segments) use_stream_copy = req.mode == "fast" and len(segments) == 1 and not mute_segments and not has_gain needs_reencode_for_subs = req.captions == "burn-in" # Burn-in captions or audio filters require re-encode if needs_reencode_for_subs or mute_segments or has_gain: use_stream_copy = False words_dicts = [w.model_dump() for w in req.words] if req.words else [] deleted_set = set(req.deleted_indices or []) # Generate ASS file for burn-in ass_path = None if req.captions == "burn-in" and words_dicts: ass_content = generate_ass(words_dicts, deleted_set) tmp = tempfile.NamedTemporaryFile(suffix=".ass", delete=False, mode="w", encoding="utf-8") tmp.write(ass_content) tmp.close() ass_path = tmp.name try: if use_stream_copy: output = export_stream_copy(req.input_path, req.output_path, segments) elif ass_path: output = export_reencode_with_subs( req.input_path, req.output_path, segments, ass_path, resolution=req.resolution, format_hint=req.format, mute_ranges=mute_segments, gain_ranges=mapped_gain_segments, global_gain_db=req.global_gain_db, ) else: output = export_reencode( req.input_path, req.output_path, segments, resolution=req.resolution, format_hint=req.format, mute_ranges=mute_segments, gain_ranges=mapped_gain_segments, global_gain_db=req.global_gain_db, ) finally: if ass_path and os.path.exists(ass_path): os.unlink(ass_path) # Audio enhancement: clean, then mux back into the exported video if req.enhanceAudio: try: tmp_dir = tempfile.mkdtemp(prefix="cutscript_audio_") cleaned_audio = os.path.join(tmp_dir, "cleaned.wav") clean_audio(output, cleaned_audio) muxed_path = output + ".muxed.mp4" _mux_audio(output, cleaned_audio, muxed_path) os.replace(muxed_path, output) logger.info(f"Audio enhanced and muxed into {output}") # Cleanup try: os.remove(cleaned_audio) os.rmdir(tmp_dir) except OSError: pass except Exception as e: logger.warning(f"Audio enhancement failed (non-fatal): {e}") # Sidecar SRT: generate and save alongside video srt_path = None if req.captions == "sidecar" and words_dicts: srt_content = generate_srt(words_dicts, deleted_set) srt_path = req.output_path.rsplit(".", 1)[0] + ".srt" save_captions(srt_content, srt_path) logger.info(f"Sidecar SRT saved to {srt_path}") result = {"status": "ok", "output_path": output} if srt_path: result["srt_path"] = srt_path return result except HTTPException: raise except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except RuntimeError as e: logger.error(f"Export failed: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) except Exception as e: logger.error(f"Export error: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e))