""" Audio noise reduction using DeepFilterNet. Falls back to a basic FFmpeg noise filter if DeepFilterNet is not installed. """ import logging import re import subprocess import tempfile import warnings from pathlib import Path logger = logging.getLogger(__name__) DEEPFILTER_AVAILABLE = None enhance = None init_df = None load_audio = None save_audio = None _df_model = None _df_state = None def _ensure_deepfilter_loaded() -> bool: global DEEPFILTER_AVAILABLE, enhance, init_df, load_audio, save_audio if DEEPFILTER_AVAILABLE is not None: return DEEPFILTER_AVAILABLE try: # DeepFilterNet currently triggers a third-party torchaudio deprecation warning # on import in some environments; suppress only this known warning. with warnings.catch_warnings(): warnings.filterwarnings( "ignore", message=r".*torchaudio\._backend\.common\.AudioMetaData has been moved.*", category=UserWarning, ) from df.enhance import enhance as _enhance, init_df as _init_df, load_audio as _load_audio, save_audio as _save_audio enhance = _enhance init_df = _init_df load_audio = _load_audio save_audio = _save_audio DEEPFILTER_AVAILABLE = True except ImportError: DEEPFILTER_AVAILABLE = False return DEEPFILTER_AVAILABLE def _init_deepfilter(): global _df_model, _df_state if not _ensure_deepfilter_loaded(): raise RuntimeError("DeepFilterNet is not available") if _df_model is None: logger.info("Initializing DeepFilterNet model") _df_model, _df_state, _ = init_df() return _df_model, _df_state def clean_audio( input_path: str, output_path: str = "", ) -> str: """ Apply noise reduction to an audio file. If DeepFilterNet is available, uses it for high-quality results. Otherwise falls back to FFmpeg's anlmdn filter. Returns: path to the cleaned audio file. """ input_path = Path(input_path) if not output_path: output_path = str(input_path.with_stem(input_path.stem + "_clean")) if is_deepfilter_available(): return _clean_with_deepfilter(str(input_path), output_path) else: return _clean_with_ffmpeg(str(input_path), output_path) def _clean_with_deepfilter(input_path: str, output_path: str) -> str: model, state = _init_deepfilter() audio, info = load_audio(input_path, sr=state.sr()) enhanced = enhance(model, state, audio) save_audio(output_path, enhanced, sr=state.sr()) logger.info(f"DeepFilterNet cleaned audio saved to {output_path}") return output_path def _clean_with_ffmpeg(input_path: str, output_path: str) -> str: """Fallback: basic noise reduction using FFmpeg's anlmdn filter.""" cmd = [ "ffmpeg", "-y", "-i", input_path, "-af", "anlmdn=s=7:p=0.002:r=0.002:m=15", output_path, ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(f"FFmpeg audio cleaning failed: {result.stderr[-300:]}") logger.info(f"FFmpeg cleaned audio saved to {output_path}") return output_path def is_deepfilter_available() -> bool: return _ensure_deepfilter_loaded() def detect_silence_ranges(input_path: str, min_silence_ms: int, silence_db: float): """Detect silence ranges using ffmpeg silencedetect. Returns a list of dicts: {start, end, duration} in seconds. """ min_silence_seconds = max(0.05, float(min_silence_ms) / 1000.0) noise_threshold = float(silence_db) cmd = [ "ffmpeg", "-hide_banner", "-i", input_path, "-af", f"silencedetect=noise={noise_threshold}dB:d={min_silence_seconds}", "-f", "null", "-", ] result = subprocess.run(cmd, capture_output=True, text=True) # silencedetect prints to stderr even on success. output = result.stderr or "" start_pat = re.compile(r"silence_start:\s*([0-9.]+)") end_pat = re.compile(r"silence_end:\s*([0-9.]+)\s*\|\s*silence_duration:\s*([0-9.]+)") starts = [float(m.group(1)) for m in start_pat.finditer(output)] ends = [(float(m.group(1)), float(m.group(2))) for m in end_pat.finditer(output)] ranges = [] pair_count = min(len(starts), len(ends)) for i in range(pair_count): start = max(0.0, starts[i]) end, duration = ends[i] if end > start and duration >= min_silence_seconds: ranges.append({ "start": round(start, 3), "end": round(end, 3), "duration": round(duration, 3), }) logger.info( "Detected %s silence ranges in %s (min=%sms, threshold=%sdB)", len(ranges), input_path, min_silence_ms, silence_db, ) return ranges def normalize_audio( input_path: str, output_path: str = "", target_lufs: float = -14.0, ) -> str: """ Normalize audio loudness to a target LUFS level using FFmpeg's loudnorm filter. Args: input_path: Path to the input audio/video file. output_path: Path for the normalized output. Auto-generated if empty. target_lufs: Target integrated loudness in LUFS. Common targets: -14 (YouTube), -16 (Spotify), -23 (broadcast). Returns: path to the normalized audio file. """ import os as _os inp = Path(input_path) if not output_path: output_path = str(inp.with_stem(inp.stem + "_normalized")) # Two-pass loudnorm: first pass measures loudness, second pass applies correction. # First pass: measure only (print_format=json) measure_cmd = [ "ffmpeg", "-y", "-i", str(inp), "-af", f"loudnorm=I={target_lufs}:LRA=7:TP=-1.5:print_format=json", "-f", "null", "-", ] logger.info("Running loudnorm first pass (measurement): %s", " ".join(measure_cmd)) measure_result = subprocess.run(measure_cmd, capture_output=True, text=True) # Parse measured parameters from stderr (loudnorm outputs JSON to stderr) measured = _parse_loudnorm_measurement(measure_result.stderr) if not measured: logger.warning( "loudnorm measurement failed or produced no output; " "falling back to single-pass normalization" ) # Single-pass fallback cmd = [ "ffmpeg", "-y", "-i", str(inp), "-af", f"loudnorm=I={target_lufs}:LRA=7:TP=-1.5", "-c:v", "copy", output_path, ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(f"Audio normalization failed: {result.stderr[-300:]}") logger.info("Single-pass normalized audio saved to %s", output_path) return output_path # Second pass: apply normalization using measured values input_i = measured.get("input_i", target_lufs) input_lra = measured.get("input_lra", 7.0) input_tp = measured.get("input_tp", -1.5) input_thresh = measured.get("input_thresh", -30.0) offset = measured.get("target_offset", 0.0) apply_cmd = [ "ffmpeg", "-y", "-i", str(inp), "-af", ( f"loudnorm=I={target_lufs}:LRA=7:TP=-1.5:" f"measured_I={input_i}:measured_LRA={input_lra}:" f"measured_TP={input_tp}:measured_thresh={input_thresh}:" f"offset={offset}:linear=true:print_format=summary" ), "-c:v", "copy", output_path, ] logger.info("Running loudnorm second pass (apply): %s", " ".join(apply_cmd)) result = subprocess.run(apply_cmd, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(f"Audio normalization (apply) failed: {result.stderr[-300:]}") logger.info( "Normalized audio saved to %s (target=%s LUFS, measured_I=%s)", output_path, target_lufs, input_i, ) return output_path def _parse_loudnorm_measurement(stderr_output: str) -> dict: """Parse loudnorm JSON measurement output from FFmpeg stderr.""" import json # loudnorm prints JSON block between "Parsed_loudnorm" lines lines = stderr_output.split("\n") json_lines = [] in_json = False for line in lines: if "Parsed_loudnorm" in line and "}" in line: # Single-line JSON try: start = line.index("{") end = line.rindex("}") + 1 return json.loads(line[start:end]) except (ValueError, json.JSONDecodeError): continue if "{" in line and not in_json: in_json = True if in_json: json_lines.append(line) if in_json and "}" in line: in_json = False break if json_lines: try: return json.loads("".join(json_lines)) except json.JSONDecodeError: pass return {}