Files
TalkEdit/backend/routers/export.py

237 lines
8.3 KiB
Python
Raw Normal View History

"""Export endpoint for video cutting and rendering."""
import logging
import tempfile
import os
from typing import List, Optional
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from services.video_editor import export_stream_copy, export_reencode, export_reencode_with_subs
from services.audio_cleaner import clean_audio
from services.caption_generator import generate_srt, generate_ass, save_captions
logger = logging.getLogger(__name__)
router = APIRouter()
class SegmentModel(BaseModel):
start: float
end: float
2026-04-15 16:10:35 -06:00
class GainRangeModel(SegmentModel):
gain_db: float
class SpeedRangeModel(SegmentModel):
speed: float
class ExportWordModel(BaseModel):
word: str
start: float
end: float
confidence: float = 0.0
class ExportRequest(BaseModel):
input_path: str
output_path: str
keep_segments: List[SegmentModel]
2026-04-03 11:14:31 -06:00
mute_ranges: Optional[List[SegmentModel]] = None
2026-04-15 16:10:35 -06:00
gain_ranges: Optional[List[GainRangeModel]] = None
speed_ranges: Optional[List[SpeedRangeModel]] = None
2026-04-15 16:10:35 -06:00
global_gain_db: float = 0.0
mode: str = "fast"
resolution: str = "1080p"
format: str = "mp4"
enhanceAudio: bool = False
2026-05-04 17:43:00 -06:00
normalize_loudness: bool = False
normalize_target_lufs: float = -14.0
captions: str = "none"
words: Optional[List[ExportWordModel]] = None
deleted_indices: Optional[List[int]] = None
2026-04-15 16:10:35 -06:00
def _map_ranges_to_output_timeline(
ranges: List[dict],
keep_segments: List[dict],
) -> List[dict]:
"""Map source-time ranges to output timeline after cuts are applied."""
if not ranges or not keep_segments:
return []
mapped: List[dict] = []
output_cursor = 0.0
for keep in keep_segments:
keep_start = float(keep["start"])
keep_end = float(keep["end"])
keep_len = max(0.0, keep_end - keep_start)
if keep_len <= 0:
continue
for src_range in ranges:
overlap_start = max(keep_start, float(src_range["start"]))
overlap_end = min(keep_end, float(src_range["end"]))
if overlap_end <= overlap_start:
continue
mapped_range = {
"start": output_cursor + (overlap_start - keep_start),
"end": output_cursor + (overlap_end - keep_start),
}
if "gain_db" in src_range:
mapped_range["gain_db"] = float(src_range["gain_db"])
if "speed" in src_range:
mapped_range["speed"] = float(src_range["speed"])
2026-04-15 16:10:35 -06:00
mapped.append(mapped_range)
output_cursor += keep_len
return mapped
def _mux_audio(video_path: str, audio_path: str, output_path: str) -> str:
"""Replace video's audio track with cleaned audio using FFmpeg."""
import subprocess
cmd = [
"ffmpeg", "-y",
"-i", video_path,
"-i", audio_path,
"-c:v", "copy",
"-map", "0:v:0",
"-map", "1:a:0",
"-shortest",
output_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"Audio mux failed: {result.stderr[-300:]}")
return output_path
@router.post("/export")
async def export_video(req: ExportRequest):
try:
segments = [{"start": s.start, "end": s.end} for s in req.keep_segments]
2026-04-03 11:14:31 -06:00
mute_segments = [{"start": s.start, "end": s.end} for s in req.mute_ranges] if req.mute_ranges else None
2026-04-15 16:10:35 -06:00
gain_segments = [{"start": s.start, "end": s.end, "gain_db": s.gain_db} for s in req.gain_ranges] if req.gain_ranges else None
speed_segments = [{"start": s.start, "end": s.end, "speed": s.speed} for s in req.speed_ranges] if req.speed_ranges else None
2026-04-03 11:14:31 -06:00
if not segments and not mute_segments:
raise HTTPException(status_code=400, detail="No segments to export")
2026-04-15 16:10:35 -06:00
mapped_gain_segments = _map_ranges_to_output_timeline(gain_segments or [], segments)
has_gain = abs(float(req.global_gain_db)) > 1e-6 or bool(gain_segments)
has_speed = bool(speed_segments)
if has_speed and (mute_segments or has_gain):
raise HTTPException(
status_code=400,
detail="Speed zones currently cannot be combined with mute/gain filters in one export",
)
use_stream_copy = req.mode == "fast" and len(segments) == 1 and not mute_segments and not has_gain and not has_speed
needs_reencode_for_subs = req.captions == "burn-in"
2026-04-15 16:10:35 -06:00
# Burn-in captions or audio filters require re-encode
if needs_reencode_for_subs or mute_segments or has_gain or has_speed:
use_stream_copy = False
words_dicts = [w.model_dump() for w in req.words] if req.words else []
deleted_set = set(req.deleted_indices or [])
# Generate ASS file for burn-in
ass_path = None
if req.captions == "burn-in" and words_dicts:
ass_content = generate_ass(words_dicts, deleted_set)
tmp = tempfile.NamedTemporaryFile(suffix=".ass", delete=False, mode="w", encoding="utf-8")
tmp.write(ass_content)
tmp.close()
ass_path = tmp.name
try:
if use_stream_copy:
output = export_stream_copy(req.input_path, req.output_path, segments)
elif ass_path:
output = export_reencode_with_subs(
req.input_path,
req.output_path,
segments,
ass_path,
resolution=req.resolution,
format_hint=req.format,
2026-04-03 11:14:31 -06:00
mute_ranges=mute_segments,
2026-04-15 16:10:35 -06:00
gain_ranges=mapped_gain_segments,
speed_ranges=speed_segments,
2026-04-15 16:10:35 -06:00
global_gain_db=req.global_gain_db,
2026-05-04 17:43:00 -06:00
normalize_loudness=req.normalize_loudness,
normalize_target_lufs=req.normalize_target_lufs,
)
else:
output = export_reencode(
req.input_path,
req.output_path,
segments,
resolution=req.resolution,
format_hint=req.format,
2026-04-03 11:14:31 -06:00
mute_ranges=mute_segments,
2026-04-15 16:10:35 -06:00
gain_ranges=mapped_gain_segments,
speed_ranges=speed_segments,
2026-04-15 16:10:35 -06:00
global_gain_db=req.global_gain_db,
2026-05-04 17:43:00 -06:00
normalize_loudness=req.normalize_loudness,
normalize_target_lufs=req.normalize_target_lufs,
)
finally:
if ass_path and os.path.exists(ass_path):
os.unlink(ass_path)
# Audio enhancement: clean, then mux back into the exported video
if req.enhanceAudio:
try:
tmp_dir = tempfile.mkdtemp(prefix="cutscript_audio_")
cleaned_audio = os.path.join(tmp_dir, "cleaned.wav")
clean_audio(output, cleaned_audio)
muxed_path = output + ".muxed.mp4"
_mux_audio(output, cleaned_audio, muxed_path)
os.replace(muxed_path, output)
logger.info(f"Audio enhanced and muxed into {output}")
# Cleanup
try:
os.remove(cleaned_audio)
os.rmdir(tmp_dir)
except OSError:
pass
except Exception as e:
logger.warning(f"Audio enhancement failed (non-fatal): {e}")
# Sidecar SRT: generate and save alongside video
srt_path = None
if req.captions == "sidecar" and words_dicts:
srt_content = generate_srt(words_dicts, deleted_set)
srt_path = req.output_path.rsplit(".", 1)[0] + ".srt"
save_captions(srt_content, srt_path)
logger.info(f"Sidecar SRT saved to {srt_path}")
result = {"status": "ok", "output_path": output}
if srt_path:
result["srt_path"] = srt_path
return result
2026-04-15 17:13:56 -06:00
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except RuntimeError as e:
logger.error(f"Export failed: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
except Exception as e:
logger.error(f"Export error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))