Files
TalkEdit/backend/services/video_editor.py
Your Name 33cca5f552 Initial CutScript release - Open-source AI-powered text-based video editor
CutScript is a local-first, Descript-like video editor where you edit video by editing text.
Delete a word from the transcript and it's cut from the video.

Features:
- Word-level transcription with WhisperX
- Text-based video editing with undo/redo
- AI filler word removal (Ollama/OpenAI/Claude)
- AI clip creation for shorts
- Waveform timeline with virtualized transcript
- FFmpeg stream-copy (fast) and re-encode (4K) export
- Caption burn-in and sidecar SRT generation
- Studio Sound audio enhancement (DeepFilterNet)
- Keyboard shortcuts (J/K/L, Space, Delete, Ctrl+Z/S/E)
- Encrypted API key storage
- Project save/load (.aive files)

Architecture:
- Electron + React + Tailwind (frontend)
- FastAPI + Python (backend)
- WhisperX for transcription
- FFmpeg for video processing
- Multi-provider AI support

Performance optimizations:
- RAF-throttled time updates
- Zustand selectors for granular subscriptions
- Dual-canvas waveform rendering
- Virtualized transcript with react-virtuoso

Built on top of DataAnts-AI/VideoTranscriber, completely rewritten as a desktop application.

License: MIT
2026-03-03 06:31:04 -05:00

272 lines
8.6 KiB
Python

