Skip to main content
โšก Calmops

Data Pipeline Architecture: ETL, ELT, and Streaming Patterns

Introduction

Data pipelines are the circulatory system of modern organizations. Every click, transaction, and sensor reading generates data that must be collected, transformed, and delivered to analysts, ML models, and business applications. Building robust, scalable data pipelines is a fundamental skill for any data engineer or software engineer working with data.

This guide covers the complete spectrum of data pipeline architecturesโ€”from traditional batch ETL to modern streaming systems. You’ll learn when to use each approach, how to handle common challenges like exactly-once processing, and practical implementation patterns using industry-standard tools.

Understanding Data Pipeline Fundamentals

What is a Data Pipeline?

A data pipeline is a series of data processing steps that move data from source to destination:

Source โ†’ Extract โ†’ Transform โ†’ Load โ†’ Destination

Each stage may involve validation, enrichment, aggregation, or format conversion. The key is understanding that pipelines must be reliable, scalable, and maintainable.

Batch vs Streaming Processing

Aspect Batch Processing Stream Processing
Data Window Fixed time intervals Continuous, event-by-event
Latency Minutes to hours Milliseconds to seconds
Complexity Lower Higher
Use Cases Analytics, reporting Real-time alerts, dashboards
Tools Airflow, dbt, Spark Batch Kafka, Flink, Spark Streaming

ETL: Extract, Transform, Load

Traditional ETL Architecture

ETL is the classic approachโ€”extract data from sources, transform it in a processing cluster, then load into the destination:

# Python ETL with psycopg2 and pandas
import pandas as pd
import psycopg2

def extract_orders():
    """Extract from source database."""
    conn = psycopg2.connect(
        host='source-db.internal',
        database='ecommerce',
        user='etl_user',
        password='secret'
    )
    
    query = """
        SELECT order_id, customer_id, total, created_at
        FROM orders
        WHERE created_at >= %(last_run)s
    """
    
    df = pd.read_sql(query, conn, params={'last_run': get_last_run()})
    conn.close()
    return df

def transform_orders(df):
    """Transform and enrich data."""
    # Clean data
    df = df.dropna()
    df['order_date'] = pd.to_datetime(df['created_at']).dt.date
    
    # Enrich with customer data
    customers = get_customer_data(df['customer_id'].unique())
    df = df.merge(customers, on='customer_id', how='left')
    
    # Calculate derived metrics
    df['order_month'] = df['order_date'].dt.to_period('M')
    df['profit_margin'] = (df['total'] - df['cost']) / df['total']
    
    return df

def load_orders(df):
    """Load into data warehouse."""
    conn = psycopg2.connect(
        host='warehouse.internal',
        database='analytics',
        user='etl_user',
        password='secret'
    )
    
    # Upsert pattern
    for _, row in df.iterrows():
        cursor.execute("""
            INSERT INTO fact_orders (order_id, customer_id, total, order_date)
            VALUES (%s, %s, %s, %s)
            ON CONFLICT (order_id) DO UPDATE SET
                total = EXCLUDED.total
        """, (row['order_id'], row['customer_id'], row['total'], row['order_date']))
    
    conn.commit()
    conn.close()

def run_etl():
    df = extract_orders()
    df = transform_orders(df)
    load_orders(df)
    update_last_run()

Modern ETL with Apache Airflow

Airflow orchestrates complex workflows with dependencies:

# dags/etl_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'etl_orders_pipeline',
    default_args=default_args,
    description='Daily order ETL pipeline',
    schedule_interval='0 2 * * *',  # 2 AM daily
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:

    extract = PythonOperator(
        task_id='extract_orders',
        python_callable=extract_orders,
    )

    transform = PythonOperator(
        task_id='transform_orders',
        python_callable=transform_orders,
    )

    load = PythonOperator(
        task_id='load_orders',
        python_callable=load_orders,
    )

    analyze = PostgresOperator(
        task_id='update_aggregations',
        sql='sql/refresh_materialized_views.sql',
        postgres_conn_id='warehouse',
    )

    extract >> transform >> load >> analyze

ETL Best Practices

# 1. Idempotency: Running multiple times produces same result
def extract_incremental(table, watermark_column, last_value=None):
    if last_value is None:
        # Full load
        return f"SELECT * FROM {table}"
    # Incremental load
    return f"SELECT * FROM {table} WHERE {watermark_column} > '{last_value}'"

