Skip to main content
โšก Calmops

MinIO for AI: Machine Learning Data Lakes and Storage Pipelines

Introduction

Artificial intelligence and machine learning workloads demand robust, scalable storage infrastructure that can handle massive datasets, frequent model checkpoints, and real-time inference requirements. Modern AI applications generate and consume data at unprecedented scales - training datasets can reach petabytes, model artifacts require versioning and lineage tracking, and inference systems need millisecond access to features and embeddings.

MinIO emerges as the ideal storage foundation for AI workloads, offering S3-compatible object storage with enterprise-grade performance, security, and scalability. Unlike traditional file systems that struggle with AI’s unique access patterns, MinIO provides high-throughput parallel I/O, seamless cloud integration, and native support for modern data formats like Parquet and Delta Lake.

Key advantages of MinIO for AI include:

  • High Performance: Multi-part uploads and parallel processing for large datasets
  • S3 Compatibility: Works with existing ML frameworks and tools
  • Kubernetes Native: Cloud-native deployment and auto-scaling
  • Data Governance: Versioning, encryption, and access controls
  • Cost Efficiency: Intelligent tiering and lifecycle management

This comprehensive guide explores building production AI infrastructure with MinIO, covering everything from data lake architecture to real-time inference pipelines.

ML Data Lake Architecture

Designing the Data Lake Foundation

A well-architected ML data lake on MinIO follows a layered approach that separates raw ingestion, processed data, feature engineering, and model artifacts. This separation enables different teams to work independently while maintaining data lineage and governance.

import boto3
import json
from datetime import datetime
from botocore.config import Config

class MLDataLake:
    def __init__(self, endpoint_url, access_key, secret_key):
        self.s3 = boto3.client(
            's3',
            endpoint_url=endpoint_url,
            aws_access_key_id=access_key,
            aws_secret_access_key=secret_key,
            config=Config(
                max_pool_connections=100,
                retries={'max_attempts': 3}
            )
        )
        self.setup_buckets()
    
    def setup_buckets(self):
        """Initialize data lake bucket structure"""
        buckets = {
            'raw-data': 'Raw ingested data from various sources',
            'bronze-data': 'Cleaned and validated data',
            'silver-data': 'Transformed and enriched data',
            'gold-data': 'Business-ready aggregated data',
            'feature-store': 'Computed features for ML',
            'model-registry': 'Trained models and metadata',
            'experiment-tracking': 'ML experiment artifacts',
            'inference-cache': 'Cached predictions and embeddings'
        }
        
        for bucket, description in buckets.items():
            try:
                self.s3.create_bucket(Bucket=bucket)
                # Add bucket metadata
                self.s3.put_bucket_tagging(
                    Bucket=bucket,
                    Tagging={
                        'TagSet': [
                            {'Key': 'Purpose', 'Value': 'ML-DataLake'},
                            {'Key': 'Description', 'Value': description},
                            {'Key': 'Environment', 'Value': 'production'}
                        ]
                    }
                )
                print(f"Created bucket: {bucket}")
            except Exception as e:
                print(f"Bucket {bucket} already exists or error: {e}")

# Initialize data lake
data_lake = MLDataLake(
    endpoint_url='http://minio:9000',
    access_key='minioadmin',
    secret_key='minioadmin'
)

Data Organization Strategy

def create_data_structure():
    """Create standardized directory structure for ML projects"""
    structure = {
        'raw-data': [
            'images/year=2026/month=01/day=01/',
            'text/source=web/year=2026/month=01/',
            'audio/format=wav/sample_rate=44100/',
            'video/resolution=1080p/fps=30/',
            'structured/format=parquet/schema_version=v1/'
        ],
        'bronze-data': [
            'images/processed/year=2026/month=01/',
            'text/cleaned/language=en/',
            'audio/normalized/duration_sec=30/',
            'video/frames/extracted_fps=1/',
            'structured/validated/schema_version=v1/'
        ],
        'silver-data': [
            'features/image_embeddings/model=resnet50/',
            'features/text_embeddings/model=bert/',
            'features/audio_mfcc/window_size=25ms/',
            'aggregations/daily/metric=engagement/',
            'joins/user_content/date=2026-01-01/'
        ],
        'gold-data': [
            'datasets/training/split=train/version=v1/',
            'datasets/validation/split=val/version=v1/',
            'datasets/test/split=test/version=v1/',
            'reports/model_performance/date=2026-01-01/',
            'dashboards/business_metrics/refresh=daily/'
        ]
    }
    
    return structure

# Example: Partitioned data storage
def store_partitioned_data(df, bucket, base_path, partition_cols):
    """Store DataFrame with partitioning for efficient querying"""
    import pandas as pd
    
    for partition_values, group in df.groupby(partition_cols):
        # Create partition path
        partition_path = base_path
        for col, value in zip(partition_cols, partition_values):
            partition_path += f"/{col}={value}"
        
        # Save as Parquet
        parquet_buffer = group.to_parquet(index=False)
        key = f"{partition_path}/data.parquet"
        
        s3.put_object(
            Bucket=bucket,
            Key=key,
            Body=parquet_buffer,
            ContentType='application/octet-stream'
        )

Data Governance and Lineage

class DataLineageTracker:
    def __init__(self, s3_client):
        self.s3 = s3_client
        self.lineage_bucket = 'data-lineage'
    
    def track_transformation(self, input_paths, output_path, 
                           transformation_code, metadata):
        """Track data transformation lineage"""
        lineage_record = {
            'timestamp': datetime.now().isoformat(),
            'input_paths': input_paths,
            'output_path': output_path,
            'transformation': {
                'code_hash': hash(transformation_code),
                'description': metadata.get('description', ''),
                'parameters': metadata.get('parameters', {}),
                'framework': metadata.get('framework', 'python')
            },
            'execution_info': {
                'duration_seconds': metadata.get('duration', 0),
                'memory_usage_mb': metadata.get('memory_usage', 0),
                'cpu_cores': metadata.get('cpu_cores', 1)
            }
        }
        
        # Store lineage record
        lineage_key = f"lineage/{output_path.replace('/', '_')}/{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        
        self.s3.put_object(
            Bucket=self.lineage_bucket,
            Key=lineage_key,
            Body=json.dumps(lineage_record, indent=2),
            ContentType='application/json'
        )
        
        return lineage_record

# Usage example
tracker = DataLineageTracker(s3)
tracker.track_transformation(
    input_paths=['raw-data/images/2026/01/01/'],
    output_path='bronze-data/images/processed/2026/01/01/',
    transformation_code="resize_and_normalize()",
    metadata={
        'description': 'Resize images to 224x224 and normalize pixel values',
        'parameters': {'target_size': (224, 224), 'normalize': True},
        'duration': 120.5,
        'memory_usage': 2048
    }
)

Training Data Pipelines with MinIO

High-Performance Data Ingestion

Modern ML training requires efficient data ingestion that can saturate GPU utilization. MinIO’s multi-part upload and parallel processing capabilities enable high-throughput data pipelines that scale with your training infrastructure.

import boto3
import os
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from botocore.config import Config
import hashlib
import json

class HighThroughputIngestion:
    def __init__(self, endpoint_url, access_key, secret_key, max_workers=20):
        self.s3 = boto3.client(
            's3',
            endpoint_url=endpoint_url,
            aws_access_key_id=access_key,
            aws_secret_access_key=secret_key,
            config=Config(
                max_pool_connections=max_workers,
                retries={'max_attempts': 3}
            )
        )
        self.max_workers = max_workers
        self.upload_stats = {'success': 0, 'failed': 0, 'bytes': 0}
    
    def upload_with_metadata(self, local_path, bucket, s3_key, metadata=None):
        """Upload file with comprehensive metadata"""
        try:
            # Calculate file hash for integrity
            with open(local_path, 'rb') as f:
                file_hash = hashlib.sha256(f.read()).hexdigest()
            
            # Prepare metadata
            upload_metadata = {
                'sha256': file_hash,
                'original_name': os.path.basename(local_path),
                'upload_timestamp': str(int(time.time())),
                'file_size': str(os.path.getsize(local_path))
            }
            
            if metadata:
                upload_metadata.update(metadata)
            
            # Upload with metadata
            self.s3.upload_file(
                local_path, bucket, s3_key,
                ExtraArgs={
                    'Metadata': upload_metadata,
                    'ContentType': self._get_content_type(local_path)
                }
            )
            
            self.upload_stats['success'] += 1
            self.upload_stats['bytes'] += os.path.getsize(local_path)
            
            return {'status': 'success', 'key': s3_key, 'hash': file_hash}
            
        except Exception as e:
            self.upload_stats['failed'] += 1
            return {'status': 'failed', 'key': s3_key, 'error': str(e)}
    
    def batch_upload_dataset(self, dataset_path, bucket, prefix, 
                           file_extensions=None, metadata_extractor=None):
        """Upload entire dataset with parallel processing"""
        if file_extensions is None:
            file_extensions = ['.jpg', '.png', '.txt', '.parquet', '.csv']
        
        # Collect all files to upload
        upload_tasks = []
        for root, dirs, files in os.walk(dataset_path):
            for file in files:
                if any(file.lower().endswith(ext) for ext in file_extensions):
                    local_path = os.path.join(root, file)
                    relative_path = os.path.relpath(local_path, dataset_path)
                    s3_key = f"{prefix}/{relative_path}"
                    
                    # Extract metadata if function provided
                    metadata = {}
                    if metadata_extractor:
                        metadata = metadata_extractor(local_path)
                    
                    upload_tasks.append((local_path, bucket, s3_key, metadata))
        
        print(f"Starting upload of {len(upload_tasks)} files...")
        
        # Execute parallel uploads
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            future_to_task = {
                executor.submit(self.upload_with_metadata, *task): task 
                for task in upload_tasks
            }
            
            results = []
            for future in as_completed(future_to_task):
                result = future.result()
                results.append(result)
                
                if len(results) % 100 == 0:
                    print(f"Uploaded {len(results)}/{len(upload_tasks)} files")
        
        return results
    
    def _get_content_type(self, file_path):
        """Determine content type based on file extension"""
        ext = os.path.splitext(file_path)[1].lower()
        content_types = {
            '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg',
            '.png': 'image/png', '.gif': 'image/gif',
            '.txt': 'text/plain', '.csv': 'text/csv',
            '.json': 'application/json',
            '.parquet': 'application/octet-stream',
            '.pt': 'application/octet-stream',
            '.pkl': 'application/octet-stream'
        }
        return content_types.get(ext, 'application/octet-stream')

