Introduction
MLOps applies DevOps principles to machine learning. It addresses unique ML challenges: model training, versioning, and monitoring in production. This guide covers building reliable ML systems.
ML Pipeline
Stages
Data โ Training โ Validation โ Deployment โ Monitoring
โ โ โ โ โ
ETL Compute Metrics A/B Test Drift Detection
Components
- Data pipeline: ETL, feature engineering
- Training pipeline: Model training
- Inference: Predictions in production
Model Development
Experiment Tracking
import mlflow
mlflow.set_experiment("customer_churn")
with mlflow.start_run():
mlflow.log_param("model_type", "random_forest")
mlflow.log_param("n_estimators", 100)
model.fit(X_train, y_train)
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("f1", f1_score)
mlflow.sklearn.log_model(model, "model")
Model Versioning
- Track experiments
- Version datasets
- Version code
- Version models
Tools
- MLflow: Open source
- Weights & Biases: Visualization
- Neptune: Experiment tracking
- SageMaker: Managed
Model Serving
Options
| Method | Use Case | Pros | Cons |
|---|---|---|---|
| REST API | General | Flexible | Latency |
| Batch | Offline | Simple | Delayed |
| Streaming | Real-time | Fast | Complex |
| Edge | Mobile | Offline | Limited |
Simple REST API
from flask import Flask, request
import joblib
app = Flask(__name__)
model = joblib.load('model.pkl')
@app.route('/predict', methods=['POST'])
def predict():
data = request.json
features = [data['feat1'], data['feat2']]
prediction = model.predict([features])[0]
probability = model.predict_proba([features])[0]
return jsonify({
'prediction': int(prediction),
'confidence': float(probability[prediction])
})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Model Serialization
# Save model
joblib.dump(model, 'model.pkl')
pickle.dump(model, open('model.pkl', 'wb'))
mlflow.sklearn.log_model(model, 'model')
Feature Store
Purpose
- Centralize features
- Ensure consistency
- Enable reuse
- Handle offline/online
Implementation
from feast import Feature, FeatureView, FileSource
customer_features = FeatureView(
name="customer_features",
entities=["customer_id"],
ttl=timedelta(days=1),
features=[
Feature(name="total_purchases", dtype=Float32),
Feature(name="avg_order_value", dtype=Float32),
],
online=True,
batch_source=FileSource(
path="s3://bucket/features.parquet",
timestamp_field="timestamp",
),
)
Model Monitoring
What to Monitor
- Model accuracy
- Prediction distribution
- Feature drift
- Data quality
Drift Detection
import numpy as np
from scipy import stats
def detect_drift(baseline, current):
# Statistical test for distribution shift
ks_statistic, p_value = stats.ks_2samp(
baseline,
current
)
if p_value < 0.05:
return {"drift": True, "p_value": p_value}
return {"drift": False, "p_value": p_value}
Tools
- Prometheus + Grafana: Metrics
- Evidently AI: Drift detection
- Fiddler: Model monitoring
- Seldon: ML deployment
CI/CD for ML
Pipeline
# .github/workflows/ml-pipeline.yml
name: ML Pipeline
on: [push]
jobs:
train:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Train model
run: python train.py
- name: Evaluate
run: python evaluate.py
- name: Register model
run: mlflow.register_model()
deploy:
needs: train
runs-on: ubuntu-latest
steps:
- name: Deploy to staging
run: kubectl apply -f staging/
Infrastructure
Containerization
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY model.pkl .
COPY inference.py .
EXPOSE 8000
CMD ["python", "inference.py"]
Orchestration
- Kubernetes: Most common
- Airflow: Data pipelines
- Kubeflow: ML on Kubernetes
- SageMaker: Managed
Best Practices
Data Quality
- Validate incoming data
- Handle missing values
- Monitor data drift
Model Quality
- Track metrics
- A/B testing
- Rollback capability
Operations
- Logging
- Alerting
- Documentation
Conclusion
MLOps brings reliability to ML systems. Start with experiment tracking, add model serving, then monitoring. Automate everything as you scale.
Comments