# 2. Schema validation on extract
def validate_schema(df, expected_schema):
    for col, dtype in expected_schema.items():
        if col not in df.columns:
            raise ValueError(f"Missing column: {col}")
        if df[col].dtype != dtype:
            df[col] = df[col].astype(dtype)
    return df

# 3. Error handling with dead letter queue
def process_with_dlq(df):
    valid_records = []
    errors = []
    
    for idx, row in df.iterrows():
        try:
            validated = validate_record(row)
            valid_records.append(validated)
        except ValidationError as e:
            errors.append({
                'index': idx,
                'error': str(e),
                'data': row.to_dict()
            })
    
    # Send errors to DLQ for investigation
    if errors:
        save_to_dlq(errors)
    
    return pd.DataFrame(valid_records)

ELT: Extract, Load, Transform

Why ELT?

ELT flips the traditional orderโ€”load raw data first, then transform in the data warehouse. This approach leverages the data warehouse’s computational power and provides flexibility:

-- Raw data landing
CREATE TABLE raw_orders AS
SELECT * FROM external_source.orders;

-- Transform layer (can be re-run anytime)
CREATE MATERIALIZED VIEW mv_monthly_orders AS
SELECT 
    DATE_TRUNC('month', order_date) AS month,
    COUNT(*) AS order_count,
    SUM(total) AS revenue,
    COUNT(DISTINCT customer_id) AS unique_customers
FROM raw_orders
GROUP BY DATE_TRUNC('month', order_date);

ELT with dbt

dbt (data build tool) transforms data in your warehouse using SQL:

# dbt_project.yml
name: analytics_pipeline
version: '1.0.0'

models:
  analytics:
    +materialized: table
    
    staging:
      +schema: staging
      
    intermediate:
      +schema: intermediate
      
    marts:
      +schema: marts
# models/staging/stg_orders.sql
{{ config(materialized='view') }}

SELECT
    order_id,
    customer_id,
    total,
    status,
    created_at::TIMESTAMP AS order_timestamp,
    ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY created_at) AS order_sequence
FROM {{ source('raw', 'orders') }}
WHERE created_at >= '2024-01-01'
# models/intermediate/int_order_metrics.sql
{{ config(materialized='table') }}

WITH customer_orders AS (
    SELECT 
        customer_id,
        COUNT(*) AS total_orders,
        SUM(total) AS lifetime_value,
        AVG(total) AS avg_order_value,
        MIN(created_at) AS first_order_date,
        MAX(created_at) AS last_order_date
    FROM {{ ref('stg_orders') }}
    GROUP BY customer_id
)

SELECT 
    c.*,
    co.total_orders,
    co.lifetime_value,
    co.avg_order_value,
    co.first_order_date,
    co.last_order_date,
    DATE_DIFF('day', co.first_order_date, co.last_order_date) AS customer_tenure_days
FROM {{ ref('stg_customers') }} c
LEFT JOIN customer_orders co ON c.customer_id = co.customer_id
# models/marts/fact_orders.sql
{{ config(materialized='table') }}

{{ 
    config(
        partition_by={
            "field": "order_date",
            "data_type": "date",
            "granularity": "day"
        }
    ) 
}}

SELECT
    o.order_id,
    o.customer_id,
    o.total,
    o.status,
    o.created_at AS order_timestamp,
    DATE(o.created_at) AS order_date,
    c.segment,
    c.region,
    p.product_category,
    ROW_NUMBER() OVER (PARTITION BY o.customer_id ORDER BY o.created_at) AS customer_order_number
FROM {{ ref('stg_orders') }} o
JOIN {{ ref('stg_customers') }} c ON o.customer_id = c.customer_id
LEFT JOIN {{ ref('stg_products') }} p ON o.product_id = p.product_id

Incremental ELT with dbt

# models/marts/fact_orders_incremental.sql
{{ config(materialized='incremental', unique_key='order_id') }}

{% set max_order_date = run_query("SELECT MAX(order_date) FROM " ~ this ~ "").columns[0].values()[0] %}

SELECT
    order_id,
    customer_id,
    total,
    status,
    created_at AS order_timestamp,
    DATE(created_at) AS order_date
FROM {{ source('raw', 'orders') }}
WHERE 
    created_at >= COALESCE('{{ max_order_date }}', '1900-01-01')
    {% if is_incremental() %}
    AND created_at > (SELECT MAX(order_timestamp) FROM {{ this }})
    {% endif %}