# Example: Upload ImageNet-style dataset
def extract_image_metadata(image_path):
    """Extract metadata from image files"""
    from PIL import Image
    
    try:
        with Image.open(image_path) as img:
            return {
                'width': str(img.width),
                'height': str(img.height),
                'format': img.format,
                'mode': img.mode,
                'class_label': os.path.basename(os.path.dirname(image_path))
            }
    except Exception:
        return {}

# Usage
ingestion = HighThroughputIngestion(
    'http://minio:9000', 'minioadmin', 'minioadmin'
)

results = ingestion.batch_upload_dataset(
    dataset_path='./imagenet_subset',
    bucket='training-data',
    prefix='imagenet/train',
    file_extensions=['.jpg', '.jpeg'],
    metadata_extractor=extract_image_metadata
)

print(f"Upload complete: {ingestion.upload_stats}")

Data Validation and Quality Checks

import pandas as pd
import numpy as np
from typing import Dict, List, Any

class DataQualityValidator:
    def __init__(self, s3_client):
        self.s3 = s3_client
        self.validation_results = []
    
    def validate_dataset(self, bucket, prefix, validation_rules):
        """Validate dataset quality across multiple files"""
        # List all files in dataset
        response = self.s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
        
        validation_summary = {
            'total_files': 0,
            'valid_files': 0,
            'invalid_files': 0,
            'validation_errors': []
        }
        
        for obj in response.get('Contents', []):
            key = obj['Key']
            validation_summary['total_files'] += 1
            
            try:
                # Download and validate file
                local_path = f"/tmp/{os.path.basename(key)}"
                self.s3.download_file(bucket, key, local_path)
                
                is_valid, errors = self._validate_file(local_path, validation_rules)
                
                if is_valid:
                    validation_summary['valid_files'] += 1
                else:
                    validation_summary['invalid_files'] += 1
                    validation_summary['validation_errors'].extend([
                        {'file': key, 'errors': errors}
                    ])
                
                # Cleanup
                os.remove(local_path)
                
            except Exception as e:
                validation_summary['invalid_files'] += 1
                validation_summary['validation_errors'].append({
                    'file': key, 'errors': [f"Processing error: {str(e)}"]
                })
        
        return validation_summary
    
    def _validate_file(self, file_path, rules):
        """Validate individual file against rules"""
        errors = []
        
        try:
            if file_path.endswith('.parquet'):
                df = pd.read_parquet(file_path)
                errors.extend(self._validate_dataframe(df, rules))
            elif file_path.endswith(('.jpg', '.png')):
                errors.extend(self._validate_image(file_path, rules))
            elif file_path.endswith('.csv'):
                df = pd.read_csv(file_path)
                errors.extend(self._validate_dataframe(df, rules))
            
        except Exception as e:
            errors.append(f"File format error: {str(e)}")
        
        return len(errors) == 0, errors
    
    def _validate_dataframe(self, df, rules):
        """Validate DataFrame against rules"""
        errors = []
        
        # Check required columns
        if 'required_columns' in rules:
            missing_cols = set(rules['required_columns']) - set(df.columns)
            if missing_cols:
                errors.append(f"Missing columns: {missing_cols}")
        
        # Check data types
        if 'column_types' in rules:
            for col, expected_type in rules['column_types'].items():
                if col in df.columns and df[col].dtype != expected_type:
                    errors.append(f"Column {col} has type {df[col].dtype}, expected {expected_type}")
        
        # Check for null values
        if 'no_nulls' in rules:
            for col in rules['no_nulls']:
                if col in df.columns and df[col].isnull().any():
                    errors.append(f"Column {col} contains null values")
        
        # Check value ranges
        if 'value_ranges' in rules:
            for col, (min_val, max_val) in rules['value_ranges'].items():
                if col in df.columns:
                    if df[col].min() < min_val or df[col].max() > max_val:
                        errors.append(f"Column {col} values outside range [{min_val}, {max_val}]")
        
        return errors
    
    def _validate_image(self, image_path, rules):
        """Validate image file against rules"""
        errors = []
        
        try:
            from PIL import Image
            with Image.open(image_path) as img:
                # Check dimensions
                if 'min_dimensions' in rules:
                    min_w, min_h = rules['min_dimensions']
                    if img.width < min_w or img.height < min_h:
                        errors.append(f"Image too small: {img.width}x{img.height}")
                
                # Check format
                if 'allowed_formats' in rules:
                    if img.format not in rules['allowed_formats']:
                        errors.append(f"Invalid format: {img.format}")
                
                # Check file size
                if 'max_file_size_mb' in rules:
                    file_size_mb = os.path.getsize(image_path) / (1024 * 1024)
                    if file_size_mb > rules['max_file_size_mb']:
                        errors.append(f"File too large: {file_size_mb:.2f}MB")
        
        except Exception as e:
            errors.append(f"Image validation error: {str(e)}")
        
        return errors

# Example validation rules
validation_rules = {
    'required_columns': ['image_path', 'label', 'split'],
    'column_types': {'label': 'int64', 'split': 'object'},
    'no_nulls': ['image_path', 'label'],
    'value_ranges': {'label': (0, 999)},
    'min_dimensions': (224, 224),
    'allowed_formats': ['JPEG', 'PNG'],
    'max_file_size_mb': 10
}

# Run validation
validator = DataQualityValidator(s3)
results = validator.validate_dataset(
    bucket='training-data',
    prefix='imagenet/train',
    validation_rules=validation_rules
)

print(f"Validation Results: {results}")

Distributed Data Processing with Apache Spark

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, regexp_replace, lower
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

def create_spark_session(minio_endpoint, access_key, secret_key):
    """Create Spark session configured for MinIO"""
    return SparkSession.builder \
        .appName("ML Data Processing Pipeline") \
        .config("spark.hadoop.fs.s3a.endpoint", minio_endpoint) \
        .config("spark.hadoop.fs.s3a.access.key", access_key) \
        .config("spark.hadoop.fs.s3a.secret.key", secret_key) \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .getOrCreate()

class SparkMLPipeline:
    def __init__(self, spark_session):
        self.spark = spark_session
    
    def process_text_data(self, input_path, output_path):
        """Process text data for NLP training"""
        # Read raw text data
        df = self.spark.read.text(input_path)
        
        # Clean and preprocess
        processed_df = df.select(
            # Remove special characters and normalize
            regexp_replace(lower(col("value")), r'[^\w\s]', '').alias("clean_text")
        ).filter(
            # Filter out empty or very short texts
            col("clean_text").isNotNull() & (length(col("clean_text")) > 10)
        )
        
        # Add metadata
        processed_df = processed_df.withColumn("processed_timestamp", current_timestamp())
        
        # Write processed data
        processed_df.write \
            .mode("overwrite") \
            .parquet(output_path)
        
        return processed_df.count()
    
    def create_training_splits(self, input_path, output_base_path, 
                             train_ratio=0.8, val_ratio=0.1, test_ratio=0.1):
        """Create train/validation/test splits"""
        df = self.spark.read.parquet(input_path)
        
        # Add random column for splitting
        df_with_rand = df.withColumn("rand", rand())
        
        # Create splits
        train_df = df_with_rand.filter(col("rand") < train_ratio)
        val_df = df_with_rand.filter(
            (col("rand") >= train_ratio) & 
            (col("rand") < train_ratio + val_ratio)
        )
        test_df = df_with_rand.filter(col("rand") >= train_ratio + val_ratio)
        
        # Remove random column and save
        for split_name, split_df in [("train", train_df), ("val", val_df), ("test", test_df)]:
            split_df.drop("rand").write \
                .mode("overwrite") \
                .parquet(f"{output_base_path}/{split_name}")
        
        return {
            "train_count": train_df.count(),
            "val_count": val_df.count(),
            "test_count": test_df.count()
        }

