Skip to main content
โšก Calmops

Building Production ML Systems: MLOps Best Practices

Introduction

Machine learning in production is vastly different from notebooks and competitions. MLOps (Machine Learning Operations) encompasses the entire ML lifecycle: data, training, deployment, and monitoring.

This guide covers building reliable production ML systems.


The ML Lifecycle

Typical ML Workflow

Data Collection โ†’ Exploration โ†’ Feature Engineering โ†’ Model Training
        โ†“
    Deployment โ†’ Serving โ†’ Monitoring โ†’ Retraining

Industry Statistics

Time allocation in ML projects:
Data collection:     30-40%
Data cleaning:       20-25%
Exploratory analysis: 10-15%
Feature engineering: 10-15%
Model training:      5-10%
Deployment:          3-5%
Monitoring:          2-5%

Data Engineering

Data Pipeline Architecture

Raw Data Sources (APIs, Databases, Files)
    โ†“
Data Ingestion (Kafka, S3, data lakes)
    โ†“
Data Validation (schema, quality checks)
    โ†“
Data Transformation (cleaning, normalization)
    โ†“
Feature Store (cached, versioned features)
    โ†“
Training/Serving

Data Quality Checklist

โœ… Missing values strategy
โœ… Outlier detection and handling
โœ… Data type validation
โœ… Temporal consistency
โœ… Distribution monitoring
โœ… Feature correlation analysis
โœ… Duplicate detection
โœ… PII/sensitive data handling

Feature Engineering

Feature Types

# Numerical features
def standardize_features(X):
    from sklearn.preprocessing import StandardScaler
    scaler = StandardScaler()
    return scaler.fit_transform(X)

# Categorical features
def encode_categorical(df, column):
    return pd.get_dummies(df, columns=[column])

# Temporal features
def extract_temporal(timestamp):
    return {
        'hour': timestamp.hour,
        'day_of_week': timestamp.dayofweek,
        'month': timestamp.month,
    }

Feature Store Pattern

class FeatureStore:
    def __init__(self, cache_backend='redis'):
        self.cache = cache_backend
        self.features = {}
    
    def get_features(self, entity_id, feature_names):
        """Retrieve cached features or compute on-the-fly"""
        result = {}
        for feature in feature_names:
            # Check cache first
            cached = self.cache.get(f"{entity_id}:{feature}")
            if cached:
                result[feature] = cached
            else:
                # Compute and cache
                value = self._compute_feature(entity_id, feature)
                self.cache.set(f"{entity_id}:{feature}", value, ttl=3600)
                result[feature] = value
        return result
    
    def _compute_feature(self, entity_id, feature_name):
        # Custom computation logic
        pass

Model Training

Experiment Tracking

import mlflow

mlflow.set_experiment("customer_churn")

with mlflow.start_run():
    # Log parameters
    mlflow.log_param("learning_rate", 0.01)
    mlflow.log_param("max_depth", 5)
    
    # Train model
    model = train_model()
    accuracy = evaluate_model(model)
    
    # Log metrics
    mlflow.log_metric("accuracy", accuracy)
    
    # Log model
    mlflow.sklearn.log_model(model, "model")

Cross-Validation Pattern

from sklearn.model_selection import cross_validate

def evaluate_with_cv(model, X, y):
    cv_results = cross_validate(
        model, X, y,
        cv=5,
        scoring=['accuracy', 'precision', 'recall', 'f1'],
        return_train_score=True
    )
    
    return {
        'train_acc': cv_results['train_accuracy'].mean(),
        'test_acc': cv_results['test_accuracy'].mean(),
        'std': cv_results['test_accuracy'].std(),
    }

Hyperparameter Optimization

from optuna import create_study
from optuna.pruners import MedianPruner

def objective(trial):
    # Suggest hyperparameters
    lr = trial.suggest_float('learning_rate', 1e-5, 1e-1, log=True)
    max_depth = trial.suggest_int('max_depth', 3, 20)
    
    # Train model
    model = train_model(lr=lr, max_depth=max_depth)
    score = evaluate_model(model)
    
    return score

# Run optimization
study = create_study(direction='maximize')
study.optimize(objective, n_trials=100)

print(f"Best accuracy: {study.best_value}")
print(f"Best params: {study.best_params}")

Model Validation

Train/Test Split Strategy

from sklearn.model_selection import train_test_split

# Time-based split (for time series)
def time_based_split(df, test_size=0.2):
    split_idx = int(len(df) * (1 - test_size))
    return df[:split_idx], df[split_idx:]

# Stratified split (for imbalanced classes)
X_train, X_test, y_train, y_test = train_test_split(
    X, y,
    test_size=0.2,
    stratify=y,
    random_state=42
)

Performance Metrics

from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, 
    f1_score, roc_auc_score, confusion_matrix
)

def evaluate_model(y_true, y_pred, y_pred_proba):
    metrics = {
        'accuracy': accuracy_score(y_true, y_pred),
        'precision': precision_score(y_true, y_pred),
        'recall': recall_score(y_true, y_pred),
        'f1': f1_score(y_true, y_pred),
        'auc': roc_auc_score(y_true, y_pred_proba),
    }
    
    # For imbalanced data
    cm = confusion_matrix(y_true, y_pred)
    
    return metrics, cm

