audio gen in gui
This commit is contained in:
402
create_audiobook.py
Normal file
402
create_audiobook.py
Normal file
@ -0,0 +1,402 @@
|
||||
"""
|
||||
create_audiobook.py
|
||||
------------------
|
||||
Generic audiobook generator for text files that contain chapter headings.
|
||||
|
||||
Supported heading formats (single-line headings):
|
||||
- Prologue
|
||||
- Chapter 12
|
||||
- Chapter 12 - Chapter Name
|
||||
- Chapter - 12
|
||||
- Chapter - 12 - Chapter Name
|
||||
|
||||
Features:
|
||||
- Parses chapters from one or more input files/directories
|
||||
- Caches parsed chapter data for faster re-runs when source files are unchanged
|
||||
- Warns about missing chapter numbers (example: found 1,2,4 -> warns about 3)
|
||||
- Generates one .wav per chapter with Kokoro
|
||||
|
||||
Examples:
|
||||
python create_audiobook.py --input "Audio Text for Novel Lightbringer"
|
||||
python create_audiobook.py --input novel.txt --list
|
||||
python create_audiobook.py --input novel.txt 0 1 2 --voice am_michael
|
||||
python create_audiobook.py --input novel.txt --preview 3000
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
import soundfile as sf
|
||||
import torch
|
||||
from kokoro import KPipeline
|
||||
|
||||
SAMPLE_RATE = 24000
|
||||
SPEED = 1.0
|
||||
LANG_CODE = "a"
|
||||
VOICE = "am_onyx"
|
||||
CACHE_VERSION = 1
|
||||
|
||||
PROLOGUE_RE = re.compile(r"^\s*Prologue\s*$", re.IGNORECASE)
|
||||
CHAPTER_RE_1 = re.compile(r"^\s*Chapter\s*-\s*(\d+)(?:\s*-\s*(.+))?\s*$", re.IGNORECASE)
|
||||
CHAPTER_RE_2 = re.compile(r"^\s*Chapter\s+(\d+)(?:\s*-\s*(.+))?\s*$", re.IGNORECASE)
|
||||
RULE_RE = re.compile(r"^[_\-*\s]{3,}\s*$")
|
||||
|
||||
|
||||
def _slug(text: str) -> str:
|
||||
text = text.lower()
|
||||
text = re.sub(r"[^a-z0-9]+", "_", text)
|
||||
return text.strip("_")
|
||||
|
||||
|
||||
def _clean_text(text: str) -> str:
|
||||
text = RULE_RE.sub("", text)
|
||||
text = re.sub(r"\n{3,}", "\n\n", text)
|
||||
return text.strip()
|
||||
|
||||
|
||||
def _fmt_duration(seconds: float) -> str:
|
||||
h, rem = divmod(int(seconds), 3600)
|
||||
m, s = divmod(rem, 60)
|
||||
if h > 0:
|
||||
return f"{h}h {m:02d}m {s:02d}s"
|
||||
if m > 0:
|
||||
return f"{m}m {s:02d}s"
|
||||
return f"{s}s"
|
||||
|
||||
|
||||
def _chapter_heading(line: str) -> tuple[int, str, str] | None:
|
||||
stripped = line.strip()
|
||||
if PROLOGUE_RE.match(stripped):
|
||||
return (0, "Prologue", "Prologue")
|
||||
|
||||
m = CHAPTER_RE_1.match(stripped)
|
||||
if not m:
|
||||
m = CHAPTER_RE_2.match(stripped)
|
||||
if not m:
|
||||
return None
|
||||
|
||||
num = int(m.group(1))
|
||||
title = (m.group(2) or "").strip()
|
||||
label = f"Chapter {num}" + (f" - {title}" if title else "")
|
||||
return (num, title, label)
|
||||
|
||||
|
||||
def _resolve_txt_files(inputs: list[str]) -> list[Path]:
|
||||
txt_files: list[Path] = []
|
||||
for raw in inputs:
|
||||
path = Path(raw)
|
||||
if path.is_file():
|
||||
if path.suffix.lower() == ".txt":
|
||||
txt_files.append(path)
|
||||
continue
|
||||
if path.is_dir():
|
||||
txt_files.extend(sorted(path.glob("*.txt")))
|
||||
|
||||
deduped = sorted({p.resolve() for p in txt_files})
|
||||
return deduped
|
||||
|
||||
|
||||
def _signature_for_files(files: list[Path]) -> list[dict]:
|
||||
sig = []
|
||||
for p in files:
|
||||
st = p.stat()
|
||||
sig.append({
|
||||
"path": str(p),
|
||||
"size": st.st_size,
|
||||
"mtime_ns": st.st_mtime_ns,
|
||||
})
|
||||
return sig
|
||||
|
||||
|
||||
def _cache_path(output_dir: Path, files: list[Path]) -> Path:
|
||||
cache_dir = output_dir / ".cache"
|
||||
digest = hashlib.sha256("\n".join(str(p) for p in files).encode("utf-8")).hexdigest()[:12]
|
||||
return cache_dir / f"parse_{digest}.json"
|
||||
|
||||
|
||||
def _load_cached_chapters(cache_file: Path, file_sig: list[dict]) -> list[dict] | None:
|
||||
if not cache_file.exists():
|
||||
return None
|
||||
|
||||
try:
|
||||
data = json.loads(cache_file.read_text(encoding="utf-8"))
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
if data.get("version") != CACHE_VERSION:
|
||||
return None
|
||||
if data.get("file_signature") != file_sig:
|
||||
return None
|
||||
|
||||
chapters = data.get("chapters")
|
||||
if not isinstance(chapters, list):
|
||||
return None
|
||||
return chapters
|
||||
|
||||
|
||||
def _save_cached_chapters(cache_file: Path, file_sig: list[dict], chapters: list[dict]) -> None:
|
||||
cache_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
payload = {
|
||||
"version": CACHE_VERSION,
|
||||
"file_signature": file_sig,
|
||||
"chapters": chapters,
|
||||
}
|
||||
cache_file.write_text(json.dumps(payload, ensure_ascii=False), encoding="utf-8")
|
||||
|
||||
|
||||
def _parse_chapters(files: list[Path]) -> tuple[list[dict], set[int]]:
|
||||
chapters: list[dict] = []
|
||||
duplicates: set[int] = set()
|
||||
seen: set[int] = set()
|
||||
current: dict | None = None
|
||||
|
||||
def flush_current() -> None:
|
||||
if current is not None:
|
||||
current["text"] = "".join(current.pop("lines"))
|
||||
num = current["num"]
|
||||
if num in seen:
|
||||
duplicates.add(num)
|
||||
return
|
||||
seen.add(num)
|
||||
chapters.append(current)
|
||||
|
||||
for fpath in files:
|
||||
with fpath.open("r", encoding="utf-8") as fh:
|
||||
for line in fh:
|
||||
info = _chapter_heading(line)
|
||||
if info is not None:
|
||||
flush_current()
|
||||
num, title, label = info
|
||||
num_str = f"{num:02d}"
|
||||
if num == 0:
|
||||
slug = "chapter_00_prologue"
|
||||
elif title:
|
||||
slug = f"chapter_{num_str}_{_slug(title)}"
|
||||
else:
|
||||
slug = f"chapter_{num_str}"
|
||||
current = {
|
||||
"num": num,
|
||||
"title": title,
|
||||
"label": label,
|
||||
"slug": slug,
|
||||
"lines": [line],
|
||||
}
|
||||
elif current is not None:
|
||||
current["lines"].append(line)
|
||||
|
||||
flush_current()
|
||||
chapters.sort(key=lambda c: c["num"])
|
||||
return chapters, duplicates
|
||||
|
||||
|
||||
def load_all_chapters_with_cache(inputs: list[str], output_dir: Path, force_reparse: bool = False) -> tuple[list[dict], bool, set[int], list[Path]]:
|
||||
files = _resolve_txt_files(inputs)
|
||||
if not files:
|
||||
raise FileNotFoundError("No .txt files found in --input paths")
|
||||
|
||||
file_sig = _signature_for_files(files)
|
||||
cache_file = _cache_path(output_dir, files)
|
||||
|
||||
if not force_reparse:
|
||||
cached = _load_cached_chapters(cache_file, file_sig)
|
||||
if cached is not None:
|
||||
return cached, True, set(), files
|
||||
|
||||
chapters, duplicates = _parse_chapters(files)
|
||||
_save_cached_chapters(cache_file, file_sig, chapters)
|
||||
return chapters, False, duplicates, files
|
||||
|
||||
|
||||
def warn_missing_chapters(chapters: list[dict]) -> None:
|
||||
nums = sorted(ch["num"] for ch in chapters if ch["num"] > 0)
|
||||
if not nums:
|
||||
return
|
||||
missing = [n for n in range(nums[0], nums[-1] + 1) if n not in set(nums)]
|
||||
if missing:
|
||||
print(f"WARNING: missing chapter numbers detected: {missing}")
|
||||
|
||||
|
||||
def generate_audio(pipeline: KPipeline, text: str, voice: str, output_path: Path) -> float:
|
||||
t0 = time.monotonic()
|
||||
chunks = []
|
||||
for _, _, chunk_audio in pipeline(text, voice=voice, speed=SPEED):
|
||||
if hasattr(chunk_audio, "numpy"):
|
||||
chunk_audio = chunk_audio.cpu().numpy()
|
||||
chunk_audio = np.atleast_1d(chunk_audio.squeeze())
|
||||
if chunk_audio.size > 0:
|
||||
chunks.append(chunk_audio)
|
||||
|
||||
elapsed = time.monotonic() - t0
|
||||
if chunks:
|
||||
audio = np.concatenate(chunks, axis=0)
|
||||
sf.write(str(output_path), audio, SAMPLE_RATE)
|
||||
duration = len(audio) / SAMPLE_RATE
|
||||
print(
|
||||
f" OK saved '{output_path.name}' "
|
||||
f"({_fmt_duration(duration)} audio | {_fmt_duration(elapsed)} wall-clock)"
|
||||
)
|
||||
else:
|
||||
print(f" ERROR no audio produced for voice='{voice}'")
|
||||
return elapsed
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Generate an audiobook from chapterized text files.")
|
||||
parser.add_argument(
|
||||
"chapters",
|
||||
nargs="*",
|
||||
type=int,
|
||||
help="Chapter numbers to generate (0 = Prologue). Default: all.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--input",
|
||||
nargs="+",
|
||||
required=True,
|
||||
help="One or more .txt files and/or directories containing .txt files.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output",
|
||||
default="output_audiobook",
|
||||
help="Output directory for generated chapter audio.",
|
||||
)
|
||||
parser.add_argument("--list", action="store_true", help="Print detected chapters and exit.")
|
||||
parser.add_argument("--voice", default=VOICE, help=f"Kokoro voice to use (default: {VOICE}).")
|
||||
parser.add_argument(
|
||||
"--preview",
|
||||
nargs="?",
|
||||
const=3000,
|
||||
type=int,
|
||||
metavar="CHARS",
|
||||
help="Generate short preview clips capped at CHARS (default: 3000).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--reparse",
|
||||
action="store_true",
|
||||
help="Ignore cache and re-parse chapters from source files.",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
output_dir = Path(args.output)
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
print("Loading chapters...")
|
||||
chapters, used_cache, duplicates, files = load_all_chapters_with_cache(
|
||||
args.input, output_dir, force_reparse=args.reparse
|
||||
)
|
||||
|
||||
print(f"Input files: {len(files)}")
|
||||
print(f"Parse cache: {'HIT' if used_cache else 'MISS'}")
|
||||
|
||||
if duplicates:
|
||||
print(f"WARNING: duplicate chapter numbers were found and ignored: {sorted(duplicates)}")
|
||||
|
||||
if not chapters:
|
||||
print("WARNING: no chapters found.")
|
||||
print("Expected headings like: 'Prologue' or 'Chapter 12 - Name' or 'Chapter - 12'")
|
||||
return
|
||||
|
||||
warn_missing_chapters(chapters)
|
||||
|
||||
if args.list:
|
||||
print(f"\nDetected {len(chapters)} chapters:\n")
|
||||
print(f" {'#':>4} {'Label':<45} {'Chars':>8} {'Output filename'}")
|
||||
print(f" {'-' * 4} {'-' * 45} {'-' * 8} {'-' * 30}")
|
||||
for ch in chapters:
|
||||
chars = len(_clean_text(ch["text"]))
|
||||
print(f" {ch['num']:>4} {ch['label']:<45} {chars:>8,} {ch['slug']}.wav")
|
||||
return
|
||||
|
||||
if args.chapters:
|
||||
requested = set(args.chapters)
|
||||
run_chapters = [ch for ch in chapters if ch["num"] in requested]
|
||||
missing_req = sorted(requested - {ch["num"] for ch in run_chapters})
|
||||
if missing_req:
|
||||
print(f"WARNING: requested chapter(s) not found: {missing_req}")
|
||||
else:
|
||||
run_chapters = chapters
|
||||
|
||||
if not run_chapters:
|
||||
print("No chapters selected. Use --list to see available chapters.")
|
||||
return
|
||||
|
||||
device = "cuda" if torch.cuda.is_available() else "cpu"
|
||||
print(f"Device: {device}")
|
||||
if device == "cuda":
|
||||
print(f"GPU: {torch.cuda.get_device_name(0)}")
|
||||
print(f"Voice: {args.voice}")
|
||||
|
||||
chapter_chars = {ch["num"]: len(_clean_text(ch["text"])) for ch in run_chapters}
|
||||
total_chars = sum(chapter_chars.values())
|
||||
|
||||
preview_note = f"PREVIEW MODE: capped at {args.preview:,} chars/chapter" if args.preview else ""
|
||||
if preview_note:
|
||||
print(preview_note)
|
||||
|
||||
print("\nPlan:")
|
||||
for ch in run_chapters:
|
||||
print(f" {ch['num']:>3} {ch['label']} ({chapter_chars[ch['num']]:,} chars)")
|
||||
print(f" TOTAL: {total_chars:,} chars\n")
|
||||
|
||||
print("Initializing Kokoro pipeline...")
|
||||
pipeline = KPipeline(lang_code=LANG_CODE)
|
||||
|
||||
chars_per_sec: float | None = None
|
||||
timing_rows: list[tuple[str, int, float]] = []
|
||||
|
||||
for ch in run_chapters:
|
||||
text = _clean_text(ch["text"])
|
||||
if not text:
|
||||
print(f"[{ch['label']}] WARNING empty text, skipping")
|
||||
continue
|
||||
|
||||
if args.preview and len(text) > args.preview:
|
||||
cut = text.rfind(" ", 0, args.preview)
|
||||
text = text[: cut if cut > 0 else args.preview]
|
||||
|
||||
chars = len(text)
|
||||
preview_tag = "_preview" if args.preview else ""
|
||||
out_path = output_dir / f"{ch['slug']}{preview_tag}.wav"
|
||||
|
||||
if chars_per_sec is not None:
|
||||
eta = _fmt_duration(chars / chars_per_sec)
|
||||
print(f"\n[{ch['label']}] -> {out_path.name} (est. {eta})")
|
||||
else:
|
||||
print(f"\n[{ch['label']}] -> {out_path.name} (calibration run)")
|
||||
|
||||
elapsed = generate_audio(pipeline, text, args.voice, out_path)
|
||||
timing_rows.append((ch["label"], chars, elapsed))
|
||||
|
||||
done_chars = sum(c for _, c, _ in timing_rows)
|
||||
done_elapsed = sum(e for _, _, e in timing_rows)
|
||||
if done_elapsed > 0:
|
||||
chars_per_sec = done_chars / done_elapsed
|
||||
remaining = total_chars - done_chars
|
||||
eta_total = _fmt_duration(remaining / chars_per_sec) if remaining > 0 else "0s"
|
||||
print(f" Speed: {chars_per_sec:.0f} chars/sec | Estimated remaining: {eta_total}")
|
||||
|
||||
print("\nSummary:")
|
||||
print(f" {'Chapter':<35} {'Chars':>7} {'Actual':>8} {'Est':>8}")
|
||||
print(" " + "-" * 65)
|
||||
for i, (label, chars, elapsed) in enumerate(timing_rows):
|
||||
actual_str = _fmt_duration(elapsed)
|
||||
prior_chars = sum(c for _, c, _ in timing_rows[:i])
|
||||
prior_elapsed = sum(e for _, _, e in timing_rows[:i])
|
||||
est_str = _fmt_duration(chars / (prior_chars / prior_elapsed)) if prior_elapsed > 0 else "(first)"
|
||||
print(f" {label:<35} {chars:>7,} {actual_str:>8} {est_str:>8}")
|
||||
|
||||
total_elapsed = sum(e for _, _, e in timing_rows)
|
||||
total_done_chars = sum(c for _, c, _ in timing_rows)
|
||||
print(" " + "-" * 65)
|
||||
print(f" {'TOTAL':<35} {total_done_chars:>7,} {_fmt_duration(total_elapsed):>8}")
|
||||
print("\nDone.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user