diff --git a/.gitignore b/.gitignore index 95a6253..153385f 100644 --- a/.gitignore +++ b/.gitignore @@ -207,6 +207,7 @@ __marimo__/ # Projects datasets/ +!mini-nav/**/datasets/ data/ deps/ outputs/ diff --git a/mini-nav/configs/__init__.py b/mini-nav/configs/__init__.py index f62aed0..68771ba 100644 --- a/mini-nav/configs/__init__.py +++ b/mini-nav/configs/__init__.py @@ -5,6 +5,7 @@ from .config import ( from .loader import ConfigError, load_yaml, save_yaml from .models import ( Config, + DatasetConfig, ModelConfig, OutputConfig, PoolingType, @@ -14,6 +15,7 @@ __all__ = [ # Models "ModelConfig", "OutputConfig", + "DatasetConfig", "Config", "PoolingType", # Loader diff --git a/mini-nav/configs/config.yaml b/mini-nav/configs/config.yaml index 8c1f7f6..9c05347 100644 --- a/mini-nav/configs/config.yaml +++ b/mini-nav/configs/config.yaml @@ -5,3 +5,13 @@ model: output: directory: "./outputs" + +dataset: + dataset_root: "datasets/InsDet-FULL" + output_dir: "datasets/InsDet-FULL/Synthesized" + num_objects_range: [3, 8] + num_scenes: 1000 + object_scale_range: [0.1, 0.4] + rotation_range: [-30, 30] + overlap_threshold: 0.3 + seed: 42 diff --git a/mini-nav/configs/models.py b/mini-nav/configs/models.py index 16a443c..dd6c9cd 100644 --- a/mini-nav/configs/models.py +++ b/mini-nav/configs/models.py @@ -1,17 +1,10 @@ """Pydantic data models for feature compressor configuration.""" -from enum import Enum from pathlib import Path from pydantic import BaseModel, ConfigDict, Field, field_validator -class PoolingType(str, Enum): - """Enum for pooling types.""" - - ATTENTION = "attention" - - class ModelConfig(BaseModel): """Configuration for the vision model and compression.""" @@ -42,10 +35,60 @@ class OutputConfig(BaseModel): return Path(__file__).parent.parent.parent / v +class DatasetConfig(BaseModel): + """Configuration for synthetic dataset generation.""" + + model_config = ConfigDict(extra="ignore") + + dataset_root: Path = ( + Path(__file__).parent.parent.parent / "datasets" / "InsDet-FULL" + ) + output_dir: Path = ( + Path(__file__).parent.parent.parent / "datasets" / "InsDet-FULL" / "Synthesized" + ) + num_objects_range: tuple[int, int] = (3, 8) + num_scenes: int = 1000 + object_scale_range: tuple[float, float] = (0.1, 0.4) + rotation_range: tuple[int, int] = (-30, 30) + overlap_threshold: float = 0.3 + seed: int = 42 + + @field_validator("dataset_root", "output_dir", mode="after") + def convert_to_absolute(cls, v: Path) -> Path: + """ + Converts the path to an absolute path relative to the project root. + This works even if the path doesn't exist on disk. + """ + if v.is_absolute(): + return v + return Path(__file__).parent.parent.parent / v + + @field_validator("num_objects_range", mode="after") + def validate_num_objects(cls, v: tuple[int, int]) -> tuple[int, int]: + if v[0] < 1 or v[1] < v[0]: + raise ValueError("num_objects_range must have min >= 1 and min <= max") + return v + + @field_validator("object_scale_range", mode="after") + def validate_scale(cls, v: tuple[float, float]) -> tuple[float, float]: + if v[0] <= 0 or v[1] <= 0 or v[1] < v[0]: + raise ValueError( + "object_scale_range must have positive values and min <= max" + ) + return v + + @field_validator("overlap_threshold", mode="after") + def validate_overlap(cls, v: float) -> float: + if not 0 <= v <= 1: + raise ValueError("overlap_threshold must be between 0 and 1") + return v + + class Config(BaseModel): """Root configuration for the feature compressor.""" model_config = ConfigDict(extra="ignore") - model: ModelConfig - output: OutputConfig + model: ModelConfig = Field(default_factory=ModelConfig) + output: OutputConfig = Field(default_factory=OutputConfig) + dataset: DatasetConfig = Field(default_factory=DatasetConfig) diff --git a/mini-nav/datasets/__init__.py b/mini-nav/datasets/__init__.py new file mode 100644 index 0000000..4eb1ddc --- /dev/null +++ b/mini-nav/datasets/__init__.py @@ -0,0 +1,8 @@ +from .loader import SynthDataset, ValDataset +from .synthesizer import ImageSynthesizer + +__all__ = [ + "ImageSynthesizer", + "SynthDataset", + "ValDataset", +] diff --git a/mini-nav/datasets/loader.py b/mini-nav/datasets/loader.py new file mode 100644 index 0000000..d0f79e3 --- /dev/null +++ b/mini-nav/datasets/loader.py @@ -0,0 +1,105 @@ +"""Data loaders for synthetic and validation datasets.""" + +from collections.abc import Iterator +from pathlib import Path + +from PIL import Image + + +class SynthDataset: + """Dataset loader for synthesized training images.""" + + def __init__(self, synth_dir: Path, annotations_suffix: str = ".txt"): + """ + Initialize the synthetic dataset loader. + + Args: + synth_dir: Directory containing synthesized images and annotations + annotations_suffix: Suffix for annotation files + """ + self.synth_dir = Path(synth_dir) + self.annotations_suffix = annotations_suffix + + # Find all images + self.image_files = sorted(self.synth_dir.glob("synth_*.jpg")) + + def __len__(self) -> int: + return len(self.image_files) + + def __getitem__(self, idx: int) -> tuple[Image.Image, list[tuple[str, int, int, int, int]]]: + """Get a single item. + + Args: + idx: Index of the item + + Returns: + Tuple of (image, annotations) where annotations is a list of + (category, xmin, ymin, xmax, ymax) + """ + img_path = self.image_files[idx] + image = Image.open(img_path).convert("RGB") + + # Load annotations + anno_path = img_path.with_suffix(self.annotations_suffix) + annotations: list[tuple[str, int, int, int, int]] = [] + + if anno_path.exists(): + with open(anno_path, "r") as f: + for line in f: + line = line.strip() + if line: + parts = line.split() + if len(parts) == 5: + category = parts[0] + xmin, ymin, xmax, ymax = map(int, parts[1:]) + annotations.append((category, xmin, ymin, xmax, ymax)) + + return image, annotations + + def __iter__(self) -> Iterator[tuple[Image.Image, list[tuple[str, int, int, int, int]]]]: + """Iterate over the dataset.""" + for i in range(len(self)): + yield self[i] + + +class ValDataset: + """Dataset loader for validation scene images.""" + + def __init__(self, scenes_dir: Path, split: str = "easy"): + """ + Initialize the validation dataset loader. + + Args: + scenes_dir: Directory containing scene subdirectories + split: Scene split to load ('easy' or 'hard') + """ + self.scenes_dir = Path(scenes_dir) + self.split = split + + self.split_dir = self.scenes_dir / split + if not self.split_dir.exists(): + raise ValueError(f"Scene split directory not found: {self.split_dir}") + + # Find all RGB images + self.image_files = sorted(self.split_dir.glob("*/rgb_*.jpg")) + + def __len__(self) -> int: + return len(self.image_files) + + def __getitem__(self, idx: int) -> tuple[Image.Image, Path]: + """Get a single item. + + Args: + idx: Index of the item + + Returns: + Tuple of (image, scene_path) + """ + img_path = self.image_files[idx] + image = Image.open(img_path).convert("RGB") + return image, img_path.parent + + def __iter__(self) -> Iterator[tuple[Image.Image, Path]]: + """Iterate over the dataset.""" + for i in range(len(self)): + yield self[i] diff --git a/mini-nav/datasets/synthesizer.py b/mini-nav/datasets/synthesizer.py new file mode 100644 index 0000000..553b466 --- /dev/null +++ b/mini-nav/datasets/synthesizer.py @@ -0,0 +1,295 @@ +"""Image synthesizer for generating synthetic object detection datasets.""" + +import random +from pathlib import Path + +import numpy as np +from PIL import Image +from PIL.Image import Resampling + + +class ImageSynthesizer: + """Synthesizes composite images from background and object images with masks.""" + + def __init__( + self, + dataset_root: Path, + output_dir: Path, + num_objects_range: tuple[int, int] = (3, 8), + num_scenes: int = 1000, + object_scale_range: tuple[float, float] = (0.1, 0.4), + rotation_range: tuple[int, int] = (-30, 30), + overlap_threshold: float = 0.3, + seed: int = 42, + ): + """ + Initialize the image synthesizer. + + Args: + dataset_root: Root directory of the dataset (InsDet-FULL) + output_dir: Directory to save synthesized images + num_objects_range: Range of number of objects per scene + num_scenes: Number of scenes to generate + object_scale_range: Range of object scale relative to background + rotation_range: Range of rotation angles in degrees + overlap_threshold: Maximum allowed overlap ratio + seed: Random seed for reproducibility + """ + self.dataset_root = Path(dataset_root) + self.output_dir = Path(output_dir) + self.num_objects_range = num_objects_range + self.num_scenes = num_scenes + self.object_scale_range = object_scale_range + self.rotation_range = rotation_range + self.overlap_threshold = overlap_threshold + self.seed = seed + + self.background_dir = self.dataset_root / "Background" + self.objects_dir = self.dataset_root / "Objects" + self.scenes_dir = self.dataset_root / "Scenes" + + # Will be populated on first use + self._background_categories: list[str] | None = None + self._object_categories: list[str] | None = None + + @property + def background_images(self) -> list[Path]: + """List of background image paths.""" + if self._background_categories is None: + self._background_categories = sorted( + [p.name for p in self.background_dir.iterdir() if p.suffix in [".jpg", ".jpeg", ".png"]] + ) + # Return as list of Path for type compatibility + return [self.background_dir / name for name in self._background_categories] # type: ignore[return-value] + + @property + def object_categories(self) -> list[str]: + """List of object categories.""" + if self._object_categories is None: + self._object_categories = sorted( + [d.name for d in self.objects_dir.iterdir() if d.is_dir()] + ) + return self._object_categories + + def load_background(self, path: Path) -> Image.Image: + """Load a background image. + + Args: + path: Background image path + + Returns: + PIL Image + """ + return Image.open(path).convert("RGB") + + def load_object(self, category: str, angle: int) -> tuple[Image.Image, Image.Image]: + """Load an object image and its mask. + + Args: + category: Object category name (e.g., '099_mug_blue') + angle: Angle index (1-24) + + Returns: + Tuple of (image, mask) as PIL Images + """ + img_path = self.objects_dir / category / "images" / f"{angle:03d}.jpg" + mask_path = self.objects_dir / category / "masks" / f"{angle:03d}.png" + image = Image.open(img_path).convert("RGB") + mask = Image.open(mask_path).convert("L") + return image, mask + + def get_random_background(self) -> tuple[Image.Image, Path]: + """Get a random background image. + + Returns: + Tuple of (image, path) + """ + path = random.choice(self.background_images) + return self.load_background(path), path + + def get_random_object(self) -> tuple[Image.Image, Image.Image, str]: + """Get a random object with its mask. + + Returns: + Tuple of (image, mask, category_name) + """ + category = random.choice(self.object_categories) + angle = random.randint(1, 24) + image, mask = self.load_object(category, angle) + return image, mask, category + + def _rotate_image_and_mask( + self, image: Image.Image, mask: Image.Image, angle: float + ) -> tuple[Image.Image, Image.Image]: + """Rotate image and mask together.""" + image = image.rotate(angle, resample=Resampling.BILINEAR, expand=True) + mask = mask.rotate(angle, resample=Resampling.BILINEAR, expand=True) + return image, mask + + def _compute_overlap(self, box1: tuple[int, int, int, int], box2: tuple[int, int, int, int]) -> float: + """Compute overlap ratio between two boxes. + + Args: + box1: (xmin, ymin, xmax, ymax) + box2: (xmin, ymin, xmax, ymax) + + Returns: + Overlap ratio (area of intersection / area of smaller box) + """ + x1_min, y1_min, x1_max, y1_max = box1 + x2_min, y2_min, x2_max, y2_max = box2 + + # Compute intersection + inter_xmin = max(x1_min, x2_min) + inter_ymin = max(y1_min, y2_min) + inter_xmax = min(x1_max, x2_max) + inter_ymax = min(y1_max, y2_max) + + if inter_xmax <= inter_xmin or inter_ymax <= inter_ymin: + return 0.0 + + inter_area = (inter_xmax - inter_xmin) * (inter_ymax - inter_ymin) + box1_area = (x1_max - x1_min) * (y1_max - y1_min) + box2_area = (x2_max - x2_min) * (y2_max - y2_min) + min_area = min(box1_area, box2_area) + + return inter_area / min_area if min_area > 0 else 0.0 + + def _place_object( + self, + background: Image.Image, + obj_image: Image.Image, + obj_mask: Image.Image, + existing_boxes: list[tuple[int, int, int, int]], + scale: float, + ) -> tuple[Image.Image, Image.Image, tuple[int, int, int, int]] | None: + """Place an object on the background without exceeding overlap threshold. + + Args: + background: Background PIL Image + obj_image: Object PIL Image (RGB) + obj_mask: Object PIL Image (L) + existing_boxes: List of existing object boxes + scale: Scale factor for the object + + Returns: + Tuple of (new_background, updated_mask, new_box) or None if placement failed + """ + bg_w, bg_h = background.size + + # Scale object + obj_w, obj_h = obj_image.size + new_w = int(obj_w * scale) + new_h = int(obj_h * scale) + + if new_w <= 0 or new_h <= 0: + return None + + obj_image = obj_image.resize((new_w, new_h), Resampling.LANCZOS) + obj_mask = obj_mask.resize((new_w, new_h), Resampling.LANCZOS) + + # Try to find a valid position + max_attempts = 50 + for _ in range(max_attempts): + # Random position + x = random.randint(0, bg_w - new_w) + y = random.randint(0, bg_h - new_h) + + new_box = (x, y, x + new_w, y + new_h) + + # Check overlap with existing boxes + valid = True + for existing_box in existing_boxes: + overlap = self._compute_overlap(new_box, existing_box) + if overlap > self.overlap_threshold: + valid = False + break + + if valid: + # Composite object onto background + background = background.copy() + mask_array = np.array(obj_mask) / 255.0 + bg_array = np.array(background) + obj_array = np.array(obj_image) + + # Apply mask + mask_3d = np.stack([mask_array] * 3, axis=-1) + bg_array[y:y+new_h, x:x+new_w] = ( + bg_array[y:y+new_h, x:x+new_w] * (1 - mask_3d) + + obj_array * mask_3d + ) + + return Image.fromarray(bg_array), obj_mask, new_box + + return None + + def synthesize_scene(self) -> tuple[Image.Image, list[tuple[str, int, int, int, int]]]: + """Synthesize a single scene with random objects. + + Returns: + Tuple of (synthesized_image, list of (category, xmin, ymin, xmax, ymax)) + """ + random.seed(self.seed) + np.random.seed(self.seed) + + # Load background + background, _ = self.get_random_background() + + # Determine number of objects + num_objects = random.randint(*self.num_objects_range) + + # Place objects + placed_boxes: list[tuple[int, int, int, int]] = [] + annotations: list[tuple[str, int, int, int, int]] = [] + + for _ in range(num_objects): + # Get random object + obj_image, obj_mask, obj_category = self.get_random_object() + + # Get random scale + scale = random.uniform(*self.object_scale_range) + + # Get random rotation + angle = random.uniform(*self.rotation_range) + obj_image, obj_mask = self._rotate_image_and_mask(obj_image, obj_mask, angle) + + # Try to place object + result = self._place_object(background, obj_image, obj_mask, placed_boxes, scale) + + if result is not None: + background, _, box = result + placed_boxes.append(box) + annotations.append((obj_category, box[0], box[1], box[2], box[3])) + + return background, annotations + + def generate(self) -> list[Path]: + """Generate all synthesized scenes. + + Returns: + List of paths to generated images + """ + self.output_dir.mkdir(parents=True, exist_ok=True) + + generated_files: list[Path] = [] + + for i in range(self.num_scenes): + # Update seed for each scene + random.seed(self.seed + i) + np.random.seed(self.seed + i) + + image, annotations = self.synthesize_scene() + + # Save image + img_path = self.output_dir / f"synth_{i:04d}.jpg" + image.save(img_path, quality=95) + + # Save annotation + anno_path = self.output_dir / f"synth_{i:04d}.txt" + with open(anno_path, "w") as f: + for category, xmin, ymin, xmax, ymax in annotations: + f.write(f"{category} {xmin} {ymin} {xmax} {ymax}\n") + + generated_files.append(img_path) + + return generated_files