Files
audiobook_creator/create_audiobook.py

403 lines
13 KiB
Python
Raw Permalink Normal View History

2026-04-08 01:42:29 -06:00
"""
create_audiobook.py
------------------
Generic audiobook generator for text files that contain chapter headings.
Supported heading formats (single-line headings):
- Prologue
- Chapter 12
- Chapter 12 - Chapter Name
- Chapter - 12
- Chapter - 12 - Chapter Name
Features:
- Parses chapters from one or more input files/directories
- Caches parsed chapter data for faster re-runs when source files are unchanged
- Warns about missing chapter numbers (example: found 1,2,4 -> warns about 3)
- Generates one .wav per chapter with Kokoro
Examples:
python create_audiobook.py --input "Audio Text for Novel Lightbringer"
python create_audiobook.py --input novel.txt --list
python create_audiobook.py --input novel.txt 0 1 2 --voice am_michael
python create_audiobook.py --input novel.txt --preview 3000
"""
from __future__ import annotations
import argparse
import hashlib
import json
import re
import time
from pathlib import Path
import numpy as np
import soundfile as sf
import torch
from kokoro import KPipeline
SAMPLE_RATE = 24000
SPEED = 1.0
LANG_CODE = "a"
VOICE = "am_onyx"
CACHE_VERSION = 1
PROLOGUE_RE = re.compile(r"^\s*Prologue\s*$", re.IGNORECASE)
CHAPTER_RE_1 = re.compile(r"^\s*Chapter\s*-\s*(\d+)(?:\s*-\s*(.+))?\s*$", re.IGNORECASE)
CHAPTER_RE_2 = re.compile(r"^\s*Chapter\s+(\d+)(?:\s*-\s*(.+))?\s*$", re.IGNORECASE)
RULE_RE = re.compile(r"^[_\-*\s]{3,}\s*$")
def _slug(text: str) -> str:
text = text.lower()
text = re.sub(r"[^a-z0-9]+", "_", text)
return text.strip("_")
def _clean_text(text: str) -> str:
text = RULE_RE.sub("", text)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
def _fmt_duration(seconds: float) -> str:
h, rem = divmod(int(seconds), 3600)
m, s = divmod(rem, 60)
if h > 0:
return f"{h}h {m:02d}m {s:02d}s"
if m > 0:
return f"{m}m {s:02d}s"
return f"{s}s"
def _chapter_heading(line: str) -> tuple[int, str, str] | None:
stripped = line.strip()
if PROLOGUE_RE.match(stripped):
return (0, "Prologue", "Prologue")
m = CHAPTER_RE_1.match(stripped)
if not m:
m = CHAPTER_RE_2.match(stripped)
if not m:
return None
num = int(m.group(1))
title = (m.group(2) or "").strip()
label = f"Chapter {num}" + (f" - {title}" if title else "")
return (num, title, label)
def _resolve_txt_files(inputs: list[str]) -> list[Path]:
txt_files: list[Path] = []
for raw in inputs:
path = Path(raw)
if path.is_file():
if path.suffix.lower() == ".txt":
txt_files.append(path)
continue
if path.is_dir():
txt_files.extend(sorted(path.glob("*.txt")))
deduped = sorted({p.resolve() for p in txt_files})
return deduped
def _signature_for_files(files: list[Path]) -> list[dict]:
sig = []
for p in files:
st = p.stat()
sig.append({
"path": str(p),
"size": st.st_size,
"mtime_ns": st.st_mtime_ns,
})
return sig
def _cache_path(output_dir: Path, files: list[Path]) -> Path:
cache_dir = output_dir / ".cache"
digest = hashlib.sha256("\n".join(str(p) for p in files).encode("utf-8")).hexdigest()[:12]
return cache_dir / f"parse_{digest}.json"
def _load_cached_chapters(cache_file: Path, file_sig: list[dict]) -> list[dict] | None:
if not cache_file.exists():
return None
try:
data = json.loads(cache_file.read_text(encoding="utf-8"))
except Exception:
return None
if data.get("version") != CACHE_VERSION:
return None
if data.get("file_signature") != file_sig:
return None
chapters = data.get("chapters")
if not isinstance(chapters, list):
return None
return chapters
def _save_cached_chapters(cache_file: Path, file_sig: list[dict], chapters: list[dict]) -> None:
cache_file.parent.mkdir(parents=True, exist_ok=True)
payload = {
"version": CACHE_VERSION,
"file_signature": file_sig,
"chapters": chapters,
}
cache_file.write_text(json.dumps(payload, ensure_ascii=False), encoding="utf-8")
def _parse_chapters(files: list[Path]) -> tuple[list[dict], set[int]]:
chapters: list[dict] = []
duplicates: set[int] = set()
seen: set[int] = set()
current: dict | None = None
def flush_current() -> None:
if current is not None:
current["text"] = "".join(current.pop("lines"))
num = current["num"]
if num in seen:
duplicates.add(num)
return
seen.add(num)
chapters.append(current)
for fpath in files:
with fpath.open("r", encoding="utf-8") as fh:
for line in fh:
info = _chapter_heading(line)
if info is not None:
flush_current()
num, title, label = info
num_str = f"{num:02d}"
if num == 0:
slug = "chapter_00_prologue"
elif title:
slug = f"chapter_{num_str}_{_slug(title)}"
else:
slug = f"chapter_{num_str}"
current = {
"num": num,
"title": title,
"label": label,
"slug": slug,
"lines": [line],
}
elif current is not None:
current["lines"].append(line)
flush_current()
chapters.sort(key=lambda c: c["num"])
return chapters, duplicates
def load_all_chapters_with_cache(inputs: list[str], output_dir: Path, force_reparse: bool = False) -> tuple[list[dict], bool, set[int], list[Path]]:
files = _resolve_txt_files(inputs)
if not files:
raise FileNotFoundError("No .txt files found in --input paths")
file_sig = _signature_for_files(files)
cache_file = _cache_path(output_dir, files)
if not force_reparse:
cached = _load_cached_chapters(cache_file, file_sig)
if cached is not None:
return cached, True, set(), files
chapters, duplicates = _parse_chapters(files)
_save_cached_chapters(cache_file, file_sig, chapters)
return chapters, False, duplicates, files
def warn_missing_chapters(chapters: list[dict]) -> None:
nums = sorted(ch["num"] for ch in chapters if ch["num"] > 0)
if not nums:
return
missing = [n for n in range(nums[0], nums[-1] + 1) if n not in set(nums)]
if missing:
print(f"WARNING: missing chapter numbers detected: {missing}")
def generate_audio(pipeline: KPipeline, text: str, voice: str, output_path: Path) -> float:
t0 = time.monotonic()
chunks = []
for _, _, chunk_audio in pipeline(text, voice=voice, speed=SPEED):
if hasattr(chunk_audio, "numpy"):
chunk_audio = chunk_audio.cpu().numpy()
chunk_audio = np.atleast_1d(chunk_audio.squeeze())
if chunk_audio.size > 0:
chunks.append(chunk_audio)
elapsed = time.monotonic() - t0
if chunks:
audio = np.concatenate(chunks, axis=0)
sf.write(str(output_path), audio, SAMPLE_RATE)
duration = len(audio) / SAMPLE_RATE
print(
f" OK saved '{output_path.name}' "
f"({_fmt_duration(duration)} audio | {_fmt_duration(elapsed)} wall-clock)"
)
else:
print(f" ERROR no audio produced for voice='{voice}'")
return elapsed
def main() -> None:
parser = argparse.ArgumentParser(description="Generate an audiobook from chapterized text files.")
parser.add_argument(
"chapters",
nargs="*",
type=int,
help="Chapter numbers to generate (0 = Prologue). Default: all.",
)
parser.add_argument(
"--input",
nargs="+",
required=True,
help="One or more .txt files and/or directories containing .txt files.",
)
parser.add_argument(
"--output",
default="output_audiobook",
help="Output directory for generated chapter audio.",
)
parser.add_argument("--list", action="store_true", help="Print detected chapters and exit.")
parser.add_argument("--voice", default=VOICE, help=f"Kokoro voice to use (default: {VOICE}).")
parser.add_argument(
"--preview",
nargs="?",
const=3000,
type=int,
metavar="CHARS",
help="Generate short preview clips capped at CHARS (default: 3000).",
)
parser.add_argument(
"--reparse",
action="store_true",
help="Ignore cache and re-parse chapters from source files.",
)
args = parser.parse_args()
output_dir = Path(args.output)
output_dir.mkdir(parents=True, exist_ok=True)
print("Loading chapters...")
chapters, used_cache, duplicates, files = load_all_chapters_with_cache(
args.input, output_dir, force_reparse=args.reparse
)
print(f"Input files: {len(files)}")
print(f"Parse cache: {'HIT' if used_cache else 'MISS'}")
if duplicates:
print(f"WARNING: duplicate chapter numbers were found and ignored: {sorted(duplicates)}")
if not chapters:
print("WARNING: no chapters found.")
print("Expected headings like: 'Prologue' or 'Chapter 12 - Name' or 'Chapter - 12'")
return
warn_missing_chapters(chapters)
if args.list:
print(f"\nDetected {len(chapters)} chapters:\n")
print(f" {'#':>4} {'Label':<45} {'Chars':>8} {'Output filename'}")
print(f" {'-' * 4} {'-' * 45} {'-' * 8} {'-' * 30}")
for ch in chapters:
chars = len(_clean_text(ch["text"]))
print(f" {ch['num']:>4} {ch['label']:<45} {chars:>8,} {ch['slug']}.wav")
return
if args.chapters:
requested = set(args.chapters)
run_chapters = [ch for ch in chapters if ch["num"] in requested]
missing_req = sorted(requested - {ch["num"] for ch in run_chapters})
if missing_req:
print(f"WARNING: requested chapter(s) not found: {missing_req}")
else:
run_chapters = chapters
if not run_chapters:
print("No chapters selected. Use --list to see available chapters.")
return
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device: {device}")
if device == "cuda":
print(f"GPU: {torch.cuda.get_device_name(0)}")
print(f"Voice: {args.voice}")
chapter_chars = {ch["num"]: len(_clean_text(ch["text"])) for ch in run_chapters}
total_chars = sum(chapter_chars.values())
preview_note = f"PREVIEW MODE: capped at {args.preview:,} chars/chapter" if args.preview else ""
if preview_note:
print(preview_note)
print("\nPlan:")
for ch in run_chapters:
print(f" {ch['num']:>3} {ch['label']} ({chapter_chars[ch['num']]:,} chars)")
print(f" TOTAL: {total_chars:,} chars\n")
print("Initializing Kokoro pipeline...")
pipeline = KPipeline(lang_code=LANG_CODE)
chars_per_sec: float | None = None
timing_rows: list[tuple[str, int, float]] = []
for ch in run_chapters:
text = _clean_text(ch["text"])
if not text:
print(f"[{ch['label']}] WARNING empty text, skipping")
continue
if args.preview and len(text) > args.preview:
cut = text.rfind(" ", 0, args.preview)
text = text[: cut if cut > 0 else args.preview]
chars = len(text)
preview_tag = "_preview" if args.preview else ""
out_path = output_dir / f"{ch['slug']}{preview_tag}.wav"
if chars_per_sec is not None:
eta = _fmt_duration(chars / chars_per_sec)
print(f"\n[{ch['label']}] -> {out_path.name} (est. {eta})")
else:
print(f"\n[{ch['label']}] -> {out_path.name} (calibration run)")
elapsed = generate_audio(pipeline, text, args.voice, out_path)
timing_rows.append((ch["label"], chars, elapsed))
done_chars = sum(c for _, c, _ in timing_rows)
done_elapsed = sum(e for _, _, e in timing_rows)
if done_elapsed > 0:
chars_per_sec = done_chars / done_elapsed
remaining = total_chars - done_chars
eta_total = _fmt_duration(remaining / chars_per_sec) if remaining > 0 else "0s"
print(f" Speed: {chars_per_sec:.0f} chars/sec | Estimated remaining: {eta_total}")
print("\nSummary:")
print(f" {'Chapter':<35} {'Chars':>7} {'Actual':>8} {'Est':>8}")
print(" " + "-" * 65)
for i, (label, chars, elapsed) in enumerate(timing_rows):
actual_str = _fmt_duration(elapsed)
prior_chars = sum(c for _, c, _ in timing_rows[:i])
prior_elapsed = sum(e for _, _, e in timing_rows[:i])
est_str = _fmt_duration(chars / (prior_chars / prior_elapsed)) if prior_elapsed > 0 else "(first)"
print(f" {label:<35} {chars:>7,} {actual_str:>8} {est_str:>8}")
total_elapsed = sum(e for _, _, e in timing_rows)
total_done_chars = sum(c for _, c, _ in timing_rows)
print(" " + "-" * 65)
print(f" {'TOTAL':<35} {total_done_chars:>7,} {_fmt_duration(total_elapsed):>8}")
print("\nDone.")
if __name__ == "__main__":
main()