Skip to main content
โšก Calmops

MLOps for Data Engineers: Machine Learning Pipeline Automation

Introduction

MLOps (Machine Learning Operations) applies DevOps principles to machine learning, enabling automated, reproducible, and scalable ML workflows. Data engineers increasingly need to build and maintain these pipelines.

MLOps Maturity Levels

Level Description Automation
0 Manual, script-based None
1 ML pipeline automation Training/deployment
2 CI/CD for ML Testing, validation
3 Continuous training Trigger-based retraining
4 Full MLOps Monitoring + optimization

MLflow for Experiment Tracking

Setup and Configuration

# mlflow_tracking.py
import mlflow
from mlflow.tracking import MlflowClient

# Configure MLflow
mlflow.set_tracking_uri("http://mlflow-server:5000")
mlflow.set_experiment("customer_churn_prediction")

# Experiment tracking
with mlflow.start_run(run_name="xgboost_v1"):
    # Log parameters
    mlflow.log_params({
        "n_estimators": 100,
        "max_depth": 5,
        "learning_rate": 0.1,
        "objective": "binary:logistic"
    })
    
    # Train model
    model = train_model(X_train, y_train)
    
    # Log metrics
    mlflow.log_metrics({
        "accuracy": 0.92,
        "precision": 0.89,
        "recall": 0.85,
        " 0.87f1":,
        "auc": 0.94
    })
    
    # Log model
    mlflow.sklearn.log_model(
        sk_model=model,
        artifact_path="model",
        registered_model_name="customer_churn"
    )
    
    # Log artifacts
    mlflow.log_artifact("feature_importance.png")

Model Registry

# model_registry.py
from mlflow.tracking import MlflowClient

client = MlflowClient()

# Register model from run
model_uri = "runs:/abc123/model"
model_name = "customer_churn"

model_version = mlflow.register_model(model_uri, model_name)

# Transition model through stages
client.transition_model_version_stage(
    name=model_name,
    version=1,
    stage="Staging"
)

# Deploy to production
client.transition_model_version_stage(
    name=model_name,
    version=1,
    stage="Production"
)

# Get latest production model
production_model = mlflow.pyfunc.load_model(
    f"models:/{model_name}/production"
)

Kubeflow Pipelines

Pipeline Definition

# ml_pipeline.py
from kfp import dsl
from kfp.components import InputPath, OutputPath

def preprocess_data(
    input_path: InputPath(str),
    output_train: OutputPath(str),
    output_test: OutputPath(str),
    test_size: float = 0.2
):
    """Preprocess and split data."""
    import pandas as pd
    from sklearn.model_selection import train_test_split
    
    df = pd.read_csv(input_path)
    
    # Preprocessing
    df = df.dropna()
    df = pd.get_dummies(df, drop_first=True)
    
    X = df.drop('target', axis=1)
    y = df['target']
    
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, random_state=42
    )
    
    # Save
    pd.concat([X_train, y_train], axis=1).to_csv(output_train)
    pd.concat([X_test, y_test], axis=1).to_csv(output_test)


def train_model(
    train_data: InputPath(str),
    output_model: OutputPath(str),
    n_estimators: int = 100
):
    """Train model."""
    import pandas as pd
    import pickle
    from sklearn.ensemble import RandomForestClassifier
    
    train_df = pd.read_csv(train_data)
    X = train_df.drop('target', axis=1)
    y = train_df['target']
    
    model = RandomForestClassifier(n_estimators=n_estimators)
    model.fit(X, y)
    
    with open(output_model, 'wb') as f:
        pickle.dump(model, f)


def evaluate_model(
    model: InputPath(str),
    test_data: InputPath(str),
    metrics_path: OutputPath(str)
):
    """Evaluate model."""
    import pandas as pd
    import pickle
    import json
    from sklearn.metrics import accuracy_score, precision_score, recall_score
    
    # Load
    with open(model, 'rb') as f:
        model = pickle.load(f)
    
    test_df = pd.read_csv(test_data)
    X = test_df.drop('target', axis=1)
    y = test_df['target']
    
    # Predict
    y_pred = model.predict(X)
    
    # Metrics
    metrics = {
        "accuracy": accuracy_score(y, y_pred),
        "precision": float(precision_score(y, y_pred)),
        "recall": float(recall_score(y, y_pred))
    }
    
    with open(metrics_path, 'w') as f:
        json.dump(metrics, f)


