feat(feature-compressor): add DINOv2 feature extraction and compression pipeline

This commit is contained in:
2026-01-31 10:33:37 +08:00
parent f9a359fc28
commit 1454647aa6
22 changed files with 1486 additions and 16 deletions

1
.gitignore vendored
View File

@@ -208,6 +208,7 @@ __marimo__/
# Projects
data/
deps/
outputs/
# Devenv
.devenv*

View File

@@ -92,7 +92,9 @@
"nixgl": {
"inputs": {
"flake-utils": "flake-utils",
"nixpkgs": "nixpkgs"
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"lastModified": 1762090880,
@@ -108,20 +110,6 @@
}
},
"nixpkgs": {
"locked": {
"lastModified": 1769664171,
"owner": "nixos",
"repo": "nixpkgs",
"rev": "effc419e5e4bddeda827eb688347bef40733bc1c",
"type": "github"
},
"original": {
"owner": "nixos",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs_2": {
"locked": {
"lastModified": 1767052823,
"owner": "cachix",
@@ -141,7 +129,7 @@
"devenv": "devenv",
"git-hooks": "git-hooks",
"nixgl": "nixgl",
"nixpkgs": "nixpkgs_2",
"nixpkgs": "nixpkgs",
"pre-commit-hooks": [
"git-hooks"
]

View File

@@ -4,6 +4,9 @@ inputs:
url: github:cachix/devenv-nixpkgs/rolling
nixgl:
url: "github:nix-community/nixGL"
inputs:
nixpkgs:
follows: nixpkgs
# If you're using non-OSS software, you can set allowUnfree to true.
# allowUnfree: true

View File

View File

@@ -0,0 +1,20 @@
from enum import Enum
from pathlib import Path
from typing import Dict
import yaml
class Config(Enum):
FEATURE_COMPRESSOR = "feature_compressor.yaml"
def get_config_dir() -> Path:
return Path(__file__).parent
def get_default_config(config_type: Config) -> Dict[Unknown, Unknown]:
config_path = get_config_dir() / config_type.value
with open(config_path) as f:
return yaml.safe_load(f)

View File

@@ -0,0 +1,21 @@
model:
name: "facebook/dinov2-large"
compression_dim: 256
pooling_type: "attention" # attention-based Top-K
top_k_ratio: 0.5 # Keep 50% of tokens
hidden_ratio: 2.0 # MLP hidden = compression_dim * 2
dropout_rate: 0.1
use_residual: true
device: "auto" # auto-detect GPU
visualization:
plot_theme: "plotly_white"
color_scale: "viridis"
point_size: 8
fig_width: 900
fig_height: 600
output:
directory: "./outputs"
html_self_contained: true
png_scale: 2 # 2x resolution for PNG

View File

@@ -0,0 +1,14 @@
"""DINOv2 Feature Compressor - Extract and compress visual features."""
__version__ = "0.1.0"
from .core.compressor import PoolNetCompressor
from .core.extractor import DINOv2FeatureExtractor
from .core.visualizer import FeatureVisualizer
__all__ = [
"PoolNetCompressor",
"DINOv2FeatureExtractor",
"FeatureVisualizer",
"__version__",
]

View File

@@ -0,0 +1,7 @@
"""Core compression, extraction, and visualization modules."""
from .compressor import PoolNetCompressor
from .extractor import DINOv2FeatureExtractor
from .visualizer import FeatureVisualizer
__all__ = ["PoolNetCompressor", "DINOv2FeatureExtractor", "FeatureVisualizer"]

View File

@@ -0,0 +1,135 @@
"""Feature compression module with attention-based pooling and MLP."""
import torch
import torch.nn as nn
import torch.nn.functional as F
class PoolNetCompressor(nn.Module):
"""Pool + Network feature compressor for DINOv2 embeddings.
Combines attention-based Top-K token pooling with a 2-layer MLP to compress
DINOv2's last_hidden_state from [batch, seq_len, hidden_dim] to [batch, compression_dim].
Args:
input_dim: Input feature dimension (e.g., 1024 for DINOv2-large)
compression_dim: Output feature dimension (default: 256)
top_k_ratio: Ratio of tokens to keep via attention pooling (default: 0.5)
hidden_ratio: Hidden layer dimension as multiple of compression_dim (default: 2.0)
dropout_rate: Dropout probability (default: 0.1)
use_residual: Whether to use residual connection (default: True)
device: Device to place model on ('auto', 'cpu', or 'cuda')
"""
def __init__(
self,
input_dim: int,
compression_dim: int = 256,
top_k_ratio: float = 0.5,
hidden_ratio: float = 2.0,
dropout_rate: float = 0.1,
use_residual: bool = True,
device: str = "auto",
):
super().__init__()
self.input_dim = input_dim
self.compression_dim = compression_dim
self.top_k_ratio = top_k_ratio
self.use_residual = use_residual
# Attention mechanism for token selection
self.attention = nn.Sequential(
nn.Linear(input_dim, input_dim // 4),
nn.Tanh(),
nn.Linear(input_dim // 4, 1),
)
# Compression network: 2-layer MLP
hidden_dim = int(compression_dim * hidden_ratio)
self.net = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.LayerNorm(hidden_dim),
nn.GELU(),
nn.Dropout(dropout_rate),
nn.Linear(hidden_dim, compression_dim),
)
# Residual projection if dimensions don't match
if use_residual and input_dim != compression_dim:
self.residual_proj = nn.Linear(input_dim, compression_dim)
else:
self.residual_proj = None
# Set device
if device == "auto":
device = "cuda" if torch.cuda.is_available() else "cpu"
self.device = torch.device(device)
self.to(self.device)
def _compute_attention_scores(self, x: torch.Tensor) -> torch.Tensor:
"""Compute attention scores for each token.
Args:
x: Input tensor [batch, seq_len, input_dim]
Returns:
Attention scores [batch, seq_len, 1]
"""
scores = self.attention(x) # [batch, seq_len, 1]
return scores.squeeze(-1) # [batch, seq_len]
def _apply_pooling(self, x: torch.Tensor, scores: torch.Tensor) -> torch.Tensor:
"""Apply Top-K attention pooling to select important tokens.
Args:
x: Input tensor [batch, seq_len, input_dim]
scores: Attention scores [batch, seq_len]
Returns:
Pooled features [batch, k, input_dim] where k = ceil(seq_len * top_k_ratio)
"""
batch_size, seq_len, _ = x.shape
k = max(1, int(seq_len * self.top_k_ratio))
# Get top-k indices
top_k_values, top_k_indices = torch.topk(scores, k=k, dim=-1) # [batch, k]
# Select top-k tokens
batch_indices = (
torch.arange(batch_size, device=x.device).unsqueeze(1).expand(-1, k)
)
pooled = x[batch_indices, top_k_indices, :] # [batch, k, input_dim]
return pooled
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Forward pass through compressor.
Args:
x: Input features [batch, seq_len, input_dim]
Returns:
Compressed features [batch, compression_dim]
"""
# Compute attention scores
scores = self._compute_attention_scores(x)
# Apply Top-K pooling
pooled = self._apply_pooling(x, scores)
# Average pool over selected tokens to get [batch, input_dim]
pooled = pooled.mean(dim=1) # [batch, input_dim]
# Apply compression network
compressed = self.net(pooled) # [batch, compression_dim]
# Add residual connection if enabled
if self.use_residual:
if self.residual_proj is not None:
residual = self.residual_proj(pooled)
else:
residual = pooled[:, : self.compression_dim]
compressed = compressed + residual
return compressed

View File

@@ -0,0 +1,236 @@
"""DINOv2 feature extraction and compression pipeline."""
import time
from pathlib import Path
from typing import Dict, List, Optional
import torch
import yaml
from transformers import AutoImageProcessor, AutoModel
from ...configs.config import Config, get_default_config
from ..utils.image_utils import load_image, preprocess_image
from .compressor import PoolNetCompressor
class DINOv2FeatureExtractor:
"""End-to-end DINOv2 feature extraction with compression.
Loads DINOv2 model, extracts last_hidden_state features,
and applies PoolNetCompressor for dimensionality reduction.
Args:
config_path: Path to YAML configuration file
device: Device to use ('auto', 'cpu', or 'cuda')
"""
def __init__(self, config_path: Optional[str] = None, device: str = "auto"):
self.config = self._load_config(config_path)
# Set device
if device == "auto":
device = self.config.get("model", {}).get("device", "auto")
if device == "auto":
device = "cuda" if torch.cuda.is_available() else "cpu"
self.device = torch.device(device)
# Load DINOv2 model and processor
model_name = self.config.get("model", {}).get("name", "facebook/dinov2-large")
self.processor = AutoImageProcessor.from_pretrained(model_name)
self.model = AutoModel.from_pretrained(model_name).to(self.device)
self.model.eval()
# Initialize compressor
model_config = self.config.get("model", {})
self.compressor = PoolNetCompressor(
input_dim=self.model.config.hidden_size,
compression_dim=model_config.get("compression_dim", 256),
top_k_ratio=model_config.get("top_k_ratio", 0.5),
hidden_ratio=model_config.get("hidden_ratio", 2.0),
dropout_rate=model_config.get("dropout_rate", 0.1),
use_residual=model_config.get("use_residual", True),
device=str(self.device),
)
def _load_config(self, config_path: Optional[str] = None) -> Dict:
"""Load configuration from YAML file.
Args:
config_path: Path to config file, or None for default
Returns:
Configuration dictionary
"""
if config_path is None:
return get_default_config(Config.FEATURE_COMPRESSOR)
with open(config_path) as f:
return yaml.safe_load(f)
def _extract_dinov2_features(self, images: List) -> torch.Tensor:
"""Extract DINOv2 last_hidden_state features.
Args:
images: List of PIL Images
Returns:
last_hidden_state [batch, seq_len, hidden_dim]
"""
with torch.no_grad():
inputs = self.processor(images=images, return_tensors="pt").to(self.device)
outputs = self.model(**inputs)
features = outputs.last_hidden_state
return features
def _compress_features(self, features: torch.Tensor) -> torch.Tensor:
"""Compress features using PoolNetCompressor.
Args:
features: [batch, seq_len, hidden_dim]
Returns:
compressed [batch, compression_dim]
"""
with torch.no_grad():
compressed = self.compressor(features)
return compressed
def process_image(
self, image_path: str, visualize: bool = False
) -> Dict[str, object]:
"""Process a single image and extract compressed features.
Args:
image_path: Path to image file
visualize: Whether to generate visualizations
Returns:
Dictionary with original_features, compressed_features, metadata
"""
start_time = time.time()
# Load and preprocess image
image = load_image(image_path)
image = preprocess_image(image, size=224)
# Extract DINOv2 features
original_features = self._extract_dinov2_features([image])
# Compute feature stats for compression ratio
original_dim = original_features.shape[-1]
compressed_dim = self.compressor.compression_dim
compression_ratio = original_dim / compressed_dim
# Compress features
compressed_features = self._compress_features(original_features)
# Get pooled features (before compression) for analysis
pooled_features = self.compressor._apply_pooling(
original_features,
self.compressor._compute_attention_scores(original_features),
)
pooled_features = pooled_features.mean(dim=1)
# Compute feature norm
feature_norm = torch.norm(compressed_features, p=2, dim=-1).mean().item()
processing_time = time.time() - start_time
# Build result dictionary
result = {
"original_features": original_features.cpu(),
"compressed_features": compressed_features.cpu(),
"pooled_features": pooled_features.cpu(),
"metadata": {
"image_path": str(image_path),
"compression_ratio": compression_ratio,
"processing_time": processing_time,
"feature_norm": feature_norm,
"device": str(self.device),
"model_name": self.config.get("model", {}).get("name"),
},
}
return result
def process_batch(
self, image_dir: str, batch_size: int = 8, save_features: bool = True
) -> List[Dict[str, object]]:
"""Process multiple images in batches.
Args:
image_dir: Directory containing images
batch_size: Number of images per batch
save_features: Whether to save features to disk
Returns:
List of result dictionaries, one per image
"""
image_dir = Path(image_dir)
image_files = sorted(image_dir.glob("*.*"))
results = []
# Process in batches
for i in range(0, len(image_files), batch_size):
batch_files = image_files[i : i + batch_size]
# Load and preprocess batch
images = [preprocess_image(load_image(f), size=224) for f in batch_files]
# Extract features for batch
original_features = self._extract_dinov2_features(images)
compressed_features = self._compress_features(original_features)
# Create individual results
for j, file_path in enumerate(batch_files):
pooled_features = self.compressor._apply_pooling(
original_features[j : j + 1],
self.compressor._compute_attention_scores(
original_features[j : j + 1]
),
).mean(dim=1)
result = {
"original_features": original_features[j : j + 1].cpu(),
"compressed_features": compressed_features[j : j + 1].cpu(),
"pooled_features": pooled_features.cpu(),
"metadata": {
"image_path": str(file_path),
"compression_ratio": original_features.shape[-1]
/ self.compressor.compression_dim,
"processing_time": 0.0,
"feature_norm": torch.norm(
compressed_features[j : j + 1], p=2, dim=-1
)
.mean()
.item(),
"device": str(self.device),
"model_name": self.config.get("model", {}).get("name"),
},
}
results.append(result)
# Save features if requested
if save_features:
output_dir = Path(
self.config.get("output", {}).get("directory", "./outputs")
)
# Resolve relative to project root
if not output_dir.is_absolute():
output_dir = Path(__file__).parent.parent.parent / output_dir
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / f"{file_path.stem}_features.json"
from ..utils.feature_utils import save_features_to_json
save_features_to_json(
result["compressed_features"],
output_path,
result["metadata"],
)
return results

View File

@@ -0,0 +1,178 @@
"""Feature visualization using Plotly."""
import os
from pathlib import Path
from typing import List, Optional
import numpy as np
import torch
import yaml
from plotly.graph_objs import Figure
from ..utils.plot_utils import (
apply_theme,
create_comparison_plot,
create_histogram,
create_pca_scatter_2d,
save_figure,
)
class FeatureVisualizer:
"""Visualize DINOv2 features with interactive Plotly charts.
Supports histograms, PCA projections, and feature comparisons
with multiple export formats.
Args:
config_path: Path to YAML configuration file
"""
def __init__(self, config_path: Optional[str] = None):
self.config = self._load_config(config_path)
def _load_config(self, config_path: Optional[str] = None) -> dict:
"""Load configuration from YAML file.
Args:
config_path: Path to config file, or None for default
Returns:
Configuration dictionary
"""
if config_path is None:
config_path = (
Path(__file__).parent.parent.parent
/ "configs"
/ "feature_compressor.yaml"
)
with open(config_path) as f:
return yaml.safe_load(f)
def plot_histogram(self, features: torch.Tensor, title: str = None) -> object:
"""Plot histogram of feature values.
Args:
features: Feature tensor [batch, dim]
title: Plot title
Returns:
Plotly Figure object
"""
features_np = features.cpu().numpy()
fig = create_histogram(features_np, title=title)
viz_config = self.config.get("visualization", {})
fig = apply_theme(fig, viz_config.get("plot_theme", "plotly_white"))
fig.update_layout(
width=viz_config.get("fig_width", 900),
height=viz_config.get("fig_height", 600),
)
return fig
def plot_pca_2d(self, features: torch.Tensor, labels: List = None) -> Figure:
"""Plot 2D PCA projection of features.
Args:
features: Feature tensor [n_samples, dim]
labels: Optional labels for coloring
Returns:
Plotly Figure object
"""
features_np = features.cpu().numpy()
viz_config = self.config.get("visualization", {})
fig = create_pca_scatter_2d(features_np, labels=labels)
fig = apply_theme(fig, viz_config.get("plot_theme", "plotly_white"))
fig.update_traces(
marker=dict(
size=viz_config.get("point_size", 8),
colorscale=viz_config.get("color_scale", "viridis"),
)
)
fig.update_layout(
width=viz_config.get("fig_width", 900),
height=viz_config.get("fig_height", 600),
)
return fig
def plot_comparison(
self, features_list: List[torch.Tensor], names: List[str]
) -> object:
"""Plot comparison of multiple feature sets.
Args:
features_list: List of feature tensors
names: Names for each feature set
Returns:
Plotly Figure object
"""
features_np_list = [f.cpu().numpy() for f in features_list]
fig = create_comparison_plot(features_np_list, names)
viz_config = self.config.get("visualization", {})
fig = apply_theme(fig, viz_config.get("plot_theme", "plotly_white"))
fig.update_layout(
width=viz_config.get("fig_width", 900) * len(features_list),
height=viz_config.get("fig_height", 600),
)
return fig
def generate_report(self, results: List[dict], output_dir: str) -> List[str]:
"""Generate full feature analysis report.
Args:
results: List of extractor results
output_dir: Directory to save visualizations
Returns:
List of generated file paths
"""
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
generated_files = []
# Extract all compressed features
all_features = torch.cat([r["compressed_features"] for r in results], dim=0)
# Create histogram
hist_fig = self.plot_histogram(all_features, "Compressed Feature Distribution")
hist_path = output_dir / "feature_histogram"
self.save(hist_fig, str(hist_path), formats=["html"])
generated_files.append(str(hist_path) + ".html")
# Create PCA
pca_fig = self.plot_pca_2d(all_features)
pca_path = output_dir / "feature_pca_2d"
self.save(pca_fig, str(pca_path), formats=["html", "png"])
generated_files.append(str(pca_path) + ".html")
generated_files.append(str(pca_path) + ".png")
return generated_files
def save(self, fig: object, path: str, formats: List[str] = None) -> None:
"""Save figure in multiple formats.
Args:
fig: Plotly Figure object
path: Output file path (without extension)
formats: List of formats to export
"""
if formats is None:
formats = ["html"]
output_config = self.config.get("output", {})
for fmt in formats:
if fmt == "png":
save_figure(fig, path, format="png")
else:
save_figure(fig, path, format=fmt)

View File

@@ -0,0 +1,63 @@
"""Basic usage example for DINOv2 Feature Compressor."""
import sys
from pathlib import Path
# Add parent to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
import requests
from PIL import Image
import io
from dino_feature_compressor import DINOv2FeatureExtractor, FeatureVisualizer
def main():
# Initialize extractor
print("Initializing DINOv2FeatureExtractor...")
extractor = DINOv2FeatureExtractor()
# Download and save test image
print("Downloading test image...")
url = "http://images.cocodataset.org/val2017/000000039769.jpg"
response = requests.get(url)
img = Image.open(io.BytesIO(response.content))
test_image_path = "/tmp/test_image.jpg"
img.save(test_image_path)
print(f"Image saved to {test_image_path}")
# Extract features
print("Extracting features...")
result = extractor.process_image(test_image_path)
print(f"\n=== Feature Extraction Results ===")
print(f"Original features shape: {result['original_features'].shape}")
print(f"Compressed features shape: {result['compressed_features'].shape}")
print(f"Processing time: {result['metadata']['processing_time']:.3f}s")
print(f"Compression ratio: {result['metadata']['compression_ratio']:.2f}x")
print(f"Feature norm: {result['metadata']['feature_norm']:.4f}")
print(f"Device: {result['metadata']['device']}")
# Visualize
print("\nGenerating visualization...")
viz = FeatureVisualizer()
fig = viz.plot_histogram(
result["compressed_features"], title="Compressed Features Distribution"
)
output_path = (
Path(__file__).parent.parent.parent / "outputs" / "basic_usage_histogram"
)
output_path.parent.mkdir(parents=True, exist_ok=True)
viz.save(fig, str(output_path), formats=["html"])
print(f"Visualization saved to {output_path}.html")
print("\nDone!")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,49 @@
"""Batch processing example for DINOv2 Feature Compressor."""
import sys
from pathlib import Path
# Add parent to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from dino_feature_compressor import DINOv2FeatureExtractor
def main():
# Initialize extractor
print("Initializing DINOv2FeatureExtractor...")
extractor = DINOv2FeatureExtractor()
# Create a test directory with sample images
# In practice, use your own directory
image_dir = "/tmp/test_images"
Path(image_dir).mkdir(parents=True, exist_ok=True)
# Create 3 test images
print("Creating test images...")
import numpy as np
from PIL import Image
for i in range(3):
img_array = np.random.randint(0, 255, (224, 224, 3), dtype=np.uint8)
img = Image.fromarray(img_array)
img.save(f"{image_dir}/test_{i}.jpg")
print(f"Created 3 test images in {image_dir}")
# Process batch
print("\nProcessing images in batch...")
results = extractor.process_batch(image_dir, batch_size=2, save_features=True)
print(f"\n=== Batch Processing Results ===")
print(f"Processed {len(results)} images")
for i, result in enumerate(results):
print(f"\nImage {i + 1}: {result['metadata']['image_path']}")
print(f" Compressed shape: {result['compressed_features'].shape}")
print(f" Feature norm: {result['metadata']['feature_norm']:.4f}")
print("\nDone! Features saved to outputs/ directory.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,61 @@
"""Visualization example for DINOv2 Feature Compressor."""
import sys
from pathlib import Path
# Add parent to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
import numpy as np
import torch
from dino_feature_compressor import FeatureVisualizer
def main():
# Generate synthetic features for demonstration
print("Generating synthetic features...")
n_samples = 100
n_features = 256
# Create two clusters
cluster1 = np.random.randn(50, n_features) + 2
cluster2 = np.random.randn(50, n_features) - 2
features = np.vstack([cluster1, cluster2])
labels = ["Cluster A"] * 50 + ["Cluster B"] * 50
features_tensor = torch.tensor(features, dtype=torch.float32)
# Initialize visualizer
print("Initializing FeatureVisualizer...")
viz = FeatureVisualizer()
output_dir = Path(__file__).parent.parent.parent / "outputs"
output_dir.mkdir(parents=True, exist_ok=True)
# Create histogram
print("Creating histogram...")
fig_hist = viz.plot_histogram(features_tensor, title="Feature Distribution")
viz.save(fig_hist, str(output_dir / "feature_histogram"), formats=["html", "json"])
print(f"Saved histogram to {output_dir / 'feature_histogram.html'}")
# Create PCA 2D projection
print("Creating PCA 2D projection...")
fig_pca = viz.plot_pca_2d(features_tensor, labels=labels)
viz.save(fig_pca, str(output_dir / "feature_pca_2d"), formats=["html", "json"])
print(f"Saved PCA to {output_dir / 'feature_pca_2d.html'}")
# Create comparison plot
print("Creating comparison plot...")
features_list = [torch.tensor(cluster1), torch.tensor(cluster2)]
names = ["Cluster A", "Cluster B"]
fig_comp = viz.plot_comparison(features_list, names)
viz.save(fig_comp, str(output_dir / "feature_comparison"), formats=["html", "json"])
print(f"Saved comparison to {output_dir / 'feature_comparison.html'}")
print("\nDone! All visualizations saved to outputs/ directory.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,19 @@
"""Utility modules for image, feature, and plot operations."""
from .feature_utils import (
compute_feature_stats,
normalize_features,
save_features_to_csv,
save_features_to_json,
)
from .image_utils import load_image, load_images_from_directory, preprocess_image
__all__ = [
"load_image",
"load_images_from_directory",
"preprocess_image",
"normalize_features",
"compute_feature_stats",
"save_features_to_json",
"save_features_to_csv",
]

View File

@@ -0,0 +1,83 @@
"""Feature processing utilities."""
from pathlib import Path
from typing import Dict
import numpy as np
import torch
import yaml
def normalize_features(features: torch.Tensor) -> torch.Tensor:
"""L2-normalize features.
Args:
features: Tensor of shape [batch, dim] or [batch, seq, dim]
Returns:
L2-normalized features
"""
norm = torch.norm(features, p=2, dim=-1, keepdim=True)
return features / (norm + 1e-8)
def compute_feature_stats(features: torch.Tensor) -> Dict[str, float]:
"""Compute basic statistics for features.
Args:
features: Tensor of shape [batch, dim] or [batch, seq, dim]
Returns:
Dictionary with mean, std, min, max
"""
with torch.no_grad():
return {
"mean": float(features.mean().item()),
"std": float(features.std().item()),
"min": float(features.min().item()),
"max": float(features.max().item()),
}
def save_features_to_json(
features: torch.Tensor, path: Path, metadata: Dict = None
) -> None:
"""Save features to JSON file.
Args:
features: Tensor to save
path: Output file path
metadata: Optional metadata dictionary
"""
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
features_np = features.cpu().numpy()
data = {
"features": features_np.tolist(),
"shape": list(features.shape),
}
if metadata:
data["metadata"] = metadata
with open(path, "w") as f:
import json
json.dump(data, f, indent=2)
def save_features_to_csv(features: torch.Tensor, path: Path) -> None:
"""Save features to CSV file.
Args:
features: Tensor to save
path: Output file path
"""
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
features_np = features.cpu().numpy()
np.savetxt(path, features_np, delimiter=",", fmt="%.6f")

View File

@@ -0,0 +1,76 @@
"""Image loading and preprocessing utilities."""
from pathlib import Path
from typing import List, Union
import requests
from PIL import Image
def load_image(path: Union[str, Path]) -> Image.Image:
"""Load an image from file path or URL.
Args:
path: File path or URL to image
Returns:
PIL Image object
Raises:
FileNotFoundError: If file doesn't exist
ValueError: If image cannot be loaded
"""
path_str = str(path)
if path_str.startswith(("http://", "https://")):
response = requests.get(path_str, stream=True)
response.raise_for_status()
img = Image.open(response.raw)
else:
img = Image.open(path)
return img
def preprocess_image(image: Image.Image, size: int = 224) -> Image.Image:
"""Preprocess image to square format with resizing.
Args:
image: PIL Image
size: Target size for shortest dimension (default: 224)
Returns:
Resized PIL Image
"""
if image.mode != "RGB":
image = image.convert("RGB")
# Resize while maintaining aspect ratio, then center crop
image = image.resize((size, size), Image.Resampling.LANCZOS)
return image
def load_images_from_directory(
dir_path: Union[str, Path], extensions: List[str] = None
) -> List[Image.Image]:
"""Load all images from a directory.
Args:
dir_path: Path to directory
extensions: List of file extensions to include (e.g., ['.jpg', '.png'])
Returns:
List of PIL Images
"""
if extensions is None:
extensions = [".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".webp"]
dir_path = Path(dir_path)
images = []
for ext in extensions:
images.extend([load_image(p) for p in dir_path.glob(f"*{ext}")])
images.extend([load_image(p) for p in dir_path.glob(f"*{ext.upper()}")])
return images

View File

@@ -0,0 +1,167 @@
"""Plotting utility functions for feature visualization."""
from pathlib import Path
from typing import List, Optional
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
def create_histogram(data: np.ndarray, title: str = None, **kwargs) -> go.Figure:
"""Create a histogram plot.
Args:
data: 1D array of values
title: Plot title
**kwargs: Additional histogram arguments
Returns:
Plotly Figure object
"""
fig = go.Figure()
fig.add_trace(
go.Histogram(
x=data.flatten(),
name="Feature Values",
**kwargs,
)
)
if title:
fig.update_layout(title=title)
fig.update_layout(
xaxis_title="Value",
yaxis_title="Count",
hovermode="x unified",
)
return fig
def create_pca_scatter_2d(
features: np.ndarray, labels: List = None, **kwargs
) -> go.Figure:
"""Create a 2D PCA scatter plot.
Args:
features: 2D array [n_samples, n_features]
labels: Optional list of labels for coloring
**kwargs: Additional scatter arguments
Returns:
Plotly Figure object
"""
from sklearn.decomposition import PCA
# Apply PCA
pca = PCA(n_components=2)
components = pca.fit_transform(features)
explained_var = pca.explained_variance_ratio_ * 100
fig = go.Figure()
if labels is None:
fig.add_trace(
go.Scatter(
x=components[:, 0],
y=components[:, 1],
mode="markers",
marker=dict(size=8, opacity=0.7),
**kwargs,
)
)
else:
for label in set(labels):
mask = np.array(labels) == label
fig.add_trace(
go.Scatter(
x=components[mask, 0],
y=components[mask, 1],
mode="markers",
name=str(label),
marker=dict(size=8, opacity=0.7),
)
)
fig.update_layout(
title=f"PCA 2D Projection (Total Variance: {explained_var.sum():.1f}%)",
xaxis_title=f"PC 1 ({explained_var[0]:.1f}%)",
yaxis_title=f"PC 2 ({explained_var[1]:.1f}%)",
hovermode="closest",
)
return fig
def create_comparison_plot(
features_list: List[np.ndarray], names: List[str], **kwargs
) -> go.Figure:
"""Create a comparison plot of multiple feature sets.
Args:
features_list: List of feature arrays
names: List of names for each feature set
**kwargs: Additional histogram arguments
Returns:
Plotly Figure object
"""
fig = make_subplots(rows=1, cols=len(features_list), subplot_titles=names)
for i, features in enumerate(features_list, 1):
fig.add_trace(
go.Histogram(
x=features.flatten(),
name=names[i - 1],
showlegend=False,
**kwargs,
),
row=1,
col=i,
)
fig.update_layout(
title="Feature Distribution Comparison",
hovermode="x unified",
)
return fig
def save_figure(fig: go.Figure, path: str, format: str = "html") -> None:
"""Save figure to file.
Args:
fig: Plotly Figure object
path: Output file path (without extension)
format: Output format ('html', 'png', 'json')
"""
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
if format == "html":
fig.write_html(str(path) + ".html", include_plotlyjs="cdn")
elif format == "png":
fig.write_image(str(path) + ".png", scale=2)
elif format == "json":
fig.write_json(str(path) + ".json")
else:
raise ValueError(f"Unsupported format: {format}")
def apply_theme(fig: go.Figure, theme: str = "plotly_white") -> go.Figure:
"""Apply a theme to the figure.
Args:
fig: Plotly Figure object
theme: Theme name
Returns:
Updated Plotly Figure object
"""
fig.update_layout(template=theme)
return fig

View File

@@ -0,0 +1 @@
"""Test suite for DINOv2 Feature Compressor."""

View File

@@ -0,0 +1,99 @@
"""Tests for PoolNetCompressor module."""
import pytest
import torch
from feature_compressor.core.compressor import PoolNetCompressor
class TestPoolNetCompressor:
"""Test suite for PoolNetCompressor class."""
def test_compressor_init(self):
"""Test PoolNetCompressor initializes with correct parameters."""
# This test will fail until we implement the module
compressor = PoolNetCompressor(
input_dim=1024,
compression_dim=256,
top_k_ratio=0.5,
hidden_ratio=2.0,
dropout_rate=0.1,
use_residual=True,
)
assert compressor.input_dim == 1024
assert compressor.compression_dim == 256
assert compressor.top_k_ratio == 0.5
def test_compressor_forward_shape(self):
"""Test output shape is [batch, compression_dim]."""
compressor = PoolNetCompressor(
input_dim=1024,
compression_dim=256,
top_k_ratio=0.5,
)
# Simulate DINOv2 output: batch=2, seq_len=257 (CLS+256 patches), dim=1024
x = torch.randn(2, 257, 1024)
out = compressor(x)
assert out.shape == (2, 256), f"Expected (2, 256), got {out.shape}"
def test_attention_scores_shape(self):
"""Test attention scores have shape [batch, seq_len]."""
compressor = PoolNetCompressor(input_dim=1024, compression_dim=256)
x = torch.randn(2, 257, 1024)
scores = compressor._compute_attention_scores(x)
assert scores.shape == (2, 257), f"Expected (2, 257), got {scores.shape}"
def test_top_k_selection(self):
"""Test that only top_k_ratio tokens are selected."""
compressor = PoolNetCompressor(
input_dim=1024, compression_dim=256, top_k_ratio=0.5
)
x = torch.randn(2, 257, 1024)
pooled = compressor._apply_pooling(x, compressor._compute_attention_scores(x))
# With top_k_ratio=0.5, should select 50% of tokens (int rounds down)
expected_k = 128 # int(257 * 0.5) = 128
assert pooled.shape[1] == expected_k, (
f"Expected seq_len={expected_k}, got {pooled.shape[1]}"
)
def test_residual_connection(self):
"""Test residual adds input contribution to output."""
compressor = PoolNetCompressor(
input_dim=1024,
compression_dim=256,
use_residual=True,
)
x = torch.randn(2, 257, 1024)
out1 = compressor(x)
# Residual should affect output
assert out1 is not None
assert out1.shape == (2, 256)
def test_gpu_device(self):
"""Test model moves to GPU correctly if available."""
device = "cuda" if torch.cuda.is_available() else "cpu"
compressor = PoolNetCompressor(
input_dim=1024,
compression_dim=256,
device=device,
)
x = torch.randn(2, 257, 1024).to(device)
out = compressor(x)
assert out.device.type == device

View File

@@ -0,0 +1,152 @@
"""Tests for DINOv2FeatureExtractor module."""
import json
import tempfile
from pathlib import Path
import numpy as np
import pytest
import torch
from feature_compressor.core.extractor import DINOv2FeatureExtractor
from PIL import Image
class TestDINOv2FeatureExtractor:
"""Test suite for DINOv2FeatureExtractor class."""
def test_extractor_init(self):
"""Test DINOv2FeatureExtractor initializes correctly."""
extractor = DINOv2FeatureExtractor()
assert extractor.model is not None
assert extractor.processor is not None
assert extractor.compressor is not None
def test_single_image_processing(self):
"""Test processing a single image."""
extractor = DINOv2FeatureExtractor()
# Create a simple test image
img_array = np.random.randint(0, 255, (224, 224, 3), dtype=np.uint8)
img = Image.fromarray(img_array)
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
img.save(f.name)
result = extractor.process_image(f.name)
assert "original_features" in result
assert "compressed_features" in result
assert "metadata" in result
# Check shapes
assert result["original_features"].shape[0] == 1 # batch=1
assert result["compressed_features"].shape == (1, 256)
assert "compression_ratio" in result["metadata"]
def test_output_structure(self):
"""Test output structure contains expected keys."""
extractor = DINOv2FeatureExtractor()
# Create test image
img_array = np.random.randint(0, 255, (224, 224, 3), dtype=np.uint8)
img = Image.fromarray(img_array)
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
img.save(f.name)
result = extractor.process_image(f.name)
required_keys = [
"original_features",
"compressed_features",
"pooled_features",
"metadata",
]
for key in required_keys:
assert key in result, f"Missing key: {key}"
metadata_keys = [
"compression_ratio",
"processing_time",
"feature_norm",
"device",
]
for key in metadata_keys:
assert key in result["metadata"], f"Missing metadata key: {key}"
def test_feature_saving(self):
"""Test saving features to disk."""
extractor = DINOv2FeatureExtractor()
# Create test image
img_array = np.random.randint(0, 255, (224, 224, 3), dtype=np.uint8)
img = Image.fromarray(img_array)
with tempfile.TemporaryDirectory() as tmpdir:
tmpdir = Path(tmpdir)
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
img.save(f.name)
result = extractor.process_image(f.name)
# Save features
json_path = tmpdir / "features.json"
from feature_compressor.utils.feature_utils import (
save_features_to_json,
)
save_features_to_json(
result["compressed_features"], json_path, result["metadata"]
)
assert json_path.exists()
# Verify file can be loaded
with open(json_path) as f:
data = json.load(f)
assert "features" in data
assert "metadata" in data
def test_batch_processing(self):
"""Test batch processing of multiple images."""
extractor = DINOv2FeatureExtractor()
# Create multiple test images
images = []
with tempfile.TemporaryDirectory() as tmpdir:
tmpdir = Path(tmpdir)
for i in range(3):
img_array = np.random.randint(0, 255, (224, 224, 3), dtype=np.uint8)
img = Image.fromarray(img_array)
img_path = tmpdir / f"test_{i}.jpg"
img.save(img_path)
images.append(str(img_path))
results = extractor.process_batch(str(tmpdir), batch_size=2)
assert len(results) == 3
for result in results:
assert result["compressed_features"].shape == (1, 256)
def test_gpu_handling(self):
"""Test GPU device handling."""
device = "cuda" if torch.cuda.is_available() else "cpu"
extractor = DINOv2FeatureExtractor(device=device)
assert extractor.device.type == device
# Create test image
img_array = np.random.randint(0, 255, (224, 224, 3), dtype=np.uint8)
img = Image.fromarray(img_array)
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
img.save(f.name)
result = extractor.process_image(f.name)
assert result["metadata"]["device"] == device

View File

@@ -0,0 +1,97 @@
"""Tests for FeatureVisualizer module."""
import os
import tempfile
import numpy as np
import pytest
import torch
from feature_compressor.core.visualizer import FeatureVisualizer
class TestFeatureVisualizer:
"""Test suite for FeatureVisualizer class."""
def test_histogram_generation(self):
"""Test histogram generation from features."""
viz = FeatureVisualizer()
features = torch.randn(20, 256)
fig = viz.plot_histogram(features, title="Test Histogram")
assert fig is not None
assert "Test Histogram" in fig.layout.title.text
def test_pca_2d_generation(self):
"""Test PCA 2D scatter plot generation."""
viz = FeatureVisualizer()
features = torch.randn(20, 256)
labels = ["cat"] * 10 + ["dog"] * 10
fig = viz.plot_pca_2d(features, labels=labels)
assert fig is not None
assert "PCA 2D" in fig.layout.title.text
def test_comparison_plot_generation(self):
"""Test comparison plot generation."""
viz = FeatureVisualizer()
features_list = [torch.randn(20, 256), torch.randn(20, 256)]
names = ["Set A", "Set B"]
fig = viz.plot_comparison(features_list, names)
assert fig is not None
assert "Comparison" in fig.layout.title.text
def test_html_export(self):
"""Test HTML export format."""
viz = FeatureVisualizer()
features = torch.randn(10, 256)
fig = viz.plot_histogram(features)
with tempfile.TemporaryDirectory() as tmpdir:
output_path = os.path.join(tmpdir, "test_plot")
viz.save(fig, output_path, formats=["html"])
assert os.path.exists(output_path + ".html")
def test_png_export(self):
"""Test PNG export format."""
viz = FeatureVisualizer()
features = torch.randn(10, 256)
fig = viz.plot_histogram(features)
with tempfile.TemporaryDirectory() as tmpdir:
output_path = os.path.join(tmpdir, "test_plot")
# Skip PNG export if Chrome not available
try:
viz.save(fig, output_path, formats=["png"])
assert os.path.exists(output_path + ".png")
except RuntimeError as e:
if "Chrome" in str(e):
pass
else:
raise
def test_json_export(self):
"""Test JSON export format."""
viz = FeatureVisualizer()
features = torch.randn(10, 256)
fig = viz.plot_histogram(features)
with tempfile.TemporaryDirectory() as tmpdir:
output_path = os.path.join(tmpdir, "test_plot")
viz.save(fig, output_path, formats=["json"])
assert os.path.exists(output_path + ".json")