# Usage example
spark = create_spark_session(
    "http://minio:9000", "minioadmin", "minioadmin"
)

pipeline = SparkMLPipeline(spark)

# Process text data
text_count = pipeline.process_text_data(
    "s3a://raw-data/text/reviews/",
    "s3a://bronze-data/text/processed_reviews/"
)

# Create training splits
split_counts = pipeline.create_training_splits(
    "s3a://bronze-data/text/processed_reviews/",
    "s3a://gold-data/datasets/reviews"
)

print(f"Processed {text_count} text samples")
print(f"Split counts: {split_counts}")

Feature Store Implementation

Building a Production Feature Store

A feature store serves as the central repository for ML features, enabling feature reuse across models, maintaining feature consistency between training and inference, and providing feature lineage tracking. MinIO provides the ideal storage backend for feature stores with its high-performance object storage and S3 compatibility.

import boto3
import json
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import hashlib
import pickle

class MinIOFeatureStore:
    def __init__(self, endpoint_url, access_key, secret_key):
        self.s3 = boto3.client(
            's3',
            endpoint_url=endpoint_url,
            aws_access_key_id=access_key,
            aws_secret_access_key=secret_key
        )
        self.bucket = 'feature-store'
        self.metadata_bucket = 'feature-metadata'
        self._ensure_buckets()
    
    def _ensure_buckets(self):
        """Ensure feature store buckets exist"""
        for bucket in [self.bucket, self.metadata_bucket]:
            try:
                self.s3.create_bucket(Bucket=bucket)
            except:
                pass
    
    def register_feature_group(self, group_name, schema, description=""):
        """Register a new feature group with schema"""
        feature_group = {
            'name': group_name,
            'schema': schema,
            'description': description,
            'created_at': datetime.now().isoformat(),
            'version': '1.0',
            'feature_count': len(schema),
            'data_types': {col: str(dtype) for col, dtype in schema.items()}
        }
        
        key = f"feature_groups/{group_name}/metadata.json"
        self.s3.put_object(
            Bucket=self.metadata_bucket,
            Key=key,
            Body=json.dumps(feature_group, indent=2),
            ContentType='application/json'
        )
        
        return feature_group
    
    def compute_and_store_features(self, group_name, entity_id, 
                                 features_dict, timestamp=None):
        """Compute and store features for an entity"""
        if timestamp is None:
            timestamp = datetime.now()
        
        # Add metadata to features
        feature_record = {
            'entity_id': entity_id,
            'features': features_dict,
            'timestamp': timestamp.isoformat(),
            'computed_at': datetime.now().isoformat(),
            'feature_group': group_name,
            'version': '1.0'
        }
        
        # Create time-partitioned key
        date_partition = timestamp.strftime('%Y/%m/%d')
        hour_partition = timestamp.strftime('%H')
        key = f"features/{group_name}/date={date_partition}/hour={hour_partition}/{entity_id}.json"
        
        self.s3.put_object(
            Bucket=self.bucket,
            Key=key,
            Body=json.dumps(feature_record, indent=2),
            ContentType='application/json'
        )
        
        return key
    
    def batch_compute_features(self, group_name, entities_features, timestamp=None):
        """Batch compute and store features for multiple entities"""
        if timestamp is None:
            timestamp = datetime.now()
        
        # Convert to DataFrame for efficient processing
        df = pd.DataFrame(entities_features)
        
        # Create Parquet file for batch storage
        date_partition = timestamp.strftime('%Y/%m/%d')
        hour_partition = timestamp.strftime('%H')
        batch_id = hashlib.md5(str(timestamp).encode()).hexdigest()[:8]
        
        key = f"features/{group_name}/date={date_partition}/hour={hour_partition}/batch_{batch_id}.parquet"
        
        # Add metadata columns
        df['timestamp'] = timestamp.isoformat()
        df['computed_at'] = datetime.now().isoformat()
        df['feature_group'] = group_name
        df['version'] = '1.0'
        
        # Save as Parquet
        parquet_buffer = df.to_parquet(index=False)
        self.s3.put_object(
            Bucket=self.bucket,
            Key=key,
            Body=parquet_buffer,
            ContentType='application/octet-stream'
        )
        
        return key
    
    def get_latest_features(self, group_name, entity_id, max_age_hours=24):
        """Retrieve latest features for an entity"""
        # Search recent partitions
        end_time = datetime.now()
        start_time = end_time - timedelta(hours=max_age_hours)
        
        # Generate partition prefixes to search
        current_time = start_time
        prefixes = []
        while current_time <= end_time:
            date_partition = current_time.strftime('%Y/%m/%d')
            hour_partition = current_time.strftime('%H')
            prefix = f"features/{group_name}/date={date_partition}/hour={hour_partition}/"
            prefixes.append(prefix)
            current_time += timedelta(hours=1)
        
        # Search for entity features
        latest_features = None
        latest_timestamp = None
        
        for prefix in reversed(prefixes):  # Start with most recent
            try:
                response = self.s3.list_objects_v2(
                    Bucket=self.bucket,
                    Prefix=prefix
                )
                
                for obj in response.get('Contents', []):
                    key = obj['Key']
                    
                    # Check if this is our entity (for JSON files)
                    if key.endswith(f"{entity_id}.json"):
                        obj_response = self.s3.get_object(Bucket=self.bucket, Key=key)
                        feature_record = json.loads(obj_response['Body'].read())
                        
                        record_timestamp = datetime.fromisoformat(feature_record['timestamp'])
                        if latest_timestamp is None or record_timestamp > latest_timestamp:
                            latest_features = feature_record['features']
                            latest_timestamp = record_timestamp
                    
                    # Check batch Parquet files
                    elif key.endswith('.parquet'):
                        # Download and search Parquet file
                        local_path = f"/tmp/{os.path.basename(key)}"
                        self.s3.download_file(self.bucket, key, local_path)
                        
                        df = pd.read_parquet(local_path)
                        entity_rows = df[df['entity_id'] == entity_id]
                        
                        if not entity_rows.empty:
                            # Get most recent row for this entity
                            latest_row = entity_rows.loc[entity_rows['timestamp'].idxmax()]
                            row_timestamp = datetime.fromisoformat(latest_row['timestamp'])
                            
                            if latest_timestamp is None or row_timestamp > latest_timestamp:
                                # Extract features (exclude metadata columns)
                                feature_cols = [col for col in df.columns 
                                              if col not in ['entity_id', 'timestamp', 'computed_at', 
                                                           'feature_group', 'version']]
                                latest_features = latest_row[feature_cols].to_dict()
                                latest_timestamp = row_timestamp
                        
                        os.remove(local_path)
                        
            except Exception as e:
                print(f"Error searching prefix {prefix}: {e}")
                continue
        
        return latest_features, latest_timestamp
    
    def get_historical_features(self, group_name, entity_ids, 
                              start_time, end_time, point_in_time=True):
        """Retrieve historical features for training"""
        # Generate time range partitions
        current_time = start_time
        partitions = []
        while current_time <= end_time:
            date_partition = current_time.strftime('%Y/%m/%d')
            partitions.append(f"features/{group_name}/date={date_partition}/")
            current_time += timedelta(days=1)
        
        all_features = []
        
        for partition in partitions:
            try:
                response = self.s3.list_objects_v2(
                    Bucket=self.bucket,
                    Prefix=partition
                )
                
                for obj in response.get('Contents', []):
                    key = obj['Key']
                    
                    if key.endswith('.parquet'):
                        # Process Parquet files
                        local_path = f"/tmp/{os.path.basename(key)}"
                        self.s3.download_file(self.bucket, key, local_path)
                        
                        df = pd.read_parquet(local_path)
                        
                        # Filter by entity IDs and time range
                        filtered_df = df[
                            (df['entity_id'].isin(entity_ids)) &
                            (pd.to_datetime(df['timestamp']) >= start_time) &
                            (pd.to_datetime(df['timestamp']) <= end_time)
                        ]
                        
                        if not filtered_df.empty:
                            all_features.append(filtered_df)
                        
                        os.remove(local_path)
                        
            except Exception as e:
                print(f"Error processing partition {partition}: {e}")
                continue
        
        if all_features:
            combined_df = pd.concat(all_features, ignore_index=True)
            
            if point_in_time:
                # For point-in-time correctness, get latest features before each timestamp
                combined_df = combined_df.sort_values(['entity_id', 'timestamp'])
                return combined_df.groupby('entity_id').last().reset_index()
            else:
                return combined_df
        
        return pd.DataFrame()

