""" Simple customizable annotation GUI with auto-labeling support. Built with Gradio - easy to modify and extend. Run: python annotation_gui.py --images-dir /path/to/images To set default paths, edit config.py """ from __future__ import annotations import argparse import json import subprocess import threading from pathlib import Path from typing import Any import gradio as gr import numpy as np from PIL import Image, ImageDraw # Try to load config, use fallbacks if not available try: from config import ( DEFAULT_IMAGES_DIR, DEFAULT_MODEL_WEIGHTS, DEFAULT_PORT, DEFAULT_DETECTION_THRESHOLD, DEFAULT_TRAIN_EPOCHS, DEFAULT_BATCH_SIZE, DEFAULT_LEARNING_RATE, DEFAULT_MODEL_SIZE ) except ImportError: DEFAULT_IMAGES_DIR = None DEFAULT_MODEL_WEIGHTS = None DEFAULT_PORT = 7860 DEFAULT_DETECTION_THRESHOLD = 0.5 DEFAULT_TRAIN_EPOCHS = 20 DEFAULT_BATCH_SIZE = 4 DEFAULT_LEARNING_RATE = 1e-4 DEFAULT_MODEL_SIZE = "small" class AnnotationApp: def __init__(self, images_dir: Path | None = None, model_weights: Path | None = None): self.images_dir = images_dir if images_dir else Path.cwd() self.current_model_path = model_weights self.image_paths = [] self.current_idx = 0 self.annotations = {} # image_name -> list of boxes self.model = None self.training_process = None self.training_thread = None self.training_status = "Not training" # Load images if directory provided if images_dir and images_dir.exists(): self._load_images(images_dir) if model_weights and model_weights.exists(): self._load_model(model_weights) def _load_images(self, images_dir: Path): """Load images from directory.""" self.images_dir = images_dir self.image_paths = sorted( list(images_dir.glob("*.jpg")) + list(images_dir.glob("*.png")) ) self.current_idx = 0 # Load existing annotations if present self.ann_file = images_dir / "annotations.json" if self.ann_file.exists(): with self.ann_file.open("r") as f: self.annotations = json.load(f) else: self.annotations = {} return f"✓ Loaded {len(self.image_paths)} images from {images_dir}" def _load_model(self, weights_path: Path): """Load RF-DETR model for auto-labeling.""" try: from rfdetr import RFDETRBase print(f"Loading model from {weights_path}...") self.model = RFDETRBase(pretrain_weights=str(weights_path)) self.current_model_path = weights_path print("✓ Model loaded") return f"✓ Model loaded from {weights_path.name}" except Exception as e: error_msg = f"⚠ Could not load model: {e}" print(error_msg) self.model = None return error_msg def load_new_model(self, weights_path: str) -> str: """Load a new model from the GUI.""" path = Path(weights_path) if not path.exists(): return f"❌ File not found: {weights_path}" return self._load_model(path) def load_new_images_dir(self, images_dir: str) -> tuple[Image.Image | None, str, str]: """Load a new images directory from the GUI.""" path = Path(images_dir) if not path.exists(): return None, "", f"❌ Directory not found: {images_dir}" if not path.is_dir(): return None, "", f"❌ Not a directory: {images_dir}" result = self._load_images(path) # Load first image if self.image_paths: img, filename = self.get_current_image() boxes = self.annotations.get(filename, []) img_with_boxes = self.draw_boxes_on_image(img, boxes) if boxes else img boxes_text = self._format_boxes_text(boxes) info = f"{result}\nImage 1/{len(self.image_paths)}: {filename}" return img_with_boxes, boxes_text, info else: return None, "", f"{result}\n⚠️ No .jpg or .png images found in directory" def get_current_model_info(self) -> str: """Get info about currently loaded model.""" if self.model and self.current_model_path: return f"📦 Loaded: {self.current_model_path}" elif self.model: return "📦 Model loaded (pretrained)" else: return "⚠️ No model loaded" def get_current_dir_info(self) -> str: """Get info about current images directory.""" return f"📁 {self.images_dir} ({len(self.image_paths)} images)" def get_current_image(self) -> tuple[Image.Image, str]: """Get current image and filename.""" if not self.image_paths: return None, "" path = self.image_paths[self.current_idx] img = Image.open(path).convert("RGB") return img, path.name def draw_boxes_on_image(self, img: Image.Image, boxes: list[dict]) -> Image.Image: """Draw bounding boxes on image.""" img_draw = img.copy() draw = ImageDraw.Draw(img_draw) for box in boxes: x1, y1, x2, y2 = box["bbox"] label = box.get("label", "knot") conf = box.get("confidence", 1.0) # Draw box draw.rectangle([x1, y1, x2, y2], outline="red", width=3) # Draw label text = f"{label} {conf:.2f}" if conf < 1.0 else label draw.text((x1, y1 - 20), text, fill="red") return img_draw def auto_label_current(self, threshold: float = 0.5) -> tuple[Image.Image, str, str]: """Auto-label current image with model.""" if not self.model: return self.get_current_image()[0], "", "⚠ No model loaded" img, filename = self.get_current_image() if not img: return None, "", "No images" # Run inference detections = self.model.predict(img, threshold=threshold) # Convert to our format boxes = [] for i in range(len(detections)): xyxy = detections.xyxy[i].tolist() conf = float(detections.confidence[i]) if detections.confidence is not None else 1.0 boxes.append({ "bbox": xyxy, "label": "knot", "confidence": conf, "source": "auto" }) # Save self.annotations[filename] = boxes self._save_annotations() # Draw img_with_boxes = self.draw_boxes_on_image(img, boxes) info = f"✓ Auto-labeled: {len(boxes)} boxes detected" boxes_text = self._format_boxes_text(boxes) return img_with_boxes, boxes_text, info def _format_boxes_text(self, boxes: list[dict]) -> str: """Format boxes for display.""" if not boxes: return "No annotations" lines = [] for i, box in enumerate(boxes): x1, y1, x2, y2 = box["bbox"] conf = box.get("confidence", 1.0) source = box.get("source", "manual") lines.append(f"{i}: [{x1:.0f}, {y1:.0f}, {x2:.0f}, {y2:.0f}] conf={conf:.2f} ({source})") return "\n".join(lines) def load_image(self, direction: str = "current") -> tuple[Image.Image, str, str]: """Load image (current/next/prev).""" if direction == "next": self.current_idx = min(self.current_idx + 1, len(self.image_paths) - 1) elif direction == "prev": self.current_idx = max(self.current_idx - 1, 0) img, filename = self.get_current_image() if not img: return None, "", "No images" # Load existing annotations boxes = self.annotations.get(filename, []) img_with_boxes = self.draw_boxes_on_image(img, boxes) if boxes else img boxes_text = self._format_boxes_text(boxes) info = f"Image {self.current_idx + 1}/{len(self.image_paths)}: {filename}" return img_with_boxes, boxes_text, info def add_box_manual(self, x1: int, y1: int, x2: int, y2: int) -> tuple[Image.Image, str, str]: """Manually add a bounding box.""" img, filename = self.get_current_image() if not img: return None, "", "No images" # Add box box = { "bbox": [float(x1), float(y1), float(x2), float(y2)], "label": "knot", "confidence": 1.0, "source": "manual" } if filename not in self.annotations: self.annotations[filename] = [] self.annotations[filename].append(box) self._save_annotations() # Redraw boxes = self.annotations[filename] img_with_boxes = self.draw_boxes_on_image(img, boxes) boxes_text = self._format_boxes_text(boxes) info = f"✓ Added box: {len(boxes)} total" return img_with_boxes, boxes_text, info def delete_last_box(self) -> tuple[Image.Image, str, str]: """Delete the last box from current image.""" img, filename = self.get_current_image() if not img: return None, "", "No images" if filename in self.annotations and self.annotations[filename]: self.annotations[filename].pop() self._save_annotations() # Redraw boxes = self.annotations.get(filename, []) img_with_boxes = self.draw_boxes_on_image(img, boxes) if boxes else img boxes_text = self._format_boxes_text(boxes) info = f"✓ Deleted last box: {len(boxes)} remaining" return img_with_boxes, boxes_text, info def clear_boxes(self) -> tuple[Image.Image, str, str]: """Clear all boxes from current image.""" img, filename = self.get_current_image() if not img: return None, "", "No images" self.annotations[filename] = [] self._save_annotations() boxes_text = "No annotations" info = "✓ Cleared all boxes" return img, boxes_text, info def _save_annotations(self): """Save annotations to JSON file.""" with self.ann_file.open("w") as f: json.dump(self.annotations, f, indent=2) def export_to_coco(self, output_path: Path): """Export annotations to COCO format.""" coco_data = { "images": [], "annotations": [], "categories": [{"id": 0, "name": "knot", "supercategory": "defect"}] } ann_id = 0 for img_id, img_path in enumerate(self.image_paths): filename = img_path.name img = Image.open(img_path) width, height = img.size coco_data["images"].append({ "id": img_id, "file_name": filename, "width": width, "height": height }) # Add annotations boxes = self.annotations.get(filename, []) for box in boxes: x1, y1, x2, y2 = box["bbox"] w = x2 - x1 h = y2 - y1 coco_data["annotations"].append({ "id": ann_id, "image_id": img_id, "category_id": 0, "bbox": [x1, y1, w, h], "area": w * h, "iscrowd": 0, "score": box.get("confidence", 1.0) }) ann_id += 1 with output_path.open("w") as f: def prepare_training_dataset(self, output_dir: Path, train_split: float = 0.8, valid_split: float = 0.1): """Prepare dataset in RF-DETR format (train/valid/test splits).""" output_dir.mkdir(parents=True, exist_ok=True) # Create splits import random annotated_images = [img for img in self.image_paths if img.name in self.annotations and self.annotations[img.name]] if len(annotated_images) < 10: return f"⚠️ Need at least 10 annotated images, have {len(annotated_images)}" random.shuffle(annotated_images) n = len(annotated_images) train_n = int(n * train_split) valid_n = int(n * valid_split) splits = { "train": annotated_images[:train_n], "valid": annotated_images[train_n:train_n + valid_n], "test": annotated_images[train_n + valid_n:] } # Create directories and copy images import shutil for split_name, split_images in splits.items(): split_dir = output_dir / split_name split_dir.mkdir(exist_ok=True) # Prepare COCO JSON for this split coco_data = { "images": [], "annotations": [], "categories": [{"id": 0, "name": "knot", "supercategory": "defect"}] } ann_id = 0 for img_id, img_path in enumerate(split_images): # Copy image dest = split_dir / img_path.name shutil.copy2(img_path, dest) # Add to COCO img = Image.open(img_path) width, height = img.size coco_data["images"].append({ "id": img_id, "file_name": img_path.name, "width": width, "height": height }) # Add annotations boxes = self.annotations.get(img_path.name, []) for box in boxes: x1, y1, x2, y2 = box["bbox"] w = x2 - x1 h = y2 - y1 coco_data["annotations"].append({ "id": ann_id, "image_id": img_id, "category_id": 0, "bbox": [x1, y1, w, h], "area": w * h, "iscrowd": 0 }) ann_id += 1 # Save COCO JSON with (split_dir / "_annotations.coco.json").open("w") as f: json.dump(coco_data, f, indent=2) return f"✓ Dataset prepared: {len(splits['train'])} train, {len(splits['valid'])} valid, {len(splits['test'])} test" with gr.Row(): start_train_btn = gr.Button("🚀 Start Training", variant="primary") stop_train_btn = gr.Button("⏹️ Stop Training", variant="stop") refresh_status_btn = gr.Button("🔄 Refresh Status") training_status = gr.Textbox( label="Training Status", value="Not training", lines=3 ) gr.Markdown(""" **Note**: Training runs in the background. You can continue annotating while training. Check the training log file for detailed progress. "" boxes = self.annotations.get(img_path.name, []) for box in boxes: x1, y1, x2, y2 = box["bbox"] w = x2 - x1 h = y2 - y1 coco_data["annotations"].append({ "id": ann_id, "image_id": img_id, "category_id": 0, "bbox": [x1, y1, w, h], "area": w * h, "iscrowd": 0 }) ann_id += 1 # Save COCO JSON with (split_dir / "_annotations.coco.json").open("w") as f: json.dump(coco_data, f, indent=2) return f"✓ Dataset prepared: {len(splits['train'])} train, {len(splits['valid'])} valid, {len(splits['test'])} test" def start_training(self, dataset_dir: str, output_dir: str, model_size: str, epochs: int, batch_size: int, lr: float, progress=gr.Progress()): """Start training in background.""" dataset_path = Path(dataset_dir) output_path = Path(output_dir) if not dataset_path.exists(): return "❌ Dataset directory not found" if self.training_process and self.training_process.poll() is None: return "⚠️ Training already in progress" output_path.mkdir(parents=True, exist_ok=True) # Build training command venv_python = Path(__file__).parent / ".venv/bin/python" train_script = Path(__file__).parent / "train_rfdetr.py" cmd = [ str(venv_python), str(train_script), "--dataset-dir", str(dataset_path), "--output-dir", str(output_path), "--model", model_size, "--epochs", str(epochs), "--batch-size", str(batch_size), "--grad-accum-steps", str(max(1, 16 // batch_size)), "--lr", str(lr) ] # Start training process log_file = output_path / "training.log" self.training_status = f"🚀 Starting training..." def run_training(): try: with log_file.open("w") as f: self.training_process = subprocess.Popen( cmd, stdout=f, stderr=subprocess.STDOUT, text=True ) self.training_status = f"⏳ Training in progress (PID: {self.training_process.pid})" self.training_process.wait() if self.training_process.returncode == 0: self.training_status = "✅ Training completed successfully!" # Reload model with new weights best_weights = output_path / "checkpoint_best_total.pth" if best_weights.exists(): self._load_model(best_weights) else: self.training_status = f"❌ Training failed (exit code {self.training_process.returncode})" except Exception as e: self.training_status = f"❌ Error: {e}" self.training_thread = threading.Thread(target=run_training, daemon=True) self.training_thread.start() return f"✓ Training started! Check {log_file} for progress" def get_training_status(self): """Get current training status.""" return self.training_status def stop_training(self): """Stop the training process.""" if self.training_process and self.training_process.poll() is None: self.training_process.terminate() self.training_status = "⏹️ Training stopped by user" return "✓ Training process terminated" return "⚠️ No training in progress" json.dump(coco_data, f, indent=2) return f"✓ Exported {len(coco_data['annotations'])} annotations to {output_path}" def create_ui(app: AnnotationApp) -> gr.Blocks: """Create Gradio UI.""" with gr.Blocks(title="Knot Annotation Tool") as demo: gr.Markdown(""" # 🪵 Wood Knot Annotation Tool **Label -> Train -> Auto-Label -> Repeat** - Manually annotate images or use **Auto-Label** with your trained model - Export and prepare dataset for training - Train RF-DETR directly from this GUI - Use trained model to auto-label more images """) with gr.Row(): with gr.Column(scale=3): image_display = gr.Image(label="Current Image", type="pil") with gr.Row(): prev_btn = gr.Button("⬅️ Previous") next_btn = gr.Button("Next ➡️") auto_label_btn = gr.Button("🤖 Auto-Label", variant="primary") with gr.Row(): threshold_slider = gr.Slider(0.1, 0.9, DEFAULT_DETECTION_THRESHOLD, label="Detection Threshold") with gr.Column(scale=1): info_text = gr.Textbox(label="Status", lines=2) boxes_text = gr.Textbox(label="Annotations", lines=10) gr.Markdown("### Manual Annotation") with gr.Row(): x1_input = gr.Number(label="x1", value=100) y1_input = gr.Number(label="y1", value=100) with gr.Row(): Training handlers prep_btn.click( lambda out, train, valid: app.prepare_training_dataset(Path(out), train, valid), inputs=[dataset_prep_dir, train_split, valid_split], outputs=[prep_result] ) start_train_btn.click( app.start_training, inputs=[train_dataset_dir, train_output_dir, model_size, epochs, batch_size, learning_rate], outputs=[training_status] ) stop_train_btn.click( app.stop_training, outputs=[training_status] ) refresh_status_btn.click( app.get_training_status, outputs=[training_status] ) # x2_input = gr.Number(label="x2", value=200) y2_input = gr.Number(label="y2", value=200) add_box_btn = gr.Button("➕ Add Box") delete_btn = gr.Button("🗑️ Delete Last") clear_btn = gr.Button("❌ Clear All") gr.Markdown("### Export") export_path = gr.Textbox( label="Output Path", value="annotations_coco.json" ) export_btn = gr.Button("💾 Export COCO") export_result = gr.Textbox(label="Export Result") # Event handlers def on_load(): return app.load_image("current") # Settings handlers load_images_btn.click( app.load_new_images_dir, inputs=[images_dir_input], outputs=[image_display, boxes_text, info_text] ).then( lambda: (app.get_current_dir_info(), app.get_current_model_info()), outputs=[dir_info, model_info] ) load_model_btn.click( app.load_new_model, inputs=[model_weights_input], outputs=[model_info] ) prev_btn.click( lambda: app.load_image("prev"), outputs=[image_display, boxes_text, info_text] ) next_btn.click( lambda: app.load_image("next"), outputs=[image_display, boxes_text, info_text] ) auto_label_btn.click( lambda t: app.auto_label_current(t), inputs=[threshold_slider], outputs=[image_display, boxes_text, info_text] ) add_box_btn.click( app.add_box_manual, inputs=[x1_input, y1_input, x2_input, y2_input], outputs=[image_display, boxes_text, info_text] ) delete_btn.click( app.delete_last_box, outputs=[image_display, boxes_text, info_text] ) clear_btn.click( app.clear_boxes, outputs=[image_display, boxes_text, info_text] )help="Default directory with images (can be changed in GUI)") parser.add_argument("--model-weights", type=Path, help="Default trained model for auto-labeling (can be changed in GUI)") parser.add_argument("--port", type=int, default=7860, help="Port for web interface") args = parser.parse_args() # Validate paths if provided if args.images_dir and not args.images_dir.exists(): print(f"⚠️ Warning: Images directory not found: {args.images_dir}") print("You can load a different directory from the GUI Settings") args.images_dir = None if args.model_weights and not args.model_weights.exists(): print(f"⚠️ Warning: Model weights not found: {args.model_weights}") print("You can load different weights from the GUI Settings") args.model_weights = None # Load first image on start demo.load(on_load, outputs=[image_display, boxes_text, info_text]) return demo def main(): parser = argparse.ArgumentParser(description="Simple annotation GUI with auto-labeling") parser.add_argument("--images-dir", type=Path, required=True, help="Directory with images") parser.add_argument("--model-weights", type=Path, help="Optional: trained model for auto-labeling") parser.add_argument("--port", type=int, default=7860, help="Port for web interface") args = parser.parse_args() if not args.images_dir.exists(): raise SystemExit(f"Images directory not found: {args.images_dir}") if args.images_dir: print(f"📁 Default images: {args.images_dir} ({len(app.image_paths)} images)") else: print(f"📁 No default images - load directory from Settings") if app.model: print(f"🤖 Model: Loaded from {args.model_weights}") else: print(f"⚠️ No model loaded - load from Settings or train one") print(f"💡 You can change images directory and model weights from the Settings panel print(f"\n{'='*60}") print(f"🚀 Starting annotation tool...") print(f"📁 Images: {args.images_dir} ({len(app.image_paths)} images)") if app.model: print(f"🤖 Model: Loaded from {args.model_weights}") else: print(f"⚠️ No model loaded (manual annotation only)") print(f"{'='*60}\n") demo.launch( server_name="0.0.0.0", server_port=args.port, share=False ) if __name__ == "__main__": main()