Skip to main content

Building Production ML Systems: MLOps Best Practices

Created: December 22, 2025 Larry Qu 5 min read

Introduction

Machine learning in production is vastly different from notebooks and competitions. MLOps (Machine Learning Operations) encompasses the entire ML lifecycle: data, training, deployment, and monitoring.

This guide covers building reliable production ML systems.


The ML Lifecycle

Typical ML Workflow

Data Collection → Exploration → Feature Engineering → Model Training
        ↓
    Deployment → Serving → Monitoring → Retraining

Industry Statistics

Time allocation in ML projects:
Data collection:     30-40%
Data cleaning:       20-25%
Exploratory analysis: 10-15%
Feature engineering: 10-15%
Model training:      5-10%
Deployment:          3-5%
Monitoring:          2-5%

Data Engineering

Data Pipeline Architecture

Raw Data Sources (APIs, Databases, Files)
    ↓
Data Ingestion (Kafka, S3, data lakes)
    ↓
Data Validation (schema, quality checks)
    ↓
Data Transformation (cleaning, normalization)
    ↓
Feature Store (cached, versioned features)
    ↓
Training/Serving

Data Quality Checklist

✅ Missing values strategy
✅ Outlier detection and handling
✅ Data type validation
✅ Temporal consistency
✅ Distribution monitoring
✅ Feature correlation analysis
✅ Duplicate detection
✅ PII/sensitive data handling

Feature Engineering

Feature Types

# Numerical features
def standardize_features(X):
    from sklearn.preprocessing import StandardScaler
    scaler = StandardScaler()
    return scaler.fit_transform(X)

# Categorical features
def encode_categorical(df, column):
    return pd.get_dummies(df, columns=[column])

# Temporal features
def extract_temporal(timestamp):
    return {
        'hour': timestamp.hour,
        'day_of_week': timestamp.dayofweek,
        'month': timestamp.month,
    }

Feature Store Pattern

class FeatureStore:
    def __init__(self, cache_backend='redis'):
        self.cache = cache_backend
        self.features = {}
    
    def get_features(self, entity_id, feature_names):
        """Retrieve cached features or compute on-the-fly"""
        result = {}
        for feature in feature_names:
            # Check cache first
            cached = self.cache.get(f"{entity_id}:{feature}")
            if cached:
                result[feature] = cached
            else:
                # Compute and cache
                value = self._compute_feature(entity_id, feature)
                self.cache.set(f"{entity_id}:{feature}", value, ttl=3600)
                result[feature] = value
        return result
    
    def _compute_feature(self, entity_id, feature_name):
        # Custom computation logic
        pass

Model Training

Experiment Tracking

import mlflow

mlflow.set_experiment("customer_churn")

with mlflow.start_run():
    # Log parameters
    mlflow.log_param("learning_rate", 0.01)
    mlflow.log_param("max_depth", 5)
    
    # Train model
    model = train_model()
    accuracy = evaluate_model(model)
    
    # Log metrics
    mlflow.log_metric("accuracy", accuracy)
    
    # Log model
    mlflow.sklearn.log_model(model, "model")

Cross-Validation Pattern

from sklearn.model_selection import cross_validate

def evaluate_with_cv(model, X, y):
    cv_results = cross_validate(
        model, X, y,
        cv=5,
        scoring=['accuracy', 'precision', 'recall', 'f1'],
        return_train_score=True
    )
    
    return {
        'train_acc': cv_results['train_accuracy'].mean(),
        'test_acc': cv_results['test_accuracy'].mean(),
        'std': cv_results['test_accuracy'].std(),
    }

Hyperparameter Optimization

from optuna import create_study
from optuna.pruners import MedianPruner

def objective(trial):
    # Suggest hyperparameters
    lr = trial.suggest_float('learning_rate', 1e-5, 1e-1, log=True)
    max_depth = trial.suggest_int('max_depth', 3, 20)
    
    # Train model
    model = train_model(lr=lr, max_depth=max_depth)
    score = evaluate_model(model)
    
    return score

# Run optimization
study = create_study(direction='maximize')
study.optimize(objective, n_trials=100)

print(f"Best accuracy: {study.best_value}")
print(f"Best params: {study.best_params}")

Model Validation

Train/Test Split Strategy

from sklearn.model_selection import train_test_split

# Time-based split (for time series)
def time_based_split(df, test_size=0.2):
    split_idx = int(len(df) * (1 - test_size))
    return df[:split_idx], df[split_idx:]

# Stratified split (for imbalanced classes)
X_train, X_test, y_train, y_test = train_test_split(
    X, y,
    test_size=0.2,
    stratify=y,
    random_state=42
)

Performance Metrics

from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, 
    f1_score, roc_auc_score, confusion_matrix
)

def evaluate_model(y_true, y_pred, y_pred_proba):
    metrics = {
        'accuracy': accuracy_score(y_true, y_pred),
        'precision': precision_score(y_true, y_pred),
        'recall': recall_score(y_true, y_pred),
        'f1': f1_score(y_true, y_pred),
        'auc': roc_auc_score(y_true, y_pred_proba),
    }
    
    # For imbalanced data
    cm = confusion_matrix(y_true, y_pred)
    
    return metrics, cm