# Example: User behavior features
def compute_user_features(user_id, user_data, transaction_data):
    """Compute user behavior features"""
    features = {}
    
    # Basic user features
    features['user_age'] = user_data.get('age', 0)
    features['user_tenure_days'] = (datetime.now() - user_data['signup_date']).days
    features['user_tier'] = user_data.get('tier', 'basic')
    
    # Transaction-based features
    if transaction_data:
        features['total_transactions'] = len(transaction_data)
        features['total_amount'] = sum(t['amount'] for t in transaction_data)
        features['avg_transaction_amount'] = features['total_amount'] / features['total_transactions']
        features['days_since_last_transaction'] = (
            datetime.now() - max(t['timestamp'] for t in transaction_data)
        ).days
        
        # Category preferences
        categories = [t['category'] for t in transaction_data]
        category_counts = pd.Series(categories).value_counts()
        features['top_category'] = category_counts.index[0] if not category_counts.empty else 'unknown'
        features['category_diversity'] = len(category_counts)
    else:
        features.update({
            'total_transactions': 0,
            'total_amount': 0.0,
            'avg_transaction_amount': 0.0,
            'days_since_last_transaction': 999,
            'top_category': 'unknown',
            'category_diversity': 0
        })
    
    return features

# Usage example
feature_store = MinIOFeatureStore(
    'http://minio:9000', 'minioadmin', 'minioadmin'
)

# Register feature group
user_schema = {
    'user_age': int,
    'user_tenure_days': int,
    'user_tier': str,
    'total_transactions': int,
    'total_amount': float,
    'avg_transaction_amount': float,
    'days_since_last_transaction': int,
    'top_category': str,
    'category_diversity': int
}

feature_store.register_feature_group(
    'user_behavior',
    user_schema,
    'User behavior and transaction features'
)

# Compute and store features
user_data = {'age': 28, 'signup_date': datetime(2024, 1, 1), 'tier': 'premium'}
transaction_data = [
    {'amount': 50.0, 'category': 'food', 'timestamp': datetime(2026, 1, 1)},
    {'amount': 200.0, 'category': 'electronics', 'timestamp': datetime(2026, 1, 15)}
]

features = compute_user_features('user_123', user_data, transaction_data)
feature_store.compute_and_store_features('user_behavior', 'user_123', features)

# Retrieve latest features
latest_features, timestamp = feature_store.get_latest_features('user_behavior', 'user_123')
print(f"Latest features for user_123: {latest_features}")

Feature Engineering Pipeline

class FeatureEngineeringPipeline:
    def __init__(self, feature_store):
        self.feature_store = feature_store
        self.transformers = {}
    
    def register_transformer(self, name, transform_func, dependencies=None):
        """Register a feature transformation function"""
        self.transformers[name] = {
            'func': transform_func,
            'dependencies': dependencies or []
        }
    
    def compute_derived_features(self, base_features):
        """Compute derived features from base features"""
        derived_features = base_features.copy()
        
        # Execute transformers in dependency order
        executed = set()
        while len(executed) < len(self.transformers):
            for name, transformer in self.transformers.items():
                if name in executed:
                    continue
                
                # Check if dependencies are satisfied
                deps_satisfied = all(dep in executed for dep in transformer['dependencies'])
                if deps_satisfied:
                    try:
                        new_features = transformer['func'](derived_features)
                        derived_features.update(new_features)
                        executed.add(name)
                    except Exception as e:
                        print(f"Error executing transformer {name}: {e}")
                        executed.add(name)  # Skip this transformer
        
        return derived_features

# Example transformers
def compute_ratio_features(features):
    """Compute ratio-based features"""
    ratios = {}
    
    if features.get('total_transactions', 0) > 0:
        ratios['amount_per_transaction'] = (
            features['total_amount'] / features['total_transactions']
        )
    
    if features.get('user_tenure_days', 0) > 0:
        ratios['transactions_per_day'] = (
            features['total_transactions'] / features['user_tenure_days']
        )
    
    return ratios

def compute_categorical_features(features):
    """Compute categorical encodings"""
    categorical = {}
    
    # One-hot encode user tier
    tiers = ['basic', 'premium', 'enterprise']
    for tier in tiers:
        categorical[f'tier_{tier}'] = 1 if features.get('user_tier') == tier else 0
    
    # Encode top category
    categories = ['food', 'electronics', 'clothing', 'books', 'other']
    for category in categories:
        categorical[f'top_category_{category}'] = (
            1 if features.get('top_category') == category else 0
        )
    
    return categorical

# Setup pipeline
pipeline = FeatureEngineeringPipeline(feature_store)
pipeline.register_transformer('ratios', compute_ratio_features)
pipeline.register_transformer('categorical', compute_categorical_features, ['ratios'])

# Compute enhanced features
base_features = {'user_age': 28, 'total_transactions': 10, 'total_amount': 500.0}
enhanced_features = pipeline.compute_derived_features(base_features)
print(f"Enhanced features: {enhanced_features}")

Model Storage and Registry

Production Model Registry

A robust model registry manages the complete lifecycle of ML models, from training artifacts to production deployments. MinIO provides the storage foundation for model artifacts, metadata, and experiment tracking.

import boto3
import json
import torch
import pickle
import joblib
from datetime import datetime
from typing import Dict, Any, Optional, List
import hashlib
import os

class MLModelRegistry:
    def __init__(self, endpoint_url, access_key, secret_key):
        self.s3 = boto3.client(
            's3',
            endpoint_url=endpoint_url,
            aws_access_key_id=access_key,
            aws_secret_access_key=secret_key
        )
        self.models_bucket = 'model-registry'
        self.experiments_bucket = 'experiment-tracking'
        self._ensure_buckets()
    
    def _ensure_buckets(self):
        """Ensure model registry buckets exist"""
        for bucket in [self.models_bucket, self.experiments_bucket]:
            try:
                self.s3.create_bucket(Bucket=bucket)
            except:
                pass
    
    def register_model(self, model_name: str, version: str, 
                      model_artifacts: Dict[str, Any], 
                      metadata: Dict[str, Any]) -> str:
        """Register a new model version with artifacts and metadata"""
        
        model_id = f"{model_name}_{version}_{int(datetime.now().timestamp())}"
        model_path = f"models/{model_name}/{version}"
        
        # Store model artifacts
        artifact_paths = {}
        for artifact_name, artifact_data in model_artifacts.items():
            artifact_key = f"{model_path}/artifacts/{artifact_name}"
            
            if isinstance(artifact_data, str) and os.path.exists(artifact_data):
                # Upload file
                self.s3.upload_file(artifact_data, self.models_bucket, artifact_key)
            else:
                # Serialize and upload object
                if artifact_name.endswith('.pt') or artifact_name.endswith('.pth'):
                    # PyTorch model
                    with open(f'/tmp/{artifact_name}', 'wb') as f:
                        torch.save(artifact_data, f)
                    self.s3.upload_file(f'/tmp/{artifact_name}', self.models_bucket, artifact_key)
                    os.remove(f'/tmp/{artifact_name}')
                elif artifact_name.endswith('.pkl'):
                    # Pickle object
                    with open(f'/tmp/{artifact_name}', 'wb') as f:
                        pickle.dump(artifact_data, f)
                    self.s3.upload_file(f'/tmp/{artifact_name}', self.models_bucket, artifact_key)
                    os.remove(f'/tmp/{artifact_name}')
                elif artifact_name.endswith('.joblib'):
                    # Joblib object
                    joblib.dump(artifact_data, f'/tmp/{artifact_name}')
                    self.s3.upload_file(f'/tmp/{artifact_name}', self.models_bucket, artifact_key)
                    os.remove(f'/tmp/{artifact_name}')
            
            artifact_paths[artifact_name] = f"s3://{self.models_bucket}/{artifact_key}"
        
        # Create model metadata
        model_metadata = {
            'model_id': model_id,
            'name': model_name,
            'version': version,
            'created_at': datetime.now().isoformat(),
            'artifacts': artifact_paths,
            'metadata': metadata,
            'status': 'registered',
            'tags': metadata.get('tags', []),
            'framework': metadata.get('framework', 'unknown'),
            'model_type': metadata.get('model_type', 'unknown'),
            'metrics': metadata.get('metrics', {}),
            'parameters': metadata.get('parameters', {}),
            'training_data': metadata.get('training_data', {}),
            'dependencies': metadata.get('dependencies', [])
        }
        
        # Store metadata
        metadata_key = f"{model_path}/metadata.json"
        self.s3.put_object(
            Bucket=self.models_bucket,
            Key=metadata_key,
            Body=json.dumps(model_metadata, indent=2),
            ContentType='application/json'
        )
        
        return model_id
    
    def load_model(self, model_name: str, version: str = 'latest') -> Dict[str, Any]:
        """Load model artifacts and metadata"""
        if version == 'latest':
            version = self.get_latest_version(model_name)
        
        model_path = f"models/{model_name}/{version}"
        
        # Load metadata
        metadata_key = f"{model_path}/metadata.json"
        try:
            metadata_obj = self.s3.get_object(Bucket=self.models_bucket, Key=metadata_key)
            metadata = json.loads(metadata_obj['Body'].read())
        except Exception as e:
            raise ValueError(f"Model {model_name} version {version} not found: {e}")
        
        # Load artifacts
        artifacts = {}
        for artifact_name, artifact_path in metadata['artifacts'].items():
            artifact_key = artifact_path.replace(f"s3://{self.models_bucket}/", "")
            local_path = f"/tmp/{artifact_name}"
            
            self.s3.download_file(self.models_bucket, artifact_key, local_path)
            
            # Load based on file extension
            if artifact_name.endswith('.pt') or artifact_name.endswith('.pth'):
                artifacts[artifact_name] = torch.load(local_path, map_location='cpu')
            elif artifact_name.endswith('.pkl'):
                with open(local_path, 'rb') as f:
                    artifacts[artifact_name] = pickle.load(f)
            elif artifact_name.endswith('.joblib'):
                artifacts[artifact_name] = joblib.load(local_path)
            else:
                # Keep as file path for other formats
                artifacts[artifact_name] = local_path
        
        return {
            'metadata': metadata,
            'artifacts': artifacts
        }
    
    def list_models(self, name_filter: Optional[str] = None) -> List[Dict[str, Any]]:
        """List all registered models"""
        models = []
        
        try:
            response = self.s3.list_objects_v2(
                Bucket=self.models_bucket,
                Prefix='models/',
                Delimiter='/'
            )
            
            # Get model names from common prefixes
            for prefix in response.get('CommonPrefixes', []):
                model_name = prefix['Prefix'].split('/')[-2]
                
                if name_filter and name_filter not in model_name:
                    continue
                
                # Get versions for this model
                versions = self.list_model_versions(model_name)
                models.append({
                    'name': model_name,
                    'versions': versions,
                    'latest_version': max(versions) if versions else None
                })
        
        except Exception as e:
            print(f"Error listing models: {e}")
        
        return models
    
    def list_model_versions(self, model_name: str) -> List[str]:
        """List all versions of a specific model"""
        versions = []
        
        try:
            response = self.s3.list_objects_v2(
                Bucket=self.models_bucket,
                Prefix=f'models/{model_name}/',
                Delimiter='/'
            )
            
            for prefix in response.get('CommonPrefixes', []):
                version = prefix['Prefix'].split('/')[-2]
                versions.append(version)
        
        except Exception as e:
            print(f"Error listing versions for {model_name}: {e}")
        
        return sorted(versions)
    
    def get_latest_version(self, model_name: str) -> str:
        """Get the latest version of a model"""
        versions = self.list_model_versions(model_name)
        if not versions:
            raise ValueError(f"No versions found for model {model_name}")
        
        # Sort versions (assuming semantic versioning)
        try:
            sorted_versions = sorted(versions, key=lambda v: [int(x) for x in v.split('.')])
            return sorted_versions[-1]
        except:
            # Fallback to string sorting
            return sorted(versions)[-1]
    
    def promote_model(self, model_name: str, version: str, stage: str) -> bool:
        """Promote model to a specific stage (staging, production, etc.)"""
        model_path = f"models/{model_name}/{version}"
        metadata_key = f"{model_path}/metadata.json"
        
        try:
            # Load current metadata
            metadata_obj = self.s3.get_object(Bucket=self.models_bucket, Key=metadata_key)
            metadata = json.loads(metadata_obj['Body'].read())
            
            # Update stage
            metadata['stage'] = stage
            metadata['promoted_at'] = datetime.now().isoformat()
            
            # Save updated metadata
            self.s3.put_object(
                Bucket=self.models_bucket,
                Key=metadata_key,
                Body=json.dumps(metadata, indent=2),
                ContentType='application/json'
            )
            
            return True
        
        except Exception as e:
            print(f"Error promoting model: {e}")
            return False

