594 lines
20 KiB
Python
594 lines
20 KiB
Python
"""
|
|
FFmpeg-based video cutting engine.
|
|
Uses stream copy for fast, lossless cuts and falls back to re-encode when needed.
|
|
"""
|
|
|
|
import logging
|
|
import subprocess
|
|
import tempfile
|
|
import os
|
|
from pathlib import Path
|
|
from typing import List
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _input_has_video_stream(ffmpeg_cmd: str, input_path: str) -> bool:
|
|
"""Return True if the input contains at least one video stream."""
|
|
ffprobe = ffmpeg_cmd.replace("ffmpeg", "ffprobe")
|
|
cmd = [
|
|
ffprobe,
|
|
"-v", "error",
|
|
"-select_streams", "v:0",
|
|
"-show_entries", "stream=index",
|
|
"-of", "csv=p=0",
|
|
str(input_path),
|
|
]
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
return result.returncode == 0 and bool(result.stdout.strip())
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _clamp_speed(speed: float) -> float:
|
|
return max(0.25, min(4.0, float(speed)))
|
|
|
|
|
|
def _build_atempo_chain(speed: float) -> str:
|
|
"""Build an FFmpeg atempo chain since each atempo node only supports 0.5..2.0."""
|
|
s = _clamp_speed(speed)
|
|
filters = []
|
|
while s > 2.0:
|
|
filters.append("atempo=2.0")
|
|
s /= 2.0
|
|
while s < 0.5:
|
|
filters.append("atempo=0.5")
|
|
s /= 0.5
|
|
filters.append(f"atempo={s:.6f}")
|
|
return ",".join(filters)
|
|
|
|
|
|
def _split_keep_segments_by_speed(
|
|
keep_segments: List[dict],
|
|
speed_ranges: List[dict] = None,
|
|
) -> List[dict]:
|
|
"""Split keep segments by speed ranges, attaching speed multiplier per piece."""
|
|
if not keep_segments:
|
|
return []
|
|
|
|
normalized_ranges = []
|
|
for r in speed_ranges or []:
|
|
start = float(r.get("start", 0.0))
|
|
end = float(r.get("end", 0.0))
|
|
if end <= start:
|
|
continue
|
|
normalized_ranges.append({
|
|
"start": start,
|
|
"end": end,
|
|
"speed": _clamp_speed(float(r.get("speed", 1.0))),
|
|
})
|
|
normalized_ranges.sort(key=lambda x: x["start"])
|
|
|
|
result = []
|
|
for keep in keep_segments:
|
|
k_start = float(keep["start"])
|
|
k_end = float(keep["end"])
|
|
if k_end <= k_start:
|
|
continue
|
|
|
|
cuts = {k_start, k_end}
|
|
for sr in normalized_ranges:
|
|
overlap_start = max(k_start, sr["start"])
|
|
overlap_end = min(k_end, sr["end"])
|
|
if overlap_end > overlap_start:
|
|
cuts.add(overlap_start)
|
|
cuts.add(overlap_end)
|
|
|
|
points = sorted(cuts)
|
|
for i in range(len(points) - 1):
|
|
seg_start = points[i]
|
|
seg_end = points[i + 1]
|
|
if seg_end - seg_start < 1e-6:
|
|
continue
|
|
|
|
speed = 1.0
|
|
for sr in normalized_ranges:
|
|
if seg_start >= sr["start"] and seg_end <= sr["end"]:
|
|
speed = sr["speed"]
|
|
break
|
|
|
|
result.append({"start": seg_start, "end": seg_end, "speed": speed})
|
|
|
|
return result
|
|
|
|
|
|
def _find_ffmpeg() -> str:
|
|
"""Locate ffmpeg binary."""
|
|
for cmd in ["ffmpeg", "ffmpeg.exe"]:
|
|
try:
|
|
subprocess.run([cmd, "-version"], capture_output=True, check=True)
|
|
return cmd
|
|
except (FileNotFoundError, subprocess.CalledProcessError):
|
|
continue
|
|
raise RuntimeError("FFmpeg not found. Install it or add it to PATH.")
|
|
|
|
|
|
def export_stream_copy(
|
|
input_path: str,
|
|
output_path: str,
|
|
keep_segments: List[dict],
|
|
mute_ranges: List[dict] = None,
|
|
) -> str:
|
|
"""
|
|
Export video using FFmpeg concat demuxer with stream copy.
|
|
~100x faster than re-encoding. No quality loss.
|
|
Falls back to re-encoding if mute_ranges are provided.
|
|
|
|
Args:
|
|
input_path: source video file
|
|
output_path: destination file
|
|
keep_segments: list of {"start": float, "end": float} to keep
|
|
mute_ranges: list of {"start": float, "end": float} to mute (optional)
|
|
|
|
Returns:
|
|
output_path on success
|
|
"""
|
|
if mute_ranges:
|
|
# Mute ranges require audio filtering, so fall back to re-encoding
|
|
return export_reencode(input_path, output_path, keep_segments, "1080p", "mp4", mute_ranges)
|
|
ffmpeg = _find_ffmpeg()
|
|
if not _input_has_video_stream(ffmpeg, input_path):
|
|
# Audio-only inputs cannot use TS segment stream-copy concat reliably.
|
|
return export_reencode(input_path, output_path, keep_segments)
|
|
|
|
input_path = str(Path(input_path).resolve())
|
|
output_path = str(Path(output_path).resolve())
|
|
|
|
if not keep_segments:
|
|
raise ValueError("No segments to export")
|
|
|
|
temp_dir = tempfile.mkdtemp(prefix="aive_export_")
|
|
|
|
try:
|
|
segment_files = []
|
|
for i, seg in enumerate(keep_segments):
|
|
seg_file = os.path.join(temp_dir, f"seg_{i:04d}.ts")
|
|
cmd = [
|
|
ffmpeg, "-y",
|
|
"-ss", str(seg["start"]),
|
|
"-to", str(seg["end"]),
|
|
"-i", input_path,
|
|
"-c", "copy",
|
|
"-avoid_negative_ts", "make_zero",
|
|
"-f", "mpegts",
|
|
seg_file,
|
|
]
|
|
logger.info(f"Extracting segment {i}: {seg['start']:.2f}s - {seg['end']:.2f}s")
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
logger.warning(f"Stream copy segment {i} failed, will try re-encode: {result.stderr[-200:]}")
|
|
return export_reencode(input_path, output_path, keep_segments)
|
|
segment_files.append(seg_file)
|
|
|
|
concat_str = "|".join(segment_files)
|
|
cmd = [
|
|
ffmpeg, "-y",
|
|
"-i", f"concat:{concat_str}",
|
|
"-c", "copy",
|
|
"-movflags", "+faststart",
|
|
output_path,
|
|
]
|
|
logger.info(f"Concatenating {len(segment_files)} segments -> {output_path}")
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
logger.warning(f"Concat failed, falling back to re-encode: {result.stderr[-200:]}")
|
|
return export_reencode(input_path, output_path, keep_segments)
|
|
|
|
return output_path
|
|
|
|
finally:
|
|
for f in os.listdir(temp_dir):
|
|
try:
|
|
os.remove(os.path.join(temp_dir, f))
|
|
except OSError:
|
|
pass
|
|
try:
|
|
os.rmdir(temp_dir)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def export_reencode(
|
|
input_path: str,
|
|
output_path: str,
|
|
keep_segments: List[dict],
|
|
resolution: str = "1080p",
|
|
format_hint: str = "mp4",
|
|
mute_ranges: List[dict] = None,
|
|
gain_ranges: List[dict] = None,
|
|
speed_ranges: List[dict] = None,
|
|
global_gain_db: float = 0.0,
|
|
) -> str:
|
|
"""
|
|
Export video with full re-encode. Slower but supports resolution changes,
|
|
format conversion, and avoids stream-copy edge cases.
|
|
If mute_ranges are provided, applies audio muting instead of cutting.
|
|
"""
|
|
ffmpeg = _find_ffmpeg()
|
|
input_path = str(Path(input_path).resolve())
|
|
output_path = str(Path(output_path).resolve())
|
|
|
|
scale_map = {
|
|
"720p": "scale=-2:720",
|
|
"1080p": "scale=-2:1080",
|
|
"4k": "scale=-2:2160",
|
|
}
|
|
|
|
def build_audio_filter() -> str:
|
|
filters = []
|
|
if abs(float(global_gain_db)) > 1e-6:
|
|
filters.append(f"volume={float(global_gain_db)}dB")
|
|
|
|
for gain_range in gain_ranges or []:
|
|
start = gain_range['start']
|
|
end = gain_range['end']
|
|
gain_db = gain_range.get('gain_db', 0.0)
|
|
filters.append(f"volume={float(gain_db)}dB:enable='between(t,{start},{end})'")
|
|
|
|
for mute_range in mute_ranges or []:
|
|
start = mute_range['start']
|
|
end = mute_range['end']
|
|
filters.append(f"volume=0:enable='between(t,{start},{end})'")
|
|
|
|
return ",".join(filters) if filters else "anull"
|
|
|
|
has_audio_filters = bool(mute_ranges) or bool(gain_ranges) or abs(float(global_gain_db)) > 1e-6
|
|
has_video = _input_has_video_stream(ffmpeg, input_path)
|
|
|
|
speed_segments = _split_keep_segments_by_speed(keep_segments, speed_ranges)
|
|
has_speed = any(abs(seg.get("speed", 1.0) - 1.0) > 1e-6 for seg in speed_segments)
|
|
|
|
if not has_video:
|
|
if not keep_segments:
|
|
raise ValueError("No segments to export")
|
|
|
|
segments_for_concat = speed_segments if speed_segments else _split_keep_segments_by_speed(keep_segments, None)
|
|
if not segments_for_concat:
|
|
raise ValueError("No segments to export")
|
|
|
|
filter_parts = []
|
|
for i, seg in enumerate(segments_for_concat):
|
|
speed = _clamp_speed(seg.get("speed", 1.0))
|
|
a_chain = f"atrim=start={seg['start']}:end={seg['end']},asetpts=PTS-STARTPTS"
|
|
if abs(speed - 1.0) > 1e-6:
|
|
a_chain += f",{_build_atempo_chain(speed)}"
|
|
filter_parts.append(f"[0:a]{a_chain}[a{i}];")
|
|
|
|
n = len(segments_for_concat)
|
|
concat_inputs = "".join(f"[a{i}]" for i in range(n))
|
|
filter_parts.append(f"{concat_inputs}concat=n={n}:v=0:a=1[outa_raw]")
|
|
|
|
audio_filter = build_audio_filter()
|
|
if audio_filter != "anull":
|
|
filter_parts.append(f";[outa_raw]{audio_filter}[outa]")
|
|
audio_map = "[outa]"
|
|
else:
|
|
audio_map = "[outa_raw]"
|
|
|
|
filter_complex = "".join(filter_parts)
|
|
|
|
audio_codec_args = ["-c:a", "aac", "-b:a", "192k"]
|
|
if format_hint == "webm":
|
|
audio_codec_args = ["-c:a", "libopus", "-b:a", "160k"]
|
|
|
|
cmd = [
|
|
ffmpeg, "-y",
|
|
"-i", input_path,
|
|
"-filter_complex", filter_complex,
|
|
"-map", audio_map,
|
|
*audio_codec_args,
|
|
output_path,
|
|
]
|
|
|
|
logger.info(
|
|
"Re-encoding audio-only input (%s segments, speed-adjusted=%s) -> %s",
|
|
n,
|
|
has_speed,
|
|
output_path,
|
|
)
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"FFmpeg audio-only export failed: {result.stderr[-500:]}")
|
|
|
|
return output_path
|
|
|
|
# Handle filtered full-timeline audio case (mute/gain/global gain) when no speed warping is needed
|
|
if has_audio_filters and not has_speed:
|
|
audio_filter = build_audio_filter()
|
|
|
|
# Video filter - just scaling if needed
|
|
scale = scale_map.get(resolution, "")
|
|
if scale:
|
|
video_filter = scale
|
|
video_map = "[v]"
|
|
else:
|
|
video_filter = "null"
|
|
video_map = "0:v"
|
|
|
|
filter_complex = f"[0:a]{audio_filter}[a];[0:v]{video_filter}{video_map}"
|
|
|
|
codec_args = ["-c:v", "libx264", "-preset", "medium", "-crf", "18", "-c:a", "aac", "-b:a", "192k"]
|
|
if format_hint == "webm":
|
|
codec_args = ["-c:v", "libvpx-vp9", "-crf", "30", "-b:v", "0", "-c:a", "libopus"]
|
|
|
|
cmd = [
|
|
ffmpeg, "-y",
|
|
"-i", input_path,
|
|
"-filter_complex", filter_complex,
|
|
"-map", video_map,
|
|
"-map", "[a]",
|
|
*codec_args,
|
|
"-movflags", "+faststart",
|
|
output_path,
|
|
]
|
|
|
|
logger.info(
|
|
"Re-encoding with audio filters (mute=%s gain=%s global=%s) -> %s (%s)",
|
|
len(mute_ranges or []),
|
|
len(gain_ranges or []),
|
|
global_gain_db,
|
|
output_path,
|
|
resolution,
|
|
)
|
|
else:
|
|
# Cutting logic with optional per-segment speed changes
|
|
if not keep_segments:
|
|
raise ValueError("No segments to export")
|
|
|
|
segments_for_concat = speed_segments if speed_segments else _split_keep_segments_by_speed(keep_segments, None)
|
|
if not segments_for_concat:
|
|
raise ValueError("No segments to export")
|
|
|
|
filter_parts = []
|
|
for i, seg in enumerate(segments_for_concat):
|
|
speed = _clamp_speed(seg.get("speed", 1.0))
|
|
v_chain = f"trim=start={seg['start']}:end={seg['end']},setpts=PTS-STARTPTS"
|
|
a_chain = f"atrim=start={seg['start']}:end={seg['end']},asetpts=PTS-STARTPTS"
|
|
if abs(speed - 1.0) > 1e-6:
|
|
v_chain += f",setpts=PTS/{speed:.6f}"
|
|
a_chain += f",{_build_atempo_chain(speed)}"
|
|
filter_parts.append(f"[0:v]{v_chain}[v{i}];[0:a]{a_chain}[a{i}];")
|
|
|
|
n = len(segments_for_concat)
|
|
concat_inputs = "".join(f"[v{i}][a{i}]" for i in range(n))
|
|
filter_parts.append(f"{concat_inputs}concat=n={n}:v=1:a=1[outv][outa]")
|
|
|
|
filter_complex = "".join(filter_parts)
|
|
|
|
scale = scale_map.get(resolution, "")
|
|
if scale:
|
|
filter_complex += f";[outv]{scale}[outv_scaled]"
|
|
video_map = "[outv_scaled]"
|
|
else:
|
|
video_map = "[outv]"
|
|
|
|
codec_args = ["-c:v", "libx264", "-preset", "medium", "-crf", "18", "-c:a", "aac", "-b:a", "192k"]
|
|
if format_hint == "webm":
|
|
codec_args = ["-c:v", "libvpx-vp9", "-crf", "30", "-b:v", "0", "-c:a", "libopus"]
|
|
|
|
cmd = [
|
|
ffmpeg, "-y",
|
|
"-i", input_path,
|
|
"-filter_complex", filter_complex,
|
|
"-map", video_map,
|
|
"-map", "[outa]",
|
|
*codec_args,
|
|
"-movflags", "+faststart",
|
|
output_path,
|
|
]
|
|
|
|
logger.info(
|
|
"Re-encoding %s segments (speed-adjusted=%s) -> %s (%s)",
|
|
n,
|
|
has_speed,
|
|
output_path,
|
|
resolution,
|
|
)
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"FFmpeg re-encode failed: {result.stderr[-500:]}")
|
|
|
|
return output_path
|
|
|
|
|
|
def export_reencode_with_subs(
|
|
input_path: str,
|
|
output_path: str,
|
|
keep_segments: List[dict],
|
|
subtitle_path: str,
|
|
resolution: str = "1080p",
|
|
format_hint: str = "mp4",
|
|
mute_ranges: List[dict] = None,
|
|
gain_ranges: List[dict] = None,
|
|
speed_ranges: List[dict] = None,
|
|
global_gain_db: float = 0.0,
|
|
) -> str:
|
|
"""
|
|
Export video with re-encode and burn-in subtitles (ASS format).
|
|
Applies trim+concat first, then overlays the subtitle file.
|
|
If mute_ranges are provided, applies audio muting instead of cutting.
|
|
"""
|
|
ffmpeg = _find_ffmpeg()
|
|
if not _input_has_video_stream(ffmpeg, input_path):
|
|
raise ValueError("Burn-in captions require a video track")
|
|
|
|
input_path = str(Path(input_path).resolve())
|
|
output_path = str(Path(output_path).resolve())
|
|
subtitle_path = str(Path(subtitle_path).resolve())
|
|
|
|
scale_map = {
|
|
"720p": "scale=-2:720",
|
|
"1080p": "scale=-2:1080",
|
|
"4k": "scale=-2:2160",
|
|
}
|
|
|
|
def build_audio_filter() -> str:
|
|
filters = []
|
|
if abs(float(global_gain_db)) > 1e-6:
|
|
filters.append(f"volume={float(global_gain_db)}dB")
|
|
|
|
for gain_range in gain_ranges or []:
|
|
start = gain_range['start']
|
|
end = gain_range['end']
|
|
gain_db = gain_range.get('gain_db', 0.0)
|
|
filters.append(f"volume={float(gain_db)}dB:enable='between(t,{start},{end})'")
|
|
|
|
for mute_range in mute_ranges or []:
|
|
start = mute_range['start']
|
|
end = mute_range['end']
|
|
filters.append(f"volume=0:enable='between(t,{start},{end})'")
|
|
|
|
return ",".join(filters) if filters else "anull"
|
|
|
|
has_audio_filters = bool(mute_ranges) or bool(gain_ranges) or abs(float(global_gain_db)) > 1e-6
|
|
|
|
speed_segments = _split_keep_segments_by_speed(keep_segments, speed_ranges)
|
|
has_speed = any(abs(seg.get("speed", 1.0) - 1.0) > 1e-6 for seg in speed_segments)
|
|
|
|
# Handle filtered full-timeline audio case (mute/gain/global gain) when no speed warping is needed
|
|
if has_audio_filters and not has_speed:
|
|
audio_filter = build_audio_filter()
|
|
|
|
# Video filter with subtitles
|
|
escaped_sub = subtitle_path.replace("\\", "/").replace(":", "\\:")
|
|
scale = scale_map.get(resolution, "")
|
|
if scale:
|
|
video_filter = f"{scale},ass='{escaped_sub}'"
|
|
else:
|
|
video_filter = f"ass='{escaped_sub}'"
|
|
|
|
filter_complex = f"[0:a]{audio_filter}[a];[0:v]{video_filter}[v]"
|
|
|
|
codec_args = ["-c:v", "libx264", "-preset", "medium", "-crf", "18", "-c:a", "aac", "-b:a", "192k"]
|
|
if format_hint == "webm":
|
|
codec_args = ["-c:v", "libvpx-vp9", "-crf", "30", "-b:v", "0", "-c:a", "libopus"]
|
|
|
|
cmd = [
|
|
ffmpeg, "-y",
|
|
"-i", input_path,
|
|
"-filter_complex", filter_complex,
|
|
"-map", "[v]",
|
|
"-map", "[a]",
|
|
*codec_args,
|
|
"-movflags", "+faststart",
|
|
output_path,
|
|
]
|
|
|
|
logger.info(
|
|
"Re-encoding with subtitles and audio filters (mute=%s gain=%s global=%s) -> %s (%s)",
|
|
len(mute_ranges or []),
|
|
len(gain_ranges or []),
|
|
global_gain_db,
|
|
output_path,
|
|
resolution,
|
|
)
|
|
else:
|
|
# Cutting logic with subtitles and optional speed changes
|
|
if not keep_segments:
|
|
raise ValueError("No segments to export")
|
|
|
|
segments_for_concat = speed_segments if speed_segments else _split_keep_segments_by_speed(keep_segments, None)
|
|
if not segments_for_concat:
|
|
raise ValueError("No segments to export")
|
|
|
|
filter_parts = []
|
|
for i, seg in enumerate(segments_for_concat):
|
|
speed = _clamp_speed(seg.get("speed", 1.0))
|
|
v_chain = f"trim=start={seg['start']}:end={seg['end']},setpts=PTS-STARTPTS"
|
|
a_chain = f"atrim=start={seg['start']}:end={seg['end']},asetpts=PTS-STARTPTS"
|
|
if abs(speed - 1.0) > 1e-6:
|
|
v_chain += f",setpts=PTS/{speed:.6f}"
|
|
a_chain += f",{_build_atempo_chain(speed)}"
|
|
filter_parts.append(f"[0:v]{v_chain}[v{i}];[0:a]{a_chain}[a{i}];")
|
|
|
|
n = len(segments_for_concat)
|
|
concat_inputs = "".join(f"[v{i}][a{i}]" for i in range(n))
|
|
filter_parts.append(f"{concat_inputs}concat=n={n}:v=1:a=1[outv][outa]")
|
|
|
|
filter_complex = "".join(filter_parts)
|
|
|
|
# Escape path for FFmpeg subtitle filter (Windows backslashes need escaping)
|
|
escaped_sub = subtitle_path.replace("\\", "/").replace(":", "\\:")
|
|
|
|
scale = scale_map.get(resolution, "")
|
|
if scale:
|
|
filter_complex += f";[outv]{scale},ass='{escaped_sub}'[outv_final]"
|
|
else:
|
|
filter_complex += f";[outv]ass='{escaped_sub}'[outv_final]"
|
|
video_map = "[outv_final]"
|
|
|
|
codec_args = ["-c:v", "libx264", "-preset", "medium", "-crf", "18", "-c:a", "aac", "-b:a", "192k"]
|
|
if format_hint == "webm":
|
|
codec_args = ["-c:v", "libvpx-vp9", "-crf", "30", "-b:v", "0", "-c:a", "libopus"]
|
|
|
|
cmd = [
|
|
ffmpeg, "-y",
|
|
"-i", input_path,
|
|
"-filter_complex", filter_complex,
|
|
"-map", video_map,
|
|
"-map", "[outa]",
|
|
*codec_args,
|
|
"-movflags", "+faststart",
|
|
output_path,
|
|
]
|
|
|
|
logger.info(
|
|
"Re-encoding %s segments with subtitles (speed-adjusted=%s) -> %s (%s)",
|
|
n,
|
|
has_speed,
|
|
output_path,
|
|
resolution,
|
|
)
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"FFmpeg re-encode with subs failed: {result.stderr[-500:]}")
|
|
|
|
return output_path
|
|
|
|
|
|
def get_video_info(input_path: str) -> dict:
|
|
"""Get basic video metadata using ffprobe."""
|
|
ffmpeg = _find_ffmpeg()
|
|
ffprobe = ffmpeg.replace("ffmpeg", "ffprobe")
|
|
|
|
cmd = [
|
|
ffprobe, "-v", "quiet",
|
|
"-print_format", "json",
|
|
"-show_format", "-show_streams",
|
|
str(input_path),
|
|
]
|
|
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
|
|
import json
|
|
data = json.loads(result.stdout)
|
|
fmt = data.get("format", {})
|
|
video_stream = next((s for s in data.get("streams", []) if s.get("codec_type") == "video"), {})
|
|
|
|
return {
|
|
"duration": float(fmt.get("duration", 0)),
|
|
"size": int(fmt.get("size", 0)),
|
|
"format": fmt.get("format_name", ""),
|
|
"width": int(video_stream.get("width", 0)),
|
|
"height": int(video_stream.get("height", 0)),
|
|
"codec": video_stream.get("codec_name", ""),
|
|
"fps": eval(video_stream.get("r_frame_rate", "0/1")) if "/" in video_stream.get("r_frame_rate", "") else 0,
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Failed to get video info: {e}")
|
|
return {}
|