Files
TalkEdit/backend/routers/export.py

358 lines
13 KiB
Python
Raw Permalink Normal View History

"""Export endpoint for video cutting and rendering."""
import logging
import tempfile
import os
from typing import List, Optional
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from services.video_editor import export_stream_copy, export_reencode, export_reencode_with_subs, mix_background_music, concat_clips
from services.audio_cleaner import clean_audio
from services.caption_generator import generate_srt, generate_ass, save_captions
from services.background_removal import remove_background_on_export as remove_bg
logger = logging.getLogger(__name__)
router = APIRouter()
class SegmentModel(BaseModel):
start: float
end: float
2026-04-15 16:10:35 -06:00
class GainRangeModel(SegmentModel):
gain_db: float
class SpeedRangeModel(SegmentModel):
speed: float
class ExportWordModel(BaseModel):
word: str
start: float
end: float
confidence: float = 0.0
class ZoomConfigModel(BaseModel):
enabled: bool = False
zoomFactor: float = 1.0
panX: float = 0.0
panY: float = 0.0
class BackgroundMusicModel(BaseModel):
path: str
volumeDb: float = 0.0
duckingEnabled: bool = False
duckingDb: float = 6.0
duckingAttackMs: float = 10.0
duckingReleaseMs: float = 200.0
class ExportRequest(BaseModel):
input_path: str
output_path: str
keep_segments: List[SegmentModel]
2026-04-03 11:14:31 -06:00
mute_ranges: Optional[List[SegmentModel]] = None
2026-04-15 16:10:35 -06:00
gain_ranges: Optional[List[GainRangeModel]] = None
speed_ranges: Optional[List[SpeedRangeModel]] = None
2026-04-15 16:10:35 -06:00
global_gain_db: float = 0.0
mode: str = "fast"
resolution: str = "1080p"
format: str = "mp4"
enhanceAudio: bool = False
2026-05-04 17:43:00 -06:00
normalize_loudness: bool = False
normalize_target_lufs: float = -14.0
captions: str = "none"
words: Optional[List[ExportWordModel]] = None
deleted_indices: Optional[List[int]] = None
zoom: Optional[ZoomConfigModel] = None
additional_clips: Optional[List[str]] = None
background_music: Optional[BackgroundMusicModel] = None
remove_background: bool = False
background_replacement: str = "blur"
background_replacement_value: str = ""
2026-05-04 23:54:14 -06:00
class TranscriptExportRequest(BaseModel):
words: List[ExportWordModel]
deleted_indices: Optional[List[int]] = None
output_path: str
format: str = "txt" # "txt" or "srt"
2026-04-15 16:10:35 -06:00
def _map_ranges_to_output_timeline(
ranges: List[dict],
keep_segments: List[dict],
) -> List[dict]:
"""Map source-time ranges to output timeline after cuts are applied."""
if not ranges or not keep_segments:
return []
mapped: List[dict] = []
output_cursor = 0.0
for keep in keep_segments:
keep_start = float(keep["start"])
keep_end = float(keep["end"])
keep_len = max(0.0, keep_end - keep_start)
if keep_len <= 0:
continue
for src_range in ranges:
overlap_start = max(keep_start, float(src_range["start"]))
overlap_end = min(keep_end, float(src_range["end"]))
if overlap_end <= overlap_start:
continue
mapped_range = {
"start": output_cursor + (overlap_start - keep_start),
"end": output_cursor + (overlap_end - keep_start),
}
if "gain_db" in src_range:
mapped_range["gain_db"] = float(src_range["gain_db"])
if "speed" in src_range:
mapped_range["speed"] = float(src_range["speed"])
2026-04-15 16:10:35 -06:00
mapped.append(mapped_range)
output_cursor += keep_len
return mapped
def _mux_audio(video_path: str, audio_path: str, output_path: str) -> str:
"""Replace video's audio track with cleaned audio using FFmpeg."""
import subprocess
cmd = [
"ffmpeg", "-y",
"-i", video_path,
"-i", audio_path,
"-c:v", "copy",
"-map", "0:v:0",
"-map", "1:a:0",
"-shortest",
output_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"Audio mux failed: {result.stderr[-300:]}")
return output_path
@router.post("/export")
async def export_video(req: ExportRequest):
try:
segments = [{"start": s.start, "end": s.end} for s in req.keep_segments]
2026-04-03 11:14:31 -06:00
mute_segments = [{"start": s.start, "end": s.end} for s in req.mute_ranges] if req.mute_ranges else None
2026-04-15 16:10:35 -06:00
gain_segments = [{"start": s.start, "end": s.end, "gain_db": s.gain_db} for s in req.gain_ranges] if req.gain_ranges else None
speed_segments = [{"start": s.start, "end": s.end, "speed": s.speed} for s in req.speed_ranges] if req.speed_ranges else None
2026-04-03 11:14:31 -06:00
if not segments and not mute_segments:
raise HTTPException(status_code=400, detail="No segments to export")
# Convert zoom config to dict
zoom_dict = None
if req.zoom and req.zoom.enabled:
zoom_dict = {
"enabled": True,
"zoomFactor": req.zoom.zoomFactor,
"panX": req.zoom.panX,
"panY": req.zoom.panY,
}
# Handle additional clips: pre-concat before main editing
working_input = req.input_path
has_additional = bool(req.additional_clips)
if has_additional:
try:
concat_output = req.output_path + ".concat.mp4"
concat_clips(req.input_path, req.additional_clips, concat_output)
working_input = concat_output
logger.info("Pre-concatenated %d additional clips into %s", len(req.additional_clips), concat_output)
except Exception as e:
logger.warning(f"Clip concatenation failed (non-fatal): {e}")
# Fall back to main input only
2026-04-15 16:10:35 -06:00
mapped_gain_segments = _map_ranges_to_output_timeline(gain_segments or [], segments)
has_gain = abs(float(req.global_gain_db)) > 1e-6 or bool(gain_segments)
has_speed = bool(speed_segments)
if has_speed and (mute_segments or has_gain):
raise HTTPException(
status_code=400,
detail="Speed zones currently cannot be combined with mute/gain filters in one export",
)
use_stream_copy = req.mode == "fast" and len(segments) == 1 and not mute_segments and not has_gain and not has_speed and not zoom_dict and not has_additional
needs_reencode_for_subs = req.captions == "burn-in"
2026-04-15 16:10:35 -06:00
# Burn-in captions or audio filters require re-encode
if needs_reencode_for_subs or mute_segments or has_gain or has_speed:
use_stream_copy = False
words_dicts = [w.model_dump() for w in req.words] if req.words else []
deleted_set = set(req.deleted_indices or [])
# Generate ASS file for burn-in
ass_path = None
if req.captions == "burn-in" and words_dicts:
ass_content = generate_ass(words_dicts, deleted_set)
tmp = tempfile.NamedTemporaryFile(suffix=".ass", delete=False, mode="w", encoding="utf-8")
tmp.write(ass_content)
tmp.close()
ass_path = tmp.name
try:
if use_stream_copy:
output = export_stream_copy(working_input, req.output_path, segments)
elif ass_path:
output = export_reencode_with_subs(
working_input,
req.output_path,
segments,
ass_path,
resolution=req.resolution,
format_hint=req.format,
2026-04-03 11:14:31 -06:00
mute_ranges=mute_segments,
2026-04-15 16:10:35 -06:00
gain_ranges=mapped_gain_segments,
speed_ranges=speed_segments,
2026-04-15 16:10:35 -06:00
global_gain_db=req.global_gain_db,
2026-05-04 17:43:00 -06:00
normalize_loudness=req.normalize_loudness,
normalize_target_lufs=req.normalize_target_lufs,
zoom_config=zoom_dict,
)
else:
output = export_reencode(
working_input,
req.output_path,
segments,
resolution=req.resolution,
format_hint=req.format,
2026-04-03 11:14:31 -06:00
mute_ranges=mute_segments,
2026-04-15 16:10:35 -06:00
gain_ranges=mapped_gain_segments,
speed_ranges=speed_segments,
2026-04-15 16:10:35 -06:00
global_gain_db=req.global_gain_db,
2026-05-04 17:43:00 -06:00
normalize_loudness=req.normalize_loudness,
normalize_target_lufs=req.normalize_target_lufs,
zoom_config=zoom_dict,
)
finally:
if ass_path and os.path.exists(ass_path):
os.unlink(ass_path)
# Audio enhancement: clean, then mux back into the exported video
if req.enhanceAudio:
try:
tmp_dir = tempfile.mkdtemp(prefix="cutscript_audio_")
cleaned_audio = os.path.join(tmp_dir, "cleaned.wav")
clean_audio(output, cleaned_audio)
muxed_path = output + ".muxed.mp4"
_mux_audio(output, cleaned_audio, muxed_path)
os.replace(muxed_path, output)
logger.info(f"Audio enhanced and muxed into {output}")
try:
os.remove(cleaned_audio)
os.rmdir(tmp_dir)
except OSError:
pass
except Exception as e:
logger.warning(f"Audio enhancement failed (non-fatal): {e}")
# Background removal (post-process)
if req.remove_background:
try:
bg_output = output + ".nobg.mp4"
remove_bg(output, bg_output, req.background_replacement, req.background_replacement_value)
os.replace(bg_output, output)
logger.info("Background removed from %s", output)
except Exception as e:
logger.warning(f"Background removal failed (non-fatal): {e}")
# Background music mixing (post-process)
if req.background_music:
try:
music_output = output + ".music.mp4"
mix_background_music(
output,
req.background_music.path,
music_output,
volume_db=req.background_music.volumeDb,
ducking_enabled=req.background_music.duckingEnabled,
ducking_db=req.background_music.duckingDb,
ducking_attack_ms=req.background_music.duckingAttackMs,
ducking_release_ms=req.background_music.duckingReleaseMs,
)
os.replace(music_output, output)
logger.info("Background music mixed into %s", output)
except Exception as e:
logger.warning(f"Background music mixing failed (non-fatal): {e}")
# Sidecar SRT: generate and save alongside video
srt_path = None
if req.captions == "sidecar" and words_dicts:
srt_content = generate_srt(words_dicts, deleted_set)
srt_path = req.output_path.rsplit(".", 1)[0] + ".srt"
save_captions(srt_content, srt_path)
logger.info(f"Sidecar SRT saved to {srt_path}")
# Cleanup pre-concat temp file
if has_additional and working_input != req.input_path and os.path.exists(working_input):
try:
os.remove(working_input)
except OSError:
pass
result = {"status": "ok", "output_path": output}
if srt_path:
result["srt_path"] = srt_path
return result
2026-04-15 17:13:56 -06:00
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except RuntimeError as e:
logger.error(f"Export failed: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
except Exception as e:
logger.error(f"Export error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
2026-05-04 23:54:14 -06:00
@router.post("/export/transcript")
async def export_transcript(req: TranscriptExportRequest):
"""Export transcript as plain text or SRT without rendering video."""
try:
from services.caption_generator import generate_srt
deleted_set = set(req.deleted_indices or [])
word_dicts = [w.model_dump() for w in req.words]
if req.format == "srt":
content = generate_srt(word_dicts, deleted_set)
else:
# Plain text: join non-deleted words
active_words = []
for i, w in enumerate(word_dicts):
if i not in deleted_set:
active_words.append(w["word"])
content = " ".join(active_words)
os.makedirs(os.path.dirname(req.output_path) or ".", exist_ok=True)
with open(req.output_path, "w", encoding="utf-8") as f:
f.write(content)
logger.info("Transcript exported to %s (format=%s)", req.output_path, req.format)
return {"status": "ok", "output_path": req.output_path}
except Exception as e:
logger.error(f"Transcript export failed: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))