clean up of features
This commit is contained in:
@ -175,30 +175,40 @@ def _remove_with_mediapipe(
|
||||
raise RuntimeError(f"MediaPipe background removal failed: {e}")
|
||||
|
||||
|
||||
|
||||
def _remove_with_ffmpeg_portrait(
|
||||
input_path: str,
|
||||
output_path: str,
|
||||
replacement: str = "blur",
|
||||
replacement_value: str = "",
|
||||
) -> str:
|
||||
"""Fallback: use FFmpeg's colorkey + chromakey for basic background removal.
|
||||
"""Fallback: basic FFmpeg-only background blur.
|
||||
|
||||
This is a crude approximation. For best results, install mediapipe + opencv-python.
|
||||
Uses a strong gaussian blur as a crude background replacement.
|
||||
For proper person segmentation (color/image replacement), install:
|
||||
pip install mediapipe opencv-python
|
||||
"""
|
||||
ffmpeg = "ffmpeg"
|
||||
|
||||
# Use a simple chromakey-based approach with a neutral background
|
||||
# This won't work well for most real videos but provides a fallback
|
||||
if replacement == "color":
|
||||
if replacement == "blur":
|
||||
filter_complex = "gblur=sigma=30"
|
||||
elif replacement == "color":
|
||||
color = replacement_value or "00FF00"
|
||||
filter_complex = f"colorkey=0x{color}:0.3:0.1,chromakey=0x{color}:0.3:0.1"
|
||||
elif replacement == "blur":
|
||||
filter_complex = "gblur=sigma=20:enable='gt(scene,0.01)'"
|
||||
filter_complex = (
|
||||
f"split[fg][bg];"
|
||||
f"[bg]colorkey=0x{color}:0.3:0.1[bg_key];"
|
||||
f"[fg][bg_key]overlay"
|
||||
)
|
||||
elif replacement == "image" and replacement_value:
|
||||
escaped = replacement_value.replace("\\", "/").replace(":", "\\:")
|
||||
filter_complex = (
|
||||
f"movie='{escaped}':loop=0,scale=iw:ih[bg];"
|
||||
f"[0:v][bg]overlay=0:0:shortest=1"
|
||||
)
|
||||
else:
|
||||
filter_complex = "null"
|
||||
|
||||
if filter_complex == "null":
|
||||
# No-op, copy input to output
|
||||
cmd = [ffmpeg, "-y", "-i", input_path, "-c", "copy", output_path]
|
||||
else:
|
||||
cmd = [
|
||||
@ -215,5 +225,8 @@ def _remove_with_ffmpeg_portrait(
|
||||
if result.returncode != 0:
|
||||
raise RuntimeError(f"FFmpeg background removal failed: {result.stderr[-500:]}")
|
||||
|
||||
logger.info("FFmpeg portait background removal completed -> %s", output_path)
|
||||
logger.warning(
|
||||
"FFmpeg fallback background removal used (no MediaPipe). "
|
||||
"Install 'mediapipe' and 'opencv-python' for proper person segmentation."
|
||||
)
|
||||
return output_path
|
||||
|
||||
@ -45,6 +45,24 @@ def _input_has_video_stream(ffmpeg_cmd: str, input_path: str) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def _input_has_audio_stream(ffmpeg_cmd: str, input_path: str) -> bool:
|
||||
"""Return True if the input contains at least one audio stream."""
|
||||
ffprobe = ffmpeg_cmd.replace("ffmpeg", "ffprobe")
|
||||
cmd = [
|
||||
ffprobe,
|
||||
"-v", "error",
|
||||
"-select_streams", "a:0",
|
||||
"-show_entries", "stream=index",
|
||||
"-of", "csv=p=0",
|
||||
str(input_path),
|
||||
]
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
return result.returncode == 0 and bool(result.stdout.strip())
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _clamp_speed(speed: float) -> float:
|
||||
return max(0.25, min(4.0, float(speed)))
|
||||
|
||||
@ -144,39 +162,65 @@ def mix_background_music(
|
||||
ducking_release_ms: float = 200.0,
|
||||
) -> str:
|
||||
"""Mix background music into a video with optional ducking.
|
||||
|
||||
Uses FFmpeg amix + sidechaincompress. Output is written to output_path.
|
||||
|
||||
Uses FFmpeg amix + sidechaincompress. If the input has no audio,
|
||||
the music track becomes the sole audio track. Output is written to output_path.
|
||||
"""
|
||||
ffmpeg = _find_ffmpeg()
|
||||
escaped_music = music_path.replace("\\", "/").replace(":", "\\:")
|
||||
|
||||
# Build the filter graph
|
||||
if ducking_enabled:
|
||||
has_audio_result = _input_has_audio_stream(ffmpeg, video_path)
|
||||
|
||||
if not has_audio_result:
|
||||
cmd = [
|
||||
ffmpeg, "-y",
|
||||
"-i", video_path,
|
||||
"-i", music_path,
|
||||
"-map", "0:v",
|
||||
"-map", "1:a",
|
||||
"-c:v", "copy",
|
||||
"-c:a", "aac", "-b:a", "192k",
|
||||
"-shortest",
|
||||
"-movflags", "+faststart",
|
||||
output_path,
|
||||
]
|
||||
elif ducking_enabled:
|
||||
music_source = f"amovie='{escaped_music}',volume={volume_db}dB[music]"
|
||||
filter_complex = (
|
||||
f"[0:a]asplit[main][sidechain];"
|
||||
f"movie='{escaped_music}':loop=0,volume={volume_db}dB[music];"
|
||||
f"{music_source};"
|
||||
f"[main][music]amix=inputs=2:duration=first:dropout_transition=2[mixed];"
|
||||
f"[mixed][sidechain]sidechaincompress="
|
||||
f"threshold=-30dB:ratio=100:attack={ducking_attack_ms}ms:"
|
||||
f"release={ducking_release_ms}ms:makeup=1:level_sc={ducking_db}[outa]"
|
||||
f"threshold=-30dB:ratio=20:attack={ducking_attack_ms / 1000}:"
|
||||
f"release={ducking_release_ms / 1000}:makeup=1:level_sc={ducking_db}[outa]"
|
||||
)
|
||||
cmd = [
|
||||
ffmpeg, "-y",
|
||||
"-i", video_path,
|
||||
"-filter_complex", filter_complex,
|
||||
"-map", "0:v",
|
||||
"-map", "[outa]",
|
||||
"-c:v", "copy",
|
||||
"-c:a", "aac", "-b:a", "192k",
|
||||
"-shortest",
|
||||
output_path,
|
||||
]
|
||||
else:
|
||||
music_source = f"amovie='{escaped_music}',volume={volume_db}dB[music]"
|
||||
filter_complex = (
|
||||
f"movie='{escaped_music}':loop=0,volume={volume_db}dB[music];"
|
||||
f"{music_source};"
|
||||
f"[0:a][music]amix=inputs=2:duration=first:dropout_transition=2[outa]"
|
||||
)
|
||||
|
||||
cmd = [
|
||||
ffmpeg, "-y",
|
||||
"-i", video_path,
|
||||
"-filter_complex", filter_complex,
|
||||
"-map", "0:v",
|
||||
"-map", "[outa]",
|
||||
"-c:v", "copy",
|
||||
"-c:a", "aac", "-b:a", "192k",
|
||||
"-shortest",
|
||||
output_path,
|
||||
]
|
||||
cmd = [
|
||||
ffmpeg, "-y",
|
||||
"-i", video_path,
|
||||
"-filter_complex", filter_complex,
|
||||
"-map", "0:v",
|
||||
"-map", "[outa]",
|
||||
"-c:v", "copy",
|
||||
"-c:a", "aac", "-b:a", "192k",
|
||||
"-shortest",
|
||||
output_path,
|
||||
]
|
||||
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
if result.returncode != 0:
|
||||
@ -191,28 +235,29 @@ def concat_clips(
|
||||
output_path: str,
|
||||
) -> str:
|
||||
"""Concatenate multiple video clips using FFmpeg concat demuxer.
|
||||
|
||||
|
||||
The main_path is kept as-is. append_paths are appended after it.
|
||||
"""
|
||||
if not append_paths:
|
||||
raise ValueError("No clips to concatenate")
|
||||
|
||||
|
||||
ffmpeg = _find_ffmpeg()
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
resolved_main = str(Path(main_path).resolve())
|
||||
|
||||
# If output_path collides with an input, write to temp first
|
||||
all_inputs = [resolved_main] + [str(Path(p).resolve()) for p in append_paths]
|
||||
needs_rename = str(Path(output_path).resolve()) in all_inputs
|
||||
final_output = output_path
|
||||
if needs_rename:
|
||||
final_output = output_path + ".concat_tmp.mp4"
|
||||
|
||||
temp_dir = tempfile.mkdtemp(prefix="aive_concat_")
|
||||
try:
|
||||
segment_files = [main_path]
|
||||
segment_files.extend(append_paths)
|
||||
|
||||
# Create concat file list
|
||||
concat_file = os.path.join(temp_dir, "concat.txt")
|
||||
with open(concat_file, "w") as f:
|
||||
for path in segment_files:
|
||||
resolved = os.path.abspath(path)
|
||||
f.write(f"file '{resolved}'\n")
|
||||
|
||||
for path in all_inputs:
|
||||
f.write(f"file '{path}'\n")
|
||||
|
||||
cmd = [
|
||||
ffmpeg, "-y",
|
||||
"-f", "concat",
|
||||
@ -220,13 +265,16 @@ def concat_clips(
|
||||
"-i", concat_file,
|
||||
"-c", "copy",
|
||||
"-movflags", "+faststart",
|
||||
output_path,
|
||||
final_output,
|
||||
]
|
||||
|
||||
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
if result.returncode != 0:
|
||||
raise RuntimeError(f"Clip concat failed: {result.stderr[-500:]}")
|
||||
|
||||
|
||||
if needs_rename:
|
||||
os.replace(final_output, output_path)
|
||||
|
||||
return output_path
|
||||
finally:
|
||||
for f in os.listdir(temp_dir):
|
||||
@ -570,11 +618,9 @@ def export_reencode(
|
||||
|
||||
# Apply zoom post-processing if configured
|
||||
if zoom_config and zoom_config.get("enabled") and has_video:
|
||||
import tempfile as _tf
|
||||
import os as _os
|
||||
zoomed_path = output_path + ".zoomed.mp4"
|
||||
_apply_zoom_post(output_path, zoomed_path, zoom_config)
|
||||
_os.replace(zoomed_path, output_path)
|
||||
os.replace(zoomed_path, output_path)
|
||||
logger.info("Zoom/punch-in applied to %s (factor=%s)", output_path, zoom_config.get("zoomFactor", 1.0))
|
||||
|
||||
return output_path
|
||||
@ -737,11 +783,9 @@ def export_reencode_with_subs(
|
||||
|
||||
# Apply zoom post-processing if configured
|
||||
if zoom_config and zoom_config.get("enabled"):
|
||||
import tempfile as _tf
|
||||
import os as _os
|
||||
zoomed_path = output_path + ".zoomed.mp4"
|
||||
_apply_zoom_post(output_path, zoomed_path, zoom_config)
|
||||
_os.replace(zoomed_path, output_path)
|
||||
os.replace(zoomed_path, output_path)
|
||||
logger.info("Zoom/punch-in applied to %s (factor=%s)", output_path, zoom_config.get("zoomFactor", 1.0))
|
||||
|
||||
return output_path
|
||||
|
||||
Reference in New Issue
Block a user