132 lines
3.8 KiB
Python
132 lines
3.8 KiB
Python
"""
|
|
Audio noise reduction using DeepFilterNet.
|
|
Falls back to a basic FFmpeg noise filter if DeepFilterNet is not installed.
|
|
"""
|
|
|
|
import logging
|
|
import re
|
|
import subprocess
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
from df.enhance import enhance, init_df, load_audio, save_audio
|
|
DEEPFILTER_AVAILABLE = True
|
|
except ImportError:
|
|
DEEPFILTER_AVAILABLE = False
|
|
|
|
|
|
_df_model = None
|
|
_df_state = None
|
|
|
|
|
|
def _init_deepfilter():
|
|
global _df_model, _df_state
|
|
if _df_model is None:
|
|
logger.info("Initializing DeepFilterNet model")
|
|
_df_model, _df_state, _ = init_df()
|
|
return _df_model, _df_state
|
|
|
|
|
|
def clean_audio(
|
|
input_path: str,
|
|
output_path: str = "",
|
|
) -> str:
|
|
"""
|
|
Apply noise reduction to an audio file.
|
|
|
|
If DeepFilterNet is available, uses it for high-quality results.
|
|
Otherwise falls back to FFmpeg's anlmdn filter.
|
|
|
|
Returns: path to the cleaned audio file.
|
|
"""
|
|
input_path = Path(input_path)
|
|
if not output_path:
|
|
output_path = str(input_path.with_stem(input_path.stem + "_clean"))
|
|
|
|
if DEEPFILTER_AVAILABLE:
|
|
return _clean_with_deepfilter(str(input_path), output_path)
|
|
else:
|
|
return _clean_with_ffmpeg(str(input_path), output_path)
|
|
|
|
|
|
def _clean_with_deepfilter(input_path: str, output_path: str) -> str:
|
|
model, state = _init_deepfilter()
|
|
audio, info = load_audio(input_path, sr=state.sr())
|
|
enhanced = enhance(model, state, audio)
|
|
save_audio(output_path, enhanced, sr=state.sr())
|
|
logger.info(f"DeepFilterNet cleaned audio saved to {output_path}")
|
|
return output_path
|
|
|
|
|
|
def _clean_with_ffmpeg(input_path: str, output_path: str) -> str:
|
|
"""Fallback: basic noise reduction using FFmpeg's anlmdn filter."""
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-i", input_path,
|
|
"-af", "anlmdn=s=7:p=0.002:r=0.002:m=15",
|
|
output_path,
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"FFmpeg audio cleaning failed: {result.stderr[-300:]}")
|
|
logger.info(f"FFmpeg cleaned audio saved to {output_path}")
|
|
return output_path
|
|
|
|
|
|
def is_deepfilter_available() -> bool:
|
|
return DEEPFILTER_AVAILABLE
|
|
|
|
|
|
def detect_silence_ranges(input_path: str, min_silence_ms: int, silence_db: float):
|
|
"""Detect silence ranges using ffmpeg silencedetect.
|
|
|
|
Returns a list of dicts: {start, end, duration} in seconds.
|
|
"""
|
|
min_silence_seconds = max(0.05, float(min_silence_ms) / 1000.0)
|
|
noise_threshold = float(silence_db)
|
|
|
|
cmd = [
|
|
"ffmpeg",
|
|
"-hide_banner",
|
|
"-i",
|
|
input_path,
|
|
"-af",
|
|
f"silencedetect=noise={noise_threshold}dB:d={min_silence_seconds}",
|
|
"-f",
|
|
"null",
|
|
"-",
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
# silencedetect prints to stderr even on success.
|
|
output = result.stderr or ""
|
|
start_pat = re.compile(r"silence_start:\s*([0-9.]+)")
|
|
end_pat = re.compile(r"silence_end:\s*([0-9.]+)\s*\|\s*silence_duration:\s*([0-9.]+)")
|
|
|
|
starts = [float(m.group(1)) for m in start_pat.finditer(output)]
|
|
ends = [(float(m.group(1)), float(m.group(2))) for m in end_pat.finditer(output)]
|
|
|
|
ranges = []
|
|
pair_count = min(len(starts), len(ends))
|
|
for i in range(pair_count):
|
|
start = max(0.0, starts[i])
|
|
end, duration = ends[i]
|
|
if end > start and duration >= min_silence_seconds:
|
|
ranges.append({
|
|
"start": round(start, 3),
|
|
"end": round(end, 3),
|
|
"duration": round(duration, 3),
|
|
})
|
|
|
|
logger.info(
|
|
"Detected %s silence ranges in %s (min=%sms, threshold=%sdB)",
|
|
len(ranges),
|
|
input_path,
|
|
min_silence_ms,
|
|
silence_db,
|
|
)
|
|
return ranges
|