1398 lines
55 KiB
Python
1398 lines
55 KiB
Python
"""
|
||
gui_proper_noun_player.py
|
||
──────────────────────────
|
||
GUI for auditing proper noun pronunciations — supports multiple books.
|
||
|
||
Each book's data is isolated in its own subdirectory:
|
||
output_proper_nouns/<book_slug>/manifest.json
|
||
output_proper_nouns/<book_slug>/correct_words.json
|
||
output_proper_nouns/<book_slug>/pronunciation_fixes.json
|
||
proper_nouns_audio/<book_slug>/<word>.wav
|
||
proper_nouns_audio/<book_slug>/replacements_cache/<phonetic>.wav
|
||
|
||
Three columns (all persisted as JSON per book):
|
||
• Review – words not yet audited
|
||
• Correct – words that already pronounce fine
|
||
• Fixes – linked list: original word → phonetic replacement
|
||
e.g. "Nephi" → "Kneephi"
|
||
|
||
Hotkeys (always active):
|
||
Space – replay current word
|
||
s – stop audio
|
||
Escape – reset fix entry to original word, refocus review list
|
||
|
||
On the Review list:
|
||
↑ / ↓ – navigate
|
||
Click / Enter – play word AND focus fix entry
|
||
|
||
On the fix entry (bottom bar, right of the word label):
|
||
Start typing to overwrite the pre-filled word.
|
||
Enter → if text == original word → mark Correct, advance to next
|
||
if text differs → add as Fix, advance to next
|
||
Escape → reset text to original word, return focus to review list
|
||
|
||
On the Correct list:
|
||
Delete / BackSpace – move selected word back to Review
|
||
|
||
On the Fixes list:
|
||
Delete / BackSpace – move selected fix back to Review
|
||
|
||
"Apply Fixes to Text" writes a TTS-ready copy of the source file with all
|
||
substitutions applied (case-sensitive whole-word replace).
|
||
|
||
Run:
|
||
.venv/bin/python gui_proper_noun_player.py
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import re
|
||
import sys
|
||
import threading
|
||
import time
|
||
from pathlib import Path
|
||
from typing import NamedTuple
|
||
|
||
# Model is already cached locally — skip all HuggingFace Hub network calls
|
||
os.environ.setdefault("HF_HUB_OFFLINE", "1")
|
||
|
||
import sounddevice as sd
|
||
import soundfile as sf
|
||
|
||
from PySide6.QtWidgets import *
|
||
from PySide6.QtCore import *
|
||
from PySide6.QtGui import *
|
||
|
||
# ── Project management ──────────────────────────────────────────────────────────
|
||
|
||
class Project(NamedTuple):
|
||
name: str
|
||
source_paths: list[Path]
|
||
proper_nouns_output_dir: Path
|
||
proper_nouns_audio_dir: Path
|
||
|
||
def _project_slug(name: str) -> str:
|
||
return re.sub(r"[^a-zA-Z0-9_-]", "_", name).strip("_")[:60].lower()
|
||
|
||
def load_projects() -> list[Project]:
|
||
projects_file = Path("projects.json")
|
||
if projects_file.exists():
|
||
data = json.loads(projects_file.read_text(encoding="utf-8"))
|
||
projects = []
|
||
for item in data:
|
||
paths = [Path(p) for p in item["source_paths"]]
|
||
output_dir = Path(item["proper_nouns_output_dir"])
|
||
audio_dir = Path(item["proper_nouns_audio_dir"])
|
||
projects.append(
|
||
Project(
|
||
name=item["name"],
|
||
source_paths=paths,
|
||
proper_nouns_output_dir=output_dir,
|
||
proper_nouns_audio_dir=audio_dir,
|
||
)
|
||
)
|
||
return projects
|
||
return []
|
||
|
||
def save_projects(projects: list[Project]) -> None:
|
||
data = [
|
||
{
|
||
"name": p.name,
|
||
"source_paths": [str(path) for path in p.source_paths],
|
||
"proper_nouns_output_dir": str(p.proper_nouns_output_dir),
|
||
"proper_nouns_audio_dir": str(p.proper_nouns_audio_dir),
|
||
}
|
||
for p in projects
|
||
]
|
||
Path("projects.json").write_text(json.dumps(data, indent=2), encoding="utf-8")
|
||
|
||
VOICE = "am_michael"
|
||
SAMPLE_RATE = 24000
|
||
|
||
# ── Book source ────────────────────────────────────────────────────────────────
|
||
|
||
class BookSource(NamedTuple):
|
||
label: str # Display name shown in the UI
|
||
slug: str # Filesystem-safe identifier used for subdirectory names
|
||
source_paths: list # list[Path] — one or more source .txt files
|
||
fixed_out: Path # Where "Apply Fixes to Text" writes the TTS-ready copy
|
||
proper_nouns_output_dir: Path
|
||
proper_nouns_audio_dir: Path
|
||
|
||
|
||
def _book_slug(text: str) -> str:
|
||
"""Convert a display name to a lowercase filesystem-safe slug."""
|
||
return re.sub(r"[^a-zA-Z0-9_-]", "_", text).strip("_")[:60].lower()
|
||
|
||
|
||
def load_books_from_projects() -> list[BookSource]:
|
||
projects = load_projects()
|
||
books = []
|
||
for project in projects:
|
||
slug = _project_slug(project.name)
|
||
fixed_out = Path(f"{project.name} (TTS Fixed).txt")
|
||
books.append(BookSource(label=project.name, slug=slug,
|
||
source_paths=project.source_paths, fixed_out=fixed_out,
|
||
proper_nouns_output_dir=project.proper_nouns_output_dir,
|
||
proper_nouns_audio_dir=project.proper_nouns_audio_dir))
|
||
return books
|
||
|
||
# ── Colours ────────────────────────────────────────────────────────────────────
|
||
BG = "#1e1e2e"
|
||
BG2 = "#181825"
|
||
BG3 = "#313244"
|
||
FG = "#cdd6f4"
|
||
FG_DIM = "#6c7086"
|
||
GREEN = "#a6e3a1"
|
||
BLUE = "#89b4fa"
|
||
RED = "#f38ba8"
|
||
YELLOW = "#f9e2af"
|
||
MAUVE = "#cba6f7"
|
||
|
||
# ── Audio ──────────────────────────────────────────────────────────────────────
|
||
|
||
def play_async(path: Path) -> None:
|
||
sd.stop()
|
||
def _play():
|
||
try:
|
||
data, sr = sf.read(str(path), dtype="float32")
|
||
sd.play(data, sr)
|
||
except Exception as exc:
|
||
print(f"[audio] playback error: {exc}")
|
||
threading.Thread(target=_play, daemon=True).start()
|
||
|
||
|
||
def _slug(text: str) -> str:
|
||
"""Safe filename from arbitrary text."""
|
||
return re.sub(r"[^a-zA-Z0-9_-]", "_", text).strip("_")[:80]
|
||
|
||
|
||
_CHAPTER_LINE_RE = re.compile(r"^Chapter\s+(\d+)\s*-\s*(.+)\s*$", re.IGNORECASE)
|
||
_PROLOGUE_LINE_RE = re.compile(r"^Prologue\s*$", re.IGNORECASE)
|
||
|
||
|
||
def _chapter_slug(title: str) -> str:
|
||
text = title.lower()
|
||
text = re.sub(r"[^a-z0-9]+", "_", text)
|
||
return text.strip("_")
|
||
|
||
|
||
def _clean_tts_text(text: str) -> str:
|
||
text = re.sub(r"^[_\-\*\s]{3,}\s*$", "", text, flags=re.MULTILINE)
|
||
text = re.sub(r"\n{3,}", "\n\n", text)
|
||
return text.strip()
|
||
|
||
|
||
def _parse_chapters_from_paths(source_paths: list[Path]) -> list[dict]:
|
||
"""Parse chapters from source files.
|
||
|
||
Supported heading formats:
|
||
- Prologue
|
||
- Chapter # - chapter name
|
||
"""
|
||
chapters: list[dict] = []
|
||
current: dict | None = None
|
||
|
||
for path in source_paths:
|
||
lines = path.read_text(encoding="utf-8").splitlines()
|
||
for line in lines:
|
||
m = _CHAPTER_LINE_RE.match(line.strip())
|
||
if m:
|
||
if current is not None:
|
||
current["text"] = "\n".join(current["lines"])
|
||
chapters.append(current)
|
||
num = int(m.group(1))
|
||
title = m.group(2).strip()
|
||
current = {
|
||
"num": num,
|
||
"title": title,
|
||
"label": f"Chapter {num} - {title}",
|
||
"slug": f"chapter_{num:02d}_{_chapter_slug(title)}",
|
||
"lines": [line],
|
||
}
|
||
elif _PROLOGUE_LINE_RE.match(line.strip()):
|
||
if current is not None:
|
||
current["text"] = "\n".join(current["lines"])
|
||
chapters.append(current)
|
||
current = {
|
||
"num": 0,
|
||
"title": "Prologue",
|
||
"label": "Prologue",
|
||
"slug": "chapter_00_prologue",
|
||
"lines": [line],
|
||
}
|
||
elif current is not None:
|
||
current["lines"].append(line)
|
||
|
||
if current is not None:
|
||
current["text"] = "\n".join(current["lines"])
|
||
chapters.append(current)
|
||
|
||
deduped: list[dict] = []
|
||
seen: set[int] = set()
|
||
for ch in chapters:
|
||
if ch["num"] in seen:
|
||
continue
|
||
seen.add(ch["num"])
|
||
ch.pop("lines", None)
|
||
deduped.append(ch)
|
||
return sorted(deduped, key=lambda x: x["num"])
|
||
|
||
|
||
def _parse_chapter_selection(raw: str, valid_numbers: set[int]) -> list[int]:
|
||
"""Parse chapter selection like: all | 1,2,5-8."""
|
||
text = (raw or "").strip().lower()
|
||
if not text or text == "all":
|
||
return sorted(valid_numbers)
|
||
|
||
out: set[int] = set()
|
||
for part in text.split(","):
|
||
token = part.strip()
|
||
if not token:
|
||
continue
|
||
if "-" in token:
|
||
a, b = token.split("-", 1)
|
||
start = int(a.strip())
|
||
end = int(b.strip())
|
||
if end < start:
|
||
start, end = end, start
|
||
for n in range(start, end + 1):
|
||
if n in valid_numbers:
|
||
out.add(n)
|
||
else:
|
||
n = int(token)
|
||
if n in valid_numbers:
|
||
out.add(n)
|
||
|
||
return sorted(out)
|
||
|
||
|
||
# Lazy KPipeline singleton — only imported+loaded on first synthesis request
|
||
_pipeline = None
|
||
_pipeline_lock = threading.Lock()
|
||
|
||
def _get_pipeline():
|
||
global _pipeline
|
||
if _pipeline is None:
|
||
with _pipeline_lock:
|
||
if _pipeline is None:
|
||
import warnings
|
||
from kokoro import KPipeline # type: ignore
|
||
with warnings.catch_warnings():
|
||
warnings.filterwarnings("ignore", category=UserWarning)
|
||
warnings.filterwarnings("ignore", category=FutureWarning)
|
||
warnings.filterwarnings("ignore", message=".*unauthenticated.*")
|
||
_pipeline = KPipeline(lang_code="a", repo_id="hexgrad/Kokoro-82M")
|
||
return _pipeline
|
||
|
||
|
||
def synth_and_play(text: str, replacements_dir: Path, on_ready=None) -> None:
|
||
"""Synthesise *text* with Kokoro (cached to *replacements_dir*) and play it.
|
||
Runs entirely on a daemon thread so the GUI never blocks.
|
||
*on_ready(path)* is called on the same thread once the file is written.
|
||
"""
|
||
def _run():
|
||
try:
|
||
path = _synth_to_cache(text, replacements_dir)
|
||
if path:
|
||
if on_ready:
|
||
on_ready(path)
|
||
play_async(path)
|
||
except Exception as exc:
|
||
print(f"[synth] error synthesising '{text}': {exc}")
|
||
|
||
threading.Thread(target=_run, daemon=True).start()
|
||
|
||
|
||
def _synth_to_cache(text: str, replacements_dir: Path) -> "Path | None":
|
||
"""Synthesise *text* to a cached WAV and return its path (or None on failure).
|
||
Skips synthesis if the file already exists. Safe to call from any thread.
|
||
"""
|
||
replacements_dir.mkdir(parents=True, exist_ok=True)
|
||
cache_path = replacements_dir / f"{_slug(text)}.wav"
|
||
if not cache_path.exists():
|
||
import warnings
|
||
import numpy as np
|
||
pipeline = _get_pipeline()
|
||
chunks = []
|
||
with warnings.catch_warnings():
|
||
warnings.filterwarnings("ignore", category=UserWarning)
|
||
for _, _, audio in pipeline(text, voice=VOICE):
|
||
if audio is not None:
|
||
chunks.append(audio)
|
||
if chunks:
|
||
combined = np.concatenate(chunks)
|
||
sf.write(str(cache_path), combined, SAMPLE_RATE)
|
||
return cache_path if cache_path.exists() else None
|
||
|
||
|
||
# ── Persistence helpers ────────────────────────────────────────────────────────
|
||
|
||
def load_json(path: Path, default):
|
||
if path.exists():
|
||
return json.loads(path.read_text(encoding="utf-8"))
|
||
return default
|
||
|
||
def save_json(path: Path, obj) -> None:
|
||
path.write_text(json.dumps(obj, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
|
||
|
||
# ── Main app ───────────────────────────────────────────────────────────────────
|
||
|
||
class ProperNounAuditor(QMainWindow):
|
||
|
||
# tracks which word is currently loaded into the fix entry
|
||
_fix_entry_word: str = ""
|
||
|
||
def __init__(self, books: list[BookSource]) -> None:
|
||
super().__init__()
|
||
self.setWindowTitle("Proper Noun Pronunciation Auditor")
|
||
self.setGeometry(100, 100, 1020, 760)
|
||
|
||
self.books: list[BookSource] = books
|
||
self.projects: list[Project] = load_projects()
|
||
self.book: BookSource | None = None
|
||
|
||
# Loaded per-book via _load_book()
|
||
self.manifest: dict[str, str] = {}
|
||
self.all_words: list[str] = []
|
||
self.correct: list[str] = []
|
||
self.fixes: dict[str, str] = {}
|
||
|
||
self._build_ui()
|
||
|
||
# Auto-load first project that has data; otherwise select first
|
||
if self.projects:
|
||
first_project = self.projects[0]
|
||
self._book_var.setCurrentText(first_project.name)
|
||
self._on_project_change()
|
||
|
||
# Hotkeys
|
||
self._setup_shortcuts()
|
||
|
||
def _setup_shortcuts(self):
|
||
# Space – replay
|
||
shortcut = QShortcut(QKeySequence("Space"), self)
|
||
shortcut.activated.connect(self._replay)
|
||
|
||
# s – stop
|
||
shortcut = QShortcut(QKeySequence("S"), self)
|
||
shortcut.activated.connect(lambda: sd.stop())
|
||
|
||
# r – regen
|
||
shortcut = QShortcut(QKeySequence("R"), self)
|
||
shortcut.activated.connect(self._regen_current)
|
||
|
||
# Escape – reset
|
||
shortcut = QShortcut(QKeySequence("Escape"), self)
|
||
shortcut.activated.connect(self._reset_fix_entry)
|
||
|
||
# ── Per-book path properties ─────────────────────────────────────────────────
|
||
|
||
@property
|
||
def _data_dir(self) -> Path:
|
||
return self.book.proper_nouns_output_dir
|
||
|
||
@property
|
||
def _audio_dir(self) -> Path:
|
||
return self.book.proper_nouns_audio_dir
|
||
|
||
@property
|
||
def _manifest_file(self) -> Path:
|
||
return self._data_dir / "manifest.json"
|
||
|
||
@property
|
||
def _replacements_dir(self) -> Path:
|
||
return self._audio_dir / "replacements_cache"
|
||
|
||
@property
|
||
def _correct_file(self) -> Path:
|
||
return self._data_dir / "correct_words.json"
|
||
|
||
@property
|
||
def _fixes_file(self) -> Path:
|
||
return self._data_dir / "pronunciation_fixes.json"
|
||
|
||
# ── Book loading / switching ──────────────────────────────────────────────────
|
||
|
||
def _load_book(self, book: BookSource) -> None:
|
||
"""Switch to *book* — reload all state from its per-book data files."""
|
||
sd.stop()
|
||
self.book = book
|
||
self._book_var.setCurrentText(book.label)
|
||
|
||
if self._manifest_file.exists():
|
||
self.manifest = load_json(self._manifest_file, {})
|
||
else:
|
||
self.manifest = {}
|
||
|
||
self.all_words = sorted(self.manifest.keys(), key=str.casefold)
|
||
self.correct = load_json(self._correct_file, [])
|
||
self.fixes = load_json(self._fixes_file, {})
|
||
|
||
n = len(self.manifest)
|
||
if n:
|
||
status = f"{n} words loaded · {len(self.correct)} correct · {len(self.fixes)} fixes"
|
||
else:
|
||
status = "No manifest yet — click find proper nouns to create one"
|
||
self._book_status_var.setText(status)
|
||
|
||
self._refresh_all()
|
||
self.fix_var = ""
|
||
self._fix_entry.setText("")
|
||
self._fix_entry_word = ""
|
||
self.now_playing_var.setText("—")
|
||
|
||
def _on_book_change(self, event=None) -> None:
|
||
label = self._book_var.get()
|
||
book = next((b for b in self.books if b.label == label), None)
|
||
if book:
|
||
self._load_book(book)
|
||
|
||
def _on_project_change(self) -> None:
|
||
name = self._book_var.currentText()
|
||
project = next((p for p in self.projects if p.name == name), None)
|
||
if project:
|
||
# Create BookSource from project
|
||
slug = _project_slug(project.name)
|
||
fixed_out = Path(f"{project.name} (TTS Fixed).txt")
|
||
book = BookSource(label=project.name, slug=slug,
|
||
source_paths=project.source_paths, fixed_out=fixed_out,
|
||
proper_nouns_output_dir=project.proper_nouns_output_dir,
|
||
proper_nouns_audio_dir=project.proper_nouns_audio_dir)
|
||
self.books = [book]
|
||
self._load_book(book)
|
||
|
||
def _new_project(self) -> None:
|
||
name, ok = QInputDialog.getText(self, "New Project", "Enter project name:")
|
||
if ok and name:
|
||
# Check if exists
|
||
if any(p.name == name for p in self.projects):
|
||
QMessageBox.critical(self, "Error", "Project name already exists.")
|
||
return
|
||
# Select files
|
||
files, _ = QFileDialog.getOpenFileNames(self, "Select TXT files", "", "Text files (*.txt)")
|
||
if files:
|
||
paths = [Path(f) for f in files]
|
||
slug = _project_slug(name)
|
||
project = Project(
|
||
name=name,
|
||
source_paths=paths,
|
||
proper_nouns_output_dir=Path("output_proper_nouns") / slug,
|
||
proper_nouns_audio_dir=Path("proper_nouns_audio") / slug,
|
||
)
|
||
self.projects.append(project)
|
||
save_projects(self.projects)
|
||
# Update combobox values
|
||
self._book_var.clear()
|
||
self._book_var.addItems([p.name for p in self.projects])
|
||
self._book_var.setCurrentText(name)
|
||
self._on_project_change()
|
||
|
||
def _add_files(self) -> None:
|
||
if not self._book_var.currentText():
|
||
QMessageBox.information(self, "No project selected", "Select a project first.")
|
||
return
|
||
files, _ = QFileDialog.getOpenFileNames(self, "Add TXT files", "", "Text files (*.txt)")
|
||
if files:
|
||
name = self._book_var.currentText()
|
||
project = next((p for p in self.projects if p.name == name), None)
|
||
if project:
|
||
new_paths = [Path(f) for f in files if Path(f) not in project.source_paths]
|
||
project.source_paths.extend(new_paths)
|
||
save_projects(self.projects)
|
||
self._on_project_change()
|
||
|
||
def closeEvent(self, event) -> None:
|
||
sd.stop()
|
||
event.accept()
|
||
|
||
# ── UI construction ────────────────────────────────────────────────────────
|
||
|
||
def _build_ui(self) -> None:
|
||
central_widget = QWidget()
|
||
self.setCentralWidget(central_widget)
|
||
main_layout = QVBoxLayout(central_widget)
|
||
|
||
# ── Project selector bar ──────────────────────────────────────────────────
|
||
book_bar = QWidget()
|
||
book_bar.setStyleSheet(f"background-color: {BG2}; padding: 7px;")
|
||
book_layout = QHBoxLayout(book_bar)
|
||
|
||
book_label = QLabel("Project:")
|
||
book_label.setStyleSheet(f"color: {FG_DIM}; font-weight: bold; font-size: 10pt;")
|
||
book_layout.addWidget(book_label)
|
||
|
||
self._book_var = QComboBox()
|
||
self._book_var.addItems([p.name for p in self.projects])
|
||
self._book_var.setEditable(False)
|
||
self._book_var.setStyleSheet(f"font-size: 10pt; min-width: 300px;")
|
||
self._book_var.currentTextChanged.connect(self._on_project_change)
|
||
book_layout.addWidget(self._book_var)
|
||
|
||
new_project_btn = self._create_button("New Project", self._new_project, BLUE, BG3)
|
||
book_layout.addWidget(new_project_btn)
|
||
|
||
add_files_btn = self._create_button("Add Files", self._add_files, GREEN, BG3)
|
||
book_layout.addWidget(add_files_btn)
|
||
|
||
self._extract_btn = self._create_button("find proper nouns", self._extract_and_generate, GREEN, BG3)
|
||
book_layout.addWidget(self._extract_btn)
|
||
|
||
apply_fixes_btn = self._create_button("⇄ Apply Fixes to Text", self._apply_fixes, YELLOW, BG3)
|
||
book_layout.addWidget(apply_fixes_btn)
|
||
|
||
export_remaining_btn = self._create_button("⬇ Export Remaining", self._export_remaining, BLUE, BG3)
|
||
book_layout.addWidget(export_remaining_btn)
|
||
|
||
voice_label = QLabel("Voice:")
|
||
voice_label.setStyleSheet(f"color: {FG_DIM}; font-size: 9pt;")
|
||
book_layout.addWidget(voice_label)
|
||
|
||
self._voice_combo = QComboBox()
|
||
self._voice_combo.setEditable(True)
|
||
self._voice_combo.addItems([
|
||
"am_onyx",
|
||
"am_michael",
|
||
"af_heart",
|
||
"af_bella",
|
||
"af_nicole",
|
||
"bm_george",
|
||
"bm_lewis",
|
||
])
|
||
self._voice_combo.setCurrentText("am_onyx")
|
||
self._voice_combo.setStyleSheet("font-size: 9pt; min-width: 120px;")
|
||
book_layout.addWidget(self._voice_combo)
|
||
|
||
chapters_label = QLabel("Chapters:")
|
||
chapters_label.setStyleSheet(f"color: {FG_DIM}; font-size: 9pt;")
|
||
book_layout.addWidget(chapters_label)
|
||
|
||
self._chapters_entry = QLineEdit("all")
|
||
self._chapters_entry.setPlaceholderText("all or 0,1,2,5-8")
|
||
self._chapters_entry.setStyleSheet("font-size: 9pt; min-width: 130px;")
|
||
book_layout.addWidget(self._chapters_entry)
|
||
|
||
self._gen_audio_btn = self._create_button("Generate Audio", self._generate_selected_chapters, MAUVE, BG3)
|
||
book_layout.addWidget(self._gen_audio_btn)
|
||
|
||
self._gen_audio_status = QLabel("")
|
||
self._gen_audio_status.setStyleSheet(f"color: {FG_DIM}; font-size: 8pt;")
|
||
book_layout.addWidget(self._gen_audio_status)
|
||
|
||
self._book_status_var = QLabel("Select a book above")
|
||
self._book_status_var.setStyleSheet(f"color: {FG_DIM}; font-size: 9pt;")
|
||
book_layout.addWidget(self._book_status_var)
|
||
book_layout.addStretch()
|
||
|
||
main_layout.addWidget(book_bar)
|
||
|
||
# ── Title bar ─────────────────────────────────────────────────────────
|
||
title_bar = QWidget()
|
||
title_bar.setStyleSheet(f"background-color: {BG}; padding: 6px;")
|
||
title_layout = QHBoxLayout(title_bar)
|
||
|
||
title_label = QLabel("Proper Noun Pronunciation Auditor")
|
||
title_label.setStyleSheet(f"font-size: 15pt; font-weight: bold; color: {FG};")
|
||
title_layout.addWidget(title_label)
|
||
|
||
hint_label = QLabel("Space=replay r=regen s=stop Esc=reset fix Del=remove from list Enter=correct|fix")
|
||
hint_label.setStyleSheet(f"font-size: 8pt; color: {FG_DIM};")
|
||
title_layout.addWidget(hint_label)
|
||
title_layout.addStretch()
|
||
|
||
main_layout.addWidget(title_bar)
|
||
|
||
# Three-column body
|
||
body = QWidget()
|
||
body_layout = QHBoxLayout(body)
|
||
body_layout.setSpacing(8)
|
||
|
||
# ── Column 0: Review list ──────────────────────────────────────────────
|
||
col0 = QWidget()
|
||
col0_layout = QVBoxLayout(col0)
|
||
|
||
filter_row = QWidget()
|
||
filter_layout = QHBoxLayout(filter_row)
|
||
filter_label = QLabel("Filter:")
|
||
filter_label.setStyleSheet(f"color: {FG}; font-size: 10pt;")
|
||
filter_layout.addWidget(filter_label)
|
||
|
||
self.search_var = ""
|
||
self._filter_entry = QLineEdit()
|
||
self._filter_entry.setStyleSheet(f"font-size: 11pt; background-color: {BG3}; color: {FG}; border: 1px solid {BG3}; padding: 4px;")
|
||
self._filter_entry.textChanged.connect(self._refresh_review)
|
||
filter_layout.addWidget(self._filter_entry)
|
||
|
||
clear_filter_btn = self._create_button("✕", lambda: self._filter_entry.clear(), RED, BG3)
|
||
filter_layout.addWidget(clear_filter_btn)
|
||
|
||
col0_layout.addWidget(filter_row)
|
||
|
||
hdr0 = QWidget()
|
||
hdr0_layout = QHBoxLayout(hdr0)
|
||
review_section_label = QLabel("TO REVIEW")
|
||
review_section_label.setStyleSheet(f"font-weight: bold; color: {FG};")
|
||
hdr0_layout.addWidget(review_section_label)
|
||
|
||
self.review_count_var = QLabel("")
|
||
self.review_count_var.setStyleSheet(f"color: {FG_DIM}; font-size: 9pt;")
|
||
hdr0_layout.addStretch()
|
||
hdr0_layout.addWidget(self.review_count_var)
|
||
|
||
col0_layout.addWidget(hdr0)
|
||
|
||
self.review_lb = QListWidget()
|
||
self.review_lb.setStyleSheet(f"background-color: {BG2}; color: {FG}; border: none;")
|
||
self.review_lb.itemSelectionChanged.connect(self._on_review_select)
|
||
self.review_lb.itemDoubleClicked.connect(self._on_review_select)
|
||
self.review_lb.keyPressEvent = self._review_key_press
|
||
col0_layout.addWidget(self.review_lb)
|
||
|
||
body_layout.addWidget(col0, 3)
|
||
|
||
# ── Column 1: Correct list ─────────────────────────────────────────────
|
||
col1 = QWidget()
|
||
col1_layout = QVBoxLayout(col1)
|
||
|
||
hdr1 = QWidget()
|
||
hdr1_layout = QHBoxLayout(hdr1)
|
||
correct_section_label = QLabel("✓ CORRECT [Del=remove]")
|
||
correct_section_label.setStyleSheet(f"font-weight: bold; color: {FG};")
|
||
hdr1_layout.addWidget(correct_section_label)
|
||
|
||
self.correct_count_var = QLabel("")
|
||
self.correct_count_var.setStyleSheet(f"color: {FG_DIM}; font-size: 9pt;")
|
||
hdr1_layout.addStretch()
|
||
hdr1_layout.addWidget(self.correct_count_var)
|
||
|
||
col1_layout.addWidget(hdr1)
|
||
|
||
self.correct_lb = QListWidget()
|
||
self.correct_lb.setStyleSheet(f"background-color: {BG2}; color: {FG}; border: none;")
|
||
self.correct_lb.itemSelectionChanged.connect(lambda: self._on_side_select(self.correct_lb))
|
||
self.correct_lb.keyPressEvent = lambda e: self._side_key_press(e, self.correct_lb, False)
|
||
col1_layout.addWidget(self.correct_lb)
|
||
|
||
back_to_review_btn = self._create_button("← Back to Review [Del]", lambda: self._move_back(self.correct_lb, is_dict=False), YELLOW)
|
||
col1_layout.addWidget(back_to_review_btn)
|
||
|
||
body_layout.addWidget(col1, 2)
|
||
|
||
# ── Column 2: Fixes list ───────────────────────────────────────────────
|
||
col2 = QWidget()
|
||
col2_layout = QVBoxLayout(col2)
|
||
|
||
hdr2 = QWidget()
|
||
hdr2_layout = QHBoxLayout(hdr2)
|
||
fixes_section_label = QLabel("⇄ FIXES (original → phonetic)")
|
||
fixes_section_label.setStyleSheet(f"font-weight: bold; color: {FG};")
|
||
hdr2_layout.addWidget(fixes_section_label)
|
||
|
||
self.fixes_count_var = QLabel("")
|
||
self.fixes_count_var.setStyleSheet(f"color: {FG_DIM}; font-size: 9pt;")
|
||
hdr2_layout.addStretch()
|
||
hdr2_layout.addWidget(self.fixes_count_var)
|
||
|
||
col2_layout.addWidget(hdr2)
|
||
|
||
self.fixes_lb = QListWidget()
|
||
self.fixes_lb.setStyleSheet(f"background-color: {BG2}; color: {FG}; border: none;")
|
||
self.fixes_lb.itemSelectionChanged.connect(lambda: self._on_side_select(self.fixes_lb))
|
||
self.fixes_lb.keyPressEvent = lambda e: self._side_key_press(e, self.fixes_lb, True)
|
||
col2_layout.addWidget(self.fixes_lb)
|
||
|
||
back_to_review_fixes_btn = self._create_button("← Back to Review [Del]", lambda: self._move_back(self.fixes_lb, is_dict=True), YELLOW)
|
||
col2_layout.addWidget(back_to_review_fixes_btn)
|
||
|
||
body_layout.addWidget(col2, 2)
|
||
|
||
main_layout.addWidget(body)
|
||
|
||
# ── Bottom action bar ──────────────────────────────────────────────────
|
||
action_bar = QWidget()
|
||
action_bar.setStyleSheet(f"background-color: {BG3}; padding: 8px;")
|
||
action_layout = QHBoxLayout(action_bar)
|
||
|
||
playing_icon = QLabel("▶")
|
||
playing_icon.setStyleSheet(f"color: {GREEN}; font-size: 11pt;")
|
||
action_layout.addWidget(playing_icon)
|
||
|
||
self.now_playing_var = QLabel("—")
|
||
self.now_playing_var.setStyleSheet(f"color: {GREEN}; font-size: 11pt; font-weight: bold; min-width: 150px;")
|
||
action_layout.addWidget(self.now_playing_var)
|
||
|
||
arrow_label = QLabel("→")
|
||
arrow_label.setStyleSheet(f"color: {MAUVE}; font-size: 13pt; font-weight: bold;")
|
||
action_layout.addWidget(arrow_label)
|
||
|
||
self.fix_var = ""
|
||
self._fix_entry = QLineEdit()
|
||
self._fix_entry.setStyleSheet(f"font-size: 11pt; background-color: {BG2}; color: {MAUVE}; border: 1px solid {BG2}; padding: 4px; max-width: 150px;")
|
||
self._fix_entry.returnPressed.connect(self._enter_action)
|
||
self._fix_entry.keyPressEvent = self._fix_entry_key_press
|
||
action_layout.addWidget(self._fix_entry)
|
||
|
||
hint_action = QLabel("Enter=correct (edit first for fix) Esc=reset")
|
||
hint_action.setStyleSheet(f"color: {FG_DIM}; font-size: 8pt;")
|
||
action_layout.addWidget(hint_action)
|
||
|
||
separator1 = QLabel("│")
|
||
separator1.setStyleSheet(f"color: {FG_DIM};")
|
||
action_layout.addWidget(separator1)
|
||
|
||
stop_btn = self._create_button("■ Stop [s]", lambda: sd.stop(), RED)
|
||
action_layout.addWidget(stop_btn)
|
||
|
||
replay_btn = self._create_button("↺ Replay [Space]", self._replay, BLUE)
|
||
action_layout.addWidget(replay_btn)
|
||
|
||
regen_btn = self._create_button("↻ Regen [r]", self._regen_current, GREEN)
|
||
action_layout.addWidget(regen_btn)
|
||
|
||
separator2 = QLabel("│")
|
||
separator2.setStyleSheet(f"color: {FG_DIM};")
|
||
action_layout.addWidget(separator2)
|
||
|
||
self._pregen_btn = self._create_button("↻ Pre-gen Fix Audio", self._pregen_all_fix_audio, MAUVE, BG2)
|
||
action_layout.addWidget(self._pregen_btn)
|
||
|
||
self._pregen_status_var = QLabel("")
|
||
self._pregen_status_var.setStyleSheet(f"color: {FG_DIM}; font-size: 8pt; min-width: 200px;")
|
||
action_layout.addWidget(self._pregen_status_var)
|
||
action_layout.addStretch()
|
||
|
||
main_layout.addWidget(action_bar)
|
||
|
||
def _create_button(self, text: str, callback, color: str = GREEN, bg: str = BG3) -> QPushButton:
|
||
btn = QPushButton(text)
|
||
btn.setStyleSheet(f"background-color: {bg}; color: {color}; border: 1px solid {color}; padding: 4px 8px;")
|
||
btn.clicked.connect(callback)
|
||
return btn
|
||
|
||
# ── Refresh helpers ────────────────────────────────────────────────────────
|
||
|
||
def _review_words(self) -> list[str]:
|
||
excluded = set(self.correct) | set(self.fixes.keys())
|
||
q = self._filter_entry.text().strip().casefold()
|
||
words = [w for w in self.all_words if w not in excluded]
|
||
if q:
|
||
words = [w for w in words if q in w.casefold()]
|
||
return words
|
||
|
||
def _refresh_review(self) -> None:
|
||
words = self._review_words()
|
||
self.review_lb.clear()
|
||
for w in words:
|
||
self.review_lb.addItem(f" {w}")
|
||
self.review_count_var.setText(f"{len(words)}")
|
||
|
||
def _refresh_correct(self) -> None:
|
||
self.correct_lb.clear()
|
||
for w in self.correct: # already newest-first
|
||
self.correct_lb.addItem(f" {w}")
|
||
self.correct_count_var.setText(f"{len(self.correct)}")
|
||
|
||
def _refresh_fixes(self) -> None:
|
||
self.fixes_lb.clear()
|
||
for orig, rep in reversed(list(self.fixes.items())): # newest-first
|
||
self.fixes_lb.addItem(f" {orig} → {rep}")
|
||
self.fixes_count_var.setText(f"{len(self.fixes)}")
|
||
|
||
def _refresh_all(self) -> None:
|
||
self._refresh_review()
|
||
self._refresh_correct()
|
||
self._refresh_fixes()
|
||
|
||
# ── Playback ───────────────────────────────────────────────────────────────
|
||
|
||
def _play_word(self, word: str) -> None:
|
||
if not self.book:
|
||
return
|
||
wav_name = self.manifest.get(word)
|
||
if not wav_name:
|
||
return
|
||
wav_path = self._audio_dir / wav_name
|
||
if not wav_path.exists():
|
||
QMessageBox.warning(self, "Missing audio",
|
||
f"No audio file for '{word}'.\n"
|
||
"Click 'find proper nouns' first.")
|
||
return
|
||
self.now_playing_var.setText(word)
|
||
play_async(wav_path)
|
||
|
||
# ── Selection callbacks ────────────────────────────────────────────────────
|
||
|
||
def _on_review_select(self) -> None:
|
||
item = self.review_lb.currentItem()
|
||
if not item:
|
||
return
|
||
word = item.text().strip()
|
||
self._fix_entry_word = word
|
||
self.fix_var = word # pre-fill fix entry with the word
|
||
self._fix_entry.setText(word)
|
||
self._fix_entry.selectAll()
|
||
self._fix_entry.setFocus()
|
||
self._play_word(word)
|
||
|
||
def _on_side_select(self, listbox: QListWidget) -> None:
|
||
if not self.book:
|
||
return
|
||
item = listbox.currentItem()
|
||
if not item:
|
||
return
|
||
row = item.text().strip()
|
||
parts = row.split(" → ")
|
||
original = parts[0].strip()
|
||
|
||
if listbox is self.fixes_lb and len(parts) == 2:
|
||
replacement = parts[1].strip()
|
||
self._fix_entry_word = original
|
||
self.fix_var = replacement
|
||
self._fix_entry.setText(replacement)
|
||
self.now_playing_var.setText(f"… {replacement}")
|
||
rdir = self._replacements_dir
|
||
def _on_ready(_path):
|
||
self.now_playing_var.setText(replacement)
|
||
synth_and_play(replacement, rdir, on_ready=_on_ready)
|
||
else:
|
||
self._fix_entry_word = original
|
||
self.fix_var = original
|
||
self._fix_entry.setText(original)
|
||
self._play_word(original)
|
||
|
||
def _review_key_press(self, event):
|
||
if event.key() == Qt.Key_Return or event.key() == Qt.Key_Enter:
|
||
self._on_review_select()
|
||
else:
|
||
QListWidget.keyPressEvent(self.review_lb, event)
|
||
|
||
def _fix_entry_key_press(self, event):
|
||
if event.key() == Qt.Key_Up:
|
||
self._navigate_review(-1)
|
||
event.accept()
|
||
elif event.key() == Qt.Key_Down:
|
||
self._navigate_review(1)
|
||
event.accept()
|
||
elif event.key() == Qt.Key_Escape:
|
||
self._reset_fix_entry()
|
||
event.accept()
|
||
else:
|
||
QLineEdit.keyPressEvent(self._fix_entry, event)
|
||
|
||
# ── Actions ────────────────────────────────────────────────────────────────
|
||
|
||
def _selected_review_word(self) -> str | None:
|
||
item = self.review_lb.currentItem()
|
||
if not item:
|
||
return None
|
||
return item.text().strip()
|
||
|
||
def _enter_action(self) -> None:
|
||
"""Smart Enter handler for the fix entry.
|
||
|
||
If the entry text matches the original word → mark Correct.
|
||
If the entry text differs from the original → add as Fix.
|
||
"""
|
||
word = self._fix_entry_word or self._selected_review_word()
|
||
if not word:
|
||
return
|
||
text = self.fix_var.strip()
|
||
if not text or text == word:
|
||
self._mark_correct_word(word)
|
||
else:
|
||
self._add_fix_for_word(word, text)
|
||
|
||
def _reset_fix_entry(self) -> None:
|
||
"""Escape: reset fix entry to the original word, refocus the review list."""
|
||
self.fix_var = self._fix_entry_word
|
||
self._fix_entry.setText(self._fix_entry_word)
|
||
self.review_lb.setFocus()
|
||
|
||
def _replay(self) -> None:
|
||
if self._fix_entry_word:
|
||
self._play_word(self._fix_entry_word)
|
||
|
||
def _regen_current(self) -> None:
|
||
"""Delete the cached WAV for the current word/replacement and re-synthesise."""
|
||
word = self._fix_entry_word
|
||
if not word:
|
||
return
|
||
|
||
# Determine which file to delete based on context
|
||
fix_text = self.fix_var.strip()
|
||
# If the fix box contains something different from the word, regen that text
|
||
is_fix_replacement = bool(fix_text and fix_text != word)
|
||
|
||
if not self.book:
|
||
return
|
||
if is_fix_replacement:
|
||
target = self._replacements_dir / f"{_slug(fix_text)}.wav"
|
||
if target.exists():
|
||
target.unlink()
|
||
self.now_playing_var.setText(f"… regen {fix_text}")
|
||
rdir = self._replacements_dir
|
||
def _on_ready(_p):
|
||
self.now_playing_var.setText(fix_text)
|
||
synth_and_play(fix_text, rdir, on_ready=_on_ready)
|
||
else:
|
||
wav_name = self.manifest.get(word)
|
||
if not wav_name:
|
||
return
|
||
wav_path = self._audio_dir / wav_name
|
||
if wav_path.exists():
|
||
wav_path.unlink()
|
||
self.now_playing_var.setText(f"… regen {word}")
|
||
|
||
def _regen():
|
||
try:
|
||
import warnings, numpy as np
|
||
pipeline = _get_pipeline()
|
||
chunks = []
|
||
with warnings.catch_warnings():
|
||
warnings.filterwarnings("ignore", category=UserWarning)
|
||
for _, _, audio in pipeline(word, voice=VOICE):
|
||
if audio is not None:
|
||
chunks.append(audio)
|
||
if chunks:
|
||
sf.write(str(wav_path), np.concatenate(chunks), SAMPLE_RATE)
|
||
self.now_playing_var.setText(word)
|
||
play_async(wav_path)
|
||
except Exception as exc:
|
||
print(f"[regen] error for '{word}': {exc}")
|
||
|
||
threading.Thread(target=_regen, daemon=True).start()
|
||
|
||
def _navigate_review(self, delta: int) -> None:
|
||
"""Move the review list selection up (delta=-1) or down (delta=+1)."""
|
||
count = self.review_lb.count()
|
||
if count == 0:
|
||
return
|
||
current_row = self.review_lb.currentRow()
|
||
if current_row == -1:
|
||
current_row = 0
|
||
new_row = max(0, min(count - 1, current_row + delta))
|
||
if new_row == current_row:
|
||
return
|
||
self.review_lb.setCurrentRow(new_row)
|
||
self._on_review_select()
|
||
|
||
def _advance_review(self, from_idx: int = 0) -> None:
|
||
"""Select the item at from_idx (clamped), positioned in the upper portion
|
||
of the viewport so the word doesn't end up in the bottom half unless
|
||
the list can't scroll any further down."""
|
||
count = self.review_lb.count()
|
||
if count == 0:
|
||
return
|
||
target = min(from_idx, count - 1)
|
||
self.review_lb.setCurrentRow(target)
|
||
self.review_lb.scrollToItem(self.review_lb.item(target))
|
||
|
||
self.review_lb.yview_moveto(ideal_top / size)
|
||
self.review_lb.event_generate("<<ListboxSelect>>")
|
||
|
||
def _mark_correct_word(self, word: str) -> None:
|
||
idx = self.review_lb.curselection()
|
||
from_idx = idx[0] if idx else 0
|
||
if word not in self.correct:
|
||
self.correct.insert(0, word)
|
||
save_json(self._correct_file, self.correct)
|
||
self._fix_entry_word = ""
|
||
self.fix_var = ""
|
||
self._fix_entry.setText("")
|
||
self.now_playing_var.setText("—")
|
||
self._refresh_all()
|
||
self._advance_review(from_idx)
|
||
|
||
def _add_fix_for_word(self, word: str, replacement: str) -> None:
|
||
current_row = self.review_lb.currentRow()
|
||
from_idx = current_row if current_row != -1 else 0
|
||
self.fixes.pop(word, None)
|
||
self.fixes[word] = replacement
|
||
save_json(self._fixes_file, self.fixes)
|
||
self._fix_entry_word = ""
|
||
self.fix_var = ""
|
||
self._fix_entry.setText("")
|
||
self.now_playing_var.setText("—")
|
||
self._refresh_all()
|
||
self._advance_review(from_idx)
|
||
|
||
def _move_back(self, listbox: QListWidget, is_dict: bool) -> None:
|
||
item = listbox.currentItem()
|
||
if not item:
|
||
return
|
||
raw = item.text().strip().split(" → ")[0].strip()
|
||
if is_dict:
|
||
self.fixes.pop(raw, None)
|
||
save_json(self._fixes_file, self.fixes)
|
||
if raw in self.correct:
|
||
self.correct.remove(raw)
|
||
save_json(self._correct_file, self.correct)
|
||
else:
|
||
if raw in self.correct:
|
||
self.correct.remove(raw)
|
||
save_json(self._correct_file, self.correct)
|
||
self._refresh_all()
|
||
|
||
# ── Extract & Generate ─────────────────────────────────────────────────────────────
|
||
|
||
def _extract_and_generate(self) -> None:
|
||
"""Extract proper nouns from the selected book’s source text, then
|
||
generate a TTS audio clip for each one. Runs in a background thread.
|
||
"""
|
||
if not self.book:
|
||
QMessageBox.information(self, "No book selected", "Please select a book first.")
|
||
return
|
||
|
||
missing = [p for p in self.book.source_paths if not p.exists()]
|
||
if missing:
|
||
QMessageBox.critical(
|
||
self, "Source file(s) not found",
|
||
"Could not find:\n" + "\n".join(str(p) for p in missing))
|
||
return
|
||
|
||
self._extract_btn.setEnabled(False)
|
||
self._book_status_var.setText("Loading spaCy NLP model…")
|
||
book = self.book # capture for the thread
|
||
|
||
def _run():
|
||
try:
|
||
self._book_status_var.setText(
|
||
"Running NLP extraction (may take a minute)…")
|
||
words = _extract_nouns_from_paths(book.source_paths)
|
||
n_extracted = len(words)
|
||
self._book_status_var.setText(
|
||
f"Extracted {n_extracted} nouns — generating audio…")
|
||
|
||
data_dir = Path("output_proper_nouns") / book.slug
|
||
audio_dir = Path("proper_nouns_audio") / book.slug
|
||
data_dir.mkdir(parents=True, exist_ok=True)
|
||
audio_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
manifest_path = data_dir / "manifest.json"
|
||
manifest: dict = load_json(manifest_path, {})
|
||
|
||
pipeline = _get_pipeline()
|
||
done = failed = 0
|
||
|
||
for i, word in enumerate(sorted(words, key=str.casefold)):
|
||
word_slug = re.sub(r"[^a-z0-9]+", "_", word.lower()).strip("_")
|
||
wav_name = f"{word_slug}.wav"
|
||
wav_path = audio_dir / wav_name
|
||
|
||
if word in manifest and wav_path.exists():
|
||
continue
|
||
|
||
try:
|
||
import warnings, numpy as np
|
||
chunks = []
|
||
with warnings.catch_warnings():
|
||
warnings.filterwarnings("ignore", category=UserWarning)
|
||
for _, _, audio in pipeline(word, voice=VOICE):
|
||
if audio is not None:
|
||
chunks.append(audio)
|
||
if chunks:
|
||
sf.write(str(wav_path), np.concatenate(chunks), SAMPLE_RATE)
|
||
manifest[word] = wav_name
|
||
done += 1
|
||
else:
|
||
failed += 1
|
||
except Exception as exc:
|
||
print(f"[gen] failed for '{word}': {exc}")
|
||
failed += 1
|
||
|
||
if i % 10 == 0:
|
||
remaining = n_extracted - i
|
||
self._book_status_var.setText(f"Generating audio… {remaining} remaining")
|
||
|
||
manifest_path.write_text(
|
||
json.dumps(manifest, ensure_ascii=False, indent=2))
|
||
self._finish_extract(book, manifest, done, failed)
|
||
|
||
except ImportError as exc:
|
||
msg = (f"Missing dependency: {exc}\n\n"
|
||
"Install with: pip install spacy wordfreq\n"
|
||
"Then: python -m spacy download en_core_web_sm")
|
||
QMessageBox.critical(self, "Missing package", msg)
|
||
self._book_status_var.setText("Error — see popup")
|
||
self._extract_btn.setEnabled(True)
|
||
except Exception as exc:
|
||
err = str(exc)
|
||
self._book_status_var.setText(f"Error: {err}")
|
||
self._extract_btn.setEnabled(True)
|
||
|
||
threading.Thread(target=_run, daemon=True).start()
|
||
|
||
def _finish_extract(self, book: BookSource, manifest: dict,
|
||
done: int, failed: int) -> None:
|
||
self._extract_btn.setEnabled(True)
|
||
self._book_status_var.setText(
|
||
f"Done — {len(manifest)} words total ({done} new, {failed} failed)")
|
||
if self.book and self.book.slug == book.slug:
|
||
self._load_book(book)
|
||
|
||
def _pregen_all_fix_audio(self) -> None:
|
||
if not self.book:
|
||
return
|
||
if not self.fixes:
|
||
QMessageBox.information(self, "No fixes", "The Fixes list is empty.")
|
||
return
|
||
|
||
replacements = list(self.fixes.values())
|
||
total = len(replacements)
|
||
rdir = self._replacements_dir
|
||
already = sum(1 for r in replacements if (rdir / f"{_slug(r)}.wav").exists())
|
||
new_count = total - already
|
||
if new_count == 0:
|
||
QMessageBox.information(self, "Already done",
|
||
f"All {total} replacement clips already exist.")
|
||
return
|
||
|
||
self._pregen_btn.setEnabled(False)
|
||
self._pregen_status_var.setText(f"0 / {new_count} new ({already} cached)")
|
||
|
||
def _run():
|
||
try:
|
||
done = 0
|
||
for rep in replacements:
|
||
if not (rdir / f"{_slug(rep)}.wav").exists():
|
||
_synth_to_cache(rep, rdir)
|
||
done += 1
|
||
self._pregen_status_var.setText(f"{done} / {new_count} synthesised…")
|
||
self._pregen_status_var.setText(f"Done — {total} clips ready")
|
||
except Exception as exc:
|
||
print(f"[pregen] error: {exc}")
|
||
finally:
|
||
self._pregen_btn.setEnabled(True)
|
||
|
||
threading.Thread(target=_run, daemon=True).start()
|
||
|
||
def _export_remaining(self) -> None:
|
||
if not self.book:
|
||
return
|
||
words = self._review_words()
|
||
if not words:
|
||
QMessageBox.information(self, "Nothing to export", "No words left to review.")
|
||
return
|
||
out = self._data_dir / "remaining_review.txt"
|
||
out.write_text("\n".join(words), encoding="utf-8")
|
||
QMessageBox.information(self, "Exported", f"{len(words)} words written to:\n{out}")
|
||
|
||
def _apply_fixes(self) -> None:
|
||
if not self.book:
|
||
return
|
||
if not self.fixes:
|
||
QMessageBox.information(self, "No fixes", "The Fixes list is empty.")
|
||
return
|
||
|
||
parts = []
|
||
for p in self.book.source_paths:
|
||
if not p.exists():
|
||
QMessageBox.critical(self, "Source not found", f"Cannot find:\n{p}")
|
||
return
|
||
parts.append(p.read_text(encoding="utf-8"))
|
||
text = "\n\n".join(parts)
|
||
|
||
count_total = 0
|
||
for original, replacement in self.fixes.items():
|
||
pattern = r'\b' + re.escape(original) + r'\b'
|
||
new_text, n = re.subn(pattern, replacement, text, flags=re.IGNORECASE)
|
||
if n:
|
||
text = new_text
|
||
count_total += n
|
||
|
||
text, n_caps = re.subn(
|
||
r'\b[A-Z]{2,}(?:-[A-Z]{2,})*\b',
|
||
lambda m: m.group(0).title(),
|
||
text,
|
||
)
|
||
|
||
self.book.fixed_out.write_text(text, encoding="utf-8")
|
||
QMessageBox.information(
|
||
self, "Done",
|
||
f"Applied {len(self.fixes)} fix rules ({count_total} replacements).\n"
|
||
f"Converted {n_caps} ALL-CAPS words to Title Case.\n\n"
|
||
f"Saved to:\n{self.book.fixed_out}"
|
||
)
|
||
|
||
def _set_gen_audio_status(self, text: str) -> None:
|
||
QTimer.singleShot(0, lambda: self._gen_audio_status.setText(text))
|
||
|
||
def _set_gen_audio_enabled(self, enabled: bool) -> None:
|
||
QTimer.singleShot(0, lambda: self._gen_audio_btn.setEnabled(enabled))
|
||
|
||
def _generate_selected_chapters(self) -> None:
|
||
"""Generate chapter audio from source files with selected voice and chapter set."""
|
||
if not self.book:
|
||
return
|
||
|
||
missing = [p for p in self.book.source_paths if not p.exists()]
|
||
if missing:
|
||
QMessageBox.critical(self, "Source file(s) not found", "Could not find:\n" + "\n".join(str(p) for p in missing))
|
||
return
|
||
|
||
voice = self._voice_combo.currentText().strip() or "am_onyx"
|
||
chapter_expr = self._chapters_entry.text().strip() or "all"
|
||
out_dir = Path("output_audiobook") / self.book.slug
|
||
out_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
self._gen_audio_btn.setEnabled(False)
|
||
self._set_gen_audio_status("Parsing chapters…")
|
||
|
||
def _run() -> None:
|
||
try:
|
||
chapters = _parse_chapters_from_paths(self.book.source_paths)
|
||
if not chapters:
|
||
self._set_gen_audio_status("No chapters found (expected 'Prologue' or 'Chapter # - chapter name').")
|
||
return
|
||
|
||
valid = {ch["num"] for ch in chapters}
|
||
selected_nums = _parse_chapter_selection(chapter_expr, valid)
|
||
if not selected_nums:
|
||
self._set_gen_audio_status("No matching chapters selected.")
|
||
return
|
||
|
||
selected = [ch for ch in chapters if ch["num"] in selected_nums]
|
||
pipeline = _get_pipeline()
|
||
total = len(selected)
|
||
done = 0
|
||
|
||
for i, ch in enumerate(selected, start=1):
|
||
text = _clean_tts_text(ch["text"])
|
||
if not text:
|
||
continue
|
||
|
||
self._set_gen_audio_status(f"Generating {i}/{total}: {ch['label']}")
|
||
out_path = out_dir / f"{ch['slug']}.wav"
|
||
|
||
t0 = time.monotonic()
|
||
chunks = []
|
||
import numpy as np
|
||
for _, _, chunk_audio in pipeline(text, voice=voice):
|
||
if chunk_audio is None:
|
||
continue
|
||
if hasattr(chunk_audio, "numpy"):
|
||
chunk_audio = chunk_audio.cpu().numpy()
|
||
chunk_audio = np.atleast_1d(chunk_audio.squeeze())
|
||
if chunk_audio.size > 0:
|
||
chunks.append(chunk_audio)
|
||
|
||
if chunks:
|
||
audio = np.concatenate(chunks, axis=0)
|
||
sf.write(str(out_path), audio, SAMPLE_RATE)
|
||
elapsed = int(time.monotonic() - t0)
|
||
done += 1
|
||
self._set_gen_audio_status(
|
||
f"Saved {done}/{total}: {out_path.name} ({elapsed}s)"
|
||
)
|
||
|
||
self._set_gen_audio_status(f"Done. Generated {done}/{total} chapters to {out_dir}")
|
||
except ValueError:
|
||
self._set_gen_audio_status("Invalid chapter selection. Use: all or 0,1,2,5-8")
|
||
except Exception as exc:
|
||
self._set_gen_audio_status(f"Error: {exc}")
|
||
finally:
|
||
self._set_gen_audio_enabled(True)
|
||
|
||
threading.Thread(target=_run, daemon=True).start()
|
||
|
||
|
||
# ── Standalone NLP extraction (lazy-imports spaCy) ─────────────────────────────────
|
||
|
||
def _extract_nouns_from_paths(source_paths: list) -> set[str]:
|
||
"""Run spaCy NER + PROPN pass over all *source_paths* and return a set of
|
||
unique proper-noun strings, noise-filtered.
|
||
Raises ImportError if spaCy or wordfreq are not installed.
|
||
"""
|
||
import spacy # lazy — only loaded when button is clicked
|
||
from wordfreq import top_n_list
|
||
|
||
TOP_10K: frozenset[str] = frozenset(top_n_list("en", 10_000))
|
||
WHITELIST: frozenset[str] = frozenset({
|
||
"aaron","abel","abraham","adam","cain","eden","egypt",
|
||
"elijah","ephraim","eve","gad","ham","isaac","israel",
|
||
"jacob","james","jehovah","john","joseph","judah",
|
||
"laban","lehi","levi","micah","michael","moses","noah",
|
||
"peter","pharaoh","samuel","sarah","sarai","seth","simeon",
|
||
"timothy","zion",
|
||
"alma","ether","gideon","limhi","mormon","moroni","mulek",
|
||
"mosiah","nephi","satan","sidon",
|
||
})
|
||
STOP_WORDS: set[str] = {
|
||
"A","AN","AND","AS","AT","BE","BUT","BY","DO","DID","DOTH","EVEN",
|
||
"FOR","FROM","HAD","HAS","HAVE","HATH","HE","HER","HIS","HOW","I",
|
||
"IN","IS","IT","ITS","MAY","ME","MORE","MY","NAY","NO","NOT","NOW",
|
||
"OF","OR","OUR","SHALL","SHE","SO","SOME","THAT","THE","THEE",
|
||
"THEIR","THEN","THERE","THESE","THEY","THIS","THOSE","THOU","THUS",
|
||
"THY","TO","UP","UPON","US","WAS","WE","WHEN","WHERE","WHICH","WHO",
|
||
"WILL","WITH","YE","YEA","YET","YOU","YOUR",
|
||
"BEHOLD","CHAPTER","CHRIST","GOD","GHOST","HOLY","LORD","VERSE",
|
||
"CITY","DAYS","DAY","GREAT","LAND","MAN","MEN","NEW","PEOPLE","SON","TIME",
|
||
}
|
||
|
||
def _is_noise(t: str) -> bool:
|
||
t = t.strip()
|
||
if len(t) <= 1: return True
|
||
if t.isupper() and len(t) > 4: return True
|
||
if t.upper() in STOP_WORDS: return True
|
||
if re.search(r"[^a-zA-Z\-']", t): return True
|
||
if "-" not in t and t.lower() in TOP_10K and t.lower() not in WHITELIST:
|
||
return True
|
||
return False
|
||
|
||
def _canonical(text: str) -> str:
|
||
return " ".join(text.split()).title()
|
||
|
||
nlp = spacy.load("en_core_web_sm")
|
||
nlp.max_length = 4_000_000
|
||
|
||
PERSON = {"PERSON"}
|
||
PLACE = {"GPE", "LOC", "FAC"}
|
||
ORG = {"ORG", "NORP"}
|
||
OTHER = {"EVENT", "WORK_OF_ART", "LAW", "PRODUCT", "LANGUAGE"}
|
||
|
||
found: set[str] = set()
|
||
|
||
for path in source_paths:
|
||
raw = path.read_text(encoding="utf-8")
|
||
doc = nlp(raw)
|
||
|
||
for ent in doc.ents:
|
||
if ent.label_ not in (PERSON | PLACE | ORG | OTHER):
|
||
continue
|
||
for word in _canonical(ent.text).split():
|
||
if not _is_noise(word):
|
||
found.add(word)
|
||
|
||
for token in doc:
|
||
if token.pos_ != "PROPN":
|
||
continue
|
||
t = token.text.strip()
|
||
if not t[0].isupper() or t.isupper():
|
||
continue
|
||
if token.i == token.sent.start:
|
||
continue
|
||
word = _canonical(t)
|
||
if not _is_noise(word) and word not in found:
|
||
found.add(word)
|
||
|
||
return found
|
||
|
||
|
||
# ── Entry point ──────────────────────────────────────────────────────────────────
|
||
|
||
def main() -> None:
|
||
books = load_books_from_projects()
|
||
print(f"Loaded {len(books)} project(s):")
|
||
for b in books:
|
||
print(f" [{b.slug}] {b.label} ({len(b.source_paths)} file(s))")
|
||
|
||
app = QApplication(sys.argv)
|
||
window = ProperNounAuditor(books)
|
||
window.show()
|
||
sys.exit(app.exec())
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|