# Advanced model checkpoint management
class ModelCheckpointManager:
    def __init__(self, s3_client, bucket='model-checkpoints'):
        self.s3 = s3_client
        self.bucket = bucket
    
    def save_checkpoint(self, model, optimizer, epoch, metrics, 
                       experiment_id, checkpoint_name=None):
        """Save training checkpoint with comprehensive metadata"""
        if checkpoint_name is None:
            checkpoint_name = f"checkpoint_epoch_{epoch}"
        
        # Create checkpoint data
        checkpoint_data = {
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'metrics': metrics,
            'timestamp': datetime.now().isoformat(),
            'experiment_id': experiment_id
        }
        
        # Save checkpoint
        checkpoint_path = f"/tmp/{checkpoint_name}.pt"
        torch.save(checkpoint_data, checkpoint_path)
        
        # Upload to MinIO
        key = f"experiments/{experiment_id}/checkpoints/{checkpoint_name}.pt"
        self.s3.upload_file(checkpoint_path, self.bucket, key)
        
        # Save checkpoint metadata
        metadata = {
            'checkpoint_name': checkpoint_name,
            'experiment_id': experiment_id,
            'epoch': epoch,
            'metrics': metrics,
            'timestamp': datetime.now().isoformat(),
            'model_architecture': str(model),
            'optimizer_config': str(optimizer),
            's3_path': f"s3://{self.bucket}/{key}"
        }
        
        metadata_key = f"experiments/{experiment_id}/checkpoints/{checkpoint_name}_metadata.json"
        self.s3.put_object(
            Bucket=self.bucket,
            Key=metadata_key,
            Body=json.dumps(metadata, indent=2),
            ContentType='application/json'
        )
        
        # Cleanup local file
        os.remove(checkpoint_path)
        
        return key
    
    def load_checkpoint(self, experiment_id, checkpoint_name):
        """Load checkpoint from MinIO"""
        key = f"experiments/{experiment_id}/checkpoints/{checkpoint_name}.pt"
        local_path = f"/tmp/{checkpoint_name}.pt"
        
        self.s3.download_file(self.bucket, key, local_path)
        checkpoint = torch.load(local_path, map_location='cpu')
        
        os.remove(local_path)
        return checkpoint
    
    def list_checkpoints(self, experiment_id):
        """List all checkpoints for an experiment"""
        prefix = f"experiments/{experiment_id}/checkpoints/"
        
        response = self.s3.list_objects_v2(
            Bucket=self.bucket,
            Prefix=prefix
        )
        
        checkpoints = []
        for obj in response.get('Contents', []):
            if obj['Key'].endswith('.pt'):
                checkpoint_name = os.path.basename(obj['Key']).replace('.pt', '')
                checkpoints.append({
                    'name': checkpoint_name,
                    'key': obj['Key'],
                    'size': obj['Size'],
                    'last_modified': obj['LastModified']
                })
        
        return sorted(checkpoints, key=lambda x: x['last_modified'], reverse=True)

# Usage examples
registry = MLModelRegistry('http://minio:9000', 'minioadmin', 'minioadmin')

# Register a PyTorch model
model = torch.nn.Linear(10, 1)
optimizer = torch.optim.Adam(model.parameters())

model_artifacts = {
    'model.pt': model.state_dict(),
    'optimizer.pt': optimizer.state_dict(),
    'preprocessing.pkl': {'scaler': 'StandardScaler', 'params': {}}
}

metadata = {
    'framework': 'pytorch',
    'model_type': 'linear_regression',
    'metrics': {'mse': 0.05, 'r2': 0.95},
    'parameters': {'learning_rate': 0.001, 'batch_size': 32},
    'training_data': {'dataset': 'housing_prices', 'samples': 10000},
    'tags': ['regression', 'production-ready']
}

model_id = registry.register_model('housing_price_predictor', 'v1.0', model_artifacts, metadata)
print(f"Registered model: {model_id}")

# Load model
loaded_model = registry.load_model('housing_price_predictor', 'v1.0')
print(f"Loaded model metadata: {loaded_model['metadata']['metrics']}")

# List all models
models = registry.list_models()
for model in models:
    print(f"Model: {model['name']}, Versions: {model['versions']}")

Integration with PyTorch and TensorFlow

PyTorch Integration with MinIO

PyTorch’s DataLoader can be seamlessly integrated with MinIO for efficient training data loading. This enables distributed training scenarios where data is stored centrally in MinIO and accessed by multiple training nodes.

import torch
from torch.utils.data import Dataset, DataLoader, IterableDataset
import boto3
from PIL import Image
import io
import json
import pandas as pd
from torchvision import transforms
import numpy as np

class MinIOImageDataset(Dataset):
    def __init__(self, bucket, prefix, s3_client, transform=None, 
                 cache_size=1000, metadata_file=None):
        self.s3 = s3_client
        self.bucket = bucket
        self.prefix = prefix
        self.transform = transform
        self.cache = {}
        self.cache_size = cache_size
        
        # Load image paths and labels
        self.samples = self._load_samples(metadata_file)
    
    def _load_samples(self, metadata_file):
        """Load sample paths and labels from metadata or S3 listing"""
        samples = []
        
        if metadata_file:
            # Load from metadata file
            obj = self.s3.get_object(Bucket=self.bucket, Key=metadata_file)
            metadata = json.loads(obj['Body'].read())
            
            for item in metadata['samples']:
                samples.append({
                    'key': item['path'],
                    'label': item['label'],
                    'metadata': item.get('metadata', {})
                })
        else:
            # List objects from S3
            paginator = self.s3.get_paginator('list_objects_v2')
            
            for page in paginator.paginate(Bucket=self.bucket, Prefix=self.prefix):
                for obj in page.get('Contents', []):
                    key = obj['Key']
                    if key.lower().endswith(('.jpg', '.jpeg', '.png')):
                        # Extract label from path (assuming path structure like class/image.jpg)
                        path_parts = key.split('/')
                        label = path_parts[-2] if len(path_parts) > 1 else 'unknown'
                        
                        samples.append({
                            'key': key,
                            'label': label,
                            'metadata': {}
                        })
        
        return samples
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        sample = self.samples[idx]
        key = sample['key']
        
        # Check cache first
        if key in self.cache:
            image_data = self.cache[key]
        else:
            # Download from MinIO
            try:
                obj = self.s3.get_object(Bucket=self.bucket, Key=key)
                image_data = obj['Body'].read()
                
                # Cache if under limit
                if len(self.cache) < self.cache_size:
                    self.cache[key] = image_data
                    
            except Exception as e:
                print(f"Error loading {key}: {e}")
                # Return a blank image as fallback
                image_data = Image.new('RGB', (224, 224)).tobytes()
        
        # Convert to PIL Image
        try:
            image = Image.open(io.BytesIO(image_data)).convert('RGB')
        except Exception as e:
            print(f"Error processing image {key}: {e}")
            image = Image.new('RGB', (224, 224))
        
        # Apply transforms
        if self.transform:
            image = self.transform(image)
        
        # Convert label to tensor
        if isinstance(sample['label'], str):
            # For classification, you'd have a label mapping
            label_map = getattr(self, 'label_map', {})
            label = label_map.get(sample['label'], 0)
        else:
            label = sample['label']
        
        return image, torch.tensor(label, dtype=torch.long)

