feat(dataset): add synthetic dataset generation and configuration

This commit is contained in:
2026-02-28 21:15:45 +08:00
parent f61857feba
commit 77d715a2cf
7 changed files with 473 additions and 9 deletions

1
.gitignore vendored
View File

@@ -207,6 +207,7 @@ __marimo__/
# Projects # Projects
datasets/ datasets/
!mini-nav/**/datasets/
data/ data/
deps/ deps/
outputs/ outputs/

View File

@@ -5,6 +5,7 @@ from .config import (
from .loader import ConfigError, load_yaml, save_yaml from .loader import ConfigError, load_yaml, save_yaml
from .models import ( from .models import (
Config, Config,
DatasetConfig,
ModelConfig, ModelConfig,
OutputConfig, OutputConfig,
PoolingType, PoolingType,
@@ -14,6 +15,7 @@ __all__ = [
# Models # Models
"ModelConfig", "ModelConfig",
"OutputConfig", "OutputConfig",
"DatasetConfig",
"Config", "Config",
"PoolingType", "PoolingType",
# Loader # Loader

View File

@@ -5,3 +5,13 @@ model:
output: output:
directory: "./outputs" directory: "./outputs"
dataset:
dataset_root: "datasets/InsDet-FULL"
output_dir: "datasets/InsDet-FULL/Synthesized"
num_objects_range: [3, 8]
num_scenes: 1000
object_scale_range: [0.1, 0.4]
rotation_range: [-30, 30]
overlap_threshold: 0.3
seed: 42

View File

@@ -1,17 +1,10 @@
"""Pydantic data models for feature compressor configuration.""" """Pydantic data models for feature compressor configuration."""
from enum import Enum
from pathlib import Path from pathlib import Path
from pydantic import BaseModel, ConfigDict, Field, field_validator from pydantic import BaseModel, ConfigDict, Field, field_validator
class PoolingType(str, Enum):
"""Enum for pooling types."""
ATTENTION = "attention"
class ModelConfig(BaseModel): class ModelConfig(BaseModel):
"""Configuration for the vision model and compression.""" """Configuration for the vision model and compression."""
@@ -42,10 +35,60 @@ class OutputConfig(BaseModel):
return Path(__file__).parent.parent.parent / v return Path(__file__).parent.parent.parent / v
class DatasetConfig(BaseModel):
"""Configuration for synthetic dataset generation."""
model_config = ConfigDict(extra="ignore")
dataset_root: Path = (
Path(__file__).parent.parent.parent / "datasets" / "InsDet-FULL"
)
output_dir: Path = (
Path(__file__).parent.parent.parent / "datasets" / "InsDet-FULL" / "Synthesized"
)
num_objects_range: tuple[int, int] = (3, 8)
num_scenes: int = 1000
object_scale_range: tuple[float, float] = (0.1, 0.4)
rotation_range: tuple[int, int] = (-30, 30)
overlap_threshold: float = 0.3
seed: int = 42
@field_validator("dataset_root", "output_dir", mode="after")
def convert_to_absolute(cls, v: Path) -> Path:
"""
Converts the path to an absolute path relative to the project root.
This works even if the path doesn't exist on disk.
"""
if v.is_absolute():
return v
return Path(__file__).parent.parent.parent / v
@field_validator("num_objects_range", mode="after")
def validate_num_objects(cls, v: tuple[int, int]) -> tuple[int, int]:
if v[0] < 1 or v[1] < v[0]:
raise ValueError("num_objects_range must have min >= 1 and min <= max")
return v
@field_validator("object_scale_range", mode="after")
def validate_scale(cls, v: tuple[float, float]) -> tuple[float, float]:
if v[0] <= 0 or v[1] <= 0 or v[1] < v[0]:
raise ValueError(
"object_scale_range must have positive values and min <= max"
)
return v
@field_validator("overlap_threshold", mode="after")
def validate_overlap(cls, v: float) -> float:
if not 0 <= v <= 1:
raise ValueError("overlap_threshold must be between 0 and 1")
return v
class Config(BaseModel): class Config(BaseModel):
"""Root configuration for the feature compressor.""" """Root configuration for the feature compressor."""
model_config = ConfigDict(extra="ignore") model_config = ConfigDict(extra="ignore")
model: ModelConfig model: ModelConfig = Field(default_factory=ModelConfig)
output: OutputConfig output: OutputConfig = Field(default_factory=OutputConfig)
dataset: DatasetConfig = Field(default_factory=DatasetConfig)

View File

@@ -0,0 +1,8 @@
from .loader import SynthDataset, ValDataset
from .synthesizer import ImageSynthesizer
__all__ = [
"ImageSynthesizer",
"SynthDataset",
"ValDataset",
]

105
mini-nav/datasets/loader.py Normal file
View File

@@ -0,0 +1,105 @@
"""Data loaders for synthetic and validation datasets."""
from collections.abc import Iterator
from pathlib import Path
from PIL import Image
class SynthDataset:
"""Dataset loader for synthesized training images."""
def __init__(self, synth_dir: Path, annotations_suffix: str = ".txt"):
"""
Initialize the synthetic dataset loader.
Args:
synth_dir: Directory containing synthesized images and annotations
annotations_suffix: Suffix for annotation files
"""
self.synth_dir = Path(synth_dir)
self.annotations_suffix = annotations_suffix
# Find all images
self.image_files = sorted(self.synth_dir.glob("synth_*.jpg"))
def __len__(self) -> int:
return len(self.image_files)
def __getitem__(self, idx: int) -> tuple[Image.Image, list[tuple[str, int, int, int, int]]]:
"""Get a single item.
Args:
idx: Index of the item
Returns:
Tuple of (image, annotations) where annotations is a list of
(category, xmin, ymin, xmax, ymax)
"""
img_path = self.image_files[idx]
image = Image.open(img_path).convert("RGB")
# Load annotations
anno_path = img_path.with_suffix(self.annotations_suffix)
annotations: list[tuple[str, int, int, int, int]] = []
if anno_path.exists():
with open(anno_path, "r") as f:
for line in f:
line = line.strip()
if line:
parts = line.split()
if len(parts) == 5:
category = parts[0]
xmin, ymin, xmax, ymax = map(int, parts[1:])
annotations.append((category, xmin, ymin, xmax, ymax))
return image, annotations
def __iter__(self) -> Iterator[tuple[Image.Image, list[tuple[str, int, int, int, int]]]]:
"""Iterate over the dataset."""
for i in range(len(self)):
yield self[i]
class ValDataset:
"""Dataset loader for validation scene images."""
def __init__(self, scenes_dir: Path, split: str = "easy"):
"""
Initialize the validation dataset loader.
Args:
scenes_dir: Directory containing scene subdirectories
split: Scene split to load ('easy' or 'hard')
"""
self.scenes_dir = Path(scenes_dir)
self.split = split
self.split_dir = self.scenes_dir / split
if not self.split_dir.exists():
raise ValueError(f"Scene split directory not found: {self.split_dir}")
# Find all RGB images
self.image_files = sorted(self.split_dir.glob("*/rgb_*.jpg"))
def __len__(self) -> int:
return len(self.image_files)
def __getitem__(self, idx: int) -> tuple[Image.Image, Path]:
"""Get a single item.
Args:
idx: Index of the item
Returns:
Tuple of (image, scene_path)
"""
img_path = self.image_files[idx]
image = Image.open(img_path).convert("RGB")
return image, img_path.parent
def __iter__(self) -> Iterator[tuple[Image.Image, Path]]:
"""Iterate over the dataset."""
for i in range(len(self)):
yield self[i]

View File

@@ -0,0 +1,295 @@
"""Image synthesizer for generating synthetic object detection datasets."""
import random
from pathlib import Path
import numpy as np
from PIL import Image
from PIL.Image import Resampling
class ImageSynthesizer:
"""Synthesizes composite images from background and object images with masks."""
def __init__(
self,
dataset_root: Path,
output_dir: Path,
num_objects_range: tuple[int, int] = (3, 8),
num_scenes: int = 1000,
object_scale_range: tuple[float, float] = (0.1, 0.4),
rotation_range: tuple[int, int] = (-30, 30),
overlap_threshold: float = 0.3,
seed: int = 42,
):
"""
Initialize the image synthesizer.
Args:
dataset_root: Root directory of the dataset (InsDet-FULL)
output_dir: Directory to save synthesized images
num_objects_range: Range of number of objects per scene
num_scenes: Number of scenes to generate
object_scale_range: Range of object scale relative to background
rotation_range: Range of rotation angles in degrees
overlap_threshold: Maximum allowed overlap ratio
seed: Random seed for reproducibility
"""
self.dataset_root = Path(dataset_root)
self.output_dir = Path(output_dir)
self.num_objects_range = num_objects_range
self.num_scenes = num_scenes
self.object_scale_range = object_scale_range
self.rotation_range = rotation_range
self.overlap_threshold = overlap_threshold
self.seed = seed
self.background_dir = self.dataset_root / "Background"
self.objects_dir = self.dataset_root / "Objects"
self.scenes_dir = self.dataset_root / "Scenes"
# Will be populated on first use
self._background_categories: list[str] | None = None
self._object_categories: list[str] | None = None
@property
def background_images(self) -> list[Path]:
"""List of background image paths."""
if self._background_categories is None:
self._background_categories = sorted(
[p.name for p in self.background_dir.iterdir() if p.suffix in [".jpg", ".jpeg", ".png"]]
)
# Return as list of Path for type compatibility
return [self.background_dir / name for name in self._background_categories] # type: ignore[return-value]
@property
def object_categories(self) -> list[str]:
"""List of object categories."""
if self._object_categories is None:
self._object_categories = sorted(
[d.name for d in self.objects_dir.iterdir() if d.is_dir()]
)
return self._object_categories
def load_background(self, path: Path) -> Image.Image:
"""Load a background image.
Args:
path: Background image path
Returns:
PIL Image
"""
return Image.open(path).convert("RGB")
def load_object(self, category: str, angle: int) -> tuple[Image.Image, Image.Image]:
"""Load an object image and its mask.
Args:
category: Object category name (e.g., '099_mug_blue')
angle: Angle index (1-24)
Returns:
Tuple of (image, mask) as PIL Images
"""
img_path = self.objects_dir / category / "images" / f"{angle:03d}.jpg"
mask_path = self.objects_dir / category / "masks" / f"{angle:03d}.png"
image = Image.open(img_path).convert("RGB")
mask = Image.open(mask_path).convert("L")
return image, mask
def get_random_background(self) -> tuple[Image.Image, Path]:
"""Get a random background image.
Returns:
Tuple of (image, path)
"""
path = random.choice(self.background_images)
return self.load_background(path), path
def get_random_object(self) -> tuple[Image.Image, Image.Image, str]:
"""Get a random object with its mask.
Returns:
Tuple of (image, mask, category_name)
"""
category = random.choice(self.object_categories)
angle = random.randint(1, 24)
image, mask = self.load_object(category, angle)
return image, mask, category
def _rotate_image_and_mask(
self, image: Image.Image, mask: Image.Image, angle: float
) -> tuple[Image.Image, Image.Image]:
"""Rotate image and mask together."""
image = image.rotate(angle, resample=Resampling.BILINEAR, expand=True)
mask = mask.rotate(angle, resample=Resampling.BILINEAR, expand=True)
return image, mask
def _compute_overlap(self, box1: tuple[int, int, int, int], box2: tuple[int, int, int, int]) -> float:
"""Compute overlap ratio between two boxes.
Args:
box1: (xmin, ymin, xmax, ymax)
box2: (xmin, ymin, xmax, ymax)
Returns:
Overlap ratio (area of intersection / area of smaller box)
"""
x1_min, y1_min, x1_max, y1_max = box1
x2_min, y2_min, x2_max, y2_max = box2
# Compute intersection
inter_xmin = max(x1_min, x2_min)
inter_ymin = max(y1_min, y2_min)
inter_xmax = min(x1_max, x2_max)
inter_ymax = min(y1_max, y2_max)
if inter_xmax <= inter_xmin or inter_ymax <= inter_ymin:
return 0.0
inter_area = (inter_xmax - inter_xmin) * (inter_ymax - inter_ymin)
box1_area = (x1_max - x1_min) * (y1_max - y1_min)
box2_area = (x2_max - x2_min) * (y2_max - y2_min)
min_area = min(box1_area, box2_area)
return inter_area / min_area if min_area > 0 else 0.0
def _place_object(
self,
background: Image.Image,
obj_image: Image.Image,
obj_mask: Image.Image,
existing_boxes: list[tuple[int, int, int, int]],
scale: float,
) -> tuple[Image.Image, Image.Image, tuple[int, int, int, int]] | None:
"""Place an object on the background without exceeding overlap threshold.
Args:
background: Background PIL Image
obj_image: Object PIL Image (RGB)
obj_mask: Object PIL Image (L)
existing_boxes: List of existing object boxes
scale: Scale factor for the object
Returns:
Tuple of (new_background, updated_mask, new_box) or None if placement failed
"""
bg_w, bg_h = background.size
# Scale object
obj_w, obj_h = obj_image.size
new_w = int(obj_w * scale)
new_h = int(obj_h * scale)
if new_w <= 0 or new_h <= 0:
return None
obj_image = obj_image.resize((new_w, new_h), Resampling.LANCZOS)
obj_mask = obj_mask.resize((new_w, new_h), Resampling.LANCZOS)
# Try to find a valid position
max_attempts = 50
for _ in range(max_attempts):
# Random position
x = random.randint(0, bg_w - new_w)
y = random.randint(0, bg_h - new_h)
new_box = (x, y, x + new_w, y + new_h)
# Check overlap with existing boxes
valid = True
for existing_box in existing_boxes:
overlap = self._compute_overlap(new_box, existing_box)
if overlap > self.overlap_threshold:
valid = False
break
if valid:
# Composite object onto background
background = background.copy()
mask_array = np.array(obj_mask) / 255.0
bg_array = np.array(background)
obj_array = np.array(obj_image)
# Apply mask
mask_3d = np.stack([mask_array] * 3, axis=-1)
bg_array[y:y+new_h, x:x+new_w] = (
bg_array[y:y+new_h, x:x+new_w] * (1 - mask_3d) +
obj_array * mask_3d
)
return Image.fromarray(bg_array), obj_mask, new_box
return None
def synthesize_scene(self) -> tuple[Image.Image, list[tuple[str, int, int, int, int]]]:
"""Synthesize a single scene with random objects.
Returns:
Tuple of (synthesized_image, list of (category, xmin, ymin, xmax, ymax))
"""
random.seed(self.seed)
np.random.seed(self.seed)
# Load background
background, _ = self.get_random_background()
# Determine number of objects
num_objects = random.randint(*self.num_objects_range)
# Place objects
placed_boxes: list[tuple[int, int, int, int]] = []
annotations: list[tuple[str, int, int, int, int]] = []
for _ in range(num_objects):
# Get random object
obj_image, obj_mask, obj_category = self.get_random_object()
# Get random scale
scale = random.uniform(*self.object_scale_range)
# Get random rotation
angle = random.uniform(*self.rotation_range)
obj_image, obj_mask = self._rotate_image_and_mask(obj_image, obj_mask, angle)
# Try to place object
result = self._place_object(background, obj_image, obj_mask, placed_boxes, scale)
if result is not None:
background, _, box = result
placed_boxes.append(box)
annotations.append((obj_category, box[0], box[1], box[2], box[3]))
return background, annotations
def generate(self) -> list[Path]:
"""Generate all synthesized scenes.
Returns:
List of paths to generated images
"""
self.output_dir.mkdir(parents=True, exist_ok=True)
generated_files: list[Path] = []
for i in range(self.num_scenes):
# Update seed for each scene
random.seed(self.seed + i)
np.random.seed(self.seed + i)
image, annotations = self.synthesize_scene()
# Save image
img_path = self.output_dir / f"synth_{i:04d}.jpg"
image.save(img_path, quality=95)
# Save annotation
anno_path = self.output_dir / f"synth_{i:04d}.txt"
with open(anno_path, "w") as f:
for category, xmin, ymin, xmax, ymax in annotations:
f.write(f"{category} {xmin} {ymin} {xmax} {ymax}\n")
generated_files.append(img_path)
return generated_files