Files
TalkEdit/backend/services/audio_cleaner.py

283 lines
8.9 KiB
Python
Raw Permalink Normal View History

"""
Audio noise reduction using DeepFilterNet.
Falls back to a basic FFmpeg noise filter if DeepFilterNet is not installed.
"""
import logging
2026-04-03 12:05:44 -06:00
import re
import subprocess
import tempfile
2026-04-15 17:13:56 -06:00
import warnings
from pathlib import Path
logger = logging.getLogger(__name__)
2026-04-15 17:13:56 -06:00
DEEPFILTER_AVAILABLE = None
enhance = None
init_df = None
load_audio = None
save_audio = None
_df_model = None
_df_state = None
2026-04-15 17:13:56 -06:00
def _ensure_deepfilter_loaded() -> bool:
global DEEPFILTER_AVAILABLE, enhance, init_df, load_audio, save_audio
if DEEPFILTER_AVAILABLE is not None:
return DEEPFILTER_AVAILABLE
try:
# DeepFilterNet currently triggers a third-party torchaudio deprecation warning
# on import in some environments; suppress only this known warning.
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
message=r".*torchaudio\._backend\.common\.AudioMetaData has been moved.*",
category=UserWarning,
)
from df.enhance import enhance as _enhance, init_df as _init_df, load_audio as _load_audio, save_audio as _save_audio
enhance = _enhance
init_df = _init_df
load_audio = _load_audio
save_audio = _save_audio
DEEPFILTER_AVAILABLE = True
except ImportError:
DEEPFILTER_AVAILABLE = False
return DEEPFILTER_AVAILABLE
def _init_deepfilter():
global _df_model, _df_state
2026-04-15 17:13:56 -06:00
if not _ensure_deepfilter_loaded():
raise RuntimeError("DeepFilterNet is not available")
if _df_model is None:
logger.info("Initializing DeepFilterNet model")
_df_model, _df_state, _ = init_df()
return _df_model, _df_state
def clean_audio(
input_path: str,
output_path: str = "",
) -> str:
"""
Apply noise reduction to an audio file.
If DeepFilterNet is available, uses it for high-quality results.
Otherwise falls back to FFmpeg's anlmdn filter.
Returns: path to the cleaned audio file.
"""
input_path = Path(input_path)
if not output_path:
output_path = str(input_path.with_stem(input_path.stem + "_clean"))
2026-04-15 17:13:56 -06:00
if is_deepfilter_available():
return _clean_with_deepfilter(str(input_path), output_path)
else:
return _clean_with_ffmpeg(str(input_path), output_path)
def _clean_with_deepfilter(input_path: str, output_path: str) -> str:
model, state = _init_deepfilter()
audio, info = load_audio(input_path, sr=state.sr())
enhanced = enhance(model, state, audio)
save_audio(output_path, enhanced, sr=state.sr())
logger.info(f"DeepFilterNet cleaned audio saved to {output_path}")
return output_path
def _clean_with_ffmpeg(input_path: str, output_path: str) -> str:
"""Fallback: basic noise reduction using FFmpeg's anlmdn filter."""
cmd = [
"ffmpeg", "-y",
"-i", input_path,
"-af", "anlmdn=s=7:p=0.002:r=0.002:m=15",
output_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg audio cleaning failed: {result.stderr[-300:]}")
logger.info(f"FFmpeg cleaned audio saved to {output_path}")
return output_path
def is_deepfilter_available() -> bool:
2026-04-15 17:13:56 -06:00
return _ensure_deepfilter_loaded()
2026-04-03 12:05:44 -06:00
def detect_silence_ranges(input_path: str, min_silence_ms: int, silence_db: float):
"""Detect silence ranges using ffmpeg silencedetect.
Returns a list of dicts: {start, end, duration} in seconds.
"""
min_silence_seconds = max(0.05, float(min_silence_ms) / 1000.0)
noise_threshold = float(silence_db)
cmd = [
"ffmpeg",
"-hide_banner",
"-i",
input_path,
"-af",
f"silencedetect=noise={noise_threshold}dB:d={min_silence_seconds}",
"-f",
"null",
"-",
]
result = subprocess.run(cmd, capture_output=True, text=True)
# silencedetect prints to stderr even on success.
output = result.stderr or ""
start_pat = re.compile(r"silence_start:\s*([0-9.]+)")
end_pat = re.compile(r"silence_end:\s*([0-9.]+)\s*\|\s*silence_duration:\s*([0-9.]+)")
starts = [float(m.group(1)) for m in start_pat.finditer(output)]
ends = [(float(m.group(1)), float(m.group(2))) for m in end_pat.finditer(output)]
ranges = []
pair_count = min(len(starts), len(ends))
for i in range(pair_count):
start = max(0.0, starts[i])
end, duration = ends[i]
if end > start and duration >= min_silence_seconds:
ranges.append({
"start": round(start, 3),
"end": round(end, 3),
"duration": round(duration, 3),
})
logger.info(
"Detected %s silence ranges in %s (min=%sms, threshold=%sdB)",
len(ranges),
input_path,
min_silence_ms,
silence_db,
)
return ranges
2026-05-04 16:37:25 -06:00
def normalize_audio(
input_path: str,
output_path: str = "",
target_lufs: float = -14.0,
) -> str:
"""
Normalize audio loudness to a target LUFS level using FFmpeg's loudnorm filter.
Args:
input_path: Path to the input audio/video file.
output_path: Path for the normalized output. Auto-generated if empty.
target_lufs: Target integrated loudness in LUFS.
Common targets: -14 (YouTube), -16 (Spotify), -23 (broadcast).
Returns: path to the normalized audio file.
"""
import os as _os
inp = Path(input_path)
if not output_path:
output_path = str(inp.with_stem(inp.stem + "_normalized"))
# Two-pass loudnorm: first pass measures loudness, second pass applies correction.
# First pass: measure only (print_format=json)
measure_cmd = [
"ffmpeg", "-y",
"-i", str(inp),
"-af", f"loudnorm=I={target_lufs}:LRA=7:TP=-1.5:print_format=json",
"-f", "null",
"-",
]
logger.info("Running loudnorm first pass (measurement): %s", " ".join(measure_cmd))
measure_result = subprocess.run(measure_cmd, capture_output=True, text=True)
# Parse measured parameters from stderr (loudnorm outputs JSON to stderr)
measured = _parse_loudnorm_measurement(measure_result.stderr)
if not measured:
logger.warning(
"loudnorm measurement failed or produced no output; "
"falling back to single-pass normalization"
)
# Single-pass fallback
cmd = [
"ffmpeg", "-y",
"-i", str(inp),
"-af", f"loudnorm=I={target_lufs}:LRA=7:TP=-1.5",
"-c:v", "copy",
output_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"Audio normalization failed: {result.stderr[-300:]}")
logger.info("Single-pass normalized audio saved to %s", output_path)
return output_path
# Second pass: apply normalization using measured values
input_i = measured.get("input_i", target_lufs)
input_lra = measured.get("input_lra", 7.0)
input_tp = measured.get("input_tp", -1.5)
input_thresh = measured.get("input_thresh", -30.0)
offset = measured.get("target_offset", 0.0)
apply_cmd = [
"ffmpeg", "-y",
"-i", str(inp),
"-af",
(
f"loudnorm=I={target_lufs}:LRA=7:TP=-1.5:"
f"measured_I={input_i}:measured_LRA={input_lra}:"
f"measured_TP={input_tp}:measured_thresh={input_thresh}:"
f"offset={offset}:linear=true:print_format=summary"
),
"-c:v", "copy",
output_path,
]
logger.info("Running loudnorm second pass (apply): %s", " ".join(apply_cmd))
result = subprocess.run(apply_cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"Audio normalization (apply) failed: {result.stderr[-300:]}")
logger.info(
"Normalized audio saved to %s (target=%s LUFS, measured_I=%s)",
output_path,
target_lufs,
input_i,
)
return output_path
def _parse_loudnorm_measurement(stderr_output: str) -> dict:
"""Parse loudnorm JSON measurement output from FFmpeg stderr."""
import json
# loudnorm prints JSON block between "Parsed_loudnorm" lines
lines = stderr_output.split("\n")
json_lines = []
in_json = False
for line in lines:
if "Parsed_loudnorm" in line and "}" in line:
# Single-line JSON
try:
start = line.index("{")
end = line.rindex("}") + 1
return json.loads(line[start:end])
except (ValueError, json.JSONDecodeError):
continue
if "{" in line and not in_json:
in_json = True
if in_json:
json_lines.append(line)
if in_json and "}" in line:
in_json = False
break
if json_lines:
try:
return json.loads("".join(json_lines))
except json.JSONDecodeError:
pass
return {}