Introduction
Machine learning in production is vastly different from notebooks and competitions. MLOps (Machine Learning Operations) encompasses the entire ML lifecycle: data, training, deployment, and monitoring.
This guide covers building reliable production ML systems.
The ML Lifecycle
Typical ML Workflow
Data Collection โ Exploration โ Feature Engineering โ Model Training
โ
Deployment โ Serving โ Monitoring โ Retraining
Industry Statistics
Time allocation in ML projects:
Data collection: 30-40%
Data cleaning: 20-25%
Exploratory analysis: 10-15%
Feature engineering: 10-15%
Model training: 5-10%
Deployment: 3-5%
Monitoring: 2-5%
Data Engineering
Data Pipeline Architecture
Raw Data Sources (APIs, Databases, Files)
โ
Data Ingestion (Kafka, S3, data lakes)
โ
Data Validation (schema, quality checks)
โ
Data Transformation (cleaning, normalization)
โ
Feature Store (cached, versioned features)
โ
Training/Serving
Data Quality Checklist
โ
Missing values strategy
โ
Outlier detection and handling
โ
Data type validation
โ
Temporal consistency
โ
Distribution monitoring
โ
Feature correlation analysis
โ
Duplicate detection
โ
PII/sensitive data handling
Feature Engineering
Feature Types
# Numerical features
def standardize_features(X):
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
return scaler.fit_transform(X)
# Categorical features
def encode_categorical(df, column):
return pd.get_dummies(df, columns=[column])
# Temporal features
def extract_temporal(timestamp):
return {
'hour': timestamp.hour,
'day_of_week': timestamp.dayofweek,
'month': timestamp.month,
}
Feature Store Pattern
class FeatureStore:
def __init__(self, cache_backend='redis'):
self.cache = cache_backend
self.features = {}
def get_features(self, entity_id, feature_names):
"""Retrieve cached features or compute on-the-fly"""
result = {}
for feature in feature_names:
# Check cache first
cached = self.cache.get(f"{entity_id}:{feature}")
if cached:
result[feature] = cached
else:
# Compute and cache
value = self._compute_feature(entity_id, feature)
self.cache.set(f"{entity_id}:{feature}", value, ttl=3600)
result[feature] = value
return result
def _compute_feature(self, entity_id, feature_name):
# Custom computation logic
pass
Model Training
Experiment Tracking
import mlflow
mlflow.set_experiment("customer_churn")
with mlflow.start_run():
# Log parameters
mlflow.log_param("learning_rate", 0.01)
mlflow.log_param("max_depth", 5)
# Train model
model = train_model()
accuracy = evaluate_model(model)
# Log metrics
mlflow.log_metric("accuracy", accuracy)
# Log model
mlflow.sklearn.log_model(model, "model")
Cross-Validation Pattern
from sklearn.model_selection import cross_validate
def evaluate_with_cv(model, X, y):
cv_results = cross_validate(
model, X, y,
cv=5,
scoring=['accuracy', 'precision', 'recall', 'f1'],
return_train_score=True
)
return {
'train_acc': cv_results['train_accuracy'].mean(),
'test_acc': cv_results['test_accuracy'].mean(),
'std': cv_results['test_accuracy'].std(),
}
Hyperparameter Optimization
from optuna import create_study
from optuna.pruners import MedianPruner
def objective(trial):
# Suggest hyperparameters
lr = trial.suggest_float('learning_rate', 1e-5, 1e-1, log=True)
max_depth = trial.suggest_int('max_depth', 3, 20)
# Train model
model = train_model(lr=lr, max_depth=max_depth)
score = evaluate_model(model)
return score
# Run optimization
study = create_study(direction='maximize')
study.optimize(objective, n_trials=100)
print(f"Best accuracy: {study.best_value}")
print(f"Best params: {study.best_params}")
Model Validation
Train/Test Split Strategy
from sklearn.model_selection import train_test_split
# Time-based split (for time series)
def time_based_split(df, test_size=0.2):
split_idx = int(len(df) * (1 - test_size))
return df[:split_idx], df[split_idx:]
# Stratified split (for imbalanced classes)
X_train, X_test, y_train, y_test = train_test_split(
X, y,
test_size=0.2,
stratify=y,
random_state=42
)
Performance Metrics
from sklearn.metrics import (
accuracy_score, precision_score, recall_score,
f1_score, roc_auc_score, confusion_matrix
)
def evaluate_model(y_true, y_pred, y_pred_proba):
metrics = {
'accuracy': accuracy_score(y_true, y_pred),
'precision': precision_score(y_true, y_pred),
'recall': recall_score(y_true, y_pred),
'f1': f1_score(y_true, y_pred),
'auc': roc_auc_score(y_true, y_pred_proba),
}
# For imbalanced data
cm = confusion_matrix(y_true, y_pred)
return metrics, cm
Model Deployment
Containerization
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY model.pkl .
COPY app.py .
EXPOSE 5000
CMD ["python", "app.py"]
REST API for Serving
from flask import Flask, request, jsonify
import joblib
app = Flask(__name__)
model = joblib.load('model.pkl')
scaler = joblib.load('scaler.pkl')
@app.route('/predict', methods=['POST'])
def predict():
try:
data = request.json
features = scaler.transform([data['features']])
prediction = model.predict(features)[0]
probability = model.predict_proba(features)[0]
return jsonify({
'prediction': int(prediction),
'probability': float(probability[1]),
'confidence': float(max(probability))
})
except Exception as e:
return jsonify({'error': str(e)}), 400
@app.route('/health', methods=['GET'])
def health():
return jsonify({'status': 'healthy'}), 200
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Model Monitoring
Prediction Monitoring
import pandas as pd
from datetime import datetime
class ModelMonitor:
def __init__(self, baseline_metrics):
self.baseline = baseline_metrics
self.predictions = []
def log_prediction(self, features, prediction, actual=None):
self.predictions.append({
'timestamp': datetime.now(),
'features': features,
'prediction': prediction,
'actual': actual,
})
def detect_drift(self, window=1000):
"""Detect input or output drift"""
recent = pd.DataFrame(self.predictions[-window:])
# Check for feature distribution change
current_mean = recent['features'].mean()
baseline_mean = self.baseline['feature_mean']
# Alert if significant deviation
if abs(current_mean - baseline_mean) > 2 * self.baseline['feature_std']:
return 'DRIFT_DETECTED'
return 'OK'
def calculate_metrics(self, actual_labels):
"""Calculate current performance metrics"""
recent_df = pd.DataFrame(self.predictions[-1000:])
predictions = recent_df['prediction']
actuals = actual_labels[-len(predictions):]
accuracy = (predictions == actuals).mean()
return {'accuracy': accuracy}
Monitoring Dashboard
from prometheus_client import Counter, Histogram, Gauge
import time
# Prometheus metrics
prediction_counter = Counter(
'model_predictions_total',
'Total predictions',
['model', 'class']
)
prediction_latency = Histogram(
'prediction_latency_seconds',
'Prediction latency'
)
model_accuracy = Gauge(
'model_accuracy',
'Current model accuracy'
)
@app.route('/predict', methods=['POST'])
def predict():
start = time.time()
result = model.predict(features)
# Log metrics
prediction_counter.labels(model='churn', class_=result).inc()
prediction_latency.observe(time.time() - start)
return result
Retraining Pipeline
Automated Retraining
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'ml-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'model_retraining',
default_args=default_args,
schedule_interval='0 2 * * 0', # Weekly at 2 AM
)
def fetch_data():
# Get new training data
pass
def train_model():
# Train with new data
pass
def validate_model():
# Compare with baseline
pass
def deploy_model():
# Promote to production if valid
pass
fetch_task = PythonOperator(
task_id='fetch_data',
python_callable=fetch_data,
dag=dag,
)
train_task = PythonOperator(
task_id='train_model',
python_callable=train_model,
dag=dag,
)
fetch_task >> train_task >> validate_model >> deploy_model
Real-World Example: Churn Prediction
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
import mlflow
# Data loading
def load_data():
df = pd.read_csv('customer_data.csv')
df = df.dropna()
return df
# Feature engineering
def create_features(df):
features = df[['age', 'tenure', 'monthly_charges', 'total_charges']].copy()
# Normalize
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features)
return features_scaled, scaler
# Training
def train_churn_model():
mlflow.set_experiment("churn_prediction")
with mlflow.start_run():
# Load and prepare data
df = load_data()
X, scaler = create_features(df)
y = df['churn'].values
# Train
model = RandomForestClassifier(
n_estimators=100,
max_depth=10,
random_state=42
)
model.fit(X, y)
# Evaluate
accuracy = model.score(X, y)
mlflow.log_metric("accuracy", accuracy)
# Log
mlflow.sklearn.log_model(model, "churn_model")
return model, scaler
if __name__ == '__main__':
model, scaler = train_churn_model()
Best Practices
โ
Version control: code, data, models
โ
Automated testing for data quality
โ
CI/CD pipelines for model deployment
โ
Monitor data drift and concept drift
โ
Maintain explainability
โ
Document model decisions
โ
Handle edge cases gracefully
โ
Plan for model failures
Glossary
- MLOps: Operations focused on ML systems
- Feature Store: Centralized feature management
- Concept Drift: Target distribution change over time
- Data Drift: Input distribution change over time
- Model Registry: Version control for models
Comments