"""
FFmpeg-based video cutting engine.
Uses stream copy for fast, lossless cuts and falls back to re-encode when needed.
"""
import logging
import subprocess
import tempfile
import os
from pathlib import Path
from typing import List
logger = logging.getLogger(__name__)
def _find_ffmpeg() -> str:
"""Locate ffmpeg binary."""
for cmd in ["ffmpeg", "ffmpeg.exe"]:
try:
subprocess.run([cmd, "-version"], capture_output=True, check=True)
return cmd
except (FileNotFoundError, subprocess.CalledProcessError):
continue
raise RuntimeError("FFmpeg not found. Install it or add it to PATH.")
def export_stream_copy(
input_path: str,
output_path: str,
keep_segments: List[dict],
) -> str:
"""
Export video using FFmpeg concat demuxer with stream copy.
~100x faster than re-encoding. No quality loss.
Args:
input_path: source video file
output_path: destination file
keep_segments: list of {"start": float, "end": float} to keep
Returns:
output_path on success
"""
ffmpeg = _find_ffmpeg()
input_path = str(Path(input_path).resolve())
output_path = str(Path(output_path).resolve())
if not keep_segments:
raise ValueError("No segments to export")
temp_dir = tempfile.mkdtemp(prefix="aive_export_")
try:
segment_files = []
for i, seg in enumerate(keep_segments):
seg_file = os.path.join(temp_dir, f"seg_{i:04d}.ts")
cmd = [
ffmpeg, "-y",
"-ss", str(seg["start"]),
"-to", str(seg["end"]),
"-i", input_path,
"-c", "copy",
"-avoid_negative_ts", "make_zero",
"-f", "mpegts",
seg_file,
]
logger.info(f"Extracting segment {i}: {seg['start']:.2f}s - {seg['end']:.2f}s")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.warning(f"Stream copy segment {i} failed, will try re-encode: {result.stderr[-200:]}")
return export_reencode(input_path, output_path, keep_segments)
segment_files.append(seg_file)
concat_str = "|".join(segment_files)
cmd = [
ffmpeg, "-y",
"-i", f"concat:{concat_str}",
"-c", "copy",
"-movflags", "+faststart",
output_path,
]
logger.info(f"Concatenating {len(segment_files)} segments -> {output_path}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.warning(f"Concat failed, falling back to re-encode: {result.stderr[-200:]}")
return export_reencode(input_path, output_path, keep_segments)
return output_path
finally:
for f in os.listdir(temp_dir):
try:
os.remove(os.path.join(temp_dir, f))
except OSError:
pass
try:
os.rmdir(temp_dir)
except OSError:
pass
def export_reencode(
input_path: str,
output_path: str,
keep_segments: List[dict],
resolution: str = "1080p",
format_hint: str = "mp4",
) -> str:
"""
Export video with full re-encode. Slower but supports resolution changes,
format conversion, and avoids stream-copy edge cases.
"""
ffmpeg = _find_ffmpeg()
input_path = str(Path(input_path).resolve())
output_path = str(Path(output_path).resolve())
if not keep_segments:
raise ValueError("No segments to export")
scale_map = {
"720p": "scale=-2:720",
"1080p": "scale=-2:1080",
"4k": "scale=-2:2160",
}
filter_parts = []
for i, seg in enumerate(keep_segments):
filter_parts.append(
f"[0:v]trim=start={seg['start']}:end={seg['end']},setpts=PTS-STARTPTS[v{i}];"
f"[0:a]atrim=start={seg['start']}:end={seg['end']},asetpts=PTS-STARTPTS[a{i}];"
)
n = len(keep_segments)
concat_inputs = "".join(f"[v{i}][a{i}]" for i in range(n))
filter_parts.append(f"{concat_inputs}concat=n={n}:v=1:a=1[outv][outa]")
filter_complex = "".join(filter_parts)
scale = scale_map.get(resolution, "")
if scale:
filter_complex += f";[outv]{scale}[outv_scaled]"
video_map = "[outv_scaled]"
else:
video_map = "[outv]"
codec_args = ["-c:v", "libx264", "-preset", "medium", "-crf", "18", "-c:a", "aac", "-b:a", "192k"]
if format_hint == "webm":
codec_args = ["-c:v", "libvpx-vp9", "-crf", "30", "-b:v", "0", "-c:a", "libopus"]
cmd = [
ffmpeg, "-y",
"-i", input_path,
"-filter_complex", filter_complex,
"-map", video_map,
"-map", "[outa]",
*codec_args,
"-movflags", "+faststart",
output_path,
]
logger.info(f"Re-encoding {n} segments -> {output_path} ({resolution})")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg re-encode failed: {result.stderr[-500:]}")
return output_path
def export_reencode_with_subs(
input_path: str,
output_path: str,
keep_segments: List[dict],
subtitle_path: str,
resolution: str = "1080p",
format_hint: str = "mp4",
) -> str:
"""
Export video with re-encode and burn-in subtitles (ASS format).
Applies trim+concat first, then overlays the subtitle file.
"""
ffmpeg = _find_ffmpeg()
input_path = str(Path(input_path).resolve())
output_path = str(Path(output_path).resolve())
subtitle_path = str(Path(subtitle_path).resolve())
if not keep_segments:
raise ValueError("No segments to export")
scale_map = {
"720p": "scale=-2:720",
"1080p": "scale=-2:1080",
"4k": "scale=-2:2160",
}
filter_parts = []
for i, seg in enumerate(keep_segments):
filter_parts.append(
f"[0:v]trim=start={seg['start']}:end={seg['end']},setpts=PTS-STARTPTS[v{i}];"
f"[0:a]atrim=start={seg['start']}:end={seg['end']},asetpts=PTS-STARTPTS[a{i}];"
)
n = len(keep_segments)
concat_inputs = "".join(f"[v{i}][a{i}]" for i in range(n))
filter_parts.append(f"{concat_inputs}concat=n={n}:v=1:a=1[outv][outa]")
filter_complex = "".join(filter_parts)
# Escape path for FFmpeg subtitle filter (Windows backslashes need escaping)
escaped_sub = subtitle_path.replace("\\", "/").replace(":", "\\:")
scale = scale_map.get(resolution, "")
if scale:
filter_complex += f";[outv]{scale},ass='{escaped_sub}'[outv_final]"
else:
filter_complex += f";[outv]ass='{escaped_sub}'[outv_final]"
video_map = "[outv_final]"
codec_args = ["-c:v", "libx264", "-preset", "medium", "-crf", "18", "-c:a", "aac", "-b:a", "192k"]
if format_hint == "webm":
codec_args = ["-c:v", "libvpx-vp9", "-crf", "30", "-b:v", "0", "-c:a", "libopus"]
cmd = [
ffmpeg, "-y",
"-i", input_path,
"-filter_complex", filter_complex,
"-map", video_map,
"-map", "[outa]",
*codec_args,
"-movflags", "+faststart",
output_path,
]
logger.info(f"Re-encoding {n} segments with subtitles -> {output_path} ({resolution})")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg re-encode with subs failed: {result.stderr[-500:]}")
return output_path
def get_video_info(input_path: str) -> dict:
"""Get basic video metadata using ffprobe."""
ffmpeg = _find_ffmpeg()
ffprobe = ffmpeg.replace("ffmpeg", "ffprobe")
cmd = [
ffprobe, "-v", "quiet",
"-print_format", "json",
"-show_format", "-show_streams",
str(input_path),
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
import json
data = json.loads(result.stdout)
fmt = data.get("format", {})
video_stream = next((s for s in data.get("streams", []) if s.get("codec_type") == "video"), {})
return {
"duration": float(fmt.get("duration", 0)),
"size": int(fmt.get("size", 0)),
"format": fmt.get("format_name", ""),
"width": int(video_stream.get("width", 0)),
"height": int(video_stream.get("height", 0)),
"codec": video_stream.get("codec_name", ""),
"fps": eval(video_stream.get("r_frame_rate", "0/1")) if "/" in video_stream.get("r_frame_rate", "") else 0,
}
except Exception as e:
logger.error(f"Failed to get video info: {e}")
return {}