403 lines
13 KiB
Python
403 lines
13 KiB
Python
"""
|
|
create_audiobook.py
|
|
------------------
|
|
Generic audiobook generator for text files that contain chapter headings.
|
|
|
|
Supported heading formats (single-line headings):
|
|
- Prologue
|
|
- Chapter 12
|
|
- Chapter 12 - Chapter Name
|
|
- Chapter - 12
|
|
- Chapter - 12 - Chapter Name
|
|
|
|
Features:
|
|
- Parses chapters from one or more input files/directories
|
|
- Caches parsed chapter data for faster re-runs when source files are unchanged
|
|
- Warns about missing chapter numbers (example: found 1,2,4 -> warns about 3)
|
|
- Generates one .wav per chapter with Kokoro
|
|
|
|
Examples:
|
|
python create_audiobook.py --input "Audio Text for Novel Lightbringer"
|
|
python create_audiobook.py --input novel.txt --list
|
|
python create_audiobook.py --input novel.txt 0 1 2 --voice am_michael
|
|
python create_audiobook.py --input novel.txt --preview 3000
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import re
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
import soundfile as sf
|
|
import torch
|
|
from kokoro import KPipeline
|
|
|
|
SAMPLE_RATE = 24000
|
|
SPEED = 1.0
|
|
LANG_CODE = "a"
|
|
VOICE = "am_onyx"
|
|
CACHE_VERSION = 1
|
|
|
|
PROLOGUE_RE = re.compile(r"^\s*Prologue\s*$", re.IGNORECASE)
|
|
CHAPTER_RE_1 = re.compile(r"^\s*Chapter\s*-\s*(\d+)(?:\s*-\s*(.+))?\s*$", re.IGNORECASE)
|
|
CHAPTER_RE_2 = re.compile(r"^\s*Chapter\s+(\d+)(?:\s*-\s*(.+))?\s*$", re.IGNORECASE)
|
|
RULE_RE = re.compile(r"^[_\-*\s]{3,}\s*$")
|
|
|
|
|
|
def _slug(text: str) -> str:
|
|
text = text.lower()
|
|
text = re.sub(r"[^a-z0-9]+", "_", text)
|
|
return text.strip("_")
|
|
|
|
|
|
def _clean_text(text: str) -> str:
|
|
text = RULE_RE.sub("", text)
|
|
text = re.sub(r"\n{3,}", "\n\n", text)
|
|
return text.strip()
|
|
|
|
|
|
def _fmt_duration(seconds: float) -> str:
|
|
h, rem = divmod(int(seconds), 3600)
|
|
m, s = divmod(rem, 60)
|
|
if h > 0:
|
|
return f"{h}h {m:02d}m {s:02d}s"
|
|
if m > 0:
|
|
return f"{m}m {s:02d}s"
|
|
return f"{s}s"
|
|
|
|
|
|
def _chapter_heading(line: str) -> tuple[int, str, str] | None:
|
|
stripped = line.strip()
|
|
if PROLOGUE_RE.match(stripped):
|
|
return (0, "Prologue", "Prologue")
|
|
|
|
m = CHAPTER_RE_1.match(stripped)
|
|
if not m:
|
|
m = CHAPTER_RE_2.match(stripped)
|
|
if not m:
|
|
return None
|
|
|
|
num = int(m.group(1))
|
|
title = (m.group(2) or "").strip()
|
|
label = f"Chapter {num}" + (f" - {title}" if title else "")
|
|
return (num, title, label)
|
|
|
|
|
|
def _resolve_txt_files(inputs: list[str]) -> list[Path]:
|
|
txt_files: list[Path] = []
|
|
for raw in inputs:
|
|
path = Path(raw)
|
|
if path.is_file():
|
|
if path.suffix.lower() == ".txt":
|
|
txt_files.append(path)
|
|
continue
|
|
if path.is_dir():
|
|
txt_files.extend(sorted(path.glob("*.txt")))
|
|
|
|
deduped = sorted({p.resolve() for p in txt_files})
|
|
return deduped
|
|
|
|
|
|
def _signature_for_files(files: list[Path]) -> list[dict]:
|
|
sig = []
|
|
for p in files:
|
|
st = p.stat()
|
|
sig.append({
|
|
"path": str(p),
|
|
"size": st.st_size,
|
|
"mtime_ns": st.st_mtime_ns,
|
|
})
|
|
return sig
|
|
|
|
|
|
def _cache_path(output_dir: Path, files: list[Path]) -> Path:
|
|
cache_dir = output_dir / ".cache"
|
|
digest = hashlib.sha256("\n".join(str(p) for p in files).encode("utf-8")).hexdigest()[:12]
|
|
return cache_dir / f"parse_{digest}.json"
|
|
|
|
|
|
def _load_cached_chapters(cache_file: Path, file_sig: list[dict]) -> list[dict] | None:
|
|
if not cache_file.exists():
|
|
return None
|
|
|
|
try:
|
|
data = json.loads(cache_file.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return None
|
|
|
|
if data.get("version") != CACHE_VERSION:
|
|
return None
|
|
if data.get("file_signature") != file_sig:
|
|
return None
|
|
|
|
chapters = data.get("chapters")
|
|
if not isinstance(chapters, list):
|
|
return None
|
|
return chapters
|
|
|
|
|
|
def _save_cached_chapters(cache_file: Path, file_sig: list[dict], chapters: list[dict]) -> None:
|
|
cache_file.parent.mkdir(parents=True, exist_ok=True)
|
|
payload = {
|
|
"version": CACHE_VERSION,
|
|
"file_signature": file_sig,
|
|
"chapters": chapters,
|
|
}
|
|
cache_file.write_text(json.dumps(payload, ensure_ascii=False), encoding="utf-8")
|
|
|
|
|
|
def _parse_chapters(files: list[Path]) -> tuple[list[dict], set[int]]:
|
|
chapters: list[dict] = []
|
|
duplicates: set[int] = set()
|
|
seen: set[int] = set()
|
|
current: dict | None = None
|
|
|
|
def flush_current() -> None:
|
|
if current is not None:
|
|
current["text"] = "".join(current.pop("lines"))
|
|
num = current["num"]
|
|
if num in seen:
|
|
duplicates.add(num)
|
|
return
|
|
seen.add(num)
|
|
chapters.append(current)
|
|
|
|
for fpath in files:
|
|
with fpath.open("r", encoding="utf-8") as fh:
|
|
for line in fh:
|
|
info = _chapter_heading(line)
|
|
if info is not None:
|
|
flush_current()
|
|
num, title, label = info
|
|
num_str = f"{num:02d}"
|
|
if num == 0:
|
|
slug = "chapter_00_prologue"
|
|
elif title:
|
|
slug = f"chapter_{num_str}_{_slug(title)}"
|
|
else:
|
|
slug = f"chapter_{num_str}"
|
|
current = {
|
|
"num": num,
|
|
"title": title,
|
|
"label": label,
|
|
"slug": slug,
|
|
"lines": [line],
|
|
}
|
|
elif current is not None:
|
|
current["lines"].append(line)
|
|
|
|
flush_current()
|
|
chapters.sort(key=lambda c: c["num"])
|
|
return chapters, duplicates
|
|
|
|
|
|
def load_all_chapters_with_cache(inputs: list[str], output_dir: Path, force_reparse: bool = False) -> tuple[list[dict], bool, set[int], list[Path]]:
|
|
files = _resolve_txt_files(inputs)
|
|
if not files:
|
|
raise FileNotFoundError("No .txt files found in --input paths")
|
|
|
|
file_sig = _signature_for_files(files)
|
|
cache_file = _cache_path(output_dir, files)
|
|
|
|
if not force_reparse:
|
|
cached = _load_cached_chapters(cache_file, file_sig)
|
|
if cached is not None:
|
|
return cached, True, set(), files
|
|
|
|
chapters, duplicates = _parse_chapters(files)
|
|
_save_cached_chapters(cache_file, file_sig, chapters)
|
|
return chapters, False, duplicates, files
|
|
|
|
|
|
def warn_missing_chapters(chapters: list[dict]) -> None:
|
|
nums = sorted(ch["num"] for ch in chapters if ch["num"] > 0)
|
|
if not nums:
|
|
return
|
|
missing = [n for n in range(nums[0], nums[-1] + 1) if n not in set(nums)]
|
|
if missing:
|
|
print(f"WARNING: missing chapter numbers detected: {missing}")
|
|
|
|
|
|
def generate_audio(pipeline: KPipeline, text: str, voice: str, output_path: Path) -> float:
|
|
t0 = time.monotonic()
|
|
chunks = []
|
|
for _, _, chunk_audio in pipeline(text, voice=voice, speed=SPEED):
|
|
if hasattr(chunk_audio, "numpy"):
|
|
chunk_audio = chunk_audio.cpu().numpy()
|
|
chunk_audio = np.atleast_1d(chunk_audio.squeeze())
|
|
if chunk_audio.size > 0:
|
|
chunks.append(chunk_audio)
|
|
|
|
elapsed = time.monotonic() - t0
|
|
if chunks:
|
|
audio = np.concatenate(chunks, axis=0)
|
|
sf.write(str(output_path), audio, SAMPLE_RATE)
|
|
duration = len(audio) / SAMPLE_RATE
|
|
print(
|
|
f" OK saved '{output_path.name}' "
|
|
f"({_fmt_duration(duration)} audio | {_fmt_duration(elapsed)} wall-clock)"
|
|
)
|
|
else:
|
|
print(f" ERROR no audio produced for voice='{voice}'")
|
|
return elapsed
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description="Generate an audiobook from chapterized text files.")
|
|
parser.add_argument(
|
|
"chapters",
|
|
nargs="*",
|
|
type=int,
|
|
help="Chapter numbers to generate (0 = Prologue). Default: all.",
|
|
)
|
|
parser.add_argument(
|
|
"--input",
|
|
nargs="+",
|
|
required=True,
|
|
help="One or more .txt files and/or directories containing .txt files.",
|
|
)
|
|
parser.add_argument(
|
|
"--output",
|
|
default="output_audiobook",
|
|
help="Output directory for generated chapter audio.",
|
|
)
|
|
parser.add_argument("--list", action="store_true", help="Print detected chapters and exit.")
|
|
parser.add_argument("--voice", default=VOICE, help=f"Kokoro voice to use (default: {VOICE}).")
|
|
parser.add_argument(
|
|
"--preview",
|
|
nargs="?",
|
|
const=3000,
|
|
type=int,
|
|
metavar="CHARS",
|
|
help="Generate short preview clips capped at CHARS (default: 3000).",
|
|
)
|
|
parser.add_argument(
|
|
"--reparse",
|
|
action="store_true",
|
|
help="Ignore cache and re-parse chapters from source files.",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
output_dir = Path(args.output)
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
print("Loading chapters...")
|
|
chapters, used_cache, duplicates, files = load_all_chapters_with_cache(
|
|
args.input, output_dir, force_reparse=args.reparse
|
|
)
|
|
|
|
print(f"Input files: {len(files)}")
|
|
print(f"Parse cache: {'HIT' if used_cache else 'MISS'}")
|
|
|
|
if duplicates:
|
|
print(f"WARNING: duplicate chapter numbers were found and ignored: {sorted(duplicates)}")
|
|
|
|
if not chapters:
|
|
print("WARNING: no chapters found.")
|
|
print("Expected headings like: 'Prologue' or 'Chapter 12 - Name' or 'Chapter - 12'")
|
|
return
|
|
|
|
warn_missing_chapters(chapters)
|
|
|
|
if args.list:
|
|
print(f"\nDetected {len(chapters)} chapters:\n")
|
|
print(f" {'#':>4} {'Label':<45} {'Chars':>8} {'Output filename'}")
|
|
print(f" {'-' * 4} {'-' * 45} {'-' * 8} {'-' * 30}")
|
|
for ch in chapters:
|
|
chars = len(_clean_text(ch["text"]))
|
|
print(f" {ch['num']:>4} {ch['label']:<45} {chars:>8,} {ch['slug']}.wav")
|
|
return
|
|
|
|
if args.chapters:
|
|
requested = set(args.chapters)
|
|
run_chapters = [ch for ch in chapters if ch["num"] in requested]
|
|
missing_req = sorted(requested - {ch["num"] for ch in run_chapters})
|
|
if missing_req:
|
|
print(f"WARNING: requested chapter(s) not found: {missing_req}")
|
|
else:
|
|
run_chapters = chapters
|
|
|
|
if not run_chapters:
|
|
print("No chapters selected. Use --list to see available chapters.")
|
|
return
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu"
|
|
print(f"Device: {device}")
|
|
if device == "cuda":
|
|
print(f"GPU: {torch.cuda.get_device_name(0)}")
|
|
print(f"Voice: {args.voice}")
|
|
|
|
chapter_chars = {ch["num"]: len(_clean_text(ch["text"])) for ch in run_chapters}
|
|
total_chars = sum(chapter_chars.values())
|
|
|
|
preview_note = f"PREVIEW MODE: capped at {args.preview:,} chars/chapter" if args.preview else ""
|
|
if preview_note:
|
|
print(preview_note)
|
|
|
|
print("\nPlan:")
|
|
for ch in run_chapters:
|
|
print(f" {ch['num']:>3} {ch['label']} ({chapter_chars[ch['num']]:,} chars)")
|
|
print(f" TOTAL: {total_chars:,} chars\n")
|
|
|
|
print("Initializing Kokoro pipeline...")
|
|
pipeline = KPipeline(lang_code=LANG_CODE)
|
|
|
|
chars_per_sec: float | None = None
|
|
timing_rows: list[tuple[str, int, float]] = []
|
|
|
|
for ch in run_chapters:
|
|
text = _clean_text(ch["text"])
|
|
if not text:
|
|
print(f"[{ch['label']}] WARNING empty text, skipping")
|
|
continue
|
|
|
|
if args.preview and len(text) > args.preview:
|
|
cut = text.rfind(" ", 0, args.preview)
|
|
text = text[: cut if cut > 0 else args.preview]
|
|
|
|
chars = len(text)
|
|
preview_tag = "_preview" if args.preview else ""
|
|
out_path = output_dir / f"{ch['slug']}{preview_tag}.wav"
|
|
|
|
if chars_per_sec is not None:
|
|
eta = _fmt_duration(chars / chars_per_sec)
|
|
print(f"\n[{ch['label']}] -> {out_path.name} (est. {eta})")
|
|
else:
|
|
print(f"\n[{ch['label']}] -> {out_path.name} (calibration run)")
|
|
|
|
elapsed = generate_audio(pipeline, text, args.voice, out_path)
|
|
timing_rows.append((ch["label"], chars, elapsed))
|
|
|
|
done_chars = sum(c for _, c, _ in timing_rows)
|
|
done_elapsed = sum(e for _, _, e in timing_rows)
|
|
if done_elapsed > 0:
|
|
chars_per_sec = done_chars / done_elapsed
|
|
remaining = total_chars - done_chars
|
|
eta_total = _fmt_duration(remaining / chars_per_sec) if remaining > 0 else "0s"
|
|
print(f" Speed: {chars_per_sec:.0f} chars/sec | Estimated remaining: {eta_total}")
|
|
|
|
print("\nSummary:")
|
|
print(f" {'Chapter':<35} {'Chars':>7} {'Actual':>8} {'Est':>8}")
|
|
print(" " + "-" * 65)
|
|
for i, (label, chars, elapsed) in enumerate(timing_rows):
|
|
actual_str = _fmt_duration(elapsed)
|
|
prior_chars = sum(c for _, c, _ in timing_rows[:i])
|
|
prior_elapsed = sum(e for _, _, e in timing_rows[:i])
|
|
est_str = _fmt_duration(chars / (prior_chars / prior_elapsed)) if prior_elapsed > 0 else "(first)"
|
|
print(f" {label:<35} {chars:>7,} {actual_str:>8} {est_str:>8}")
|
|
|
|
total_elapsed = sum(e for _, _, e in timing_rows)
|
|
total_done_chars = sum(c for _, c, _ in timing_rows)
|
|
print(" " + "-" * 65)
|
|
print(f" {'TOTAL':<35} {total_done_chars:>7,} {_fmt_duration(total_elapsed):>8}")
|
|
print("\nDone.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|