# Define pipeline
@dsl.pipeline(name="ML Training Pipeline")
def ml_pipeline(
    input_data: str = "gs://bucket/data/train.csv",
    n_estimators: int = 100,
    test_size: float = 0.2
):
    # Preprocess
    preprocess_op = dsl.ContainerOp(
        name="preprocess",
        image="preprocess:latest",
        arguments=[
            "--input_path", input_data,
            "--output_train", "/train.csv",
            "--output_test", "/test.csv",
            "--test_size", test_size
        ]
    )
    
    # Train
    train_op = dsl.ContainerOp(
        name="train",
        image="train:latest",
        arguments=[
            "--train_data", preprocess_op.outputs["output_train"],
            "--output_model", "/model.pkl",
            "--n_estimators", n_estimators
        ]
    ).after(preprocess_op)
    
    # Evaluate
    eval_op = dsl.ContainerOp(
        name="evaluate",
        image="evaluate:latest",
        arguments=[
            "--model", train_op.outputs["output_model"],
            "--test_data", preprocess_op.outputs["output_test"],
            "--metrics_path", "/metrics.json"
        ]
    ).after(train_op)

Feature Store Integration

# feature_store.py
from feast import Feature, FeatureView, FileSource
from datetime import timedelta

# Define feature definitions
customer_features = FeatureView(
    name="customer_features",
    entities=["customer_id"],
    ttl=timedelta(days=1),
    features=[
        Feature(name="total_purchases", dtype=Int64),
        Feature(name="avg_order_value", dtype=Float32),
        Feature(name="days_since_last_order", dtype=Int32),
        Feature(name="num_returns", dtype=Int32),
    ],
    online=True,
    batch_source=FileSource(
        path="gs://bucket/features/customer_features.parquet",
        timestamp_field="event_timestamp"
    )
)

# Retrieve features for training
from feast import FeatureStore

fs = FeatureStore(repo_path=".")

training_df = fs.get_feature_view(
    name="customer_features"
).get_batch_data(
    entity_df=training_labels
)

# Get online features for inference
online_features = fs.get_online_features(
    features=[
        "customer_features:total_purchases",
        "customer_features:avg_order_value"
    ],
    entity_rows=[{"customer_id": "12345"}]
)

Model Monitoring

# model_monitoring.py
import numpy as np
from dataclasses import dataclass
from datetime import datetime

@dataclass
class ModelPerformance:
    accuracy: float
    precision: float
    recall: float
    predictions: list
    actuals: list

class ModelMonitor:
    def __init__(self, alert_threshold: float = 0.05):
        self.alert_threshold = alert_threshold
        self.baseline_metrics = {}
    
    def set_baseline(self, model_name: str, metrics: dict):
        """Set baseline metrics for comparison."""
        self.baseline_metrics[model_name] = {
            **metrics,
            'timestamp': datetime.now()
        }
    
    def detect_drift(self, model_name: str, current_metrics: dict) -> dict:
        """Detect model drift."""
        
        if model_name not in self.baseline_metrics:
            return {'drift_detected': False, 'reason': 'no_baseline'}
        
        baseline = self.baseline_metrics[model_name]
        
        drift_detected = False
        drift_details = []
        
        for metric in ['accuracy', 'precision', 'recall']:
            if metric in current_metrics and metric in baseline:
                change = abs(current_metrics[metric] - baseline[metric])
                
                if change > self.alert_threshold:
                    drift_detected = True
                    drift_details.append({
                        'metric': metric,
                        'baseline': baseline[metric],
                        'current': current_metrics[metric],
                        'change': change
                    })
        
        return {
            'drift_detected': drift_detected,
            'details': drift_details,
            'timestamp': datetime.now()
        }
    
    def detect_data_drift(self, baseline_data: np.ndarray, current_data: np.ndarray) -> float:
        """Detect distribution drift using KS test."""
        from scipy.stats import ks_2samp
        
        statistic, p_value = ks_2samp(baseline_data, current_data)
        
        return {
            'ks_statistic': float(statistic),
            'p_value': float(p_value),
            'significant_drift': p_value < 0.05
        }

Implementation Checklist

  • Set up experiment tracking (MLflow)
  • Create feature store
  • Build training pipeline (Kubeflow/Airflow)
  • Implement model registry
  • Configure CI/CD for ML
  • Set up monitoring and alerting
  • Create retraining triggers

Summary

MLOps bridges data engineering and machine learning:

  1. Experiment Tracking: MLflow provides comprehensive tracking
  2. Pipeline Automation: Kubeflow enables scalable pipelines
  3. Feature Store: Centralizes feature management
  4. Monitoring: Detects model and data drift

Data engineers should build these systems to enable self-service ML while maintaining reliability and governance.


External Resources

Comments