mirror of
https://github.com/SikongJueluo/Mini-Nav.git
synced 2026-03-12 12:25:32 +08:00
refactor(project): remove feature compressor module and update docs
This commit is contained in:
@@ -1,14 +0,0 @@
|
||||
"""DINOv2 Feature Compressor - Extract and compress visual features."""
|
||||
|
||||
__version__ = "0.1.0"
|
||||
|
||||
from .core.compressor import PoolNetCompressor
|
||||
from .core.extractor import DINOv2FeatureExtractor
|
||||
from .core.visualizer import FeatureVisualizer
|
||||
|
||||
__all__ = [
|
||||
"PoolNetCompressor",
|
||||
"DINOv2FeatureExtractor",
|
||||
"FeatureVisualizer",
|
||||
"__version__",
|
||||
]
|
||||
@@ -1,7 +0,0 @@
|
||||
"""Core compression, extraction, and visualization modules."""
|
||||
|
||||
from .compressor import PoolNetCompressor
|
||||
from .extractor import DINOv2FeatureExtractor
|
||||
from .visualizer import FeatureVisualizer
|
||||
|
||||
__all__ = ["PoolNetCompressor", "DINOv2FeatureExtractor", "FeatureVisualizer"]
|
||||
@@ -1,135 +0,0 @@
|
||||
"""Feature compression module with attention-based pooling and MLP."""
|
||||
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
import torch.nn.functional as F
|
||||
|
||||
|
||||
class PoolNetCompressor(nn.Module):
|
||||
"""Pool + Network feature compressor for DINOv2 embeddings.
|
||||
|
||||
Combines attention-based Top-K token pooling with a 2-layer MLP to compress
|
||||
DINOv2's last_hidden_state from [batch, seq_len, hidden_dim] to [batch, compression_dim].
|
||||
|
||||
Args:
|
||||
input_dim: Input feature dimension (e.g., 1024 for DINOv2-large)
|
||||
compression_dim: Output feature dimension (default: 256)
|
||||
top_k_ratio: Ratio of tokens to keep via attention pooling (default: 0.5)
|
||||
hidden_ratio: Hidden layer dimension as multiple of compression_dim (default: 2.0)
|
||||
dropout_rate: Dropout probability (default: 0.1)
|
||||
use_residual: Whether to use residual connection (default: True)
|
||||
device: Device to place model on ('auto', 'cpu', or 'cuda')
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
input_dim: int,
|
||||
compression_dim: int = 256,
|
||||
top_k_ratio: float = 0.5,
|
||||
hidden_ratio: float = 2.0,
|
||||
dropout_rate: float = 0.1,
|
||||
use_residual: bool = True,
|
||||
device: str = "auto",
|
||||
):
|
||||
super().__init__()
|
||||
|
||||
self.input_dim = input_dim
|
||||
self.compression_dim = compression_dim
|
||||
self.top_k_ratio = top_k_ratio
|
||||
self.use_residual = use_residual
|
||||
|
||||
# Attention mechanism for token selection
|
||||
self.attention = nn.Sequential(
|
||||
nn.Linear(input_dim, input_dim // 4),
|
||||
nn.Tanh(),
|
||||
nn.Linear(input_dim // 4, 1),
|
||||
)
|
||||
|
||||
# Compression network: 2-layer MLP
|
||||
hidden_dim = int(compression_dim * hidden_ratio)
|
||||
self.net = nn.Sequential(
|
||||
nn.Linear(input_dim, hidden_dim),
|
||||
nn.LayerNorm(hidden_dim),
|
||||
nn.GELU(),
|
||||
nn.Dropout(dropout_rate),
|
||||
nn.Linear(hidden_dim, compression_dim),
|
||||
)
|
||||
|
||||
# Residual projection if dimensions don't match
|
||||
if use_residual and input_dim != compression_dim:
|
||||
self.residual_proj = nn.Linear(input_dim, compression_dim)
|
||||
else:
|
||||
self.residual_proj = None
|
||||
|
||||
# Set device
|
||||
if device == "auto":
|
||||
device = "cuda" if torch.cuda.is_available() else "cpu"
|
||||
self.device = torch.device(device)
|
||||
self.to(self.device)
|
||||
|
||||
def _compute_attention_scores(self, x: torch.Tensor) -> torch.Tensor:
|
||||
"""Compute attention scores for each token.
|
||||
|
||||
Args:
|
||||
x: Input tensor [batch, seq_len, input_dim]
|
||||
|
||||
Returns:
|
||||
Attention scores [batch, seq_len, 1]
|
||||
"""
|
||||
scores = self.attention(x) # [batch, seq_len, 1]
|
||||
return scores.squeeze(-1) # [batch, seq_len]
|
||||
|
||||
def _apply_pooling(self, x: torch.Tensor, scores: torch.Tensor) -> torch.Tensor:
|
||||
"""Apply Top-K attention pooling to select important tokens.
|
||||
|
||||
Args:
|
||||
x: Input tensor [batch, seq_len, input_dim]
|
||||
scores: Attention scores [batch, seq_len]
|
||||
|
||||
Returns:
|
||||
Pooled features [batch, k, input_dim] where k = ceil(seq_len * top_k_ratio)
|
||||
"""
|
||||
batch_size, seq_len, _ = x.shape
|
||||
k = max(1, int(seq_len * self.top_k_ratio))
|
||||
|
||||
# Get top-k indices
|
||||
top_k_values, top_k_indices = torch.topk(scores, k=k, dim=-1) # [batch, k]
|
||||
|
||||
# Select top-k tokens
|
||||
batch_indices = (
|
||||
torch.arange(batch_size, device=x.device).unsqueeze(1).expand(-1, k)
|
||||
)
|
||||
pooled = x[batch_indices, top_k_indices, :] # [batch, k, input_dim]
|
||||
|
||||
return pooled
|
||||
|
||||
def forward(self, x: torch.Tensor) -> torch.Tensor:
|
||||
"""Forward pass through compressor.
|
||||
|
||||
Args:
|
||||
x: Input features [batch, seq_len, input_dim]
|
||||
|
||||
Returns:
|
||||
Compressed features [batch, compression_dim]
|
||||
"""
|
||||
# Compute attention scores
|
||||
scores = self._compute_attention_scores(x)
|
||||
|
||||
# Apply Top-K pooling
|
||||
pooled = self._apply_pooling(x, scores)
|
||||
|
||||
# Average pool over selected tokens to get [batch, input_dim]
|
||||
pooled = pooled.mean(dim=1) # [batch, input_dim]
|
||||
|
||||
# Apply compression network
|
||||
compressed = self.net(pooled) # [batch, compression_dim]
|
||||
|
||||
# Add residual connection if enabled
|
||||
if self.use_residual:
|
||||
if self.residual_proj is not None:
|
||||
residual = self.residual_proj(pooled)
|
||||
else:
|
||||
residual = pooled[:, : self.compression_dim]
|
||||
compressed = compressed + residual
|
||||
|
||||
return compressed
|
||||
@@ -1,233 +0,0 @@
|
||||
"""DINOv2 feature extraction and compression pipeline."""
|
||||
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Union
|
||||
|
||||
import torch
|
||||
from configs import FeatureCompressorConfig, cfg_manager, load_yaml
|
||||
from transformers import AutoImageProcessor, AutoModel
|
||||
|
||||
from ..utils.image_utils import load_image, preprocess_image
|
||||
from .compressor import PoolNetCompressor
|
||||
|
||||
|
||||
class DINOv2FeatureExtractor:
|
||||
"""End-to-end DINOv2 feature extraction with compression.
|
||||
|
||||
Loads DINOv2 model, extracts last_hidden_state features,
|
||||
and applies PoolNetCompressor for dimensionality reduction.
|
||||
|
||||
Args:
|
||||
config_path: Path to YAML configuration file
|
||||
device: Device to use ('auto', 'cpu', or 'cuda')
|
||||
"""
|
||||
|
||||
def __init__(self, config_path: Optional[str] = None, device: str = "auto"):
|
||||
self.config: FeatureCompressorConfig = self._load_config(config_path)
|
||||
|
||||
# Set device
|
||||
if device == "auto":
|
||||
device = self.config.model.device
|
||||
if device == "auto":
|
||||
device = "cuda" if torch.cuda.is_available() else "cpu"
|
||||
self.device = torch.device(device)
|
||||
|
||||
# Load DINOv2 model and processor
|
||||
model_name = self.config.model.name
|
||||
self.processor = AutoImageProcessor.from_pretrained(model_name)
|
||||
self.model = AutoModel.from_pretrained(model_name).to(self.device)
|
||||
self.model.eval()
|
||||
|
||||
# Initialize compressor
|
||||
self.compressor = PoolNetCompressor(
|
||||
input_dim=self.model.config.hidden_size,
|
||||
compression_dim=self.config.model.compression_dim,
|
||||
top_k_ratio=self.config.model.top_k_ratio,
|
||||
hidden_ratio=self.config.model.hidden_ratio,
|
||||
dropout_rate=self.config.model.dropout_rate,
|
||||
use_residual=self.config.model.use_residual,
|
||||
device=str(self.device),
|
||||
)
|
||||
|
||||
def _load_config(
|
||||
self, config_path: Optional[str] = None
|
||||
) -> FeatureCompressorConfig:
|
||||
"""Load configuration from YAML file.
|
||||
|
||||
Args:
|
||||
config_path: Path to config file, or None for default
|
||||
|
||||
Returns:
|
||||
FeatureCompressorConfig instance
|
||||
"""
|
||||
if config_path is None:
|
||||
return cfg_manager.get()
|
||||
else:
|
||||
return load_yaml(Path(config_path), FeatureCompressorConfig)
|
||||
|
||||
def _extract_dinov2_features(self, images: List) -> torch.Tensor:
|
||||
"""Extract DINOv2 last_hidden_state features.
|
||||
|
||||
Args:
|
||||
images: List of PIL Images
|
||||
|
||||
Returns:
|
||||
last_hidden_state [batch, seq_len, hidden_dim]
|
||||
"""
|
||||
with torch.no_grad():
|
||||
inputs = self.processor(images=images, return_tensors="pt").to(self.device)
|
||||
outputs = self.model(**inputs)
|
||||
features = outputs.last_hidden_state
|
||||
|
||||
return features
|
||||
|
||||
def _compress_features(self, features: torch.Tensor) -> torch.Tensor:
|
||||
"""Compress features using PoolNetCompressor.
|
||||
|
||||
Args:
|
||||
features: [batch, seq_len, hidden_dim]
|
||||
|
||||
Returns:
|
||||
compressed [batch, compression_dim]
|
||||
"""
|
||||
with torch.no_grad():
|
||||
compressed = self.compressor(features)
|
||||
|
||||
return compressed
|
||||
|
||||
def process_image(
|
||||
self, image_path: str, visualize: bool = False
|
||||
) -> Dict[str, object]:
|
||||
"""Process a single image and extract compressed features.
|
||||
|
||||
Args:
|
||||
image_path: Path to image file
|
||||
visualize: Whether to generate visualizations
|
||||
|
||||
Returns:
|
||||
Dictionary with original_features, compressed_features, metadata
|
||||
"""
|
||||
start_time = time.time()
|
||||
|
||||
# Load and preprocess image
|
||||
image = load_image(image_path)
|
||||
image = preprocess_image(image, size=224)
|
||||
|
||||
# Extract DINOv2 features
|
||||
original_features = self._extract_dinov2_features([image])
|
||||
|
||||
# Compute feature stats for compression ratio
|
||||
original_dim = original_features.shape[-1]
|
||||
compressed_dim = self.compressor.compression_dim
|
||||
compression_ratio = original_dim / compressed_dim
|
||||
|
||||
# Compress features
|
||||
compressed_features = self._compress_features(original_features)
|
||||
|
||||
# Get pooled features (before compression) for analysis
|
||||
pooled_features = self.compressor._apply_pooling(
|
||||
original_features,
|
||||
self.compressor._compute_attention_scores(original_features),
|
||||
)
|
||||
pooled_features = pooled_features.mean(dim=1)
|
||||
|
||||
# Compute feature norm
|
||||
feature_norm = torch.norm(compressed_features, p=2, dim=-1).mean().item()
|
||||
|
||||
processing_time = time.time() - start_time
|
||||
|
||||
# Build result dictionary
|
||||
result = {
|
||||
"original_features": original_features.cpu(),
|
||||
"compressed_features": compressed_features.cpu(),
|
||||
"pooled_features": pooled_features.cpu(),
|
||||
"metadata": {
|
||||
"image_path": str(image_path),
|
||||
"compression_ratio": compression_ratio,
|
||||
"processing_time": processing_time,
|
||||
"feature_norm": feature_norm,
|
||||
"device": str(self.device),
|
||||
"model_name": self.config.model.name,
|
||||
},
|
||||
}
|
||||
|
||||
return result
|
||||
|
||||
def process_batch(
|
||||
self,
|
||||
image_dir: Union[str, Path],
|
||||
batch_size: int = 8,
|
||||
save_features: bool = True,
|
||||
) -> List[Dict[str, object]]:
|
||||
"""Process multiple images in batches.
|
||||
|
||||
Args:
|
||||
image_dir: Directory containing images
|
||||
batch_size: Number of images per batch
|
||||
save_features: Whether to save features to disk
|
||||
|
||||
Returns:
|
||||
List of result dictionaries, one per image
|
||||
"""
|
||||
image_dir = Path(image_dir)
|
||||
image_files = sorted(image_dir.glob("*.*"))
|
||||
|
||||
results = []
|
||||
|
||||
# Process in batches
|
||||
for i in range(0, len(image_files), batch_size):
|
||||
batch_files = image_files[i : i + batch_size]
|
||||
|
||||
# Load and preprocess batch
|
||||
images = [preprocess_image(load_image(f), size=224) for f in batch_files]
|
||||
|
||||
# Extract features for batch
|
||||
original_features = self._extract_dinov2_features(images)
|
||||
compressed_features = self._compress_features(original_features)
|
||||
|
||||
# Create individual results
|
||||
for j, file_path in enumerate(batch_files):
|
||||
pooled_features = self.compressor._apply_pooling(
|
||||
original_features[j : j + 1],
|
||||
self.compressor._compute_attention_scores(
|
||||
original_features[j : j + 1]
|
||||
),
|
||||
).mean(dim=1)
|
||||
|
||||
result = {
|
||||
"original_features": original_features[j : j + 1].cpu(),
|
||||
"compressed_features": compressed_features[j : j + 1].cpu(),
|
||||
"pooled_features": pooled_features.cpu(),
|
||||
"metadata": {
|
||||
"image_path": str(file_path),
|
||||
"compression_ratio": original_features.shape[-1]
|
||||
/ self.compressor.compression_dim,
|
||||
"processing_time": 0.0,
|
||||
"feature_norm": torch.norm(
|
||||
compressed_features[j : j + 1], p=2, dim=-1
|
||||
)
|
||||
.mean()
|
||||
.item(),
|
||||
"device": str(self.device),
|
||||
"model_name": self.config.model.name,
|
||||
},
|
||||
}
|
||||
|
||||
results.append(result)
|
||||
|
||||
# Save features if requested
|
||||
if save_features:
|
||||
output_dir = Path(self.config.output.directory)
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
output_path = output_dir / f"{file_path.stem}_features.json"
|
||||
from ..utils.feature_utils import save_features_to_json
|
||||
|
||||
save_features_to_json(
|
||||
result["compressed_features"],
|
||||
output_path,
|
||||
result["metadata"],
|
||||
)
|
||||
|
||||
return results
|
||||
@@ -1,63 +0,0 @@
|
||||
"""Basic usage example for DINOv2 Feature Compressor."""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Add parent to path for imports
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
||||
|
||||
import requests
|
||||
from PIL import Image
|
||||
import io
|
||||
|
||||
from dino_feature_compressor import DINOv2FeatureExtractor, FeatureVisualizer
|
||||
|
||||
|
||||
def main():
|
||||
# Initialize extractor
|
||||
print("Initializing DINOv2FeatureExtractor...")
|
||||
extractor = DINOv2FeatureExtractor()
|
||||
|
||||
# Download and save test image
|
||||
print("Downloading test image...")
|
||||
url = "http://images.cocodataset.org/val2017/000000039769.jpg"
|
||||
response = requests.get(url)
|
||||
img = Image.open(io.BytesIO(response.content))
|
||||
|
||||
test_image_path = "/tmp/test_image.jpg"
|
||||
img.save(test_image_path)
|
||||
print(f"Image saved to {test_image_path}")
|
||||
|
||||
# Extract features
|
||||
print("Extracting features...")
|
||||
result = extractor.process_image(test_image_path)
|
||||
|
||||
print(f"\n=== Feature Extraction Results ===")
|
||||
print(f"Original features shape: {result['original_features'].shape}")
|
||||
print(f"Compressed features shape: {result['compressed_features'].shape}")
|
||||
print(f"Processing time: {result['metadata']['processing_time']:.3f}s")
|
||||
print(f"Compression ratio: {result['metadata']['compression_ratio']:.2f}x")
|
||||
print(f"Feature norm: {result['metadata']['feature_norm']:.4f}")
|
||||
print(f"Device: {result['metadata']['device']}")
|
||||
|
||||
# Visualize
|
||||
print("\nGenerating visualization...")
|
||||
viz = FeatureVisualizer()
|
||||
|
||||
fig = viz.plot_histogram(
|
||||
result["compressed_features"], title="Compressed Features Distribution"
|
||||
)
|
||||
|
||||
output_path = (
|
||||
Path(__file__).parent.parent.parent / "outputs" / "basic_usage_histogram"
|
||||
)
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
viz.save(fig, str(output_path), formats=["html"])
|
||||
print(f"Visualization saved to {output_path}.html")
|
||||
|
||||
print("\nDone!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,49 +0,0 @@
|
||||
"""Batch processing example for DINOv2 Feature Compressor."""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Add parent to path for imports
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
||||
|
||||
from dino_feature_compressor import DINOv2FeatureExtractor
|
||||
|
||||
|
||||
def main():
|
||||
# Initialize extractor
|
||||
print("Initializing DINOv2FeatureExtractor...")
|
||||
extractor = DINOv2FeatureExtractor()
|
||||
|
||||
# Create a test directory with sample images
|
||||
# In practice, use your own directory
|
||||
image_dir = "/tmp/test_images"
|
||||
Path(image_dir).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Create 3 test images
|
||||
print("Creating test images...")
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
|
||||
for i in range(3):
|
||||
img_array = np.random.randint(0, 255, (224, 224, 3), dtype=np.uint8)
|
||||
img = Image.fromarray(img_array)
|
||||
img.save(f"{image_dir}/test_{i}.jpg")
|
||||
print(f"Created 3 test images in {image_dir}")
|
||||
|
||||
# Process batch
|
||||
print("\nProcessing images in batch...")
|
||||
results = extractor.process_batch(image_dir, batch_size=2, save_features=True)
|
||||
|
||||
print(f"\n=== Batch Processing Results ===")
|
||||
print(f"Processed {len(results)} images")
|
||||
|
||||
for i, result in enumerate(results):
|
||||
print(f"\nImage {i + 1}: {result['metadata']['image_path']}")
|
||||
print(f" Compressed shape: {result['compressed_features'].shape}")
|
||||
print(f" Feature norm: {result['metadata']['feature_norm']:.4f}")
|
||||
|
||||
print("\nDone! Features saved to outputs/ directory.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,61 +0,0 @@
|
||||
"""Visualization example for DINOv2 Feature Compressor."""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Add parent to path for imports
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
|
||||
from dino_feature_compressor import FeatureVisualizer
|
||||
|
||||
|
||||
def main():
|
||||
# Generate synthetic features for demonstration
|
||||
print("Generating synthetic features...")
|
||||
n_samples = 100
|
||||
n_features = 256
|
||||
|
||||
# Create two clusters
|
||||
cluster1 = np.random.randn(50, n_features) + 2
|
||||
cluster2 = np.random.randn(50, n_features) - 2
|
||||
features = np.vstack([cluster1, cluster2])
|
||||
|
||||
labels = ["Cluster A"] * 50 + ["Cluster B"] * 50
|
||||
|
||||
features_tensor = torch.tensor(features, dtype=torch.float32)
|
||||
|
||||
# Initialize visualizer
|
||||
print("Initializing FeatureVisualizer...")
|
||||
viz = FeatureVisualizer()
|
||||
|
||||
output_dir = Path(__file__).parent.parent.parent / "outputs"
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Create histogram
|
||||
print("Creating histogram...")
|
||||
fig_hist = viz.plot_histogram(features_tensor, title="Feature Distribution")
|
||||
viz.save(fig_hist, str(output_dir / "feature_histogram"), formats=["html", "json"])
|
||||
print(f"Saved histogram to {output_dir / 'feature_histogram.html'}")
|
||||
|
||||
# Create PCA 2D projection
|
||||
print("Creating PCA 2D projection...")
|
||||
fig_pca = viz.plot_pca_2d(features_tensor, labels=labels)
|
||||
viz.save(fig_pca, str(output_dir / "feature_pca_2d"), formats=["html", "json"])
|
||||
print(f"Saved PCA to {output_dir / 'feature_pca_2d.html'}")
|
||||
|
||||
# Create comparison plot
|
||||
print("Creating comparison plot...")
|
||||
features_list = [torch.tensor(cluster1), torch.tensor(cluster2)]
|
||||
names = ["Cluster A", "Cluster B"]
|
||||
fig_comp = viz.plot_comparison(features_list, names)
|
||||
viz.save(fig_comp, str(output_dir / "feature_comparison"), formats=["html", "json"])
|
||||
print(f"Saved comparison to {output_dir / 'feature_comparison.html'}")
|
||||
|
||||
print("\nDone! All visualizations saved to outputs/ directory.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,19 +0,0 @@
|
||||
"""Utility modules for image, feature, and plot operations."""
|
||||
|
||||
from .feature_utils import (
|
||||
compute_feature_stats,
|
||||
normalize_features,
|
||||
save_features_to_csv,
|
||||
save_features_to_json,
|
||||
)
|
||||
from .image_utils import load_image, load_images_from_directory, preprocess_image
|
||||
|
||||
__all__ = [
|
||||
"load_image",
|
||||
"load_images_from_directory",
|
||||
"preprocess_image",
|
||||
"normalize_features",
|
||||
"compute_feature_stats",
|
||||
"save_features_to_json",
|
||||
"save_features_to_csv",
|
||||
]
|
||||
@@ -1,83 +0,0 @@
|
||||
"""Feature processing utilities."""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Dict
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
import yaml
|
||||
|
||||
|
||||
def normalize_features(features: torch.Tensor) -> torch.Tensor:
|
||||
"""L2-normalize features.
|
||||
|
||||
Args:
|
||||
features: Tensor of shape [batch, dim] or [batch, seq, dim]
|
||||
|
||||
Returns:
|
||||
L2-normalized features
|
||||
"""
|
||||
norm = torch.norm(features, p=2, dim=-1, keepdim=True)
|
||||
return features / (norm + 1e-8)
|
||||
|
||||
|
||||
def compute_feature_stats(features: torch.Tensor) -> Dict[str, float]:
|
||||
"""Compute basic statistics for features.
|
||||
|
||||
Args:
|
||||
features: Tensor of shape [batch, dim] or [batch, seq, dim]
|
||||
|
||||
Returns:
|
||||
Dictionary with mean, std, min, max
|
||||
"""
|
||||
with torch.no_grad():
|
||||
return {
|
||||
"mean": float(features.mean().item()),
|
||||
"std": float(features.std().item()),
|
||||
"min": float(features.min().item()),
|
||||
"max": float(features.max().item()),
|
||||
}
|
||||
|
||||
|
||||
def save_features_to_json(
|
||||
features: torch.Tensor, path: Path, metadata: Dict = None
|
||||
) -> None:
|
||||
"""Save features to JSON file.
|
||||
|
||||
Args:
|
||||
features: Tensor to save
|
||||
path: Output file path
|
||||
metadata: Optional metadata dictionary
|
||||
"""
|
||||
path = Path(path)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
features_np = features.cpu().numpy()
|
||||
|
||||
data = {
|
||||
"features": features_np.tolist(),
|
||||
"shape": list(features.shape),
|
||||
}
|
||||
|
||||
if metadata:
|
||||
data["metadata"] = metadata
|
||||
|
||||
with open(path, "w") as f:
|
||||
import json
|
||||
|
||||
json.dump(data, f, indent=2)
|
||||
|
||||
|
||||
def save_features_to_csv(features: torch.Tensor, path: Path) -> None:
|
||||
"""Save features to CSV file.
|
||||
|
||||
Args:
|
||||
features: Tensor to save
|
||||
path: Output file path
|
||||
"""
|
||||
path = Path(path)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
features_np = features.cpu().numpy()
|
||||
|
||||
np.savetxt(path, features_np, delimiter=",", fmt="%.6f")
|
||||
@@ -1,76 +0,0 @@
|
||||
"""Image loading and preprocessing utilities."""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import List, Optional, Union
|
||||
|
||||
import requests
|
||||
from PIL import Image
|
||||
|
||||
|
||||
def load_image(path: Union[str, Path]) -> Image.Image:
|
||||
"""Load an image from file path or URL.
|
||||
|
||||
Args:
|
||||
path: File path or URL to image
|
||||
|
||||
Returns:
|
||||
PIL Image object
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If file doesn't exist
|
||||
ValueError: If image cannot be loaded
|
||||
"""
|
||||
path_str = str(path)
|
||||
|
||||
if path_str.startswith(("http://", "https://")):
|
||||
response = requests.get(path_str, stream=True)
|
||||
response.raise_for_status()
|
||||
img = Image.open(response.raw)
|
||||
else:
|
||||
img = Image.open(path)
|
||||
|
||||
return img
|
||||
|
||||
|
||||
def preprocess_image(image: Image.Image, size: int = 224) -> Image.Image:
|
||||
"""Preprocess image to square format with resizing.
|
||||
|
||||
Args:
|
||||
image: PIL Image
|
||||
size: Target size for shortest dimension (default: 224)
|
||||
|
||||
Returns:
|
||||
Resized PIL Image
|
||||
"""
|
||||
if image.mode != "RGB":
|
||||
image = image.convert("RGB")
|
||||
|
||||
# Resize while maintaining aspect ratio, then center crop
|
||||
image = image.resize((size, size), Image.Resampling.LANCZOS)
|
||||
|
||||
return image
|
||||
|
||||
|
||||
def load_images_from_directory(
|
||||
dir_path: Union[str, Path], extensions: Optional[List[str]] = None
|
||||
) -> List[Image.Image]:
|
||||
"""Load all images from a directory.
|
||||
|
||||
Args:
|
||||
dir_path: Path to directory
|
||||
extensions: List of file extensions to include (e.g., ['.jpg', '.png'])
|
||||
|
||||
Returns:
|
||||
List of PIL Images
|
||||
"""
|
||||
if extensions is None:
|
||||
extensions = [".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".webp"]
|
||||
|
||||
dir_path = Path(dir_path)
|
||||
images = []
|
||||
|
||||
for ext in extensions:
|
||||
images.extend([load_image(p) for p in dir_path.glob(f"*{ext}")])
|
||||
images.extend([load_image(p) for p in dir_path.glob(f"*{ext.upper()}")])
|
||||
|
||||
return images
|
||||
@@ -1,167 +0,0 @@
|
||||
"""Plotting utility functions for feature visualization."""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
import numpy as np
|
||||
import plotly.graph_objects as go
|
||||
from plotly.subplots import make_subplots
|
||||
|
||||
|
||||
def create_histogram(data: np.ndarray, title: str = None, **kwargs) -> go.Figure:
|
||||
"""Create a histogram plot.
|
||||
|
||||
Args:
|
||||
data: 1D array of values
|
||||
title: Plot title
|
||||
**kwargs: Additional histogram arguments
|
||||
|
||||
Returns:
|
||||
Plotly Figure object
|
||||
"""
|
||||
fig = go.Figure()
|
||||
|
||||
fig.add_trace(
|
||||
go.Histogram(
|
||||
x=data.flatten(),
|
||||
name="Feature Values",
|
||||
**kwargs,
|
||||
)
|
||||
)
|
||||
|
||||
if title:
|
||||
fig.update_layout(title=title)
|
||||
|
||||
fig.update_layout(
|
||||
xaxis_title="Value",
|
||||
yaxis_title="Count",
|
||||
hovermode="x unified",
|
||||
)
|
||||
|
||||
return fig
|
||||
|
||||
|
||||
def create_pca_scatter_2d(
|
||||
features: np.ndarray, labels: List = None, **kwargs
|
||||
) -> go.Figure:
|
||||
"""Create a 2D PCA scatter plot.
|
||||
|
||||
Args:
|
||||
features: 2D array [n_samples, n_features]
|
||||
labels: Optional list of labels for coloring
|
||||
**kwargs: Additional scatter arguments
|
||||
|
||||
Returns:
|
||||
Plotly Figure object
|
||||
"""
|
||||
from sklearn.decomposition import PCA
|
||||
|
||||
# Apply PCA
|
||||
pca = PCA(n_components=2)
|
||||
components = pca.fit_transform(features)
|
||||
|
||||
explained_var = pca.explained_variance_ratio_ * 100
|
||||
|
||||
fig = go.Figure()
|
||||
|
||||
if labels is None:
|
||||
fig.add_trace(
|
||||
go.Scatter(
|
||||
x=components[:, 0],
|
||||
y=components[:, 1],
|
||||
mode="markers",
|
||||
marker=dict(size=8, opacity=0.7),
|
||||
**kwargs,
|
||||
)
|
||||
)
|
||||
else:
|
||||
for label in set(labels):
|
||||
mask = np.array(labels) == label
|
||||
fig.add_trace(
|
||||
go.Scatter(
|
||||
x=components[mask, 0],
|
||||
y=components[mask, 1],
|
||||
mode="markers",
|
||||
name=str(label),
|
||||
marker=dict(size=8, opacity=0.7),
|
||||
)
|
||||
)
|
||||
|
||||
fig.update_layout(
|
||||
title=f"PCA 2D Projection (Total Variance: {explained_var.sum():.1f}%)",
|
||||
xaxis_title=f"PC 1 ({explained_var[0]:.1f}%)",
|
||||
yaxis_title=f"PC 2 ({explained_var[1]:.1f}%)",
|
||||
hovermode="closest",
|
||||
)
|
||||
|
||||
return fig
|
||||
|
||||
|
||||
def create_comparison_plot(
|
||||
features_list: List[np.ndarray], names: List[str], **kwargs
|
||||
) -> go.Figure:
|
||||
"""Create a comparison plot of multiple feature sets.
|
||||
|
||||
Args:
|
||||
features_list: List of feature arrays
|
||||
names: List of names for each feature set
|
||||
**kwargs: Additional histogram arguments
|
||||
|
||||
Returns:
|
||||
Plotly Figure object
|
||||
"""
|
||||
fig = make_subplots(rows=1, cols=len(features_list), subplot_titles=names)
|
||||
|
||||
for i, features in enumerate(features_list, 1):
|
||||
fig.add_trace(
|
||||
go.Histogram(
|
||||
x=features.flatten(),
|
||||
name=names[i - 1],
|
||||
showlegend=False,
|
||||
**kwargs,
|
||||
),
|
||||
row=1,
|
||||
col=i,
|
||||
)
|
||||
|
||||
fig.update_layout(
|
||||
title="Feature Distribution Comparison",
|
||||
hovermode="x unified",
|
||||
)
|
||||
|
||||
return fig
|
||||
|
||||
|
||||
def save_figure(fig: go.Figure, path: str, format: str = "html") -> None:
|
||||
"""Save figure to file.
|
||||
|
||||
Args:
|
||||
fig: Plotly Figure object
|
||||
path: Output file path (without extension)
|
||||
format: Output format ('html', 'png', 'json')
|
||||
"""
|
||||
path = Path(path)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if format == "html":
|
||||
fig.write_html(str(path) + ".html", include_plotlyjs="cdn")
|
||||
elif format == "png":
|
||||
fig.write_image(str(path) + ".png", scale=2)
|
||||
elif format == "json":
|
||||
fig.write_json(str(path) + ".json")
|
||||
else:
|
||||
raise ValueError(f"Unsupported format: {format}")
|
||||
|
||||
|
||||
def apply_theme(fig: go.Figure, theme: str = "plotly_white") -> go.Figure:
|
||||
"""Apply a theme to the figure.
|
||||
|
||||
Args:
|
||||
fig: Plotly Figure object
|
||||
theme: Theme name
|
||||
|
||||
Returns:
|
||||
Updated Plotly Figure object
|
||||
"""
|
||||
fig.update_layout(template=theme)
|
||||
return fig
|
||||
Reference in New Issue
Block a user