Streaming Data Pipelines

Apache Kafka Fundamentals

Kafka is the backbone of modern streaming architectures:

# Producer: Publishing events
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['kafka-1:9092', 'kafka-2:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8')
)

# Publish order events
for order in orders_batch:
    producer.send(
        'orders',
        key=str(order['customer_id']),
        value={
            'event_type': 'order_created',
            'order_id': order['id'],
            'customer_id': order['customer_id'],
            'total': order['total'],
            'items': order['items'],
            'timestamp': order['created_at'].isoformat()
        }
    )

producer.flush()
# Consumer: Processing events
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['kafka-1:9092', 'kafka-2:9092'],
    group_id='order-processor',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest',
    enable_auto_commit=True
)

for message in consumer:
    event = message.value
    
    if event['event_type'] == 'order_created':
        process_new_order(event)
        
    elif event['event_type'] == 'order_cancelled':
        process_cancellation(event)

Kafka Streams for Real-Time Processing

// Java: Kafka Streams word count
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Grouped;

StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> textLines = builder.stream("text-lines");

KTable<String, Long> wordCounts = textLines
    .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
    .groupBy((key, word) -> word, Grouped.with(String(), String()))
    .count(Materialized.as("word-counts-store"));

wordCounts.toStream().to("word-counts-output");

Stream Processing Patterns

# Pattern 1: Windowed Aggregation
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, sum, count

spark = SparkSession.builder.getOrCreate()

orders = spark.readStream.format('kafka')\
    .option('kafka.bootstrap.servers', 'kafka:9092')\
    .option('subscribe', 'orders')\
    .load()

# Tumbling window (non-overlapping)
orders_with_watermark = orders\
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
    .withWatermark("timestamp", "10 minutes")

windowed_sales = orders_with_watermark\
    .groupBy(
        window("timestamp", "1 hour"),
        "customer_id"
    )\
    .agg(
        sum("total").alias("hourly_spend"),
        count("order_id").alias("order_count")
    )

# Pattern 2: Streaming Join
orders = spark.readStream.format('kafka')\
    .option('kafka.bootstrap.servers', 'kafka:9092')\
    .option('subscribe', 'orders')\
    .load()

products = spark.readStream.format('kafka')\
    .option('kafka.bootstrap.servers', 'kafka:9092')\
    .option('subscribe', 'products')\
    .load()

enriched_orders = orders.join(products, orders.product_id == products.product_id)

# Pattern 3: State Store (Flink)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.state import MapStateDescriptor

env = StreamExecutionEnvironment.get_execution_environment()

class OrderAggregator(MapStateDescriptor):
    def __init__(self):
        super().__init__("order-aggregator", String(), RowTypeInfo(BasicTypeInfo.LONG_TYPE, BasicTypeInfo.LONG_TYPE))

@KeyedProcessFunction
def aggregate_orders(ctx, order, aggregator: MapState):
    current = aggregator.get(order.customer_id)
    if current is None:
        current = (0, 0)
    
    new_count = current[0] + 1
    new_total = current[1] + order.total
    aggregator.put(order.customer_id, (new_count, new_total))

Data Quality and Monitoring

Data Contracts

# Define expected schema
from dataclasses import dataclass
from typing import List
import pyspark

@dataclass
class OrderContract:
    order_id: str
    customer_id: str
    total: float
    status: str
    created_at: str
    
    @staticmethod
    def validate(df: pyspark.sql.DataFrame) -> bool:
        required_columns = ['order_id', 'customer_id', 'total', 'status', 'created_at']
        
        for col in required_columns:
            if col not in df.columns:
                raise DataContractError(f"Missing required column: {col}")
        
        # Null checks
        null_counts = {col: df.filter(df[col].isNull()).count() for col in required_columns}
        for col, count in null_counts.items():
            if count > 0:
                raise DataContractError(f"Found {count} null values in {col}")
        
        # Type validation
        if not isinstance(df.select('total').first()[0], (int, float)):
            raise DataContractError("total must be numeric")
        
        # Value constraints
        invalid_statuses = df.filter(~df.status.isin(['pending', 'completed', 'cancelled'])).count()
        if invalid_statuses > 0:
            raise DataContractError(f"Found {invalid_statuses} invalid status values")
        
        return True

Pipeline Monitoring

