""" FFmpeg-based video cutting engine. Uses stream copy for fast, lossless cuts and falls back to re-encode when needed. """ import logging import subprocess import tempfile import os from pathlib import Path from typing import List logger = logging.getLogger(__name__) def _get_codec_args(format_hint: str, has_video: bool = True) -> list: """Return FFmpeg codec arguments for the given format.""" if format_hint == "wav": return ["-c:a", "pcm_s16le"] if format_hint == "webm": if has_video: return ["-c:v", "libvpx-vp9", "-crf", "30", "-b:v", "0", "-c:a", "libopus"] return ["-c:a", "libopus", "-b:a", "160k"] # Default: MP4 if has_video: return ["-c:v", "libx264", "-preset", "medium", "-crf", "18", "-c:a", "aac", "-b:a", "192k"] return ["-c:a", "aac", "-b:a", "192k"] def _input_has_video_stream(ffmpeg_cmd: str, input_path: str) -> bool: """Return True if the input contains at least one video stream.""" ffprobe = ffmpeg_cmd.replace("ffmpeg", "ffprobe") cmd = [ ffprobe, "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=index", "-of", "csv=p=0", str(input_path), ] try: result = subprocess.run(cmd, capture_output=True, text=True) return result.returncode == 0 and bool(result.stdout.strip()) except Exception: return False def _clamp_speed(speed: float) -> float: return max(0.25, min(4.0, float(speed))) def _build_atempo_chain(speed: float) -> str: """Build an FFmpeg atempo chain since each atempo node only supports 0.5..2.0.""" s = _clamp_speed(speed) filters = [] while s > 2.0: filters.append("atempo=2.0") s /= 2.0 while s < 0.5: filters.append("atempo=0.5") s /= 0.5 filters.append(f"atempo={s:.6f}") return ",".join(filters) def _split_keep_segments_by_speed( keep_segments: List[dict], speed_ranges: List[dict] = None, ) -> List[dict]: """Split keep segments by speed ranges, attaching speed multiplier per piece.""" if not keep_segments: return [] normalized_ranges = [] for r in speed_ranges or []: start = float(r.get("start", 0.0)) end = float(r.get("end", 0.0)) if end <= start: continue normalized_ranges.append({ "start": start, "end": end, "speed": _clamp_speed(float(r.get("speed", 1.0))), }) normalized_ranges.sort(key=lambda x: x["start"]) result = [] for keep in keep_segments: k_start = float(keep["start"]) k_end = float(keep["end"]) if k_end <= k_start: continue cuts = {k_start, k_end} for sr in normalized_ranges: overlap_start = max(k_start, sr["start"]) overlap_end = min(k_end, sr["end"]) if overlap_end > overlap_start: cuts.add(overlap_start) cuts.add(overlap_end) points = sorted(cuts) for i in range(len(points) - 1): seg_start = points[i] seg_end = points[i + 1] if seg_end - seg_start < 1e-6: continue speed = 1.0 for sr in normalized_ranges: if seg_start >= sr["start"] and seg_end <= sr["end"]: speed = sr["speed"] break result.append({"start": seg_start, "end": seg_end, "speed": speed}) return result def _build_zoom_filter(zoom_config: dict = None) -> str: """Build FFmpeg video filter snippet for zoom/punch-in effect. zoom_config: {enabled, zoomFactor, panX, panY} Returns empty string if disabled. Should be prepended to the video filter chain. """ if not zoom_config or not zoom_config.get("enabled"): return "" factor = float(zoom_config.get("zoomFactor", 1.0)) if abs(factor - 1.0) < 0.01: return "" pan_x = float(zoom_config.get("panX", 0.0)) pan_y = float(zoom_config.get("panY", 0.0)) return f"crop=iw/{factor}:ih/{factor}:((iw-iw/{factor})/2)+({pan_x}*(iw-iw/{factor})/2):((ih-ih/{factor})/2)+({pan_y}*(ih-ih/{factor})/2),scale=iw:ih" def mix_background_music( video_path: str, music_path: str, output_path: str, volume_db: float = 0.0, ducking_enabled: bool = False, ducking_db: float = 6.0, ducking_attack_ms: float = 10.0, ducking_release_ms: float = 200.0, ) -> str: """Mix background music into a video with optional ducking. Uses FFmpeg amix + sidechaincompress. Output is written to output_path. """ ffmpeg = _find_ffmpeg() escaped_music = music_path.replace("\\", "/").replace(":", "\\:") # Build the filter graph if ducking_enabled: filter_complex = ( f"[0:a]asplit[main][sidechain];" f"movie='{escaped_music}':loop=0,volume={volume_db}dB[music];" f"[main][music]amix=inputs=2:duration=first:dropout_transition=2[mixed];" f"[mixed][sidechain]sidechaincompress=" f"threshold=-30dB:ratio=100:attack={ducking_attack_ms}ms:" f"release={ducking_release_ms}ms:makeup=1:level_sc={ducking_db}[outa]" ) else: filter_complex = ( f"movie='{escaped_music}':loop=0,volume={volume_db}dB[music];" f"[0:a][music]amix=inputs=2:duration=first:dropout_transition=2[outa]" ) cmd = [ ffmpeg, "-y", "-i", video_path, "-filter_complex", filter_complex, "-map", "0:v", "-map", "[outa]", "-c:v", "copy", "-c:a", "aac", "-b:a", "192k", "-shortest", output_path, ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(f"Background music mix failed: {result.stderr[-500:]}") return output_path def concat_clips( main_path: str, append_paths: list, output_path: str, ) -> str: """Concatenate multiple video clips using FFmpeg concat demuxer. The main_path is kept as-is. append_paths are appended after it. """ if not append_paths: raise ValueError("No clips to concatenate") ffmpeg = _find_ffmpeg() import tempfile import os temp_dir = tempfile.mkdtemp(prefix="aive_concat_") try: segment_files = [main_path] segment_files.extend(append_paths) # Create concat file list concat_file = os.path.join(temp_dir, "concat.txt") with open(concat_file, "w") as f: for path in segment_files: resolved = os.path.abspath(path) f.write(f"file '{resolved}'\n") cmd = [ ffmpeg, "-y", "-f", "concat", "-safe", "0", "-i", concat_file, "-c", "copy", "-movflags", "+faststart", output_path, ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(f"Clip concat failed: {result.stderr[-500:]}") return output_path finally: for f in os.listdir(temp_dir): try: os.remove(os.path.join(temp_dir, f)) except OSError: pass try: os.rmdir(temp_dir) except OSError: pass def _find_ffmpeg() -> str: """Locate ffmpeg binary.""" for cmd in ["ffmpeg", "ffmpeg.exe"]: try: subprocess.run([cmd, "-version"], capture_output=True, check=True) return cmd except (FileNotFoundError, subprocess.CalledProcessError): continue raise RuntimeError("FFmpeg not found. Install it or add it to PATH.") def export_stream_copy( input_path: str, output_path: str, keep_segments: List[dict], mute_ranges: List[dict] = None, ) -> str: """ Export video using FFmpeg concat demuxer with stream copy. ~100x faster than re-encoding. No quality loss. Falls back to re-encoding if mute_ranges are provided. Args: input_path: source video file output_path: destination file keep_segments: list of {"start": float, "end": float} to keep mute_ranges: list of {"start": float, "end": float} to mute (optional) Returns: output_path on success """ if mute_ranges: # Mute ranges require audio filtering, so fall back to re-encode return export_reencode(input_path, output_path, keep_segments, "1080p", "mp4", mute_ranges) ffmpeg = _find_ffmpeg() if not _input_has_video_stream(ffmpeg, input_path): # Audio-only inputs cannot use TS segment stream-copy concat reliably. return export_reencode(input_path, output_path, keep_segments) input_path = str(Path(input_path).resolve()) output_path = str(Path(output_path).resolve()) if not keep_segments: raise ValueError("No segments to export") temp_dir = tempfile.mkdtemp(prefix="aive_export_") try: segment_files = [] for i, seg in enumerate(keep_segments): seg_file = os.path.join(temp_dir, f"seg_{i:04d}.ts") cmd = [ ffmpeg, "-y", "-ss", str(seg["start"]), "-to", str(seg["end"]), "-i", input_path, "-c", "copy", "-avoid_negative_ts", "make_zero", "-f", "mpegts", seg_file, ] logger.info(f"Extracting segment {i}: {seg['start']:.2f}s - {seg['end']:.2f}s") result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: logger.warning(f"Stream copy segment {i} failed, will try re-encode: {result.stderr[-200:]}") return export_reencode(input_path, output_path, keep_segments) segment_files.append(seg_file) concat_str = "|".join(segment_files) cmd = [ ffmpeg, "-y", "-i", f"concat:{concat_str}", "-c", "copy", "-movflags", "+faststart", output_path, ] logger.info(f"Concatenating {len(segment_files)} segments -> {output_path}") result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: logger.warning(f"Concat failed, falling back to re-encode: {result.stderr[-200:]}") return export_reencode(input_path, output_path, keep_segments) return output_path finally: for f in os.listdir(temp_dir): try: os.remove(os.path.join(temp_dir, f)) except OSError: pass try: os.rmdir(temp_dir) except OSError: pass def _apply_zoom_post(input_path: str, output_path: str, zoom_config: dict) -> str: """Re-encode video applying zoom/punch-in crop+scale as a post-process step.""" ffmpeg = _find_ffmpeg() zoom_filter = _build_zoom_filter(zoom_config) if not zoom_filter: return input_path cmd = [ ffmpeg, "-y", "-i", input_path, "-filter_complex", f"[0:v]{zoom_filter}[v]", "-map", "[v]", "-map", "0:a?", "-c:a", "copy", "-movflags", "+faststart", output_path, ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(f"Zoom post-process failed: {result.stderr[-500:]}") return output_path def export_reencode( input_path: str, output_path: str, keep_segments: List[dict], resolution: str = "1080p", format_hint: str = "mp4", mute_ranges: List[dict] = None, gain_ranges: List[dict] = None, speed_ranges: List[dict] = None, global_gain_db: float = 0.0, normalize_loudness: bool = False, normalize_target_lufs: float = -14.0, zoom_config: dict = None, ) -> str: """ Export video with full re-encode. Slower but supports resolution changes, format conversion, and avoids stream-copy edge cases. If mute_ranges are provided, applies audio muting instead of cutting. """ ffmpeg = _find_ffmpeg() input_path = str(Path(input_path).resolve()) output_path = str(Path(output_path).resolve()) scale_map = { "720p": "scale=-2:720", "1080p": "scale=-2:1080", "4k": "scale=-2:2160", } def build_audio_filter() -> str: filters = [] if abs(float(global_gain_db)) > 1e-6: filters.append(f"volume={float(global_gain_db)}dB") for gain_range in gain_ranges or []: start = gain_range['start'] end = gain_range['end'] gain_db = gain_range.get('gain_db', 0.0) filters.append(f"volume={float(gain_db)}dB:enable='between(t,{start},{end})'") for mute_range in mute_ranges or []: start = mute_range['start'] end = mute_range['end'] filters.append(f"volume=0:enable='between(t,{start},{end})'") if normalize_loudness: filters.append(f"loudnorm=I={normalize_target_lufs}:LRA=7:TP=-1.5") return ",".join(filters) if filters else "anull" has_audio_filters = bool(mute_ranges) or bool(gain_ranges) or abs(float(global_gain_db)) > 1e-6 has_video = _input_has_video_stream(ffmpeg, input_path) speed_segments = _split_keep_segments_by_speed(keep_segments, speed_ranges) has_speed = any(abs(seg.get("speed", 1.0) - 1.0) > 1e-6 for seg in speed_segments) if not has_video: if not keep_segments: raise ValueError("No segments to export") segments_for_concat = speed_segments if speed_segments else _split_keep_segments_by_speed(keep_segments, None) if not segments_for_concat: raise ValueError("No segments to export") filter_parts = [] for i, seg in enumerate(segments_for_concat): speed = _clamp_speed(seg.get("speed", 1.0)) a_chain = f"atrim=start={seg['start']}:end={seg['end']},asetpts=PTS-STARTPTS" if abs(speed - 1.0) > 1e-6: a_chain += f",{_build_atempo_chain(speed)}" filter_parts.append(f"[0:a]{a_chain}[a{i}];") n = len(segments_for_concat) concat_inputs = "".join(f"[a{i}]" for i in range(n)) filter_parts.append(f"{concat_inputs}concat=n={n}:v=0:a=1[outa_raw]") audio_filter = build_audio_filter() if audio_filter != "anull": filter_parts.append(f";[outa_raw]{audio_filter}[outa]") audio_map = "[outa]" else: audio_map = "[outa_raw]" filter_complex = "".join(filter_parts) codec_args = _get_codec_args(format_hint, has_video=False) cmd = [ ffmpeg, "-y", "-i", input_path, "-filter_complex", filter_complex, "-map", audio_map, *codec_args, output_path, ] logger.info( "Re-encoding audio-only input (%s segments, speed-adjusted=%s) -> %s", n, has_speed, output_path, ) result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(f"FFmpeg audio-only export failed: {result.stderr[-500:]}") return output_path # Handle filtered full-timeline audio case (mute/gain/global gain) when no speed warping is needed if has_audio_filters and not has_speed: audio_filter = build_audio_filter() # Video filter - just scaling if needed scale = scale_map.get(resolution, "") if scale: video_filter = scale video_map = "[v]" else: video_filter = "null" video_map = "0:v" filter_complex = f"[0:a]{audio_filter}[a];[0:v]{video_filter}{video_map}" codec_args = _get_codec_args(format_hint, has_video) cmd = [ ffmpeg, "-y", "-i", input_path, "-filter_complex", filter_complex, "-map", video_map, "-map", "[a]", *codec_args, "-movflags", "+faststart", output_path, ] logger.info( "Re-encoding with audio filters (mute=%s gain=%s global=%s) -> %s (%s)", len(mute_ranges or []), len(gain_ranges or []), global_gain_db, output_path, resolution, ) else: # Cutting logic with optional per-segment speed changes if not keep_segments: raise ValueError("No segments to export") segments_for_concat = speed_segments if speed_segments else _split_keep_segments_by_speed(keep_segments, None) if not segments_for_concat: raise ValueError("No segments to export") filter_parts = [] for i, seg in enumerate(segments_for_concat): speed = _clamp_speed(seg.get("speed", 1.0)) v_chain = f"trim=start={seg['start']}:end={seg['end']},setpts=PTS-STARTPTS" a_chain = f"atrim=start={seg['start']}:end={seg['end']},asetpts=PTS-STARTPTS" if abs(speed - 1.0) > 1e-6: v_chain += f",setpts=PTS/{speed:.6f}" a_chain += f",{_build_atempo_chain(speed)}" filter_parts.append(f"[0:v]{v_chain}[v{i}];[0:a]{a_chain}[a{i}];") n = len(segments_for_concat) concat_inputs = "".join(f"[v{i}][a{i}]" for i in range(n)) filter_parts.append(f"{concat_inputs}concat=n={n}:v=1:a=1[outv][outa]") filter_complex = "".join(filter_parts) # Add loudnorm to the cutting path audio chain if enabled audio_map_label = "[outa]" if normalize_loudness: filter_complex += f";{audio_map_label}loudnorm=I={normalize_target_lufs}:LRA=7:TP=-1.5[outa_norm]" audio_map_label = "[outa_norm]" scale = scale_map.get(resolution, "") if scale: filter_complex += f";[outv]{scale}[outv_scaled]" video_map = "[outv_scaled]" else: video_map = "[outv]" codec_args = _get_codec_args(format_hint, has_video) cmd = [ ffmpeg, "-y", "-i", input_path, "-filter_complex", filter_complex, "-map", video_map, "-map", audio_map_label, *codec_args, "-movflags", "+faststart", output_path, ] logger.info( "Re-encoding %s segments (speed-adjusted=%s, normalize=%s) -> %s (%s)", n, has_speed, normalize_loudness, output_path, resolution, ) result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(f"FFmpeg re-encode failed: {result.stderr[-500:]}") # Apply zoom post-processing if configured if zoom_config and zoom_config.get("enabled") and has_video: import tempfile as _tf import os as _os zoomed_path = output_path + ".zoomed.mp4" _apply_zoom_post(output_path, zoomed_path, zoom_config) _os.replace(zoomed_path, output_path) logger.info("Zoom/punch-in applied to %s (factor=%s)", output_path, zoom_config.get("zoomFactor", 1.0)) return output_path def export_reencode_with_subs( input_path: str, output_path: str, keep_segments: List[dict], subtitle_path: str, resolution: str = "1080p", format_hint: str = "mp4", mute_ranges: List[dict] = None, gain_ranges: List[dict] = None, speed_ranges: List[dict] = None, global_gain_db: float = 0.0, normalize_loudness: bool = False, normalize_target_lufs: float = -14.0, zoom_config: dict = None, ) -> str: """ Export video with re-encode and burn-in subtitles (ASS format). Applies trim+concat first, then overlays the subtitle file. If mute_ranges are provided, applies audio muting instead of cutting. """ ffmpeg = _find_ffmpeg() if not _input_has_video_stream(ffmpeg, input_path): raise ValueError("Burn-in captions require a video track") input_path = str(Path(input_path).resolve()) output_path = str(Path(output_path).resolve()) subtitle_path = str(Path(subtitle_path).resolve()) scale_map = { "720p": "scale=-2:720", "1080p": "scale=-2:1080", "4k": "scale=-2:2160", } def build_audio_filter() -> str: filters = [] if abs(float(global_gain_db)) > 1e-6: filters.append(f"volume={float(global_gain_db)}dB") for gain_range in gain_ranges or []: start = gain_range['start'] end = gain_range['end'] gain_db = gain_range.get('gain_db', 0.0) filters.append(f"volume={float(gain_db)}dB:enable='between(t,{start},{end})'") for mute_range in mute_ranges or []: start = mute_range['start'] end = mute_range['end'] filters.append(f"volume=0:enable='between(t,{start},{end})'") if normalize_loudness: filters.append(f"loudnorm=I={normalize_target_lufs}:LRA=7:TP=-1.5") return ",".join(filters) if filters else "anull" has_audio_filters = bool(mute_ranges) or bool(gain_ranges) or abs(float(global_gain_db)) > 1e-6 speed_segments = _split_keep_segments_by_speed(keep_segments, speed_ranges) has_speed = any(abs(seg.get("speed", 1.0) - 1.0) > 1e-6 for seg in speed_segments) # Handle filtered full-timeline audio case (mute/gain/global gain) when no speed warping is needed if has_audio_filters and not has_speed: audio_filter = build_audio_filter() # Video filter with subtitles escaped_sub = subtitle_path.replace("\\", "/").replace(":", "\\:") scale = scale_map.get(resolution, "") if scale: video_filter = f"{scale},ass='{escaped_sub}'" else: video_filter = f"ass='{escaped_sub}'" filter_complex = f"[0:a]{audio_filter}[a];[0:v]{video_filter}[v]" codec_args = _get_codec_args(format_hint, has_video=True) cmd = [ ffmpeg, "-y", "-i", input_path, "-filter_complex", filter_complex, "-map", "[v]", "-map", "[a]", *codec_args, "-movflags", "+faststart", output_path, ] logger.info( "Re-encoding with subtitles and audio filters (mute=%s gain=%s global=%s) -> %s (%s)", len(mute_ranges or []), len(gain_ranges or []), global_gain_db, output_path, resolution, ) else: # Cutting logic with subtitles and optional speed changes if not keep_segments: raise ValueError("No segments to export") segments_for_concat = speed_segments if speed_segments else _split_keep_segments_by_speed(keep_segments, None) if not segments_for_concat: raise ValueError("No segments to export") filter_parts = [] for i, seg in enumerate(segments_for_concat): speed = _clamp_speed(seg.get("speed", 1.0)) v_chain = f"trim=start={seg['start']}:end={seg['end']},setpts=PTS-STARTPTS" a_chain = f"atrim=start={seg['start']}:end={seg['end']},asetpts=PTS-STARTPTS" if abs(speed - 1.0) > 1e-6: v_chain += f",setpts=PTS/{speed:.6f}" a_chain += f",{_build_atempo_chain(speed)}" filter_parts.append(f"[0:v]{v_chain}[v{i}];[0:a]{a_chain}[a{i}];") n = len(segments_for_concat) concat_inputs = "".join(f"[v{i}][a{i}]" for i in range(n)) filter_parts.append(f"{concat_inputs}concat=n={n}:v=1:a=1[outv][outa]") filter_complex = "".join(filter_parts) # Escape path for FFmpeg subtitle filter (Windows backslashes need escaping) escaped_sub = subtitle_path.replace("\\", "/").replace(":", "\\:") scale = scale_map.get(resolution, "") if scale: filter_complex += f";[outv]{scale},ass='{escaped_sub}'[outv_final]" else: filter_complex += f";[outv]ass='{escaped_sub}'[outv_final]" video_map = "[outv_final]" codec_args = _get_codec_args(format_hint, has_video=True) cmd = [ ffmpeg, "-y", "-i", input_path, "-filter_complex", filter_complex, "-map", video_map, "-map", "[outa]", *codec_args, "-movflags", "+faststart", output_path, ] logger.info( "Re-encoding %s segments with subtitles (speed-adjusted=%s) -> %s (%s)", n, has_speed, output_path, resolution, ) result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(f"FFmpeg re-encode with subs failed: {result.stderr[-500:]}") # Apply zoom post-processing if configured if zoom_config and zoom_config.get("enabled"): import tempfile as _tf import os as _os zoomed_path = output_path + ".zoomed.mp4" _apply_zoom_post(output_path, zoomed_path, zoom_config) _os.replace(zoomed_path, output_path) logger.info("Zoom/punch-in applied to %s (factor=%s)", output_path, zoom_config.get("zoomFactor", 1.0)) return output_path def get_video_info(input_path: str) -> dict: """Get basic video metadata using ffprobe.""" ffmpeg = _find_ffmpeg() ffprobe = ffmpeg.replace("ffmpeg", "ffprobe") cmd = [ ffprobe, "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", str(input_path), ] try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) import json data = json.loads(result.stdout) fmt = data.get("format", {}) video_stream = next((s for s in data.get("streams", []) if s.get("codec_type") == "video"), {}) return { "duration": float(fmt.get("duration", 0)), "size": int(fmt.get("size", 0)), "format": fmt.get("format_name", ""), "width": int(video_stream.get("width", 0)), "height": int(video_stream.get("height", 0)), "codec": video_stream.get("codec_name", ""), "fps": eval(video_stream.get("r_frame_rate", "0/1")) if "/" in video_stream.get("r_frame_rate", "") else 0, } except Exception as e: logger.error(f"Failed to get video info: {e}") return {}