Files
saw_mill_knot_detection/annotation_gui.py.broken

709 lines
26 KiB
Plaintext
Raw Normal View History

"""
Simple customizable annotation GUI with auto-labeling support.
Built with Gradio - easy to modify and extend.
Run: python annotation_gui.py --images-dir /path/to/images
To set default paths, edit config.py
"""
from __future__ import annotations
import argparse
import json
import subprocess
import threading
from pathlib import Path
from typing import Any
import gradio as gr
import numpy as np
from PIL import Image, ImageDraw
# Try to load config, use fallbacks if not available
try:
from config import (
DEFAULT_IMAGES_DIR, DEFAULT_MODEL_WEIGHTS, DEFAULT_PORT,
DEFAULT_DETECTION_THRESHOLD, DEFAULT_TRAIN_EPOCHS,
DEFAULT_BATCH_SIZE, DEFAULT_LEARNING_RATE, DEFAULT_MODEL_SIZE
)
except ImportError:
DEFAULT_IMAGES_DIR = None
DEFAULT_MODEL_WEIGHTS = None
DEFAULT_PORT = 7860
DEFAULT_DETECTION_THRESHOLD = 0.5
DEFAULT_TRAIN_EPOCHS = 20
DEFAULT_BATCH_SIZE = 4
DEFAULT_LEARNING_RATE = 1e-4
DEFAULT_MODEL_SIZE = "small"
class AnnotationApp:
def __init__(self, images_dir: Path | None = None, model_weights: Path | None = None):
self.images_dir = images_dir if images_dir else Path.cwd()
self.current_model_path = model_weights
self.image_paths = []
self.current_idx = 0
self.annotations = {} # image_name -> list of boxes
self.model = None
self.training_process = None
self.training_thread = None
self.training_status = "Not training"
# Load images if directory provided
if images_dir and images_dir.exists():
self._load_images(images_dir)
if model_weights and model_weights.exists():
self._load_model(model_weights)
def _load_images(self, images_dir: Path):
"""Load images from directory."""
self.images_dir = images_dir
self.image_paths = sorted(
list(images_dir.glob("*.jpg")) + list(images_dir.glob("*.png"))
)
self.current_idx = 0
# Load existing annotations if present
self.ann_file = images_dir / "annotations.json"
if self.ann_file.exists():
with self.ann_file.open("r") as f:
self.annotations = json.load(f)
else:
self.annotations = {}
return f"✓ Loaded {len(self.image_paths)} images from {images_dir}"
def _load_model(self, weights_path: Path):
"""Load RF-DETR model for auto-labeling."""
try:
from rfdetr import RFDETRBase
print(f"Loading model from {weights_path}...")
self.model = RFDETRBase(pretrain_weights=str(weights_path))
self.current_model_path = weights_path
print("✓ Model loaded")
return f"✓ Model loaded from {weights_path.name}"
except Exception as e:
error_msg = f"⚠ Could not load model: {e}"
print(error_msg)
self.model = None
return error_msg
def load_new_model(self, weights_path: str) -> str:
"""Load a new model from the GUI."""
path = Path(weights_path)
if not path.exists():
return f"❌ File not found: {weights_path}"
return self._load_model(path)
def load_new_images_dir(self, images_dir: str) -> tuple[Image.Image | None, str, str]:
"""Load a new images directory from the GUI."""
path = Path(images_dir)
if not path.exists():
return None, "", f"❌ Directory not found: {images_dir}"
if not path.is_dir():
return None, "", f"❌ Not a directory: {images_dir}"
result = self._load_images(path)
# Load first image
if self.image_paths:
img, filename = self.get_current_image()
boxes = self.annotations.get(filename, [])
img_with_boxes = self.draw_boxes_on_image(img, boxes) if boxes else img
boxes_text = self._format_boxes_text(boxes)
info = f"{result}\nImage 1/{len(self.image_paths)}: {filename}"
return img_with_boxes, boxes_text, info
else:
return None, "", f"{result}\n⚠ No .jpg or .png images found in directory"
def get_current_model_info(self) -> str:
"""Get info about currently loaded model."""
if self.model and self.current_model_path:
return f"📦 Loaded: {self.current_model_path}"
elif self.model:
return "📦 Model loaded (pretrained)"
else:
return "⚠️ No model loaded"
def get_current_dir_info(self) -> str:
"""Get info about current images directory."""
return f"📁 {self.images_dir} ({len(self.image_paths)} images)"
def get_current_image(self) -> tuple[Image.Image, str]:
"""Get current image and filename."""
if not self.image_paths:
return None, ""
path = self.image_paths[self.current_idx]
img = Image.open(path).convert("RGB")
return img, path.name
def draw_boxes_on_image(self, img: Image.Image, boxes: list[dict]) -> Image.Image:
"""Draw bounding boxes on image."""
img_draw = img.copy()
draw = ImageDraw.Draw(img_draw)
for box in boxes:
x1, y1, x2, y2 = box["bbox"]
label = box.get("label", "knot")
conf = box.get("confidence", 1.0)
# Draw box
draw.rectangle([x1, y1, x2, y2], outline="red", width=3)
# Draw label
text = f"{label} {conf:.2f}" if conf < 1.0 else label
draw.text((x1, y1 - 20), text, fill="red")
return img_draw
def auto_label_current(self, threshold: float = 0.5) -> tuple[Image.Image, str, str]:
"""Auto-label current image with model."""
if not self.model:
return self.get_current_image()[0], "", "⚠ No model loaded"
img, filename = self.get_current_image()
if not img:
return None, "", "No images"
# Run inference
detections = self.model.predict(img, threshold=threshold)
# Convert to our format
boxes = []
for i in range(len(detections)):
xyxy = detections.xyxy[i].tolist()
conf = float(detections.confidence[i]) if detections.confidence is not None else 1.0
boxes.append({
"bbox": xyxy,
"label": "knot",
"confidence": conf,
"source": "auto"
})
# Save
self.annotations[filename] = boxes
self._save_annotations()
# Draw
img_with_boxes = self.draw_boxes_on_image(img, boxes)
info = f"✓ Auto-labeled: {len(boxes)} boxes detected"
boxes_text = self._format_boxes_text(boxes)
return img_with_boxes, boxes_text, info
def _format_boxes_text(self, boxes: list[dict]) -> str:
"""Format boxes for display."""
if not boxes:
return "No annotations"
lines = []
for i, box in enumerate(boxes):
x1, y1, x2, y2 = box["bbox"]
conf = box.get("confidence", 1.0)
source = box.get("source", "manual")
lines.append(f"{i}: [{x1:.0f}, {y1:.0f}, {x2:.0f}, {y2:.0f}] conf={conf:.2f} ({source})")
return "\n".join(lines)
def load_image(self, direction: str = "current") -> tuple[Image.Image, str, str]:
"""Load image (current/next/prev)."""
if direction == "next":
self.current_idx = min(self.current_idx + 1, len(self.image_paths) - 1)
elif direction == "prev":
self.current_idx = max(self.current_idx - 1, 0)
img, filename = self.get_current_image()
if not img:
return None, "", "No images"
# Load existing annotations
boxes = self.annotations.get(filename, [])
img_with_boxes = self.draw_boxes_on_image(img, boxes) if boxes else img
boxes_text = self._format_boxes_text(boxes)
info = f"Image {self.current_idx + 1}/{len(self.image_paths)}: {filename}"
return img_with_boxes, boxes_text, info
def add_box_manual(self, x1: int, y1: int, x2: int, y2: int) -> tuple[Image.Image, str, str]:
"""Manually add a bounding box."""
img, filename = self.get_current_image()
if not img:
return None, "", "No images"
# Add box
box = {
"bbox": [float(x1), float(y1), float(x2), float(y2)],
"label": "knot",
"confidence": 1.0,
"source": "manual"
}
if filename not in self.annotations:
self.annotations[filename] = []
self.annotations[filename].append(box)
self._save_annotations()
# Redraw
boxes = self.annotations[filename]
img_with_boxes = self.draw_boxes_on_image(img, boxes)
boxes_text = self._format_boxes_text(boxes)
info = f"✓ Added box: {len(boxes)} total"
return img_with_boxes, boxes_text, info
def delete_last_box(self) -> tuple[Image.Image, str, str]:
"""Delete the last box from current image."""
img, filename = self.get_current_image()
if not img:
return None, "", "No images"
if filename in self.annotations and self.annotations[filename]:
self.annotations[filename].pop()
self._save_annotations()
# Redraw
boxes = self.annotations.get(filename, [])
img_with_boxes = self.draw_boxes_on_image(img, boxes) if boxes else img
boxes_text = self._format_boxes_text(boxes)
info = f"✓ Deleted last box: {len(boxes)} remaining"
return img_with_boxes, boxes_text, info
def clear_boxes(self) -> tuple[Image.Image, str, str]:
"""Clear all boxes from current image."""
img, filename = self.get_current_image()
if not img:
return None, "", "No images"
self.annotations[filename] = []
self._save_annotations()
boxes_text = "No annotations"
info = "✓ Cleared all boxes"
return img, boxes_text, info
def _save_annotations(self):
"""Save annotations to JSON file."""
with self.ann_file.open("w") as f:
json.dump(self.annotations, f, indent=2)
def export_to_coco(self, output_path: Path):
"""Export annotations to COCO format."""
coco_data = {
"images": [],
"annotations": [],
"categories": [{"id": 0, "name": "knot", "supercategory": "defect"}]
}
ann_id = 0
for img_id, img_path in enumerate(self.image_paths):
filename = img_path.name
img = Image.open(img_path)
width, height = img.size
coco_data["images"].append({
"id": img_id,
"file_name": filename,
"width": width,
"height": height
})
# Add annotations
boxes = self.annotations.get(filename, [])
for box in boxes:
x1, y1, x2, y2 = box["bbox"]
w = x2 - x1
h = y2 - y1
coco_data["annotations"].append({
"id": ann_id,
"image_id": img_id,
"category_id": 0,
"bbox": [x1, y1, w, h],
"area": w * h,
"iscrowd": 0,
"score": box.get("confidence", 1.0)
})
ann_id += 1
with output_path.open("w") as f:
def prepare_training_dataset(self, output_dir: Path, train_split: float = 0.8, valid_split: float = 0.1):
"""Prepare dataset in RF-DETR format (train/valid/test splits)."""
output_dir.mkdir(parents=True, exist_ok=True)
# Create splits
import random
annotated_images = [img for img in self.image_paths if img.name in self.annotations and self.annotations[img.name]]
if len(annotated_images) < 10:
return f"⚠️ Need at least 10 annotated images, have {len(annotated_images)}"
random.shuffle(annotated_images)
n = len(annotated_images)
train_n = int(n * train_split)
valid_n = int(n * valid_split)
splits = {
"train": annotated_images[:train_n],
"valid": annotated_images[train_n:train_n + valid_n],
"test": annotated_images[train_n + valid_n:]
}
# Create directories and copy images
import shutil
for split_name, split_images in splits.items():
split_dir = output_dir / split_name
split_dir.mkdir(exist_ok=True)
# Prepare COCO JSON for this split
coco_data = {
"images": [],
"annotations": [],
"categories": [{"id": 0, "name": "knot", "supercategory": "defect"}]
}
ann_id = 0
for img_id, img_path in enumerate(split_images):
# Copy image
dest = split_dir / img_path.name
shutil.copy2(img_path, dest)
# Add to COCO
img = Image.open(img_path)
width, height = img.size
coco_data["images"].append({
"id": img_id,
"file_name": img_path.name,
"width": width,
"height": height
})
# Add annotations
boxes = self.annotations.get(img_path.name, [])
for box in boxes:
x1, y1, x2, y2 = box["bbox"]
w = x2 - x1
h = y2 - y1
coco_data["annotations"].append({
"id": ann_id,
"image_id": img_id,
"category_id": 0,
"bbox": [x1, y1, w, h],
"area": w * h,
"iscrowd": 0
})
ann_id += 1
# Save COCO JSON
with (split_dir / "_annotations.coco.json").open("w") as f:
json.dump(coco_data, f, indent=2)
return f"✓ Dataset prepared: {len(splits['train'])} train, {len(splits['valid'])} valid, {len(splits['test'])} test"
with gr.Row():
start_train_btn = gr.Button("🚀 Start Training", variant="primary")
stop_train_btn = gr.Button("⏹️ Stop Training", variant="stop")
refresh_status_btn = gr.Button("🔄 Refresh Status")
training_status = gr.Textbox(
label="Training Status",
value="Not training",
lines=3
)
gr.Markdown("""
**Note**: Training runs in the background. You can continue annotating while training.
Check the training log file for detailed progress.
""
boxes = self.annotations.get(img_path.name, [])
for box in boxes:
x1, y1, x2, y2 = box["bbox"]
w = x2 - x1
h = y2 - y1
coco_data["annotations"].append({
"id": ann_id,
"image_id": img_id,
"category_id": 0,
"bbox": [x1, y1, w, h],
"area": w * h,
"iscrowd": 0
})
ann_id += 1
# Save COCO JSON
with (split_dir / "_annotations.coco.json").open("w") as f:
json.dump(coco_data, f, indent=2)
return f"✓ Dataset prepared: {len(splits['train'])} train, {len(splits['valid'])} valid, {len(splits['test'])} test"
def start_training(self, dataset_dir: str, output_dir: str, model_size: str,
epochs: int, batch_size: int, lr: float, progress=gr.Progress()):
"""Start training in background."""
dataset_path = Path(dataset_dir)
output_path = Path(output_dir)
if not dataset_path.exists():
return "❌ Dataset directory not found"
if self.training_process and self.training_process.poll() is None:
return "⚠️ Training already in progress"
output_path.mkdir(parents=True, exist_ok=True)
# Build training command
venv_python = Path(__file__).parent / ".venv/bin/python"
train_script = Path(__file__).parent / "train_rfdetr.py"
cmd = [
str(venv_python),
str(train_script),
"--dataset-dir", str(dataset_path),
"--output-dir", str(output_path),
"--model", model_size,
"--epochs", str(epochs),
"--batch-size", str(batch_size),
"--grad-accum-steps", str(max(1, 16 // batch_size)),
"--lr", str(lr)
]
# Start training process
log_file = output_path / "training.log"
self.training_status = f"🚀 Starting training..."
def run_training():
try:
with log_file.open("w") as f:
self.training_process = subprocess.Popen(
cmd,
stdout=f,
stderr=subprocess.STDOUT,
text=True
)
self.training_status = f"⏳ Training in progress (PID: {self.training_process.pid})"
self.training_process.wait()
if self.training_process.returncode == 0:
self.training_status = "✅ Training completed successfully!"
# Reload model with new weights
best_weights = output_path / "checkpoint_best_total.pth"
if best_weights.exists():
self._load_model(best_weights)
else:
self.training_status = f"❌ Training failed (exit code {self.training_process.returncode})"
except Exception as e:
self.training_status = f"❌ Error: {e}"
self.training_thread = threading.Thread(target=run_training, daemon=True)
self.training_thread.start()
return f"✓ Training started! Check {log_file} for progress"
def get_training_status(self):
"""Get current training status."""
return self.training_status
def stop_training(self):
"""Stop the training process."""
if self.training_process and self.training_process.poll() is None:
self.training_process.terminate()
self.training_status = "⏹️ Training stopped by user"
return "✓ Training process terminated"
return "⚠️ No training in progress"
json.dump(coco_data, f, indent=2)
return f"✓ Exported {len(coco_data['annotations'])} annotations to {output_path}"
def create_ui(app: AnnotationApp) -> gr.Blocks:
"""Create Gradio UI."""
with gr.Blocks(title="Knot Annotation Tool") as demo:
gr.Markdown("""
# 🪵 Wood Knot Annotation Tool
**Label -> Train -> Auto-Label -> Repeat**
- Manually annotate images or use **Auto-Label** with your trained model
- Export and prepare dataset for training
- Train RF-DETR directly from this GUI
- Use trained model to auto-label more images
""")
with gr.Row():
with gr.Column(scale=3):
image_display = gr.Image(label="Current Image", type="pil")
with gr.Row():
prev_btn = gr.Button("⬅️ Previous")
next_btn = gr.Button("Next ➡️")
auto_label_btn = gr.Button("🤖 Auto-Label", variant="primary")
with gr.Row():
threshold_slider = gr.Slider(0.1, 0.9, DEFAULT_DETECTION_THRESHOLD, label="Detection Threshold")
with gr.Column(scale=1):
info_text = gr.Textbox(label="Status", lines=2)
boxes_text = gr.Textbox(label="Annotations", lines=10)
gr.Markdown("### Manual Annotation")
with gr.Row():
x1_input = gr.Number(label="x1", value=100)
y1_input = gr.Number(label="y1", value=100)
with gr.Row():
Training handlers
prep_btn.click(
lambda out, train, valid: app.prepare_training_dataset(Path(out), train, valid),
inputs=[dataset_prep_dir, train_split, valid_split],
outputs=[prep_result]
)
start_train_btn.click(
app.start_training,
inputs=[train_dataset_dir, train_output_dir, model_size, epochs, batch_size, learning_rate],
outputs=[training_status]
)
stop_train_btn.click(
app.stop_training,
outputs=[training_status]
)
refresh_status_btn.click(
app.get_training_status,
outputs=[training_status]
)
# x2_input = gr.Number(label="x2", value=200)
y2_input = gr.Number(label="y2", value=200)
add_box_btn = gr.Button(" Add Box")
delete_btn = gr.Button("🗑️ Delete Last")
clear_btn = gr.Button("❌ Clear All")
gr.Markdown("### Export")
export_path = gr.Textbox(
label="Output Path",
value="annotations_coco.json"
)
export_btn = gr.Button("💾 Export COCO")
export_result = gr.Textbox(label="Export Result")
# Event handlers
def on_load():
return app.load_image("current")
# Settings handlers
load_images_btn.click(
app.load_new_images_dir,
inputs=[images_dir_input],
outputs=[image_display, boxes_text, info_text]
).then(
lambda: (app.get_current_dir_info(), app.get_current_model_info()),
outputs=[dir_info, model_info]
)
load_model_btn.click(
app.load_new_model,
inputs=[model_weights_input],
outputs=[model_info]
)
prev_btn.click(
lambda: app.load_image("prev"),
outputs=[image_display, boxes_text, info_text]
)
next_btn.click(
lambda: app.load_image("next"),
outputs=[image_display, boxes_text, info_text]
)
auto_label_btn.click(
lambda t: app.auto_label_current(t),
inputs=[threshold_slider],
outputs=[image_display, boxes_text, info_text]
)
add_box_btn.click(
app.add_box_manual,
inputs=[x1_input, y1_input, x2_input, y2_input],
outputs=[image_display, boxes_text, info_text]
)
delete_btn.click(
app.delete_last_box,
outputs=[image_display, boxes_text, info_text]
)
clear_btn.click(
app.clear_boxes,
outputs=[image_display, boxes_text, info_text]
)help="Default directory with images (can be changed in GUI)")
parser.add_argument("--model-weights", type=Path, help="Default trained model for auto-labeling (can be changed in GUI)")
parser.add_argument("--port", type=int, default=7860, help="Port for web interface")
args = parser.parse_args()
# Validate paths if provided
if args.images_dir and not args.images_dir.exists():
print(f"⚠️ Warning: Images directory not found: {args.images_dir}")
print("You can load a different directory from the GUI Settings")
args.images_dir = None
if args.model_weights and not args.model_weights.exists():
print(f"⚠️ Warning: Model weights not found: {args.model_weights}")
print("You can load different weights from the GUI Settings")
args.model_weights = None
# Load first image on start
demo.load(on_load, outputs=[image_display, boxes_text, info_text])
return demo
def main():
parser = argparse.ArgumentParser(description="Simple annotation GUI with auto-labeling")
parser.add_argument("--images-dir", type=Path, required=True, help="Directory with images")
parser.add_argument("--model-weights", type=Path, help="Optional: trained model for auto-labeling")
parser.add_argument("--port", type=int, default=7860, help="Port for web interface")
args = parser.parse_args()
if not args.images_dir.exists():
raise SystemExit(f"Images directory not found: {args.images_dir}")
if args.images_dir:
print(f"📁 Default images: {args.images_dir} ({len(app.image_paths)} images)")
else:
print(f"📁 No default images - load directory from Settings")
if app.model:
print(f"🤖 Model: Loaded from {args.model_weights}")
else:
print(f"⚠️ No model loaded - load from Settings or train one")
print(f"💡 You can change images directory and model weights from the Settings panel
print(f"\n{'='*60}")
print(f"🚀 Starting annotation tool...")
print(f"📁 Images: {args.images_dir} ({len(app.image_paths)} images)")
if app.model:
print(f"🤖 Model: Loaded from {args.model_weights}")
else:
print(f"⚠️ No model loaded (manual annotation only)")
print(f"{'='*60}\n")
demo.launch(
server_name="0.0.0.0",
server_port=args.port,
share=False
)
if __name__ == "__main__":
main()