Introduction
MLOps (Machine Learning Operations) applies DevOps principles to machine learning, enabling automated, reproducible, and scalable ML workflows. Data engineers increasingly need to build and maintain these pipelines.
MLOps Maturity Levels
| Level | Description | Automation |
|---|---|---|
| 0 | Manual, script-based | None |
| 1 | ML pipeline automation | Training/deployment |
| 2 | CI/CD for ML | Testing, validation |
| 3 | Continuous training | Trigger-based retraining |
| 4 | Full MLOps | Monitoring + optimization |
MLflow for Experiment Tracking
Setup and Configuration
# mlflow_tracking.py
import mlflow
from mlflow.tracking import MlflowClient
# Configure MLflow
mlflow.set_tracking_uri("http://mlflow-server:5000")
mlflow.set_experiment("customer_churn_prediction")
# Experiment tracking
with mlflow.start_run(run_name="xgboost_v1"):
# Log parameters
mlflow.log_params({
"n_estimators": 100,
"max_depth": 5,
"learning_rate": 0.1,
"objective": "binary:logistic"
})
# Train model
model = train_model(X_train, y_train)
# Log metrics
mlflow.log_metrics({
"accuracy": 0.92,
"precision": 0.89,
"recall": 0.85,
" 0.87f1":,
"auc": 0.94
})
# Log model
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
registered_model_name="customer_churn"
)
# Log artifacts
mlflow.log_artifact("feature_importance.png")
Model Registry
# model_registry.py
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Register model from run
model_uri = "runs:/abc123/model"
model_name = "customer_churn"
model_version = mlflow.register_model(model_uri, model_name)
# Transition model through stages
client.transition_model_version_stage(
name=model_name,
version=1,
stage="Staging"
)
# Deploy to production
client.transition_model_version_stage(
name=model_name,
version=1,
stage="Production"
)
# Get latest production model
production_model = mlflow.pyfunc.load_model(
f"models:/{model_name}/production"
)
Kubeflow Pipelines
Pipeline Definition
# ml_pipeline.py
from kfp import dsl
from kfp.components import InputPath, OutputPath
def preprocess_data(
input_path: InputPath(str),
output_train: OutputPath(str),
output_test: OutputPath(str),
test_size: float = 0.2
):
"""Preprocess and split data."""
import pandas as pd
from sklearn.model_selection import train_test_split
df = pd.read_csv(input_path)
# Preprocessing
df = df.dropna()
df = pd.get_dummies(df, drop_first=True)
X = df.drop('target', axis=1)
y = df['target']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=42
)
# Save
pd.concat([X_train, y_train], axis=1).to_csv(output_train)
pd.concat([X_test, y_test], axis=1).to_csv(output_test)
def train_model(
train_data: InputPath(str),
output_model: OutputPath(str),
n_estimators: int = 100
):
"""Train model."""
import pandas as pd
import pickle
from sklearn.ensemble import RandomForestClassifier
train_df = pd.read_csv(train_data)
X = train_df.drop('target', axis=1)
y = train_df['target']
model = RandomForestClassifier(n_estimators=n_estimators)
model.fit(X, y)
with open(output_model, 'wb') as f:
pickle.dump(model, f)
def evaluate_model(
model: InputPath(str),
test_data: InputPath(str),
metrics_path: OutputPath(str)
):
"""Evaluate model."""
import pandas as pd
import pickle
import json
from sklearn.metrics import accuracy_score, precision_score, recall_score
# Load
with open(model, 'rb') as f:
model = pickle.load(f)
test_df = pd.read_csv(test_data)
X = test_df.drop('target', axis=1)
y = test_df['target']
# Predict
y_pred = model.predict(X)
# Metrics
metrics = {
"accuracy": accuracy_score(y, y_pred),
"precision": float(precision_score(y, y_pred)),
"recall": float(recall_score(y, y_pred))
}
with open(metrics_path, 'w') as f:
json.dump(metrics, f)
# Define pipeline
@dsl.pipeline(name="ML Training Pipeline")
def ml_pipeline(
input_data: str = "gs://bucket/data/train.csv",
n_estimators: int = 100,
test_size: float = 0.2
):
# Preprocess
preprocess_op = dsl.ContainerOp(
name="preprocess",
image="preprocess:latest",
arguments=[
"--input_path", input_data,
"--output_train", "/train.csv",
"--output_test", "/test.csv",
"--test_size", test_size
]
)
# Train
train_op = dsl.ContainerOp(
name="train",
image="train:latest",
arguments=[
"--train_data", preprocess_op.outputs["output_train"],
"--output_model", "/model.pkl",
"--n_estimators", n_estimators
]
).after(preprocess_op)
# Evaluate
eval_op = dsl.ContainerOp(
name="evaluate",
image="evaluate:latest",
arguments=[
"--model", train_op.outputs["output_model"],
"--test_data", preprocess_op.outputs["output_test"],
"--metrics_path", "/metrics.json"
]
).after(train_op)
Feature Store Integration
# feature_store.py
from feast import Feature, FeatureView, FileSource
from datetime import timedelta
# Define feature definitions
customer_features = FeatureView(
name="customer_features",
entities=["customer_id"],
ttl=timedelta(days=1),
features=[
Feature(name="total_purchases", dtype=Int64),
Feature(name="avg_order_value", dtype=Float32),
Feature(name="days_since_last_order", dtype=Int32),
Feature(name="num_returns", dtype=Int32),
],
online=True,
batch_source=FileSource(
path="gs://bucket/features/customer_features.parquet",
timestamp_field="event_timestamp"
)
)
# Retrieve features for training
from feast import FeatureStore
fs = FeatureStore(repo_path=".")
training_df = fs.get_feature_view(
name="customer_features"
).get_batch_data(
entity_df=training_labels
)
# Get online features for inference
online_features = fs.get_online_features(
features=[
"customer_features:total_purchases",
"customer_features:avg_order_value"
],
entity_rows=[{"customer_id": "12345"}]
)
Model Monitoring
# model_monitoring.py
import numpy as np
from dataclasses import dataclass
from datetime import datetime
@dataclass
class ModelPerformance:
accuracy: float
precision: float
recall: float
predictions: list
actuals: list
class ModelMonitor:
def __init__(self, alert_threshold: float = 0.05):
self.alert_threshold = alert_threshold
self.baseline_metrics = {}
def set_baseline(self, model_name: str, metrics: dict):
"""Set baseline metrics for comparison."""
self.baseline_metrics[model_name] = {
**metrics,
'timestamp': datetime.now()
}
def detect_drift(self, model_name: str, current_metrics: dict) -> dict:
"""Detect model drift."""
if model_name not in self.baseline_metrics:
return {'drift_detected': False, 'reason': 'no_baseline'}
baseline = self.baseline_metrics[model_name]
drift_detected = False
drift_details = []
for metric in ['accuracy', 'precision', 'recall']:
if metric in current_metrics and metric in baseline:
change = abs(current_metrics[metric] - baseline[metric])
if change > self.alert_threshold:
drift_detected = True
drift_details.append({
'metric': metric,
'baseline': baseline[metric],
'current': current_metrics[metric],
'change': change
})
return {
'drift_detected': drift_detected,
'details': drift_details,
'timestamp': datetime.now()
}
def detect_data_drift(self, baseline_data: np.ndarray, current_data: np.ndarray) -> float:
"""Detect distribution drift using KS test."""
from scipy.stats import ks_2samp
statistic, p_value = ks_2samp(baseline_data, current_data)
return {
'ks_statistic': float(statistic),
'p_value': float(p_value),
'significant_drift': p_value < 0.05
}
Implementation Checklist
- Set up experiment tracking (MLflow)
- Create feature store
- Build training pipeline (Kubeflow/Airflow)
- Implement model registry
- Configure CI/CD for ML
- Set up monitoring and alerting
- Create retraining triggers
Summary
MLOps bridges data engineering and machine learning:
- Experiment Tracking: MLflow provides comprehensive tracking
- Pipeline Automation: Kubeflow enables scalable pipelines
- Feature Store: Centralizes feature management
- Monitoring: Detects model and data drift
Data engineers should build these systems to enable self-service ML while maintaining reliability and governance.
Comments