class MinIOStreamingDataset(IterableDataset):
    """Streaming dataset for very large datasets that don't fit in memory"""
    
    def __init__(self, bucket, prefix, s3_client, transform=None, 
                 batch_size=32, shuffle_buffer_size=10000):
        self.s3 = s3_client
        self.bucket = bucket
        self.prefix = prefix
        self.transform = transform
        self.batch_size = batch_size
        self.shuffle_buffer_size = shuffle_buffer_size
    
    def __iter__(self):
        # Get worker info for distributed training
        worker_info = torch.utils.data.get_worker_info()
        
        # List all objects
        paginator = self.s3.get_paginator('list_objects_v2')
        all_keys = []
        
        for page in paginator.paginate(Bucket=self.bucket, Prefix=self.prefix):
            for obj in page.get('Contents', []):
                key = obj['Key']
                if key.lower().endswith(('.jpg', '.jpeg', '.png')):
                    all_keys.append(key)
        
        # Distribute keys across workers
        if worker_info is not None:
            per_worker = len(all_keys) // worker_info.num_workers
            start_idx = worker_info.id * per_worker
            end_idx = start_idx + per_worker if worker_info.id < worker_info.num_workers - 1 else len(all_keys)
            keys = all_keys[start_idx:end_idx]
        else:
            keys = all_keys
        
        # Shuffle keys
        np.random.shuffle(keys)
        
        # Stream data
        for key in keys:
            try:
                obj = self.s3.get_object(Bucket=self.bucket, Key=key)
                image_data = obj['Body'].read()
                
                image = Image.open(io.BytesIO(image_data)).convert('RGB')
                
                if self.transform:
                    image = self.transform(image)
                
                # Extract label from path
                label = key.split('/')[-2]
                
                yield image, label
                
            except Exception as e:
                print(f"Error streaming {key}: {e}")
                continue

# PyTorch training loop with MinIO
class PyTorchMinIOTrainer:
    def __init__(self, model, s3_client, bucket):
        self.model = model
        self.s3 = s3_client
        self.bucket = bucket
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.model.to(self.device)
    
    def create_data_loaders(self, train_prefix, val_prefix, batch_size=32, num_workers=4):
        """Create training and validation data loaders"""
        
        # Define transforms
        train_transform = transforms.Compose([
            transforms.Resize((256, 256)),
            transforms.RandomCrop(224),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                               std=[0.229, 0.224, 0.225])
        ])
        
        val_transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                               std=[0.229, 0.224, 0.225])
        ])
        
        # Create datasets
        train_dataset = MinIOImageDataset(
            self.bucket, train_prefix, self.s3, transform=train_transform
        )
        
        val_dataset = MinIOImageDataset(
            self.bucket, val_prefix, self.s3, transform=val_transform
        )
        
        # Create data loaders
        train_loader = DataLoader(
            train_dataset, batch_size=batch_size, shuffle=True, 
            num_workers=num_workers, pin_memory=True
        )
        
        val_loader = DataLoader(
            val_dataset, batch_size=batch_size, shuffle=False, 
            num_workers=num_workers, pin_memory=True
        )
        
        return train_loader, val_loader
    
    def train_epoch(self, train_loader, optimizer, criterion):
        """Train for one epoch"""
        self.model.train()
        total_loss = 0
        correct = 0
        total = 0
        
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(self.device), target.to(self.device)
            
            optimizer.zero_grad()
            output = self.model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
            total += target.size(0)
            
            if batch_idx % 100 == 0:
                print(f'Batch {batch_idx}, Loss: {loss.item():.6f}')
        
        return total_loss / len(train_loader), correct / total
    
    def validate(self, val_loader, criterion):
        """Validate model"""
        self.model.eval()
        val_loss = 0
        correct = 0
        total = 0
        
        with torch.no_grad():
            for data, target in val_loader:
                data, target = data.to(self.device), target.to(self.device)
                output = self.model(data)
                val_loss += criterion(output, target).item()
                
                pred = output.argmax(dim=1, keepdim=True)
                correct += pred.eq(target.view_as(pred)).sum().item()
                total += target.size(0)
        
        return val_loss / len(val_loader), correct / total

# Usage example
s3_client = boto3.client('s3', endpoint_url='http://minio:9000',
                        aws_access_key_id='minioadmin',
                        aws_secret_access_key='minioadmin')

model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=True)
model.fc = torch.nn.Linear(model.fc.in_features, 10)  # 10 classes

trainer = PyTorchMinIOTrainer(model, s3_client, 'training-data')
train_loader, val_loader = trainer.create_data_loaders('train/', 'val/')

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = torch.nn.CrossEntropyLoss()

# Training loop
for epoch in range(10):
    train_loss, train_acc = trainer.train_epoch(train_loader, optimizer, criterion)
    val_loss, val_acc = trainer.validate(val_loader, criterion)
    
    print(f'Epoch {epoch}: Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, '
          f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}')

TensorFlow Integration with MinIO

TensorFlow’s tf.data API provides excellent integration with cloud storage systems like MinIO through its S3-compatible interface.

import tensorflow as tf
import boto3
import json
import numpy as np
from typing import Generator, Tuple

class TensorFlowMinIODataset:
    def __init__(self, s3_client, bucket):
        self.s3 = s3_client
        self.bucket = bucket
    
    def create_image_dataset(self, prefix, batch_size=32, image_size=(224, 224)):
        """Create TensorFlow dataset for images stored in MinIO"""
        
        # Get list of image files
        def get_image_paths():
            paginator = self.s3.get_paginator('list_objects_v2')
            paths = []
            
            for page in paginator.paginate(Bucket=self.bucket, Prefix=prefix):
                for obj in page.get('Contents', []):
                    key = obj['Key']
                    if key.lower().endswith(('.jpg', '.jpeg', '.png')):
                        paths.append(key)
            
            return paths
        
        # Create dataset from paths
        image_paths = get_image_paths()
        
        def load_and_preprocess_image(path):
            """Load image from MinIO and preprocess"""
            # Download image
            obj = self.s3.get_object(Bucket=self.bucket, Key=path.numpy().decode())
            image_data = obj['Body'].read()
            
            # Decode image
            image = tf.image.decode_image(image_data, channels=3)
            image = tf.image.resize(image, image_size)
            image = tf.cast(image, tf.float32) / 255.0
            
            # Extract label from path
            label = path.numpy().decode().split('/')[-2]
            
            return image, label
        
        # Create TensorFlow dataset
        dataset = tf.data.Dataset.from_tensor_slices(image_paths)
        dataset = dataset.map(
            lambda path: tf.py_function(
                load_and_preprocess_image, 
                [path], 
                [tf.float32, tf.string]
            ),
            num_parallel_calls=tf.data.AUTOTUNE
        )
        
        # Batch and prefetch
        dataset = dataset.batch(batch_size)
        dataset = dataset.prefetch(tf.data.AUTOTUNE)
        
        return dataset
    
    def create_text_dataset(self, prefix, batch_size=32, max_length=512):
        """Create TensorFlow dataset for text data"""
        
        def text_generator():
            """Generator function for text data"""
            paginator = self.s3.get_paginator('list_objects_v2')
            
            for page in paginator.paginate(Bucket=self.bucket, Prefix=prefix):
                for obj in page.get('Contents', []):
                    key = obj['Key']
                    if key.endswith('.txt') or key.endswith('.json'):
                        try:
                            obj_data = self.s3.get_object(Bucket=self.bucket, Key=key)
                            content = obj_data['Body'].read().decode('utf-8')
                            
                            if key.endswith('.json'):
                                data = json.loads(content)
                                text = data.get('text', '')
                                label = data.get('label', 0)
                            else:
                                text = content
                                label = 0  # Default label
                            
                            yield text, label
                            
                        except Exception as e:
                            print(f"Error processing {key}: {e}")
                            continue
        
        # Create dataset from generator
        dataset = tf.data.Dataset.from_generator(
            text_generator,
            output_signature=(
                tf.TensorSpec(shape=(), dtype=tf.string),
                tf.TensorSpec(shape=(), dtype=tf.int32)
            )
        )
        
        # Tokenize and pad sequences
        tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=10000)
        
        def tokenize_text(text, label):
            # This is a simplified tokenization - in practice, use a proper tokenizer
            tokens = tf.strings.split(text)
            # Convert to indices (simplified)
            return tokens, label
        
        dataset = dataset.map(tokenize_text)
        dataset = dataset.batch(batch_size)
        dataset = dataset.prefetch(tf.data.AUTOTUNE)
        
        return dataset

