283 lines
8.9 KiB
Python
283 lines
8.9 KiB
Python
"""
|
|
Audio noise reduction using DeepFilterNet.
|
|
Falls back to a basic FFmpeg noise filter if DeepFilterNet is not installed.
|
|
"""
|
|
|
|
import logging
|
|
import re
|
|
import subprocess
|
|
import tempfile
|
|
import warnings
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
DEEPFILTER_AVAILABLE = None
|
|
enhance = None
|
|
init_df = None
|
|
load_audio = None
|
|
save_audio = None
|
|
|
|
|
|
_df_model = None
|
|
_df_state = None
|
|
|
|
|
|
def _ensure_deepfilter_loaded() -> bool:
|
|
global DEEPFILTER_AVAILABLE, enhance, init_df, load_audio, save_audio
|
|
if DEEPFILTER_AVAILABLE is not None:
|
|
return DEEPFILTER_AVAILABLE
|
|
|
|
try:
|
|
# DeepFilterNet currently triggers a third-party torchaudio deprecation warning
|
|
# on import in some environments; suppress only this known warning.
|
|
with warnings.catch_warnings():
|
|
warnings.filterwarnings(
|
|
"ignore",
|
|
message=r".*torchaudio\._backend\.common\.AudioMetaData has been moved.*",
|
|
category=UserWarning,
|
|
)
|
|
from df.enhance import enhance as _enhance, init_df as _init_df, load_audio as _load_audio, save_audio as _save_audio
|
|
enhance = _enhance
|
|
init_df = _init_df
|
|
load_audio = _load_audio
|
|
save_audio = _save_audio
|
|
DEEPFILTER_AVAILABLE = True
|
|
except ImportError:
|
|
DEEPFILTER_AVAILABLE = False
|
|
|
|
return DEEPFILTER_AVAILABLE
|
|
|
|
|
|
def _init_deepfilter():
|
|
global _df_model, _df_state
|
|
if not _ensure_deepfilter_loaded():
|
|
raise RuntimeError("DeepFilterNet is not available")
|
|
if _df_model is None:
|
|
logger.info("Initializing DeepFilterNet model")
|
|
_df_model, _df_state, _ = init_df()
|
|
return _df_model, _df_state
|
|
|
|
|
|
def clean_audio(
|
|
input_path: str,
|
|
output_path: str = "",
|
|
) -> str:
|
|
"""
|
|
Apply noise reduction to an audio file.
|
|
|
|
If DeepFilterNet is available, uses it for high-quality results.
|
|
Otherwise falls back to FFmpeg's anlmdn filter.
|
|
|
|
Returns: path to the cleaned audio file.
|
|
"""
|
|
input_path = Path(input_path)
|
|
if not output_path:
|
|
output_path = str(input_path.with_stem(input_path.stem + "_clean"))
|
|
|
|
if is_deepfilter_available():
|
|
return _clean_with_deepfilter(str(input_path), output_path)
|
|
else:
|
|
return _clean_with_ffmpeg(str(input_path), output_path)
|
|
|
|
|
|
def _clean_with_deepfilter(input_path: str, output_path: str) -> str:
|
|
model, state = _init_deepfilter()
|
|
audio, info = load_audio(input_path, sr=state.sr())
|
|
enhanced = enhance(model, state, audio)
|
|
save_audio(output_path, enhanced, sr=state.sr())
|
|
logger.info(f"DeepFilterNet cleaned audio saved to {output_path}")
|
|
return output_path
|
|
|
|
|
|
def _clean_with_ffmpeg(input_path: str, output_path: str) -> str:
|
|
"""Fallback: basic noise reduction using FFmpeg's anlmdn filter."""
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-i", input_path,
|
|
"-af", "anlmdn=s=7:p=0.002:r=0.002:m=15",
|
|
output_path,
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"FFmpeg audio cleaning failed: {result.stderr[-300:]}")
|
|
logger.info(f"FFmpeg cleaned audio saved to {output_path}")
|
|
return output_path
|
|
|
|
|
|
def is_deepfilter_available() -> bool:
|
|
return _ensure_deepfilter_loaded()
|
|
|
|
|
|
def detect_silence_ranges(input_path: str, min_silence_ms: int, silence_db: float):
|
|
"""Detect silence ranges using ffmpeg silencedetect.
|
|
|
|
Returns a list of dicts: {start, end, duration} in seconds.
|
|
"""
|
|
min_silence_seconds = max(0.05, float(min_silence_ms) / 1000.0)
|
|
noise_threshold = float(silence_db)
|
|
|
|
cmd = [
|
|
"ffmpeg",
|
|
"-hide_banner",
|
|
"-i",
|
|
input_path,
|
|
"-af",
|
|
f"silencedetect=noise={noise_threshold}dB:d={min_silence_seconds}",
|
|
"-f",
|
|
"null",
|
|
"-",
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
# silencedetect prints to stderr even on success.
|
|
output = result.stderr or ""
|
|
start_pat = re.compile(r"silence_start:\s*([0-9.]+)")
|
|
end_pat = re.compile(r"silence_end:\s*([0-9.]+)\s*\|\s*silence_duration:\s*([0-9.]+)")
|
|
|
|
starts = [float(m.group(1)) for m in start_pat.finditer(output)]
|
|
ends = [(float(m.group(1)), float(m.group(2))) for m in end_pat.finditer(output)]
|
|
|
|
ranges = []
|
|
pair_count = min(len(starts), len(ends))
|
|
for i in range(pair_count):
|
|
start = max(0.0, starts[i])
|
|
end, duration = ends[i]
|
|
if end > start and duration >= min_silence_seconds:
|
|
ranges.append({
|
|
"start": round(start, 3),
|
|
"end": round(end, 3),
|
|
"duration": round(duration, 3),
|
|
})
|
|
|
|
logger.info(
|
|
"Detected %s silence ranges in %s (min=%sms, threshold=%sdB)",
|
|
len(ranges),
|
|
input_path,
|
|
min_silence_ms,
|
|
silence_db,
|
|
)
|
|
return ranges
|
|
|
|
|
|
def normalize_audio(
|
|
input_path: str,
|
|
output_path: str = "",
|
|
target_lufs: float = -14.0,
|
|
) -> str:
|
|
"""
|
|
Normalize audio loudness to a target LUFS level using FFmpeg's loudnorm filter.
|
|
|
|
Args:
|
|
input_path: Path to the input audio/video file.
|
|
output_path: Path for the normalized output. Auto-generated if empty.
|
|
target_lufs: Target integrated loudness in LUFS.
|
|
Common targets: -14 (YouTube), -16 (Spotify), -23 (broadcast).
|
|
|
|
Returns: path to the normalized audio file.
|
|
"""
|
|
import os as _os
|
|
|
|
inp = Path(input_path)
|
|
if not output_path:
|
|
output_path = str(inp.with_stem(inp.stem + "_normalized"))
|
|
|
|
# Two-pass loudnorm: first pass measures loudness, second pass applies correction.
|
|
# First pass: measure only (print_format=json)
|
|
measure_cmd = [
|
|
"ffmpeg", "-y",
|
|
"-i", str(inp),
|
|
"-af", f"loudnorm=I={target_lufs}:LRA=7:TP=-1.5:print_format=json",
|
|
"-f", "null",
|
|
"-",
|
|
]
|
|
logger.info("Running loudnorm first pass (measurement): %s", " ".join(measure_cmd))
|
|
measure_result = subprocess.run(measure_cmd, capture_output=True, text=True)
|
|
|
|
# Parse measured parameters from stderr (loudnorm outputs JSON to stderr)
|
|
measured = _parse_loudnorm_measurement(measure_result.stderr)
|
|
if not measured:
|
|
logger.warning(
|
|
"loudnorm measurement failed or produced no output; "
|
|
"falling back to single-pass normalization"
|
|
)
|
|
# Single-pass fallback
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-i", str(inp),
|
|
"-af", f"loudnorm=I={target_lufs}:LRA=7:TP=-1.5",
|
|
"-c:v", "copy",
|
|
output_path,
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"Audio normalization failed: {result.stderr[-300:]}")
|
|
logger.info("Single-pass normalized audio saved to %s", output_path)
|
|
return output_path
|
|
|
|
# Second pass: apply normalization using measured values
|
|
input_i = measured.get("input_i", target_lufs)
|
|
input_lra = measured.get("input_lra", 7.0)
|
|
input_tp = measured.get("input_tp", -1.5)
|
|
input_thresh = measured.get("input_thresh", -30.0)
|
|
offset = measured.get("target_offset", 0.0)
|
|
|
|
apply_cmd = [
|
|
"ffmpeg", "-y",
|
|
"-i", str(inp),
|
|
"-af",
|
|
(
|
|
f"loudnorm=I={target_lufs}:LRA=7:TP=-1.5:"
|
|
f"measured_I={input_i}:measured_LRA={input_lra}:"
|
|
f"measured_TP={input_tp}:measured_thresh={input_thresh}:"
|
|
f"offset={offset}:linear=true:print_format=summary"
|
|
),
|
|
"-c:v", "copy",
|
|
output_path,
|
|
]
|
|
logger.info("Running loudnorm second pass (apply): %s", " ".join(apply_cmd))
|
|
result = subprocess.run(apply_cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"Audio normalization (apply) failed: {result.stderr[-300:]}")
|
|
|
|
logger.info(
|
|
"Normalized audio saved to %s (target=%s LUFS, measured_I=%s)",
|
|
output_path,
|
|
target_lufs,
|
|
input_i,
|
|
)
|
|
return output_path
|
|
|
|
|
|
def _parse_loudnorm_measurement(stderr_output: str) -> dict:
|
|
"""Parse loudnorm JSON measurement output from FFmpeg stderr."""
|
|
import json
|
|
|
|
# loudnorm prints JSON block between "Parsed_loudnorm" lines
|
|
lines = stderr_output.split("\n")
|
|
json_lines = []
|
|
in_json = False
|
|
for line in lines:
|
|
if "Parsed_loudnorm" in line and "}" in line:
|
|
# Single-line JSON
|
|
try:
|
|
start = line.index("{")
|
|
end = line.rindex("}") + 1
|
|
return json.loads(line[start:end])
|
|
except (ValueError, json.JSONDecodeError):
|
|
continue
|
|
if "{" in line and not in_json:
|
|
in_json = True
|
|
if in_json:
|
|
json_lines.append(line)
|
|
if in_json and "}" in line:
|
|
in_json = False
|
|
break
|
|
|
|
if json_lines:
|
|
try:
|
|
return json.loads("".join(json_lines))
|
|
except json.JSONDecodeError:
|
|
pass
|
|
return {}
|