# Airflow metrics and alerting
from airflow.hooks.base import BaseHook
from slack_sdk import WebClient

def slack_failure_callback(context):
    dag_id = context['dag'].dag_id
    task_id = context['task_instance'].task_id
    error = context['exception']
    
    client = WebClient(token=os.environ['SLACK_TOKEN'])
    client.chat_postMessage(
        channel='#data-alerts',
        text=f"โŒ Pipeline Failed: {dag_id}.{task_id}\nError: {str(error)}"
    )

# Data quality checks
from great_expectations import great_expectations as gx

def validate_orders_data():
    context = gx.get_context()
    batch = context.sources.pandas_files.read_csv("orders.csv")
    
    expectations = [
        expect_column_values_to_not_be_null.column("order_id"),
        expect_column_values_to_be_between.column("total").min_value(0),
        expect_column_distributions_to_match_histogram.column("status")
            .expected_histogram_partition_object([
                {"value": "pending", "count": 0.1},
                {"value": "completed", "count": 0.8},
                {"value": "cancelled", "count": 0.1}
            ])
    ]
    
    results = batch.validate(expectations=expectations)
    
    if not results["success"]:
        send_alert(f"Data quality failed: {results['statistics']}")

Lambda and Kappa Architectures

Lambda Architecture

Combines batch and streaming layers:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚              Data Sources               โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                  โ”‚
        โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
        โ”‚                   โ”‚
        โ–ผ                   โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚   Batch Layer โ”‚    โ”‚  Speed Layer  โ”‚
โ”‚   (HDFS/S3)   โ”‚    โ”‚  (Streaming)  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
        โ”‚                    โ”‚
        โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                  โ”‚
                  โ–ผ
        โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
        โ”‚   Serving Layer   โ”‚
        โ”‚   (Query/Merge)   โ”‚
        โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Kappa Architecture

Simplifies to single streaming path:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚              Data Sources               โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                  โ”‚
                  โ–ผ
        โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
        โ”‚  Kafka Stream     โ”‚
        โ”‚  (Immutable Log)  โ”‚
        โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
                   โ”‚         โ”‚           โ”‚
                   โ–ผ         โ–ผ           โ–ผ
            โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
            โ”‚ Real-time โ”‚ โ”‚   Replay โ”‚ โ”‚  Query   โ”‚
            โ”‚  Service  โ”‚ โ”‚ (Rebuild)โ”‚ โ”‚  Layer   โ”‚
            โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Orchestration with Prefect

# prefect_pipeline.py
from prefect import flow, task
from prefect.transactions import transaction
import psycopg2

@task
def extract():
    return fetch_from_source()

@task
def transform(data):
    return transform_data(data)

@task
def load(data):
    with transaction():
        insert_into_warehouse(data)
        mark_pipeline_complete()
        
@flow
def daily_pipeline():
    data = extract()
    transformed = transform(data)
    load(transformed)
    
# Schedule
if __name__ == "__main__":
    daily_pipeline.serve(
        cron="0 2 * * *",
        parameters={"environment": "production"}
    )

Data Pipeline Testing

# Unit tests for transformations
import pytest
from pyspark.sql import SparkSession

@pytest.fixture
def spark():
    return SparkSession.builder.master("local[*]").getOrCreate()

def test_order_total_calculation(spark):
    # Arrange
    input_df = spark.createDataFrame([
        {"order_id": "1", "items": [{"price": 100}, {"price": 50}]},
        {"order_id": "2", "items": [{"price": 75}]}
    ])
    
    # Act
    result = calculate_order_totals(input_df)
    
    # Assert
    assert result.filter(result.order_id == "1").first()["total"] == 150
    assert result.filter(result.order_id == "2").first()["total"] == 75

def test_null_handling(spark):
    input_df = spark.createDataFrame([
        {"order_id": "1", "customer_id": None}
    ])
    
    with pytest.raises(ValidationError):
        validate_required_fields(input_df)

Conclusion

Building data pipelines requires understanding your use case: batch ETL for complex transformations, ELT for flexibility with modern data warehouses, and streaming for real-time requirements. Tools like Airflow for orchestration, Kafka for streaming, and dbt for transformations form the foundation of modern data infrastructure.

Start simple, instrument everything, and iterate. Data quality and monitoring aren’t optionalโ€”they’re what separate production-grade pipelines from fragile experiments.

Comments