class TensorFlowMinIOTrainer:
    def __init__(self, model, s3_client, bucket):
        self.model = model
        self.s3 = s3_client
        self.bucket = bucket
        self.dataset_creator = TensorFlowMinIODataset(s3_client, bucket)
    
    def train_model(self, train_prefix, val_prefix, epochs=10, batch_size=32):
        """Train model with data from MinIO"""
        
        # Create datasets
        train_dataset = self.dataset_creator.create_image_dataset(
            train_prefix, batch_size
        )
        val_dataset = self.dataset_creator.create_image_dataset(
            val_prefix, batch_size
        )
        
        # Compile model
        self.model.compile(
            optimizer='adam',
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )
        
        # Setup callbacks
        callbacks = [
            tf.keras.callbacks.ModelCheckpoint(
                '/tmp/best_model.h5',
                save_best_only=True,
                monitor='val_accuracy'
            ),
            tf.keras.callbacks.EarlyStopping(
                patience=3,
                monitor='val_loss'
            ),
            tf.keras.callbacks.ReduceLROnPlateau(
                factor=0.5,
                patience=2,
                monitor='val_loss'
            )
        ]
        
        # Train model
        history = self.model.fit(
            train_dataset,
            validation_data=val_dataset,
            epochs=epochs,
            callbacks=callbacks
        )
        
        # Save final model to MinIO
        self.model.save('/tmp/final_model.h5')
        self.s3.upload_file(
            '/tmp/final_model.h5',
            self.bucket,
            'models/tensorflow/final_model.h5'
        )
        
        return history
    
    def save_model_artifacts(self, model_name, version):
        """Save TensorFlow model artifacts to MinIO"""
        import tempfile
        import shutil
        
        # Create temporary directory
        with tempfile.TemporaryDirectory() as temp_dir:
            # Save model in SavedModel format
            model_path = f"{temp_dir}/saved_model"
            self.model.save(model_path)
            
            # Create tar archive
            archive_path = f"{temp_dir}/{model_name}_{version}.tar.gz"
            shutil.make_archive(
                archive_path.replace('.tar.gz', ''), 
                'gztar', 
                model_path
            )
            
            # Upload to MinIO
            s3_key = f"models/tensorflow/{model_name}/{version}/model.tar.gz"
            self.s3.upload_file(archive_path, self.bucket, s3_key)
            
            # Save model metadata
            metadata = {
                'name': model_name,
                'version': version,
                'framework': 'tensorflow',
                'format': 'savedmodel',
                'created_at': tf.timestamp().numpy(),
                'model_config': self.model.get_config(),
                'input_shape': [int(dim) for dim in self.model.input_shape],
                'output_shape': [int(dim) for dim in self.model.output_shape]
            }
            
            metadata_key = f"models/tensorflow/{model_name}/{version}/metadata.json"
            self.s3.put_object(
                Bucket=self.bucket,
                Key=metadata_key,
                Body=json.dumps(metadata, indent=2),
                ContentType='application/json'
            )
            
            return s3_key

# Usage example
s3_client = boto3.client('s3', endpoint_url='http://minio:9000',
                        aws_access_key_id='minioadmin',
                        aws_secret_access_key='minioadmin')

# Create a simple CNN model
model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(224, 224, 3)),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(64, 3, activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(64, 3, activation='relu'),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dense(10, activation='softmax')
])

trainer = TensorFlowMinIOTrainer(model, s3_client, 'training-data')

# Train model
history = trainer.train_model('train/', 'val/', epochs=5)

# Save model artifacts
model_path = trainer.save_model_artifacts('image_classifier', 'v1.0')
print(f"Model saved to: {model_path}")

Inference Patterns with MinIO

Real-Time Inference Architecture

Production inference systems require low-latency access to models, features, and cached predictions. MinIO’s high-performance object storage enables efficient inference patterns that scale with demand.

import boto3
import torch
import json
import numpy as np
from datetime import datetime, timedelta
import redis
import hashlib
from typing import Dict, Any, Optional
import asyncio
import aiohttp

class MinIOInferenceEngine:
    def __init__(self, s3_client, model_bucket='model-registry', 
                 cache_bucket='inference-cache'):
        self.s3 = s3_client
        self.model_bucket = model_bucket
        self.cache_bucket = cache_bucket
        self.model_cache = {}
        self.feature_cache = {}
        
        # Redis for hot cache (optional)
        try:
            self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
        except:
            self.redis_client = None
    
    def load_model(self, model_name, version='latest', cache=True):
        """Load model for inference with caching"""
        cache_key = f"{model_name}_{version}"
        
        if cache and cache_key in self.model_cache:
            return self.model_cache[cache_key]
        
        # Download model from MinIO
        model_path = f"models/{model_name}/{version}"
        
        try:
            # Load model metadata
            metadata_key = f"{model_path}/metadata.json"
            metadata_obj = self.s3.get_object(Bucket=self.model_bucket, Key=metadata_key)
            metadata = json.loads(metadata_obj['Body'].read())
            
            # Load model artifacts
            artifacts = {}
            for artifact_name, artifact_path in metadata['artifacts'].items():
                artifact_key = artifact_path.replace(f"s3://{self.model_bucket}/", "")
                local_path = f"/tmp/{artifact_name}"
                
                self.s3.download_file(self.model_bucket, artifact_key, local_path)
                
                if artifact_name.endswith('.pt'):
                    artifacts[artifact_name] = torch.load(local_path, map_location='cpu')
                elif artifact_name.endswith('.json'):
                    with open(local_path, 'r') as f:
                        artifacts[artifact_name] = json.load(f)
            
            model_data = {
                'metadata': metadata,
                'artifacts': artifacts,
                'loaded_at': datetime.now()
            }
            
            if cache:
                self.model_cache[cache_key] = model_data
            
            return model_data
            
        except Exception as e:
            raise RuntimeError(f"Failed to load model {model_name} v{version}: {e}")
    
    def predict(self, model_name, input_data, version='latest', 
                use_cache=True, cache_ttl=3600):
        """Make prediction with caching"""
        
        # Generate cache key for input
        input_hash = hashlib.md5(str(input_data).encode()).hexdigest()
        cache_key = f"prediction:{model_name}:{version}:{input_hash}"
        
        # Check cache first
        if use_cache:
            cached_result = self._get_cached_prediction(cache_key)
            if cached_result:
                return cached_result
        
        # Load model
        model_data = self.load_model(model_name, version)
        
        # Make prediction
        try:
            prediction = self._run_inference(model_data, input_data)
            
            # Cache result
            if use_cache:
                self._cache_prediction(cache_key, prediction, cache_ttl)
            
            return prediction
            
        except Exception as e:
            raise RuntimeError(f"Prediction failed: {e}")
    
    def _run_inference(self, model_data, input_data):
        """Run actual inference based on model type"""
        framework = model_data['metadata'].get('framework', 'unknown')
        
        if framework == 'pytorch':
            return self._pytorch_inference(model_data, input_data)
        elif framework == 'tensorflow':
            return self._tensorflow_inference(model_data, input_data)
        elif framework == 'sklearn':
            return self._sklearn_inference(model_data, input_data)
        else:
            raise ValueError(f"Unsupported framework: {framework}")
    
    def _pytorch_inference(self, model_data, input_data):
        """PyTorch model inference"""
        model_state = model_data['artifacts']['model.pt']
        
        # Reconstruct model (simplified - in practice, you'd store architecture)
        # This assumes you have a way to reconstruct the model architecture
        model = self._reconstruct_pytorch_model(model_data['metadata'])
        model.load_state_dict(model_state)
        model.eval()
        
        # Convert input to tensor
        if isinstance(input_data, np.ndarray):
            input_tensor = torch.from_numpy(input_data).float()
        elif isinstance(input_data, list):
            input_tensor = torch.tensor(input_data).float()
        else:
            input_tensor = input_data
        
        # Add batch dimension if needed
        if len(input_tensor.shape) == 1:
            input_tensor = input_tensor.unsqueeze(0)
        
        # Run inference
        with torch.no_grad():
            output = model(input_tensor)
            
        return {
            'predictions': output.numpy().tolist(),
            'model_name': model_data['metadata']['name'],
            'version': model_data['metadata']['version'],
            'timestamp': datetime.now().isoformat()
        }
    
    def _get_cached_prediction(self, cache_key):
        """Get prediction from cache"""
        try:
            if self.redis_client:
                cached = self.redis_client.get(cache_key)
                if cached:
                    return json.loads(cached)
            
            # Fallback to MinIO cache
            try:
                obj = self.s3.get_object(Bucket=self.cache_bucket, Key=f"predictions/{cache_key}.json")
                cached_data = json.loads(obj['Body'].read())
                
                # Check if cache is still valid
                cached_time = datetime.fromisoformat(cached_data['cached_at'])
                if datetime.now() - cached_time < timedelta(hours=1):
                    return cached_data['prediction']
                    
            except:
                pass
                
        except Exception as e:
            print(f"Cache retrieval error: {e}")
        
        return None
    
    def _cache_prediction(self, cache_key, prediction, ttl):
        """Cache prediction result"""
        try:
            cache_data = {
                'prediction': prediction,
                'cached_at': datetime.now().isoformat(),
                'ttl': ttl
            }
            
            if self.redis_client:
                self.redis_client.setex(
                    cache_key, 
                    ttl, 
                    json.dumps(cache_data)
                )
            
            # Also cache in MinIO for persistence
            self.s3.put_object(
                Bucket=self.cache_bucket,
                Key=f"predictions/{cache_key}.json",
                Body=json.dumps(cache_data, indent=2),
                ContentType='application/json'
            )
            
        except Exception as e:
            print(f"Cache storage error: {e}")

