Files
TalkEdit/backend/services/video_editor.py

513 lines
17 KiB
Python
Raw Normal View History

"""
FFmpeg-based video cutting engine.
Uses stream copy for fast, lossless cuts and falls back to re-encode when needed.
"""
import logging
import subprocess
import tempfile
import os
from pathlib import Path
from typing import List
logger = logging.getLogger(__name__)
def _clamp_speed(speed: float) -> float:
return max(0.25, min(4.0, float(speed)))
def _build_atempo_chain(speed: float) -> str:
"""Build an FFmpeg atempo chain since each atempo node only supports 0.5..2.0."""
s = _clamp_speed(speed)
filters = []
while s > 2.0:
filters.append("atempo=2.0")
s /= 2.0
while s < 0.5:
filters.append("atempo=0.5")
s /= 0.5
filters.append(f"atempo={s:.6f}")
return ",".join(filters)
def _split_keep_segments_by_speed(
keep_segments: List[dict],
speed_ranges: List[dict] = None,
) -> List[dict]:
"""Split keep segments by speed ranges, attaching speed multiplier per piece."""
if not keep_segments:
return []
normalized_ranges = []
for r in speed_ranges or []:
start = float(r.get("start", 0.0))
end = float(r.get("end", 0.0))
if end <= start:
continue
normalized_ranges.append({
"start": start,
"end": end,
"speed": _clamp_speed(float(r.get("speed", 1.0))),
})
normalized_ranges.sort(key=lambda x: x["start"])
result = []
for keep in keep_segments:
k_start = float(keep["start"])
k_end = float(keep["end"])
if k_end <= k_start:
continue
cuts = {k_start, k_end}
for sr in normalized_ranges:
overlap_start = max(k_start, sr["start"])
overlap_end = min(k_end, sr["end"])
if overlap_end > overlap_start:
cuts.add(overlap_start)
cuts.add(overlap_end)
points = sorted(cuts)
for i in range(len(points) - 1):
seg_start = points[i]
seg_end = points[i + 1]
if seg_end - seg_start < 1e-6:
continue
speed = 1.0
for sr in normalized_ranges:
if seg_start >= sr["start"] and seg_end <= sr["end"]:
speed = sr["speed"]
break
result.append({"start": seg_start, "end": seg_end, "speed": speed})
return result
def _find_ffmpeg() -> str:
"""Locate ffmpeg binary."""
for cmd in ["ffmpeg", "ffmpeg.exe"]:
try:
subprocess.run([cmd, "-version"], capture_output=True, check=True)
return cmd
except (FileNotFoundError, subprocess.CalledProcessError):
continue
raise RuntimeError("FFmpeg not found. Install it or add it to PATH.")
def export_stream_copy(
input_path: str,
output_path: str,
keep_segments: List[dict],
2026-04-03 11:14:31 -06:00
mute_ranges: List[dict] = None,
) -> str:
"""
Export video using FFmpeg concat demuxer with stream copy.
~100x faster than re-encoding. No quality loss.
2026-04-03 11:14:31 -06:00
Falls back to re-encoding if mute_ranges are provided.
Args:
input_path: source video file
output_path: destination file
keep_segments: list of {"start": float, "end": float} to keep
2026-04-03 11:14:31 -06:00
mute_ranges: list of {"start": float, "end": float} to mute (optional)
Returns:
output_path on success
"""
2026-04-03 11:14:31 -06:00
if mute_ranges:
# Mute ranges require audio filtering, so fall back to re-encoding
return export_reencode(input_path, output_path, keep_segments, "1080p", "mp4", mute_ranges)
ffmpeg = _find_ffmpeg()
input_path = str(Path(input_path).resolve())
output_path = str(Path(output_path).resolve())
if not keep_segments:
raise ValueError("No segments to export")
temp_dir = tempfile.mkdtemp(prefix="aive_export_")
try:
segment_files = []
for i, seg in enumerate(keep_segments):
seg_file = os.path.join(temp_dir, f"seg_{i:04d}.ts")
cmd = [
ffmpeg, "-y",
"-ss", str(seg["start"]),
"-to", str(seg["end"]),
"-i", input_path,
"-c", "copy",
"-avoid_negative_ts", "make_zero",
"-f", "mpegts",
seg_file,
]
logger.info(f"Extracting segment {i}: {seg['start']:.2f}s - {seg['end']:.2f}s")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.warning(f"Stream copy segment {i} failed, will try re-encode: {result.stderr[-200:]}")
return export_reencode(input_path, output_path, keep_segments)
segment_files.append(seg_file)
concat_str = "|".join(segment_files)
cmd = [
ffmpeg, "-y",
"-i", f"concat:{concat_str}",
"-c", "copy",
"-movflags", "+faststart",
output_path,
]
logger.info(f"Concatenating {len(segment_files)} segments -> {output_path}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.warning(f"Concat failed, falling back to re-encode: {result.stderr[-200:]}")
return export_reencode(input_path, output_path, keep_segments)
return output_path
finally:
for f in os.listdir(temp_dir):
try:
os.remove(os.path.join(temp_dir, f))
except OSError:
pass
try:
os.rmdir(temp_dir)
except OSError:
pass
def export_reencode(
input_path: str,
output_path: str,
keep_segments: List[dict],
resolution: str = "1080p",
format_hint: str = "mp4",
2026-04-03 11:14:31 -06:00
mute_ranges: List[dict] = None,
2026-04-15 16:10:35 -06:00
gain_ranges: List[dict] = None,
speed_ranges: List[dict] = None,
2026-04-15 16:10:35 -06:00
global_gain_db: float = 0.0,
) -> str:
"""
Export video with full re-encode. Slower but supports resolution changes,
format conversion, and avoids stream-copy edge cases.
2026-04-03 11:14:31 -06:00
If mute_ranges are provided, applies audio muting instead of cutting.
"""
ffmpeg = _find_ffmpeg()
input_path = str(Path(input_path).resolve())
output_path = str(Path(output_path).resolve())
scale_map = {
"720p": "scale=-2:720",
"1080p": "scale=-2:1080",
"4k": "scale=-2:2160",
}
2026-04-15 16:10:35 -06:00
def build_audio_filter() -> str:
filters = []
if abs(float(global_gain_db)) > 1e-6:
filters.append(f"volume={float(global_gain_db)}dB")
for gain_range in gain_ranges or []:
start = gain_range['start']
end = gain_range['end']
gain_db = gain_range.get('gain_db', 0.0)
filters.append(f"volume={float(gain_db)}dB:enable='between(t,{start},{end})'")
for mute_range in mute_ranges or []:
2026-04-03 11:14:31 -06:00
start = mute_range['start']
end = mute_range['end']
2026-04-15 16:10:35 -06:00
filters.append(f"volume=0:enable='between(t,{start},{end})'")
2026-04-03 11:14:31 -06:00
2026-04-15 16:10:35 -06:00
return ",".join(filters) if filters else "anull"
has_audio_filters = bool(mute_ranges) or bool(gain_ranges) or abs(float(global_gain_db)) > 1e-6
speed_segments = _split_keep_segments_by_speed(keep_segments, speed_ranges)
has_speed = any(abs(seg.get("speed", 1.0) - 1.0) > 1e-6 for seg in speed_segments)
# Handle filtered full-timeline audio case (mute/gain/global gain) when no speed warping is needed
if has_audio_filters and not has_speed:
2026-04-15 16:10:35 -06:00
audio_filter = build_audio_filter()
2026-04-03 11:14:31 -06:00
# Video filter - just scaling if needed
scale = scale_map.get(resolution, "")
if scale:
video_filter = scale
video_map = "[v]"
else:
video_filter = "null"
video_map = "0:v"
filter_complex = f"[0:a]{audio_filter}[a];[0:v]{video_filter}{video_map}"
codec_args = ["-c:v", "libx264", "-preset", "medium", "-crf", "18", "-c:a", "aac", "-b:a", "192k"]
if format_hint == "webm":
codec_args = ["-c:v", "libvpx-vp9", "-crf", "30", "-b:v", "0", "-c:a", "libopus"]
2026-04-03 11:14:31 -06:00
cmd = [
ffmpeg, "-y",
"-i", input_path,
"-filter_complex", filter_complex,
"-map", video_map,
"-map", "[a]",
*codec_args,
"-movflags", "+faststart",
output_path,
]
2026-04-15 16:10:35 -06:00
logger.info(
"Re-encoding with audio filters (mute=%s gain=%s global=%s) -> %s (%s)",
len(mute_ranges or []),
len(gain_ranges or []),
global_gain_db,
output_path,
resolution,
)
else:
# Cutting logic with optional per-segment speed changes
2026-04-03 11:14:31 -06:00
if not keep_segments:
raise ValueError("No segments to export")
segments_for_concat = speed_segments if speed_segments else _split_keep_segments_by_speed(keep_segments, None)
if not segments_for_concat:
raise ValueError("No segments to export")
2026-04-03 11:14:31 -06:00
filter_parts = []
for i, seg in enumerate(segments_for_concat):
speed = _clamp_speed(seg.get("speed", 1.0))
v_chain = f"trim=start={seg['start']}:end={seg['end']},setpts=PTS-STARTPTS"
a_chain = f"atrim=start={seg['start']}:end={seg['end']},asetpts=PTS-STARTPTS"
if abs(speed - 1.0) > 1e-6:
v_chain += f",setpts=PTS/{speed:.6f}"
a_chain += f",{_build_atempo_chain(speed)}"
filter_parts.append(f"[0:v]{v_chain}[v{i}];[0:a]{a_chain}[a{i}];")
n = len(segments_for_concat)
2026-04-03 11:14:31 -06:00
concat_inputs = "".join(f"[v{i}][a{i}]" for i in range(n))
filter_parts.append(f"{concat_inputs}concat=n={n}:v=1:a=1[outv][outa]")
filter_complex = "".join(filter_parts)
scale = scale_map.get(resolution, "")
if scale:
filter_complex += f";[outv]{scale}[outv_scaled]"
video_map = "[outv_scaled]"
else:
video_map = "[outv]"
codec_args = ["-c:v", "libx264", "-preset", "medium", "-crf", "18", "-c:a", "aac", "-b:a", "192k"]
if format_hint == "webm":
codec_args = ["-c:v", "libvpx-vp9", "-crf", "30", "-b:v", "0", "-c:a", "libopus"]
cmd = [
ffmpeg, "-y",
"-i", input_path,
"-filter_complex", filter_complex,
"-map", video_map,
"-map", "[outa]",
*codec_args,
"-movflags", "+faststart",
output_path,
]
logger.info(
"Re-encoding %s segments (speed-adjusted=%s) -> %s (%s)",
n,
has_speed,
output_path,
resolution,
)
2026-04-03 11:14:31 -06:00
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg re-encode failed: {result.stderr[-500:]}")
return output_path
def export_reencode_with_subs(
input_path: str,
output_path: str,
keep_segments: List[dict],
subtitle_path: str,
resolution: str = "1080p",
format_hint: str = "mp4",
2026-04-03 11:14:31 -06:00
mute_ranges: List[dict] = None,
2026-04-15 16:10:35 -06:00
gain_ranges: List[dict] = None,
speed_ranges: List[dict] = None,
2026-04-15 16:10:35 -06:00
global_gain_db: float = 0.0,
) -> str:
"""
Export video with re-encode and burn-in subtitles (ASS format).
Applies trim+concat first, then overlays the subtitle file.
2026-04-03 11:14:31 -06:00
If mute_ranges are provided, applies audio muting instead of cutting.
"""
ffmpeg = _find_ffmpeg()
input_path = str(Path(input_path).resolve())
output_path = str(Path(output_path).resolve())
subtitle_path = str(Path(subtitle_path).resolve())
scale_map = {
"720p": "scale=-2:720",
"1080p": "scale=-2:1080",
"4k": "scale=-2:2160",
}
2026-04-15 16:10:35 -06:00
def build_audio_filter() -> str:
filters = []
if abs(float(global_gain_db)) > 1e-6:
filters.append(f"volume={float(global_gain_db)}dB")
for gain_range in gain_ranges or []:
start = gain_range['start']
end = gain_range['end']
gain_db = gain_range.get('gain_db', 0.0)
filters.append(f"volume={float(gain_db)}dB:enable='between(t,{start},{end})'")
for mute_range in mute_ranges or []:
2026-04-03 11:14:31 -06:00
start = mute_range['start']
end = mute_range['end']
2026-04-15 16:10:35 -06:00
filters.append(f"volume=0:enable='between(t,{start},{end})'")
2026-04-03 11:14:31 -06:00
2026-04-15 16:10:35 -06:00
return ",".join(filters) if filters else "anull"
has_audio_filters = bool(mute_ranges) or bool(gain_ranges) or abs(float(global_gain_db)) > 1e-6
speed_segments = _split_keep_segments_by_speed(keep_segments, speed_ranges)
has_speed = any(abs(seg.get("speed", 1.0) - 1.0) > 1e-6 for seg in speed_segments)
# Handle filtered full-timeline audio case (mute/gain/global gain) when no speed warping is needed
if has_audio_filters and not has_speed:
2026-04-15 16:10:35 -06:00
audio_filter = build_audio_filter()
2026-04-03 11:14:31 -06:00
# Video filter with subtitles
escaped_sub = subtitle_path.replace("\\", "/").replace(":", "\\:")
scale = scale_map.get(resolution, "")
if scale:
video_filter = f"{scale},ass='{escaped_sub}'"
else:
video_filter = f"ass='{escaped_sub}'"
filter_complex = f"[0:a]{audio_filter}[a];[0:v]{video_filter}[v]"
codec_args = ["-c:v", "libx264", "-preset", "medium", "-crf", "18", "-c:a", "aac", "-b:a", "192k"]
if format_hint == "webm":
codec_args = ["-c:v", "libvpx-vp9", "-crf", "30", "-b:v", "0", "-c:a", "libopus"]
cmd = [
ffmpeg, "-y",
"-i", input_path,
"-filter_complex", filter_complex,
"-map", "[v]",
"-map", "[a]",
*codec_args,
"-movflags", "+faststart",
output_path,
]
2026-04-15 16:10:35 -06:00
logger.info(
"Re-encoding with subtitles and audio filters (mute=%s gain=%s global=%s) -> %s (%s)",
len(mute_ranges or []),
len(gain_ranges or []),
global_gain_db,
output_path,
resolution,
)
2026-04-03 11:14:31 -06:00
else:
# Cutting logic with subtitles and optional speed changes
2026-04-03 11:14:31 -06:00
if not keep_segments:
raise ValueError("No segments to export")
segments_for_concat = speed_segments if speed_segments else _split_keep_segments_by_speed(keep_segments, None)
if not segments_for_concat:
raise ValueError("No segments to export")
filter_parts = []
for i, seg in enumerate(segments_for_concat):
speed = _clamp_speed(seg.get("speed", 1.0))
v_chain = f"trim=start={seg['start']}:end={seg['end']},setpts=PTS-STARTPTS"
a_chain = f"atrim=start={seg['start']}:end={seg['end']},asetpts=PTS-STARTPTS"
if abs(speed - 1.0) > 1e-6:
v_chain += f",setpts=PTS/{speed:.6f}"
a_chain += f",{_build_atempo_chain(speed)}"
filter_parts.append(f"[0:v]{v_chain}[v{i}];[0:a]{a_chain}[a{i}];")
n = len(segments_for_concat)
2026-04-03 11:14:31 -06:00
concat_inputs = "".join(f"[v{i}][a{i}]" for i in range(n))
filter_parts.append(f"{concat_inputs}concat=n={n}:v=1:a=1[outv][outa]")
filter_complex = "".join(filter_parts)
# Escape path for FFmpeg subtitle filter (Windows backslashes need escaping)
escaped_sub = subtitle_path.replace("\\", "/").replace(":", "\\:")
scale = scale_map.get(resolution, "")
if scale:
filter_complex += f";[outv]{scale},ass='{escaped_sub}'[outv_final]"
else:
filter_complex += f";[outv]ass='{escaped_sub}'[outv_final]"
video_map = "[outv_final]"
codec_args = ["-c:v", "libx264", "-preset", "medium", "-crf", "18", "-c:a", "aac", "-b:a", "192k"]
if format_hint == "webm":
codec_args = ["-c:v", "libvpx-vp9", "-crf", "30", "-b:v", "0", "-c:a", "libopus"]
cmd = [
ffmpeg, "-y",
"-i", input_path,
"-filter_complex", filter_complex,
"-map", video_map,
"-map", "[outa]",
*codec_args,
"-movflags", "+faststart",
output_path,
]
logger.info(
"Re-encoding %s segments with subtitles (speed-adjusted=%s) -> %s (%s)",
n,
has_speed,
output_path,
resolution,
)
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg re-encode with subs failed: {result.stderr[-500:]}")
return output_path
def get_video_info(input_path: str) -> dict:
"""Get basic video metadata using ffprobe."""
ffmpeg = _find_ffmpeg()
ffprobe = ffmpeg.replace("ffmpeg", "ffprobe")
cmd = [
ffprobe, "-v", "quiet",
"-print_format", "json",
"-show_format", "-show_streams",
str(input_path),
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
import json
data = json.loads(result.stdout)
fmt = data.get("format", {})
video_stream = next((s for s in data.get("streams", []) if s.get("codec_type") == "video"), {})
return {
"duration": float(fmt.get("duration", 0)),
"size": int(fmt.get("size", 0)),
"format": fmt.get("format_name", ""),
"width": int(video_stream.get("width", 0)),
"height": int(video_stream.get("height", 0)),
"codec": video_stream.get("codec_name", ""),
"fps": eval(video_stream.get("r_frame_rate", "0/1")) if "/" in video_stream.get("r_frame_rate", "") else 0,
}
except Exception as e:
logger.error(f"Failed to get video info: {e}")
return {}