Model Deployment

Containerization

FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY model.pkl .
COPY app.py .

EXPOSE 5000

CMD ["python", "app.py"]

REST API for Serving

from flask import Flask, request, jsonify
import joblib

app = Flask(__name__)
model = joblib.load('model.pkl')
scaler = joblib.load('scaler.pkl')

@app.route('/predict', methods=['POST'])
def predict():
    try:
        data = request.json
        features = scaler.transform([data['features']])
        prediction = model.predict(features)[0]
        probability = model.predict_proba(features)[0]
        
        return jsonify({
            'prediction': int(prediction),
            'probability': float(probability[1]),
            'confidence': float(max(probability))
        })
    except Exception as e:
        return jsonify({'error': str(e)}), 400

@app.route('/health', methods=['GET'])
def health():
    return jsonify({'status': 'healthy'}), 200

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

Model Monitoring

Prediction Monitoring

import pandas as pd
from datetime import datetime

class ModelMonitor:
    def __init__(self, baseline_metrics):
        self.baseline = baseline_metrics
        self.predictions = []
    
    def log_prediction(self, features, prediction, actual=None):
        self.predictions.append({
            'timestamp': datetime.now(),
            'features': features,
            'prediction': prediction,
            'actual': actual,
        })
    
    def detect_drift(self, window=1000):
        """Detect input or output drift"""
        recent = pd.DataFrame(self.predictions[-window:])
        
        # Check for feature distribution change
        current_mean = recent['features'].mean()
        baseline_mean = self.baseline['feature_mean']
        
        # Alert if significant deviation
        if abs(current_mean - baseline_mean) > 2 * self.baseline['feature_std']:
            return 'DRIFT_DETECTED'
        
        return 'OK'
    
    def calculate_metrics(self, actual_labels):
        """Calculate current performance metrics"""
        recent_df = pd.DataFrame(self.predictions[-1000:])
        predictions = recent_df['prediction']
        actuals = actual_labels[-len(predictions):]
        
        accuracy = (predictions == actuals).mean()
        return {'accuracy': accuracy}

Monitoring Dashboard

from prometheus_client import Counter, Histogram, Gauge
import time

# Prometheus metrics
prediction_counter = Counter(
    'model_predictions_total',
    'Total predictions',
    ['model', 'class']
)

prediction_latency = Histogram(
    'prediction_latency_seconds',
    'Prediction latency'
)

model_accuracy = Gauge(
    'model_accuracy',
    'Current model accuracy'
)

@app.route('/predict', methods=['POST'])
def predict():
    start = time.time()
    
    result = model.predict(features)
    
    # Log metrics
    prediction_counter.labels(model='churn', class_=result).inc()
    prediction_latency.observe(time.time() - start)
    
    return result

Retraining Pipeline

Automated Retraining

import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'ml-team',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'model_retraining',
    default_args=default_args,
    schedule_interval='0 2 * * 0',  # Weekly at 2 AM
)

def fetch_data():
    # Get new training data
    pass

def train_model():
    # Train with new data
    pass

def validate_model():
    # Compare with baseline
    pass

def deploy_model():
    # Promote to production if valid
    pass

fetch_task = PythonOperator(
    task_id='fetch_data',
    python_callable=fetch_data,
    dag=dag,
)

train_task = PythonOperator(
    task_id='train_model',
    python_callable=train_model,
    dag=dag,
)

fetch_task >> train_task >> validate_model >> deploy_model

Real-World Example: Churn Prediction

import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
import mlflow

# Data loading
def load_data():
    df = pd.read_csv('customer_data.csv')
    df = df.dropna()
    return df

# Feature engineering
def create_features(df):
    features = df[['age', 'tenure', 'monthly_charges', 'total_charges']].copy()
    
    # Normalize
    scaler = StandardScaler()
    features_scaled = scaler.fit_transform(features)
    
    return features_scaled, scaler

# Training
def train_churn_model():
    mlflow.set_experiment("churn_prediction")
    
    with mlflow.start_run():
        # Load and prepare data
        df = load_data()
        X, scaler = create_features(df)
        y = df['churn'].values
        
        # Train
        model = RandomForestClassifier(
            n_estimators=100,
            max_depth=10,
            random_state=42
        )
        model.fit(X, y)
        
        # Evaluate
        accuracy = model.score(X, y)
        mlflow.log_metric("accuracy", accuracy)
        
        # Log
        mlflow.sklearn.log_model(model, "churn_model")
        
        return model, scaler

if __name__ == '__main__':
    model, scaler = train_churn_model()

Best Practices

โœ… Version control: code, data, models
โœ… Automated testing for data quality
โœ… CI/CD pipelines for model deployment
โœ… Monitor data drift and concept drift
โœ… Maintain explainability
โœ… Document model decisions
โœ… Handle edge cases gracefully
โœ… Plan for model failures

Glossary

  • MLOps: Operations focused on ML systems
  • Feature Store: Centralized feature management
  • Concept Drift: Target distribution change over time
  • Data Drift: Input distribution change over time
  • Model Registry: Version control for models

Resources


Comments