class BatchInferenceProcessor:
    def __init__(self, s3_client, inference_engine):
        self.s3 = s3_client
        self.inference_engine = inference_engine
    
    def process_batch(self, input_bucket, input_prefix, output_bucket, 
                     output_prefix, model_name, batch_size=100):
        """Process batch inference on data stored in MinIO"""
        
        # List input files
        response = self.s3.list_objects_v2(
            Bucket=input_bucket,
            Prefix=input_prefix
        )
        
        input_files = [obj['Key'] for obj in response.get('Contents', [])]
        
        results = []
        batch_count = 0
        
        for i in range(0, len(input_files), batch_size):
            batch_files = input_files[i:i + batch_size]
            batch_results = []
            
            for file_key in batch_files:
                try:
                    # Download and process file
                    obj = self.s3.get_object(Bucket=input_bucket, Key=file_key)
                    
                    if file_key.endswith('.json'):
                        input_data = json.loads(obj['Body'].read())
                    elif file_key.endswith('.csv'):
                        import pandas as pd
                        df = pd.read_csv(obj['Body'])
                        input_data = df.to_dict('records')
                    else:
                        # Handle other formats
                        input_data = obj['Body'].read()
                    
                    # Make prediction
                    prediction = self.inference_engine.predict(
                        model_name, input_data, use_cache=False
                    )
                    
                    batch_results.append({
                        'input_file': file_key,
                        'prediction': prediction,
                        'processed_at': datetime.now().isoformat()
                    })
                    
                except Exception as e:
                    batch_results.append({
                        'input_file': file_key,
                        'error': str(e),
                        'processed_at': datetime.now().isoformat()
                    })
            
            # Save batch results
            batch_output_key = f"{output_prefix}/batch_{batch_count:04d}.json"
            self.s3.put_object(
                Bucket=output_bucket,
                Key=batch_output_key,
                Body=json.dumps(batch_results, indent=2),
                ContentType='application/json'
            )
            
            results.extend(batch_results)
            batch_count += 1
            
            print(f"Processed batch {batch_count}, files: {len(batch_files)}")
        
        # Save summary
        summary = {
            'total_files': len(input_files),
            'successful_predictions': len([r for r in results if 'prediction' in r]),
            'failed_predictions': len([r for r in results if 'error' in r]),
            'processing_time': datetime.now().isoformat(),
            'model_used': model_name
        }
        
        summary_key = f"{output_prefix}/summary.json"
        self.s3.put_object(
            Bucket=output_bucket,
            Key=summary_key,
            Body=json.dumps(summary, indent=2),
            ContentType='application/json'
        )
        
        return summary

# A/B Testing for Model Inference
class ModelABTester:
    def __init__(self, inference_engine):
        self.inference_engine = inference_engine
        self.test_configs = {}
    
    def setup_ab_test(self, test_name, model_a, model_b, traffic_split=0.5):
        """Setup A/B test between two models"""
        self.test_configs[test_name] = {
            'model_a': model_a,
            'model_b': model_b,
            'traffic_split': traffic_split,
            'results': {'a': [], 'b': []}
        }
    
    def predict_with_ab_test(self, test_name, input_data, user_id=None):
        """Make prediction using A/B test configuration"""
        if test_name not in self.test_configs:
            raise ValueError(f"A/B test {test_name} not configured")
        
        config = self.test_configs[test_name]
        
        # Determine which model to use
        if user_id:
            # Consistent assignment based on user ID
            import hashlib
            hash_val = int(hashlib.md5(str(user_id).encode()).hexdigest(), 16)
            use_model_a = (hash_val % 100) < (config['traffic_split'] * 100)
        else:
            # Random assignment
            import random
            use_model_a = random.random() < config['traffic_split']
        
        model_name = config['model_a'] if use_model_a else config['model_b']
        variant = 'a' if use_model_a else 'b'
        
        # Make prediction
        prediction = self.inference_engine.predict(model_name, input_data)
        
        # Track result
        config['results'][variant].append({
            'prediction': prediction,
            'timestamp': datetime.now().isoformat(),
            'user_id': user_id
        })
        
        return {
            'prediction': prediction,
            'variant': variant,
            'model_used': model_name
        }

# Usage examples
s3_client = boto3.client('s3', endpoint_url='http://minio:9000',
                        aws_access_key_id='minioadmin',
                        aws_secret_access_key='minioadmin')

# Setup inference engine
inference_engine = MinIOInferenceEngine(s3_client)

# Real-time prediction
input_data = [1.2, 3.4, 5.6, 7.8]  # Example feature vector
result = inference_engine.predict('housing_price_predictor', input_data)
print(f"Prediction: {result}")

# Batch processing
batch_processor = BatchInferenceProcessor(s3_client, inference_engine)
summary = batch_processor.process_batch(
    input_bucket='inference-input',
    input_prefix='batch_001/',
    output_bucket='inference-output',
    output_prefix='results/batch_001',
    model_name='housing_price_predictor'
)
print(f"Batch processing summary: {summary}")

# A/B testing
ab_tester = ModelABTester(inference_engine)
ab_tester.setup_ab_test('price_model_test', 'price_model_v1', 'price_model_v2', 0.3)

ab_result = ab_tester.predict_with_ab_test('price_model_test', input_data, user_id='user123')
print(f"A/B test result: {ab_result}")

Vector Database Integration

Storing Embeddings

import boto3
import json
import numpy as np

def store_embeddings(embeddings, metadata, bucket='embeddings'):
    """Store vector embeddings with metadata"""
    s3 = boto3.client('s3',
        endpoint_url='http://minio:9000',
        aws_access_key_id='minioadmin',
        aws_secret_access_key='minioadmin'
    )
    
    for i, (embedding, meta) in enumerate(zip(embeddings, metadata)):
        key = f"vectors/{meta['id']}.json"
        
        data = {
            'id': meta['id'],
            'embedding': embedding.tolist(),
            'metadata': meta,
            'created_at': datetime.now().isoformat()
        }
        
        s3.put_object(
            Bucket=bucket,
            Key=key,
            Body=json.dumps(data),
            ContentType='application/json'
        )

# Store embeddings
embeddings = [np.random.randn(512) for _ in range(100)]
metadata = [{'id': f'item_{i}', 'category': 'test'} for i in range(100)]
store_embeddings(embeddings, metadata)

Complete ML Pipeline

End-to-End Example

class MLPipeline:
    def __init__(self, minio_endpoint, bucket_prefix='ml-pipeline'):
        self.s3 = boto3.client('s3',
            endpoint_url=minio_endpoint,
            aws_access_key_id='minioadmin',
            aws_secret_access_key='minioadmin'
        )
        self.bucket = bucket_prefix
        self._ensure_bucket()
    
    def _ensure_bucket(self):
        try:
            self.s3.create_bucket(Bucket=self.bucket)
        except:
            pass
    
    def ingest(self, source_path):
        """Ingest raw data"""
        # Upload raw data
        pass
    
    def preprocess(self):
        """Clean and transform data"""
        # Process data
        pass
    
    def train(self):
        """Train model"""
        # Training loop
        pass
    
    def evaluate(self):
        """Evaluate model"""
        # Evaluation
        pass
    
    def deploy(self):
        """Deploy to inference"""
        # Save model and metadata
        pass
    
    def run(self):
        """Run complete pipeline"""
        self.ingest()
        self.preprocess()
        self.train()
        self.evaluate()
        self.deploy()

# Run pipeline
pipeline = MLPipeline('http://minio:9000')
pipeline.run()

Conclusion

MinIO provides ideal storage for AI/ML workloads. Key capabilities: high-throughput data ingestion for training, efficient model checkpoint storage, feature store backends, vector embedding storage, and seamless integration with ML frameworks. Using MinIO as your AI data lake simplifies infrastructure while maintaining S3 compatibility.

In the final article, we’ll explore real-world MinIO use cases.

Resources

Comments