Introduction
Artificial intelligence and machine learning workloads demand robust, scalable storage infrastructure that can handle massive datasets, frequent model checkpoints, and real-time inference requirements. Modern AI applications generate and consume data at unprecedented scales - training datasets can reach petabytes, model artifacts require versioning and lineage tracking, and inference systems need millisecond access to features and embeddings.
MinIO emerges as the ideal storage foundation for AI workloads, offering S3-compatible object storage with enterprise-grade performance, security, and scalability. Unlike traditional file systems that struggle with AI’s unique access patterns, MinIO provides high-throughput parallel I/O, seamless cloud integration, and native support for modern data formats like Parquet and Delta Lake.
Key advantages of MinIO for AI include:
- High Performance: Multi-part uploads and parallel processing for large datasets
- S3 Compatibility: Works with existing ML frameworks and tools
- Kubernetes Native: Cloud-native deployment and auto-scaling
- Data Governance: Versioning, encryption, and access controls
- Cost Efficiency: Intelligent tiering and lifecycle management
This comprehensive guide explores building production AI infrastructure with MinIO, covering everything from data lake architecture to real-time inference pipelines.
ML Data Lake Architecture
Designing the Data Lake Foundation
A well-architected ML data lake on MinIO follows a layered approach that separates raw ingestion, processed data, feature engineering, and model artifacts. This separation enables different teams to work independently while maintaining data lineage and governance.
import boto3
import json
from datetime import datetime
from botocore.config import Config
class MLDataLake:
def __init__(self, endpoint_url, access_key, secret_key):
self.s3 = boto3.client(
's3',
endpoint_url=endpoint_url,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
config=Config(
max_pool_connections=100,
retries={'max_attempts': 3}
)
)
self.setup_buckets()
def setup_buckets(self):
"""Initialize data lake bucket structure"""
buckets = {
'raw-data': 'Raw ingested data from various sources',
'bronze-data': 'Cleaned and validated data',
'silver-data': 'Transformed and enriched data',
'gold-data': 'Business-ready aggregated data',
'feature-store': 'Computed features for ML',
'model-registry': 'Trained models and metadata',
'experiment-tracking': 'ML experiment artifacts',
'inference-cache': 'Cached predictions and embeddings'
}
for bucket, description in buckets.items():
try:
self.s3.create_bucket(Bucket=bucket)
# Add bucket metadata
self.s3.put_bucket_tagging(
Bucket=bucket,
Tagging={
'TagSet': [
{'Key': 'Purpose', 'Value': 'ML-DataLake'},
{'Key': 'Description', 'Value': description},
{'Key': 'Environment', 'Value': 'production'}
]
}
)
print(f"Created bucket: {bucket}")
except Exception as e:
print(f"Bucket {bucket} already exists or error: {e}")
# Initialize data lake
data_lake = MLDataLake(
endpoint_url='http://minio:9000',
access_key='minioadmin',
secret_key='minioadmin'
)
Data Organization Strategy
def create_data_structure():
"""Create standardized directory structure for ML projects"""
structure = {
'raw-data': [
'images/year=2026/month=01/day=01/',
'text/source=web/year=2026/month=01/',
'audio/format=wav/sample_rate=44100/',
'video/resolution=1080p/fps=30/',
'structured/format=parquet/schema_version=v1/'
],
'bronze-data': [
'images/processed/year=2026/month=01/',
'text/cleaned/language=en/',
'audio/normalized/duration_sec=30/',
'video/frames/extracted_fps=1/',
'structured/validated/schema_version=v1/'
],
'silver-data': [
'features/image_embeddings/model=resnet50/',
'features/text_embeddings/model=bert/',
'features/audio_mfcc/window_size=25ms/',
'aggregations/daily/metric=engagement/',
'joins/user_content/date=2026-01-01/'
],
'gold-data': [
'datasets/training/split=train/version=v1/',
'datasets/validation/split=val/version=v1/',
'datasets/test/split=test/version=v1/',
'reports/model_performance/date=2026-01-01/',
'dashboards/business_metrics/refresh=daily/'
]
}
return structure
# Example: Partitioned data storage
def store_partitioned_data(df, bucket, base_path, partition_cols):
"""Store DataFrame with partitioning for efficient querying"""
import pandas as pd
for partition_values, group in df.groupby(partition_cols):
# Create partition path
partition_path = base_path
for col, value in zip(partition_cols, partition_values):
partition_path += f"/{col}={value}"
# Save as Parquet
parquet_buffer = group.to_parquet(index=False)
key = f"{partition_path}/data.parquet"
s3.put_object(
Bucket=bucket,
Key=key,
Body=parquet_buffer,
ContentType='application/octet-stream'
)
Data Governance and Lineage
class DataLineageTracker:
def __init__(self, s3_client):
self.s3 = s3_client
self.lineage_bucket = 'data-lineage'
def track_transformation(self, input_paths, output_path,
transformation_code, metadata):
"""Track data transformation lineage"""
lineage_record = {
'timestamp': datetime.now().isoformat(),
'input_paths': input_paths,
'output_path': output_path,
'transformation': {
'code_hash': hash(transformation_code),
'description': metadata.get('description', ''),
'parameters': metadata.get('parameters', {}),
'framework': metadata.get('framework', 'python')
},
'execution_info': {
'duration_seconds': metadata.get('duration', 0),
'memory_usage_mb': metadata.get('memory_usage', 0),
'cpu_cores': metadata.get('cpu_cores', 1)
}
}
# Store lineage record
lineage_key = f"lineage/{output_path.replace('/', '_')}/{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
self.s3.put_object(
Bucket=self.lineage_bucket,
Key=lineage_key,
Body=json.dumps(lineage_record, indent=2),
ContentType='application/json'
)
return lineage_record
# Usage example
tracker = DataLineageTracker(s3)
tracker.track_transformation(
input_paths=['raw-data/images/2026/01/01/'],
output_path='bronze-data/images/processed/2026/01/01/',
transformation_code="resize_and_normalize()",
metadata={
'description': 'Resize images to 224x224 and normalize pixel values',
'parameters': {'target_size': (224, 224), 'normalize': True},
'duration': 120.5,
'memory_usage': 2048
}
)
Training Data Pipelines with MinIO
High-Performance Data Ingestion
Modern ML training requires efficient data ingestion that can saturate GPU utilization. MinIO’s multi-part upload and parallel processing capabilities enable high-throughput data pipelines that scale with your training infrastructure.
import boto3
import os
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from botocore.config import Config
import hashlib
import json
class HighThroughputIngestion:
def __init__(self, endpoint_url, access_key, secret_key, max_workers=20):
self.s3 = boto3.client(
's3',
endpoint_url=endpoint_url,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
config=Config(
max_pool_connections=max_workers,
retries={'max_attempts': 3}
)
)
self.max_workers = max_workers
self.upload_stats = {'success': 0, 'failed': 0, 'bytes': 0}
def upload_with_metadata(self, local_path, bucket, s3_key, metadata=None):
"""Upload file with comprehensive metadata"""
try:
# Calculate file hash for integrity
with open(local_path, 'rb') as f:
file_hash = hashlib.sha256(f.read()).hexdigest()
# Prepare metadata
upload_metadata = {
'sha256': file_hash,
'original_name': os.path.basename(local_path),
'upload_timestamp': str(int(time.time())),
'file_size': str(os.path.getsize(local_path))
}
if metadata:
upload_metadata.update(metadata)
# Upload with metadata
self.s3.upload_file(
local_path, bucket, s3_key,
ExtraArgs={
'Metadata': upload_metadata,
'ContentType': self._get_content_type(local_path)
}
)
self.upload_stats['success'] += 1
self.upload_stats['bytes'] += os.path.getsize(local_path)
return {'status': 'success', 'key': s3_key, 'hash': file_hash}
except Exception as e:
self.upload_stats['failed'] += 1
return {'status': 'failed', 'key': s3_key, 'error': str(e)}
def batch_upload_dataset(self, dataset_path, bucket, prefix,
file_extensions=None, metadata_extractor=None):
"""Upload entire dataset with parallel processing"""
if file_extensions is None:
file_extensions = ['.jpg', '.png', '.txt', '.parquet', '.csv']
# Collect all files to upload
upload_tasks = []
for root, dirs, files in os.walk(dataset_path):
for file in files:
if any(file.lower().endswith(ext) for ext in file_extensions):
local_path = os.path.join(root, file)
relative_path = os.path.relpath(local_path, dataset_path)
s3_key = f"{prefix}/{relative_path}"
# Extract metadata if function provided
metadata = {}
if metadata_extractor:
metadata = metadata_extractor(local_path)
upload_tasks.append((local_path, bucket, s3_key, metadata))
print(f"Starting upload of {len(upload_tasks)} files...")
# Execute parallel uploads
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_task = {
executor.submit(self.upload_with_metadata, *task): task
for task in upload_tasks
}
results = []
for future in as_completed(future_to_task):
result = future.result()
results.append(result)
if len(results) % 100 == 0:
print(f"Uploaded {len(results)}/{len(upload_tasks)} files")
return results
def _get_content_type(self, file_path):
"""Determine content type based on file extension"""
ext = os.path.splitext(file_path)[1].lower()
content_types = {
'.jpg': 'image/jpeg', '.jpeg': 'image/jpeg',
'.png': 'image/png', '.gif': 'image/gif',
'.txt': 'text/plain', '.csv': 'text/csv',
'.json': 'application/json',
'.parquet': 'application/octet-stream',
'.pt': 'application/octet-stream',
'.pkl': 'application/octet-stream'
}
return content_types.get(ext, 'application/octet-stream')
# Example: Upload ImageNet-style dataset
def extract_image_metadata(image_path):
"""Extract metadata from image files"""
from PIL import Image
try:
with Image.open(image_path) as img:
return {
'width': str(img.width),
'height': str(img.height),
'format': img.format,
'mode': img.mode,
'class_label': os.path.basename(os.path.dirname(image_path))
}
except Exception:
return {}
# Usage
ingestion = HighThroughputIngestion(
'http://minio:9000', 'minioadmin', 'minioadmin'
)
results = ingestion.batch_upload_dataset(
dataset_path='./imagenet_subset',
bucket='training-data',
prefix='imagenet/train',
file_extensions=['.jpg', '.jpeg'],
metadata_extractor=extract_image_metadata
)
print(f"Upload complete: {ingestion.upload_stats}")
Data Validation and Quality Checks
import pandas as pd
import numpy as np
from typing import Dict, List, Any
class DataQualityValidator:
def __init__(self, s3_client):
self.s3 = s3_client
self.validation_results = []
def validate_dataset(self, bucket, prefix, validation_rules):
"""Validate dataset quality across multiple files"""
# List all files in dataset
response = self.s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
validation_summary = {
'total_files': 0,
'valid_files': 0,
'invalid_files': 0,
'validation_errors': []
}
for obj in response.get('Contents', []):
key = obj['Key']
validation_summary['total_files'] += 1
try:
# Download and validate file
local_path = f"/tmp/{os.path.basename(key)}"
self.s3.download_file(bucket, key, local_path)
is_valid, errors = self._validate_file(local_path, validation_rules)
if is_valid:
validation_summary['valid_files'] += 1
else:
validation_summary['invalid_files'] += 1
validation_summary['validation_errors'].extend([
{'file': key, 'errors': errors}
])
# Cleanup
os.remove(local_path)
except Exception as e:
validation_summary['invalid_files'] += 1
validation_summary['validation_errors'].append({
'file': key, 'errors': [f"Processing error: {str(e)}"]
})
return validation_summary
def _validate_file(self, file_path, rules):
"""Validate individual file against rules"""
errors = []
try:
if file_path.endswith('.parquet'):
df = pd.read_parquet(file_path)
errors.extend(self._validate_dataframe(df, rules))
elif file_path.endswith(('.jpg', '.png')):
errors.extend(self._validate_image(file_path, rules))
elif file_path.endswith('.csv'):
df = pd.read_csv(file_path)
errors.extend(self._validate_dataframe(df, rules))
except Exception as e:
errors.append(f"File format error: {str(e)}")
return len(errors) == 0, errors
def _validate_dataframe(self, df, rules):
"""Validate DataFrame against rules"""
errors = []
# Check required columns
if 'required_columns' in rules:
missing_cols = set(rules['required_columns']) - set(df.columns)
if missing_cols:
errors.append(f"Missing columns: {missing_cols}")
# Check data types
if 'column_types' in rules:
for col, expected_type in rules['column_types'].items():
if col in df.columns and df[col].dtype != expected_type:
errors.append(f"Column {col} has type {df[col].dtype}, expected {expected_type}")
# Check for null values
if 'no_nulls' in rules:
for col in rules['no_nulls']:
if col in df.columns and df[col].isnull().any():
errors.append(f"Column {col} contains null values")
# Check value ranges
if 'value_ranges' in rules:
for col, (min_val, max_val) in rules['value_ranges'].items():
if col in df.columns:
if df[col].min() < min_val or df[col].max() > max_val:
errors.append(f"Column {col} values outside range [{min_val}, {max_val}]")
return errors
def _validate_image(self, image_path, rules):
"""Validate image file against rules"""
errors = []
try:
from PIL import Image
with Image.open(image_path) as img:
# Check dimensions
if 'min_dimensions' in rules:
min_w, min_h = rules['min_dimensions']
if img.width < min_w or img.height < min_h:
errors.append(f"Image too small: {img.width}x{img.height}")
# Check format
if 'allowed_formats' in rules:
if img.format not in rules['allowed_formats']:
errors.append(f"Invalid format: {img.format}")
# Check file size
if 'max_file_size_mb' in rules:
file_size_mb = os.path.getsize(image_path) / (1024 * 1024)
if file_size_mb > rules['max_file_size_mb']:
errors.append(f"File too large: {file_size_mb:.2f}MB")
except Exception as e:
errors.append(f"Image validation error: {str(e)}")
return errors
# Example validation rules
validation_rules = {
'required_columns': ['image_path', 'label', 'split'],
'column_types': {'label': 'int64', 'split': 'object'},
'no_nulls': ['image_path', 'label'],
'value_ranges': {'label': (0, 999)},
'min_dimensions': (224, 224),
'allowed_formats': ['JPEG', 'PNG'],
'max_file_size_mb': 10
}
# Run validation
validator = DataQualityValidator(s3)
results = validator.validate_dataset(
bucket='training-data',
prefix='imagenet/train',
validation_rules=validation_rules
)
print(f"Validation Results: {results}")
Distributed Data Processing with Apache Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, regexp_replace, lower
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
def create_spark_session(minio_endpoint, access_key, secret_key):
"""Create Spark session configured for MinIO"""
return SparkSession.builder \
.appName("ML Data Processing Pipeline") \
.config("spark.hadoop.fs.s3a.endpoint", minio_endpoint) \
.config("spark.hadoop.fs.s3a.access.key", access_key) \
.config("spark.hadoop.fs.s3a.secret.key", secret_key) \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.getOrCreate()
class SparkMLPipeline:
def __init__(self, spark_session):
self.spark = spark_session
def process_text_data(self, input_path, output_path):
"""Process text data for NLP training"""
# Read raw text data
df = self.spark.read.text(input_path)
# Clean and preprocess
processed_df = df.select(
# Remove special characters and normalize
regexp_replace(lower(col("value")), r'[^\w\s]', '').alias("clean_text")
).filter(
# Filter out empty or very short texts
col("clean_text").isNotNull() & (length(col("clean_text")) > 10)
)
# Add metadata
processed_df = processed_df.withColumn("processed_timestamp", current_timestamp())
# Write processed data
processed_df.write \
.mode("overwrite") \
.parquet(output_path)
return processed_df.count()
def create_training_splits(self, input_path, output_base_path,
train_ratio=0.8, val_ratio=0.1, test_ratio=0.1):
"""Create train/validation/test splits"""
df = self.spark.read.parquet(input_path)
# Add random column for splitting
df_with_rand = df.withColumn("rand", rand())
# Create splits
train_df = df_with_rand.filter(col("rand") < train_ratio)
val_df = df_with_rand.filter(
(col("rand") >= train_ratio) &
(col("rand") < train_ratio + val_ratio)
)
test_df = df_with_rand.filter(col("rand") >= train_ratio + val_ratio)
# Remove random column and save
for split_name, split_df in [("train", train_df), ("val", val_df), ("test", test_df)]:
split_df.drop("rand").write \
.mode("overwrite") \
.parquet(f"{output_base_path}/{split_name}")
return {
"train_count": train_df.count(),
"val_count": val_df.count(),
"test_count": test_df.count()
}
# Usage example
spark = create_spark_session(
"http://minio:9000", "minioadmin", "minioadmin"
)
pipeline = SparkMLPipeline(spark)
# Process text data
text_count = pipeline.process_text_data(
"s3a://raw-data/text/reviews/",
"s3a://bronze-data/text/processed_reviews/"
)
# Create training splits
split_counts = pipeline.create_training_splits(
"s3a://bronze-data/text/processed_reviews/",
"s3a://gold-data/datasets/reviews"
)
print(f"Processed {text_count} text samples")
print(f"Split counts: {split_counts}")
Feature Store Implementation
Building a Production Feature Store
A feature store serves as the central repository for ML features, enabling feature reuse across models, maintaining feature consistency between training and inference, and providing feature lineage tracking. MinIO provides the ideal storage backend for feature stores with its high-performance object storage and S3 compatibility.
import boto3
import json
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import hashlib
import pickle
class MinIOFeatureStore:
def __init__(self, endpoint_url, access_key, secret_key):
self.s3 = boto3.client(
's3',
endpoint_url=endpoint_url,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key
)
self.bucket = 'feature-store'
self.metadata_bucket = 'feature-metadata'
self._ensure_buckets()
def _ensure_buckets(self):
"""Ensure feature store buckets exist"""
for bucket in [self.bucket, self.metadata_bucket]:
try:
self.s3.create_bucket(Bucket=bucket)
except:
pass
def register_feature_group(self, group_name, schema, description=""):
"""Register a new feature group with schema"""
feature_group = {
'name': group_name,
'schema': schema,
'description': description,
'created_at': datetime.now().isoformat(),
'version': '1.0',
'feature_count': len(schema),
'data_types': {col: str(dtype) for col, dtype in schema.items()}
}
key = f"feature_groups/{group_name}/metadata.json"
self.s3.put_object(
Bucket=self.metadata_bucket,
Key=key,
Body=json.dumps(feature_group, indent=2),
ContentType='application/json'
)
return feature_group
def compute_and_store_features(self, group_name, entity_id,
features_dict, timestamp=None):
"""Compute and store features for an entity"""
if timestamp is None:
timestamp = datetime.now()
# Add metadata to features
feature_record = {
'entity_id': entity_id,
'features': features_dict,
'timestamp': timestamp.isoformat(),
'computed_at': datetime.now().isoformat(),
'feature_group': group_name,
'version': '1.0'
}
# Create time-partitioned key
date_partition = timestamp.strftime('%Y/%m/%d')
hour_partition = timestamp.strftime('%H')
key = f"features/{group_name}/date={date_partition}/hour={hour_partition}/{entity_id}.json"
self.s3.put_object(
Bucket=self.bucket,
Key=key,
Body=json.dumps(feature_record, indent=2),
ContentType='application/json'
)
return key
def batch_compute_features(self, group_name, entities_features, timestamp=None):
"""Batch compute and store features for multiple entities"""
if timestamp is None:
timestamp = datetime.now()
# Convert to DataFrame for efficient processing
df = pd.DataFrame(entities_features)
# Create Parquet file for batch storage
date_partition = timestamp.strftime('%Y/%m/%d')
hour_partition = timestamp.strftime('%H')
batch_id = hashlib.md5(str(timestamp).encode()).hexdigest()[:8]
key = f"features/{group_name}/date={date_partition}/hour={hour_partition}/batch_{batch_id}.parquet"
# Add metadata columns
df['timestamp'] = timestamp.isoformat()
df['computed_at'] = datetime.now().isoformat()
df['feature_group'] = group_name
df['version'] = '1.0'
# Save as Parquet
parquet_buffer = df.to_parquet(index=False)
self.s3.put_object(
Bucket=self.bucket,
Key=key,
Body=parquet_buffer,
ContentType='application/octet-stream'
)
return key
def get_latest_features(self, group_name, entity_id, max_age_hours=24):
"""Retrieve latest features for an entity"""
# Search recent partitions
end_time = datetime.now()
start_time = end_time - timedelta(hours=max_age_hours)
# Generate partition prefixes to search
current_time = start_time
prefixes = []
while current_time <= end_time:
date_partition = current_time.strftime('%Y/%m/%d')
hour_partition = current_time.strftime('%H')
prefix = f"features/{group_name}/date={date_partition}/hour={hour_partition}/"
prefixes.append(prefix)
current_time += timedelta(hours=1)
# Search for entity features
latest_features = None
latest_timestamp = None
for prefix in reversed(prefixes): # Start with most recent
try:
response = self.s3.list_objects_v2(
Bucket=self.bucket,
Prefix=prefix
)
for obj in response.get('Contents', []):
key = obj['Key']
# Check if this is our entity (for JSON files)
if key.endswith(f"{entity_id}.json"):
obj_response = self.s3.get_object(Bucket=self.bucket, Key=key)
feature_record = json.loads(obj_response['Body'].read())
record_timestamp = datetime.fromisoformat(feature_record['timestamp'])
if latest_timestamp is None or record_timestamp > latest_timestamp:
latest_features = feature_record['features']
latest_timestamp = record_timestamp
# Check batch Parquet files
elif key.endswith('.parquet'):
# Download and search Parquet file
local_path = f"/tmp/{os.path.basename(key)}"
self.s3.download_file(self.bucket, key, local_path)
df = pd.read_parquet(local_path)
entity_rows = df[df['entity_id'] == entity_id]
if not entity_rows.empty:
# Get most recent row for this entity
latest_row = entity_rows.loc[entity_rows['timestamp'].idxmax()]
row_timestamp = datetime.fromisoformat(latest_row['timestamp'])
if latest_timestamp is None or row_timestamp > latest_timestamp:
# Extract features (exclude metadata columns)
feature_cols = [col for col in df.columns
if col not in ['entity_id', 'timestamp', 'computed_at',
'feature_group', 'version']]
latest_features = latest_row[feature_cols].to_dict()
latest_timestamp = row_timestamp
os.remove(local_path)
except Exception as e:
print(f"Error searching prefix {prefix}: {e}")
continue
return latest_features, latest_timestamp
def get_historical_features(self, group_name, entity_ids,
start_time, end_time, point_in_time=True):
"""Retrieve historical features for training"""
# Generate time range partitions
current_time = start_time
partitions = []
while current_time <= end_time:
date_partition = current_time.strftime('%Y/%m/%d')
partitions.append(f"features/{group_name}/date={date_partition}/")
current_time += timedelta(days=1)
all_features = []
for partition in partitions:
try:
response = self.s3.list_objects_v2(
Bucket=self.bucket,
Prefix=partition
)
for obj in response.get('Contents', []):
key = obj['Key']
if key.endswith('.parquet'):
# Process Parquet files
local_path = f"/tmp/{os.path.basename(key)}"
self.s3.download_file(self.bucket, key, local_path)
df = pd.read_parquet(local_path)
# Filter by entity IDs and time range
filtered_df = df[
(df['entity_id'].isin(entity_ids)) &
(pd.to_datetime(df['timestamp']) >= start_time) &
(pd.to_datetime(df['timestamp']) <= end_time)
]
if not filtered_df.empty:
all_features.append(filtered_df)
os.remove(local_path)
except Exception as e:
print(f"Error processing partition {partition}: {e}")
continue
if all_features:
combined_df = pd.concat(all_features, ignore_index=True)
if point_in_time:
# For point-in-time correctness, get latest features before each timestamp
combined_df = combined_df.sort_values(['entity_id', 'timestamp'])
return combined_df.groupby('entity_id').last().reset_index()
else:
return combined_df
return pd.DataFrame()
# Example: User behavior features
def compute_user_features(user_id, user_data, transaction_data):
"""Compute user behavior features"""
features = {}
# Basic user features
features['user_age'] = user_data.get('age', 0)
features['user_tenure_days'] = (datetime.now() - user_data['signup_date']).days
features['user_tier'] = user_data.get('tier', 'basic')
# Transaction-based features
if transaction_data:
features['total_transactions'] = len(transaction_data)
features['total_amount'] = sum(t['amount'] for t in transaction_data)
features['avg_transaction_amount'] = features['total_amount'] / features['total_transactions']
features['days_since_last_transaction'] = (
datetime.now() - max(t['timestamp'] for t in transaction_data)
).days
# Category preferences
categories = [t['category'] for t in transaction_data]
category_counts = pd.Series(categories).value_counts()
features['top_category'] = category_counts.index[0] if not category_counts.empty else 'unknown'
features['category_diversity'] = len(category_counts)
else:
features.update({
'total_transactions': 0,
'total_amount': 0.0,
'avg_transaction_amount': 0.0,
'days_since_last_transaction': 999,
'top_category': 'unknown',
'category_diversity': 0
})
return features
# Usage example
feature_store = MinIOFeatureStore(
'http://minio:9000', 'minioadmin', 'minioadmin'
)
# Register feature group
user_schema = {
'user_age': int,
'user_tenure_days': int,
'user_tier': str,
'total_transactions': int,
'total_amount': float,
'avg_transaction_amount': float,
'days_since_last_transaction': int,
'top_category': str,
'category_diversity': int
}
feature_store.register_feature_group(
'user_behavior',
user_schema,
'User behavior and transaction features'
)
# Compute and store features
user_data = {'age': 28, 'signup_date': datetime(2024, 1, 1), 'tier': 'premium'}
transaction_data = [
{'amount': 50.0, 'category': 'food', 'timestamp': datetime(2026, 1, 1)},
{'amount': 200.0, 'category': 'electronics', 'timestamp': datetime(2026, 1, 15)}
]
features = compute_user_features('user_123', user_data, transaction_data)
feature_store.compute_and_store_features('user_behavior', 'user_123', features)
# Retrieve latest features
latest_features, timestamp = feature_store.get_latest_features('user_behavior', 'user_123')
print(f"Latest features for user_123: {latest_features}")
Feature Engineering Pipeline
class FeatureEngineeringPipeline:
def __init__(self, feature_store):
self.feature_store = feature_store
self.transformers = {}
def register_transformer(self, name, transform_func, dependencies=None):
"""Register a feature transformation function"""
self.transformers[name] = {
'func': transform_func,
'dependencies': dependencies or []
}
def compute_derived_features(self, base_features):
"""Compute derived features from base features"""
derived_features = base_features.copy()
# Execute transformers in dependency order
executed = set()
while len(executed) < len(self.transformers):
for name, transformer in self.transformers.items():
if name in executed:
continue
# Check if dependencies are satisfied
deps_satisfied = all(dep in executed for dep in transformer['dependencies'])
if deps_satisfied:
try:
new_features = transformer['func'](derived_features)
derived_features.update(new_features)
executed.add(name)
except Exception as e:
print(f"Error executing transformer {name}: {e}")
executed.add(name) # Skip this transformer
return derived_features
# Example transformers
def compute_ratio_features(features):
"""Compute ratio-based features"""
ratios = {}
if features.get('total_transactions', 0) > 0:
ratios['amount_per_transaction'] = (
features['total_amount'] / features['total_transactions']
)
if features.get('user_tenure_days', 0) > 0:
ratios['transactions_per_day'] = (
features['total_transactions'] / features['user_tenure_days']
)
return ratios
def compute_categorical_features(features):
"""Compute categorical encodings"""
categorical = {}
# One-hot encode user tier
tiers = ['basic', 'premium', 'enterprise']
for tier in tiers:
categorical[f'tier_{tier}'] = 1 if features.get('user_tier') == tier else 0
# Encode top category
categories = ['food', 'electronics', 'clothing', 'books', 'other']
for category in categories:
categorical[f'top_category_{category}'] = (
1 if features.get('top_category') == category else 0
)
return categorical
# Setup pipeline
pipeline = FeatureEngineeringPipeline(feature_store)
pipeline.register_transformer('ratios', compute_ratio_features)
pipeline.register_transformer('categorical', compute_categorical_features, ['ratios'])
# Compute enhanced features
base_features = {'user_age': 28, 'total_transactions': 10, 'total_amount': 500.0}
enhanced_features = pipeline.compute_derived_features(base_features)
print(f"Enhanced features: {enhanced_features}")
Model Storage and Registry
Production Model Registry
A robust model registry manages the complete lifecycle of ML models, from training artifacts to production deployments. MinIO provides the storage foundation for model artifacts, metadata, and experiment tracking.
import boto3
import json
import torch
import pickle
import joblib
from datetime import datetime
from typing import Dict, Any, Optional, List
import hashlib
import os
class MLModelRegistry:
def __init__(self, endpoint_url, access_key, secret_key):
self.s3 = boto3.client(
's3',
endpoint_url=endpoint_url,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key
)
self.models_bucket = 'model-registry'
self.experiments_bucket = 'experiment-tracking'
self._ensure_buckets()
def _ensure_buckets(self):
"""Ensure model registry buckets exist"""
for bucket in [self.models_bucket, self.experiments_bucket]:
try:
self.s3.create_bucket(Bucket=bucket)
except:
pass
def register_model(self, model_name: str, version: str,
model_artifacts: Dict[str, Any],
metadata: Dict[str, Any]) -> str:
"""Register a new model version with artifacts and metadata"""
model_id = f"{model_name}_{version}_{int(datetime.now().timestamp())}"
model_path = f"models/{model_name}/{version}"
# Store model artifacts
artifact_paths = {}
for artifact_name, artifact_data in model_artifacts.items():
artifact_key = f"{model_path}/artifacts/{artifact_name}"
if isinstance(artifact_data, str) and os.path.exists(artifact_data):
# Upload file
self.s3.upload_file(artifact_data, self.models_bucket, artifact_key)
else:
# Serialize and upload object
if artifact_name.endswith('.pt') or artifact_name.endswith('.pth'):
# PyTorch model
with open(f'/tmp/{artifact_name}', 'wb') as f:
torch.save(artifact_data, f)
self.s3.upload_file(f'/tmp/{artifact_name}', self.models_bucket, artifact_key)
os.remove(f'/tmp/{artifact_name}')
elif artifact_name.endswith('.pkl'):
# Pickle object
with open(f'/tmp/{artifact_name}', 'wb') as f:
pickle.dump(artifact_data, f)
self.s3.upload_file(f'/tmp/{artifact_name}', self.models_bucket, artifact_key)
os.remove(f'/tmp/{artifact_name}')
elif artifact_name.endswith('.joblib'):
# Joblib object
joblib.dump(artifact_data, f'/tmp/{artifact_name}')
self.s3.upload_file(f'/tmp/{artifact_name}', self.models_bucket, artifact_key)
os.remove(f'/tmp/{artifact_name}')
artifact_paths[artifact_name] = f"s3://{self.models_bucket}/{artifact_key}"
# Create model metadata
model_metadata = {
'model_id': model_id,
'name': model_name,
'version': version,
'created_at': datetime.now().isoformat(),
'artifacts': artifact_paths,
'metadata': metadata,
'status': 'registered',
'tags': metadata.get('tags', []),
'framework': metadata.get('framework', 'unknown'),
'model_type': metadata.get('model_type', 'unknown'),
'metrics': metadata.get('metrics', {}),
'parameters': metadata.get('parameters', {}),
'training_data': metadata.get('training_data', {}),
'dependencies': metadata.get('dependencies', [])
}
# Store metadata
metadata_key = f"{model_path}/metadata.json"
self.s3.put_object(
Bucket=self.models_bucket,
Key=metadata_key,
Body=json.dumps(model_metadata, indent=2),
ContentType='application/json'
)
return model_id
def load_model(self, model_name: str, version: str = 'latest') -> Dict[str, Any]:
"""Load model artifacts and metadata"""
if version == 'latest':
version = self.get_latest_version(model_name)
model_path = f"models/{model_name}/{version}"
# Load metadata
metadata_key = f"{model_path}/metadata.json"
try:
metadata_obj = self.s3.get_object(Bucket=self.models_bucket, Key=metadata_key)
metadata = json.loads(metadata_obj['Body'].read())
except Exception as e:
raise ValueError(f"Model {model_name} version {version} not found: {e}")
# Load artifacts
artifacts = {}
for artifact_name, artifact_path in metadata['artifacts'].items():
artifact_key = artifact_path.replace(f"s3://{self.models_bucket}/", "")
local_path = f"/tmp/{artifact_name}"
self.s3.download_file(self.models_bucket, artifact_key, local_path)
# Load based on file extension
if artifact_name.endswith('.pt') or artifact_name.endswith('.pth'):
artifacts[artifact_name] = torch.load(local_path, map_location='cpu')
elif artifact_name.endswith('.pkl'):
with open(local_path, 'rb') as f:
artifacts[artifact_name] = pickle.load(f)
elif artifact_name.endswith('.joblib'):
artifacts[artifact_name] = joblib.load(local_path)
else:
# Keep as file path for other formats
artifacts[artifact_name] = local_path
return {
'metadata': metadata,
'artifacts': artifacts
}
def list_models(self, name_filter: Optional[str] = None) -> List[Dict[str, Any]]:
"""List all registered models"""
models = []
try:
response = self.s3.list_objects_v2(
Bucket=self.models_bucket,
Prefix='models/',
Delimiter='/'
)
# Get model names from common prefixes
for prefix in response.get('CommonPrefixes', []):
model_name = prefix['Prefix'].split('/')[-2]
if name_filter and name_filter not in model_name:
continue
# Get versions for this model
versions = self.list_model_versions(model_name)
models.append({
'name': model_name,
'versions': versions,
'latest_version': max(versions) if versions else None
})
except Exception as e:
print(f"Error listing models: {e}")
return models
def list_model_versions(self, model_name: str) -> List[str]:
"""List all versions of a specific model"""
versions = []
try:
response = self.s3.list_objects_v2(
Bucket=self.models_bucket,
Prefix=f'models/{model_name}/',
Delimiter='/'
)
for prefix in response.get('CommonPrefixes', []):
version = prefix['Prefix'].split('/')[-2]
versions.append(version)
except Exception as e:
print(f"Error listing versions for {model_name}: {e}")
return sorted(versions)
def get_latest_version(self, model_name: str) -> str:
"""Get the latest version of a model"""
versions = self.list_model_versions(model_name)
if not versions:
raise ValueError(f"No versions found for model {model_name}")
# Sort versions (assuming semantic versioning)
try:
sorted_versions = sorted(versions, key=lambda v: [int(x) for x in v.split('.')])
return sorted_versions[-1]
except:
# Fallback to string sorting
return sorted(versions)[-1]
def promote_model(self, model_name: str, version: str, stage: str) -> bool:
"""Promote model to a specific stage (staging, production, etc.)"""
model_path = f"models/{model_name}/{version}"
metadata_key = f"{model_path}/metadata.json"
try:
# Load current metadata
metadata_obj = self.s3.get_object(Bucket=self.models_bucket, Key=metadata_key)
metadata = json.loads(metadata_obj['Body'].read())
# Update stage
metadata['stage'] = stage
metadata['promoted_at'] = datetime.now().isoformat()
# Save updated metadata
self.s3.put_object(
Bucket=self.models_bucket,
Key=metadata_key,
Body=json.dumps(metadata, indent=2),
ContentType='application/json'
)
return True
except Exception as e:
print(f"Error promoting model: {e}")
return False
# Advanced model checkpoint management
class ModelCheckpointManager:
def __init__(self, s3_client, bucket='model-checkpoints'):
self.s3 = s3_client
self.bucket = bucket
def save_checkpoint(self, model, optimizer, epoch, metrics,
experiment_id, checkpoint_name=None):
"""Save training checkpoint with comprehensive metadata"""
if checkpoint_name is None:
checkpoint_name = f"checkpoint_epoch_{epoch}"
# Create checkpoint data
checkpoint_data = {
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'metrics': metrics,
'timestamp': datetime.now().isoformat(),
'experiment_id': experiment_id
}
# Save checkpoint
checkpoint_path = f"/tmp/{checkpoint_name}.pt"
torch.save(checkpoint_data, checkpoint_path)
# Upload to MinIO
key = f"experiments/{experiment_id}/checkpoints/{checkpoint_name}.pt"
self.s3.upload_file(checkpoint_path, self.bucket, key)
# Save checkpoint metadata
metadata = {
'checkpoint_name': checkpoint_name,
'experiment_id': experiment_id,
'epoch': epoch,
'metrics': metrics,
'timestamp': datetime.now().isoformat(),
'model_architecture': str(model),
'optimizer_config': str(optimizer),
's3_path': f"s3://{self.bucket}/{key}"
}
metadata_key = f"experiments/{experiment_id}/checkpoints/{checkpoint_name}_metadata.json"
self.s3.put_object(
Bucket=self.bucket,
Key=metadata_key,
Body=json.dumps(metadata, indent=2),
ContentType='application/json'
)
# Cleanup local file
os.remove(checkpoint_path)
return key
def load_checkpoint(self, experiment_id, checkpoint_name):
"""Load checkpoint from MinIO"""
key = f"experiments/{experiment_id}/checkpoints/{checkpoint_name}.pt"
local_path = f"/tmp/{checkpoint_name}.pt"
self.s3.download_file(self.bucket, key, local_path)
checkpoint = torch.load(local_path, map_location='cpu')
os.remove(local_path)
return checkpoint
def list_checkpoints(self, experiment_id):
"""List all checkpoints for an experiment"""
prefix = f"experiments/{experiment_id}/checkpoints/"
response = self.s3.list_objects_v2(
Bucket=self.bucket,
Prefix=prefix
)
checkpoints = []
for obj in response.get('Contents', []):
if obj['Key'].endswith('.pt'):
checkpoint_name = os.path.basename(obj['Key']).replace('.pt', '')
checkpoints.append({
'name': checkpoint_name,
'key': obj['Key'],
'size': obj['Size'],
'last_modified': obj['LastModified']
})
return sorted(checkpoints, key=lambda x: x['last_modified'], reverse=True)
# Usage examples
registry = MLModelRegistry('http://minio:9000', 'minioadmin', 'minioadmin')
# Register a PyTorch model
model = torch.nn.Linear(10, 1)
optimizer = torch.optim.Adam(model.parameters())
model_artifacts = {
'model.pt': model.state_dict(),
'optimizer.pt': optimizer.state_dict(),
'preprocessing.pkl': {'scaler': 'StandardScaler', 'params': {}}
}
metadata = {
'framework': 'pytorch',
'model_type': 'linear_regression',
'metrics': {'mse': 0.05, 'r2': 0.95},
'parameters': {'learning_rate': 0.001, 'batch_size': 32},
'training_data': {'dataset': 'housing_prices', 'samples': 10000},
'tags': ['regression', 'production-ready']
}
model_id = registry.register_model('housing_price_predictor', 'v1.0', model_artifacts, metadata)
print(f"Registered model: {model_id}")
# Load model
loaded_model = registry.load_model('housing_price_predictor', 'v1.0')
print(f"Loaded model metadata: {loaded_model['metadata']['metrics']}")
# List all models
models = registry.list_models()
for model in models:
print(f"Model: {model['name']}, Versions: {model['versions']}")
Integration with PyTorch and TensorFlow
PyTorch Integration with MinIO
PyTorch’s DataLoader can be seamlessly integrated with MinIO for efficient training data loading. This enables distributed training scenarios where data is stored centrally in MinIO and accessed by multiple training nodes.
import torch
from torch.utils.data import Dataset, DataLoader, IterableDataset
import boto3
from PIL import Image
import io
import json
import pandas as pd
from torchvision import transforms
import numpy as np
class MinIOImageDataset(Dataset):
def __init__(self, bucket, prefix, s3_client, transform=None,
cache_size=1000, metadata_file=None):
self.s3 = s3_client
self.bucket = bucket
self.prefix = prefix
self.transform = transform
self.cache = {}
self.cache_size = cache_size
# Load image paths and labels
self.samples = self._load_samples(metadata_file)
def _load_samples(self, metadata_file):
"""Load sample paths and labels from metadata or S3 listing"""
samples = []
if metadata_file:
# Load from metadata file
obj = self.s3.get_object(Bucket=self.bucket, Key=metadata_file)
metadata = json.loads(obj['Body'].read())
for item in metadata['samples']:
samples.append({
'key': item['path'],
'label': item['label'],
'metadata': item.get('metadata', {})
})
else:
# List objects from S3
paginator = self.s3.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=self.bucket, Prefix=self.prefix):
for obj in page.get('Contents', []):
key = obj['Key']
if key.lower().endswith(('.jpg', '.jpeg', '.png')):
# Extract label from path (assuming path structure like class/image.jpg)
path_parts = key.split('/')
label = path_parts[-2] if len(path_parts) > 1 else 'unknown'
samples.append({
'key': key,
'label': label,
'metadata': {}
})
return samples
def __len__(self):
return len(self.samples)
def __getitem__(self, idx):
sample = self.samples[idx]
key = sample['key']
# Check cache first
if key in self.cache:
image_data = self.cache[key]
else:
# Download from MinIO
try:
obj = self.s3.get_object(Bucket=self.bucket, Key=key)
image_data = obj['Body'].read()
# Cache if under limit
if len(self.cache) < self.cache_size:
self.cache[key] = image_data
except Exception as e:
print(f"Error loading {key}: {e}")
# Return a blank image as fallback
image_data = Image.new('RGB', (224, 224)).tobytes()
# Convert to PIL Image
try:
image = Image.open(io.BytesIO(image_data)).convert('RGB')
except Exception as e:
print(f"Error processing image {key}: {e}")
image = Image.new('RGB', (224, 224))
# Apply transforms
if self.transform:
image = self.transform(image)
# Convert label to tensor
if isinstance(sample['label'], str):
# For classification, you'd have a label mapping
label_map = getattr(self, 'label_map', {})
label = label_map.get(sample['label'], 0)
else:
label = sample['label']
return image, torch.tensor(label, dtype=torch.long)
class MinIOStreamingDataset(IterableDataset):
"""Streaming dataset for very large datasets that don't fit in memory"""
def __init__(self, bucket, prefix, s3_client, transform=None,
batch_size=32, shuffle_buffer_size=10000):
self.s3 = s3_client
self.bucket = bucket
self.prefix = prefix
self.transform = transform
self.batch_size = batch_size
self.shuffle_buffer_size = shuffle_buffer_size
def __iter__(self):
# Get worker info for distributed training
worker_info = torch.utils.data.get_worker_info()
# List all objects
paginator = self.s3.get_paginator('list_objects_v2')
all_keys = []
for page in paginator.paginate(Bucket=self.bucket, Prefix=self.prefix):
for obj in page.get('Contents', []):
key = obj['Key']
if key.lower().endswith(('.jpg', '.jpeg', '.png')):
all_keys.append(key)
# Distribute keys across workers
if worker_info is not None:
per_worker = len(all_keys) // worker_info.num_workers
start_idx = worker_info.id * per_worker
end_idx = start_idx + per_worker if worker_info.id < worker_info.num_workers - 1 else len(all_keys)
keys = all_keys[start_idx:end_idx]
else:
keys = all_keys
# Shuffle keys
np.random.shuffle(keys)
# Stream data
for key in keys:
try:
obj = self.s3.get_object(Bucket=self.bucket, Key=key)
image_data = obj['Body'].read()
image = Image.open(io.BytesIO(image_data)).convert('RGB')
if self.transform:
image = self.transform(image)
# Extract label from path
label = key.split('/')[-2]
yield image, label
except Exception as e:
print(f"Error streaming {key}: {e}")
continue
# PyTorch training loop with MinIO
class PyTorchMinIOTrainer:
def __init__(self, model, s3_client, bucket):
self.model = model
self.s3 = s3_client
self.bucket = bucket
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.model.to(self.device)
def create_data_loaders(self, train_prefix, val_prefix, batch_size=32, num_workers=4):
"""Create training and validation data loaders"""
# Define transforms
train_transform = transforms.Compose([
transforms.Resize((256, 256)),
transforms.RandomCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
val_transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
# Create datasets
train_dataset = MinIOImageDataset(
self.bucket, train_prefix, self.s3, transform=train_transform
)
val_dataset = MinIOImageDataset(
self.bucket, val_prefix, self.s3, transform=val_transform
)
# Create data loaders
train_loader = DataLoader(
train_dataset, batch_size=batch_size, shuffle=True,
num_workers=num_workers, pin_memory=True
)
val_loader = DataLoader(
val_dataset, batch_size=batch_size, shuffle=False,
num_workers=num_workers, pin_memory=True
)
return train_loader, val_loader
def train_epoch(self, train_loader, optimizer, criterion):
"""Train for one epoch"""
self.model.train()
total_loss = 0
correct = 0
total = 0
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(self.device), target.to(self.device)
optimizer.zero_grad()
output = self.model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
total_loss += loss.item()
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
total += target.size(0)
if batch_idx % 100 == 0:
print(f'Batch {batch_idx}, Loss: {loss.item():.6f}')
return total_loss / len(train_loader), correct / total
def validate(self, val_loader, criterion):
"""Validate model"""
self.model.eval()
val_loss = 0
correct = 0
total = 0
with torch.no_grad():
for data, target in val_loader:
data, target = data.to(self.device), target.to(self.device)
output = self.model(data)
val_loss += criterion(output, target).item()
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
total += target.size(0)
return val_loss / len(val_loader), correct / total
# Usage example
s3_client = boto3.client('s3', endpoint_url='http://minio:9000',
aws_access_key_id='minioadmin',
aws_secret_access_key='minioadmin')
model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=True)
model.fc = torch.nn.Linear(model.fc.in_features, 10) # 10 classes
trainer = PyTorchMinIOTrainer(model, s3_client, 'training-data')
train_loader, val_loader = trainer.create_data_loaders('train/', 'val/')
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = torch.nn.CrossEntropyLoss()
# Training loop
for epoch in range(10):
train_loss, train_acc = trainer.train_epoch(train_loader, optimizer, criterion)
val_loss, val_acc = trainer.validate(val_loader, criterion)
print(f'Epoch {epoch}: Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, '
f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}')
TensorFlow Integration with MinIO
TensorFlow’s tf.data API provides excellent integration with cloud storage systems like MinIO through its S3-compatible interface.
import tensorflow as tf
import boto3
import json
import numpy as np
from typing import Generator, Tuple
class TensorFlowMinIODataset:
def __init__(self, s3_client, bucket):
self.s3 = s3_client
self.bucket = bucket
def create_image_dataset(self, prefix, batch_size=32, image_size=(224, 224)):
"""Create TensorFlow dataset for images stored in MinIO"""
# Get list of image files
def get_image_paths():
paginator = self.s3.get_paginator('list_objects_v2')
paths = []
for page in paginator.paginate(Bucket=self.bucket, Prefix=prefix):
for obj in page.get('Contents', []):
key = obj['Key']
if key.lower().endswith(('.jpg', '.jpeg', '.png')):
paths.append(key)
return paths
# Create dataset from paths
image_paths = get_image_paths()
def load_and_preprocess_image(path):
"""Load image from MinIO and preprocess"""
# Download image
obj = self.s3.get_object(Bucket=self.bucket, Key=path.numpy().decode())
image_data = obj['Body'].read()
# Decode image
image = tf.image.decode_image(image_data, channels=3)
image = tf.image.resize(image, image_size)
image = tf.cast(image, tf.float32) / 255.0
# Extract label from path
label = path.numpy().decode().split('/')[-2]
return image, label
# Create TensorFlow dataset
dataset = tf.data.Dataset.from_tensor_slices(image_paths)
dataset = dataset.map(
lambda path: tf.py_function(
load_and_preprocess_image,
[path],
[tf.float32, tf.string]
),
num_parallel_calls=tf.data.AUTOTUNE
)
# Batch and prefetch
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(tf.data.AUTOTUNE)
return dataset
def create_text_dataset(self, prefix, batch_size=32, max_length=512):
"""Create TensorFlow dataset for text data"""
def text_generator():
"""Generator function for text data"""
paginator = self.s3.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=self.bucket, Prefix=prefix):
for obj in page.get('Contents', []):
key = obj['Key']
if key.endswith('.txt') or key.endswith('.json'):
try:
obj_data = self.s3.get_object(Bucket=self.bucket, Key=key)
content = obj_data['Body'].read().decode('utf-8')
if key.endswith('.json'):
data = json.loads(content)
text = data.get('text', '')
label = data.get('label', 0)
else:
text = content
label = 0 # Default label
yield text, label
except Exception as e:
print(f"Error processing {key}: {e}")
continue
# Create dataset from generator
dataset = tf.data.Dataset.from_generator(
text_generator,
output_signature=(
tf.TensorSpec(shape=(), dtype=tf.string),
tf.TensorSpec(shape=(), dtype=tf.int32)
)
)
# Tokenize and pad sequences
tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=10000)
def tokenize_text(text, label):
# This is a simplified tokenization - in practice, use a proper tokenizer
tokens = tf.strings.split(text)
# Convert to indices (simplified)
return tokens, label
dataset = dataset.map(tokenize_text)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(tf.data.AUTOTUNE)
return dataset
class TensorFlowMinIOTrainer:
def __init__(self, model, s3_client, bucket):
self.model = model
self.s3 = s3_client
self.bucket = bucket
self.dataset_creator = TensorFlowMinIODataset(s3_client, bucket)
def train_model(self, train_prefix, val_prefix, epochs=10, batch_size=32):
"""Train model with data from MinIO"""
# Create datasets
train_dataset = self.dataset_creator.create_image_dataset(
train_prefix, batch_size
)
val_dataset = self.dataset_creator.create_image_dataset(
val_prefix, batch_size
)
# Compile model
self.model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# Setup callbacks
callbacks = [
tf.keras.callbacks.ModelCheckpoint(
'/tmp/best_model.h5',
save_best_only=True,
monitor='val_accuracy'
),
tf.keras.callbacks.EarlyStopping(
patience=3,
monitor='val_loss'
),
tf.keras.callbacks.ReduceLROnPlateau(
factor=0.5,
patience=2,
monitor='val_loss'
)
]
# Train model
history = self.model.fit(
train_dataset,
validation_data=val_dataset,
epochs=epochs,
callbacks=callbacks
)
# Save final model to MinIO
self.model.save('/tmp/final_model.h5')
self.s3.upload_file(
'/tmp/final_model.h5',
self.bucket,
'models/tensorflow/final_model.h5'
)
return history
def save_model_artifacts(self, model_name, version):
"""Save TensorFlow model artifacts to MinIO"""
import tempfile
import shutil
# Create temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
# Save model in SavedModel format
model_path = f"{temp_dir}/saved_model"
self.model.save(model_path)
# Create tar archive
archive_path = f"{temp_dir}/{model_name}_{version}.tar.gz"
shutil.make_archive(
archive_path.replace('.tar.gz', ''),
'gztar',
model_path
)
# Upload to MinIO
s3_key = f"models/tensorflow/{model_name}/{version}/model.tar.gz"
self.s3.upload_file(archive_path, self.bucket, s3_key)
# Save model metadata
metadata = {
'name': model_name,
'version': version,
'framework': 'tensorflow',
'format': 'savedmodel',
'created_at': tf.timestamp().numpy(),
'model_config': self.model.get_config(),
'input_shape': [int(dim) for dim in self.model.input_shape],
'output_shape': [int(dim) for dim in self.model.output_shape]
}
metadata_key = f"models/tensorflow/{model_name}/{version}/metadata.json"
self.s3.put_object(
Bucket=self.bucket,
Key=metadata_key,
Body=json.dumps(metadata, indent=2),
ContentType='application/json'
)
return s3_key
# Usage example
s3_client = boto3.client('s3', endpoint_url='http://minio:9000',
aws_access_key_id='minioadmin',
aws_secret_access_key='minioadmin')
# Create a simple CNN model
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(224, 224, 3)),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Conv2D(64, 3, activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Conv2D(64, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])
trainer = TensorFlowMinIOTrainer(model, s3_client, 'training-data')
# Train model
history = trainer.train_model('train/', 'val/', epochs=5)
# Save model artifacts
model_path = trainer.save_model_artifacts('image_classifier', 'v1.0')
print(f"Model saved to: {model_path}")
Inference Patterns with MinIO
Real-Time Inference Architecture
Production inference systems require low-latency access to models, features, and cached predictions. MinIO’s high-performance object storage enables efficient inference patterns that scale with demand.
import boto3
import torch
import json
import numpy as np
from datetime import datetime, timedelta
import redis
import hashlib
from typing import Dict, Any, Optional
import asyncio
import aiohttp
class MinIOInferenceEngine:
def __init__(self, s3_client, model_bucket='model-registry',
cache_bucket='inference-cache'):
self.s3 = s3_client
self.model_bucket = model_bucket
self.cache_bucket = cache_bucket
self.model_cache = {}
self.feature_cache = {}
# Redis for hot cache (optional)
try:
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
except:
self.redis_client = None
def load_model(self, model_name, version='latest', cache=True):
"""Load model for inference with caching"""
cache_key = f"{model_name}_{version}"
if cache and cache_key in self.model_cache:
return self.model_cache[cache_key]
# Download model from MinIO
model_path = f"models/{model_name}/{version}"
try:
# Load model metadata
metadata_key = f"{model_path}/metadata.json"
metadata_obj = self.s3.get_object(Bucket=self.model_bucket, Key=metadata_key)
metadata = json.loads(metadata_obj['Body'].read())
# Load model artifacts
artifacts = {}
for artifact_name, artifact_path in metadata['artifacts'].items():
artifact_key = artifact_path.replace(f"s3://{self.model_bucket}/", "")
local_path = f"/tmp/{artifact_name}"
self.s3.download_file(self.model_bucket, artifact_key, local_path)
if artifact_name.endswith('.pt'):
artifacts[artifact_name] = torch.load(local_path, map_location='cpu')
elif artifact_name.endswith('.json'):
with open(local_path, 'r') as f:
artifacts[artifact_name] = json.load(f)
model_data = {
'metadata': metadata,
'artifacts': artifacts,
'loaded_at': datetime.now()
}
if cache:
self.model_cache[cache_key] = model_data
return model_data
except Exception as e:
raise RuntimeError(f"Failed to load model {model_name} v{version}: {e}")
def predict(self, model_name, input_data, version='latest',
use_cache=True, cache_ttl=3600):
"""Make prediction with caching"""
# Generate cache key for input
input_hash = hashlib.md5(str(input_data).encode()).hexdigest()
cache_key = f"prediction:{model_name}:{version}:{input_hash}"
# Check cache first
if use_cache:
cached_result = self._get_cached_prediction(cache_key)
if cached_result:
return cached_result
# Load model
model_data = self.load_model(model_name, version)
# Make prediction
try:
prediction = self._run_inference(model_data, input_data)
# Cache result
if use_cache:
self._cache_prediction(cache_key, prediction, cache_ttl)
return prediction
except Exception as e:
raise RuntimeError(f"Prediction failed: {e}")
def _run_inference(self, model_data, input_data):
"""Run actual inference based on model type"""
framework = model_data['metadata'].get('framework', 'unknown')
if framework == 'pytorch':
return self._pytorch_inference(model_data, input_data)
elif framework == 'tensorflow':
return self._tensorflow_inference(model_data, input_data)
elif framework == 'sklearn':
return self._sklearn_inference(model_data, input_data)
else:
raise ValueError(f"Unsupported framework: {framework}")
def _pytorch_inference(self, model_data, input_data):
"""PyTorch model inference"""
model_state = model_data['artifacts']['model.pt']
# Reconstruct model (simplified - in practice, you'd store architecture)
# This assumes you have a way to reconstruct the model architecture
model = self._reconstruct_pytorch_model(model_data['metadata'])
model.load_state_dict(model_state)
model.eval()
# Convert input to tensor
if isinstance(input_data, np.ndarray):
input_tensor = torch.from_numpy(input_data).float()
elif isinstance(input_data, list):
input_tensor = torch.tensor(input_data).float()
else:
input_tensor = input_data
# Add batch dimension if needed
if len(input_tensor.shape) == 1:
input_tensor = input_tensor.unsqueeze(0)
# Run inference
with torch.no_grad():
output = model(input_tensor)
return {
'predictions': output.numpy().tolist(),
'model_name': model_data['metadata']['name'],
'version': model_data['metadata']['version'],
'timestamp': datetime.now().isoformat()
}
def _get_cached_prediction(self, cache_key):
"""Get prediction from cache"""
try:
if self.redis_client:
cached = self.redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Fallback to MinIO cache
try:
obj = self.s3.get_object(Bucket=self.cache_bucket, Key=f"predictions/{cache_key}.json")
cached_data = json.loads(obj['Body'].read())
# Check if cache is still valid
cached_time = datetime.fromisoformat(cached_data['cached_at'])
if datetime.now() - cached_time < timedelta(hours=1):
return cached_data['prediction']
except:
pass
except Exception as e:
print(f"Cache retrieval error: {e}")
return None
def _cache_prediction(self, cache_key, prediction, ttl):
"""Cache prediction result"""
try:
cache_data = {
'prediction': prediction,
'cached_at': datetime.now().isoformat(),
'ttl': ttl
}
if self.redis_client:
self.redis_client.setex(
cache_key,
ttl,
json.dumps(cache_data)
)
# Also cache in MinIO for persistence
self.s3.put_object(
Bucket=self.cache_bucket,
Key=f"predictions/{cache_key}.json",
Body=json.dumps(cache_data, indent=2),
ContentType='application/json'
)
except Exception as e:
print(f"Cache storage error: {e}")
class BatchInferenceProcessor:
def __init__(self, s3_client, inference_engine):
self.s3 = s3_client
self.inference_engine = inference_engine
def process_batch(self, input_bucket, input_prefix, output_bucket,
output_prefix, model_name, batch_size=100):
"""Process batch inference on data stored in MinIO"""
# List input files
response = self.s3.list_objects_v2(
Bucket=input_bucket,
Prefix=input_prefix
)
input_files = [obj['Key'] for obj in response.get('Contents', [])]
results = []
batch_count = 0
for i in range(0, len(input_files), batch_size):
batch_files = input_files[i:i + batch_size]
batch_results = []
for file_key in batch_files:
try:
# Download and process file
obj = self.s3.get_object(Bucket=input_bucket, Key=file_key)
if file_key.endswith('.json'):
input_data = json.loads(obj['Body'].read())
elif file_key.endswith('.csv'):
import pandas as pd
df = pd.read_csv(obj['Body'])
input_data = df.to_dict('records')
else:
# Handle other formats
input_data = obj['Body'].read()
# Make prediction
prediction = self.inference_engine.predict(
model_name, input_data, use_cache=False
)
batch_results.append({
'input_file': file_key,
'prediction': prediction,
'processed_at': datetime.now().isoformat()
})
except Exception as e:
batch_results.append({
'input_file': file_key,
'error': str(e),
'processed_at': datetime.now().isoformat()
})
# Save batch results
batch_output_key = f"{output_prefix}/batch_{batch_count:04d}.json"
self.s3.put_object(
Bucket=output_bucket,
Key=batch_output_key,
Body=json.dumps(batch_results, indent=2),
ContentType='application/json'
)
results.extend(batch_results)
batch_count += 1
print(f"Processed batch {batch_count}, files: {len(batch_files)}")
# Save summary
summary = {
'total_files': len(input_files),
'successful_predictions': len([r for r in results if 'prediction' in r]),
'failed_predictions': len([r for r in results if 'error' in r]),
'processing_time': datetime.now().isoformat(),
'model_used': model_name
}
summary_key = f"{output_prefix}/summary.json"
self.s3.put_object(
Bucket=output_bucket,
Key=summary_key,
Body=json.dumps(summary, indent=2),
ContentType='application/json'
)
return summary
# A/B Testing for Model Inference
class ModelABTester:
def __init__(self, inference_engine):
self.inference_engine = inference_engine
self.test_configs = {}
def setup_ab_test(self, test_name, model_a, model_b, traffic_split=0.5):
"""Setup A/B test between two models"""
self.test_configs[test_name] = {
'model_a': model_a,
'model_b': model_b,
'traffic_split': traffic_split,
'results': {'a': [], 'b': []}
}
def predict_with_ab_test(self, test_name, input_data, user_id=None):
"""Make prediction using A/B test configuration"""
if test_name not in self.test_configs:
raise ValueError(f"A/B test {test_name} not configured")
config = self.test_configs[test_name]
# Determine which model to use
if user_id:
# Consistent assignment based on user ID
import hashlib
hash_val = int(hashlib.md5(str(user_id).encode()).hexdigest(), 16)
use_model_a = (hash_val % 100) < (config['traffic_split'] * 100)
else:
# Random assignment
import random
use_model_a = random.random() < config['traffic_split']
model_name = config['model_a'] if use_model_a else config['model_b']
variant = 'a' if use_model_a else 'b'
# Make prediction
prediction = self.inference_engine.predict(model_name, input_data)
# Track result
config['results'][variant].append({
'prediction': prediction,
'timestamp': datetime.now().isoformat(),
'user_id': user_id
})
return {
'prediction': prediction,
'variant': variant,
'model_used': model_name
}
# Usage examples
s3_client = boto3.client('s3', endpoint_url='http://minio:9000',
aws_access_key_id='minioadmin',
aws_secret_access_key='minioadmin')
# Setup inference engine
inference_engine = MinIOInferenceEngine(s3_client)
# Real-time prediction
input_data = [1.2, 3.4, 5.6, 7.8] # Example feature vector
result = inference_engine.predict('housing_price_predictor', input_data)
print(f"Prediction: {result}")
# Batch processing
batch_processor = BatchInferenceProcessor(s3_client, inference_engine)
summary = batch_processor.process_batch(
input_bucket='inference-input',
input_prefix='batch_001/',
output_bucket='inference-output',
output_prefix='results/batch_001',
model_name='housing_price_predictor'
)
print(f"Batch processing summary: {summary}")
# A/B testing
ab_tester = ModelABTester(inference_engine)
ab_tester.setup_ab_test('price_model_test', 'price_model_v1', 'price_model_v2', 0.3)
ab_result = ab_tester.predict_with_ab_test('price_model_test', input_data, user_id='user123')
print(f"A/B test result: {ab_result}")
Vector Database Integration
Storing Embeddings
import boto3
import json
import numpy as np
def store_embeddings(embeddings, metadata, bucket='embeddings'):
"""Store vector embeddings with metadata"""
s3 = boto3.client('s3',
endpoint_url='http://minio:9000',
aws_access_key_id='minioadmin',
aws_secret_access_key='minioadmin'
)
for i, (embedding, meta) in enumerate(zip(embeddings, metadata)):
key = f"vectors/{meta['id']}.json"
data = {
'id': meta['id'],
'embedding': embedding.tolist(),
'metadata': meta,
'created_at': datetime.now().isoformat()
}
s3.put_object(
Bucket=bucket,
Key=key,
Body=json.dumps(data),
ContentType='application/json'
)
# Store embeddings
embeddings = [np.random.randn(512) for _ in range(100)]
metadata = [{'id': f'item_{i}', 'category': 'test'} for i in range(100)]
store_embeddings(embeddings, metadata)
Complete ML Pipeline
End-to-End Example
class MLPipeline:
def __init__(self, minio_endpoint, bucket_prefix='ml-pipeline'):
self.s3 = boto3.client('s3',
endpoint_url=minio_endpoint,
aws_access_key_id='minioadmin',
aws_secret_access_key='minioadmin'
)
self.bucket = bucket_prefix
self._ensure_bucket()
def _ensure_bucket(self):
try:
self.s3.create_bucket(Bucket=self.bucket)
except:
pass
def ingest(self, source_path):
"""Ingest raw data"""
# Upload raw data
pass
def preprocess(self):
"""Clean and transform data"""
# Process data
pass
def train(self):
"""Train model"""
# Training loop
pass
def evaluate(self):
"""Evaluate model"""
# Evaluation
pass
def deploy(self):
"""Deploy to inference"""
# Save model and metadata
pass
def run(self):
"""Run complete pipeline"""
self.ingest()
self.preprocess()
self.train()
self.evaluate()
self.deploy()
# Run pipeline
pipeline = MLPipeline('http://minio:9000')
pipeline.run()
Conclusion
MinIO provides ideal storage for AI/ML workloads. Key capabilities: high-throughput data ingestion for training, efficient model checkpoint storage, feature store backends, vector embedding storage, and seamless integration with ML frameworks. Using MinIO as your AI data lake simplifies infrastructure while maintaining S3 compatibility.
In the final article, we’ll explore real-world MinIO use cases.
Comments