Model Deployment

Containerization

FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY model.pkl .
COPY app.py .

EXPOSE 5000

CMD ["python", "app.py"]

REST API for Serving

from flask import Flask, request, jsonify
import joblib

app = Flask(__name__)
model = joblib.load('model.pkl')
scaler = joblib.load('scaler.pkl')

@app.route('/predict', methods=['POST'])
def predict():
    try:
        data = request.json
        features = scaler.transform([data['features']])
        prediction = model.predict(features)[0]
        probability = model.predict_proba(features)[0]
        
        return jsonify({
            'prediction': int(prediction),
            'probability': float(probability[1]),
            'confidence': float(max(probability))
        })
    except Exception as e:
        return jsonify({'error': str(e)}), 400

@app.route('/health', methods=['GET'])
def health():
    return jsonify({'status': 'healthy'}), 200

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

Model Monitoring

Prediction Monitoring

import pandas as pd
from datetime import datetime

class ModelMonitor:
    def __init__(self, baseline_metrics):
        self.baseline = baseline_metrics
        self.predictions = []
    
    def log_prediction(self, features, prediction, actual=None):
        self.predictions.append({
            'timestamp': datetime.now(),
            'features': features,
            'prediction': prediction,
            'actual': actual,
        })
    
    def detect_drift(self, window=1000):
        """Detect input or output drift"""
        recent = pd.DataFrame(self.predictions[-window:])
        
        # Check for feature distribution change
        current_mean = recent['features'].mean()
        baseline_mean = self.baseline['feature_mean']
        
        # Alert if significant deviation
        if abs(current_mean - baseline_mean) > 2 * self.baseline['feature_std']:
            return 'DRIFT_DETECTED'
        
        return 'OK'
    
    def calculate_metrics(self, actual_labels):
        """Calculate current performance metrics"""
        recent_df = pd.DataFrame(self.predictions[-1000:])
        predictions = recent_df['prediction']
        actuals = actual_labels[-len(predictions):]
        
        accuracy = (predictions == actuals).mean()
        return {'accuracy': accuracy}

Monitoring Dashboard

from prometheus_client import Counter, Histogram, Gauge
import time

# Prometheus metrics
prediction_counter = Counter(
    'model_predictions_total',
    'Total predictions',
    ['model', 'class']
)

prediction_latency = Histogram(
    'prediction_latency_seconds',
    'Prediction latency'
)

model_accuracy = Gauge(
    'model_accuracy',
    'Current model accuracy'
)

@app.route('/predict', methods=['POST'])
def predict():
    start = time.time()
    
    result = model.predict(features)
    
    # Log metrics
    prediction_counter.labels(model='churn', class_=result).inc()
    prediction_latency.observe(time.time() - start)
    
    return result

Retraining Pipeline

Automated Retraining

import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'ml-team',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'model_retraining',
    default_args=default_args,
    schedule_interval='0 2 * * 0',  # Weekly at 2 AM
)

def fetch_data():
    # Get new training data
    pass

def train_model():
    # Train with new data
    pass

def validate_model():
    # Compare with baseline
    pass

def deploy_model():
    # Promote to production if valid
    pass

fetch_task = PythonOperator(
    task_id='fetch_data',
    python_callable=fetch_data,
    dag=dag,
)

train_task = PythonOperator(
    task_id='train_model',
    python_callable=train_model,
    dag=dag,
)

fetch_task >> train_task >> validate_model >> deploy_model

Real-World Example: Churn Prediction

import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
import mlflow

# Data loading
def load_data():
    df = pd.read_csv('customer_data.csv')
    df = df.dropna()
    return df

# Feature engineering
def create_features(df):
    features = df[['age', 'tenure', 'monthly_charges', 'total_charges']].copy()
    
    # Normalize
    scaler = StandardScaler()
    features_scaled = scaler.fit_transform(features)
    
    return features_scaled, scaler

# Training
def train_churn_model():
    mlflow.set_experiment("churn_prediction")
    
    with mlflow.start_run():
        # Load and prepare data
        df = load_data()
        X, scaler = create_features(df)
        y = df['churn'].values
        
        # Train
        model = RandomForestClassifier(
            n_estimators=100,
            max_depth=10,
            random_state=42
        )
        model.fit(X, y)
        
        # Evaluate
        accuracy = model.score(X, y)
        mlflow.log_metric("accuracy", accuracy)
        
        # Log
        mlflow.sklearn.log_model(model, "churn_model")
        
        return model, scaler

if __name__ == '__main__':
    model, scaler = train_churn_model()

Best Practices

✅ Version control: code, data, models
✅ Automated testing for data quality
✅ CI/CD pipelines for model deployment
✅ Monitor data drift and concept drift
✅ Maintain explainability
✅ Document model decisions
✅ Handle edge cases gracefully
✅ Plan for model failures

Glossary

  • MLOps: Operations focused on ML systems
  • Feature Store: Centralized feature management
  • Concept Drift: Target distribution change over time
  • Data Drift: Input distribution change over time
  • Model Registry: Version control for models

Resources


Comments

Share this article

Scan to read on mobile