Introduction
Data pipelines are the circulatory system of modern organizations. Every click, transaction, and sensor reading generates data that must be collected, transformed, and delivered to analysts, ML models, and business applications. Building robust, scalable data pipelines is a fundamental skill for any data engineer or software engineer working with data.
This guide covers the complete spectrum of data pipeline architecturesโfrom traditional batch ETL to modern streaming systems. You’ll learn when to use each approach, how to handle common challenges like exactly-once processing, and practical implementation patterns using industry-standard tools.
Understanding Data Pipeline Fundamentals
What is a Data Pipeline?
A data pipeline is a series of data processing steps that move data from source to destination:
Source โ Extract โ Transform โ Load โ Destination
Each stage may involve validation, enrichment, aggregation, or format conversion. The key is understanding that pipelines must be reliable, scalable, and maintainable.
Batch vs Streaming Processing
| Aspect | Batch Processing | Stream Processing |
|---|---|---|
| Data Window | Fixed time intervals | Continuous, event-by-event |
| Latency | Minutes to hours | Milliseconds to seconds |
| Complexity | Lower | Higher |
| Use Cases | Analytics, reporting | Real-time alerts, dashboards |
| Tools | Airflow, dbt, Spark Batch | Kafka, Flink, Spark Streaming |
ETL: Extract, Transform, Load
Traditional ETL Architecture
ETL is the classic approachโextract data from sources, transform it in a processing cluster, then load into the destination:
# Python ETL with psycopg2 and pandas
import pandas as pd
import psycopg2
def extract_orders():
"""Extract from source database."""
conn = psycopg2.connect(
host='source-db.internal',
database='ecommerce',
user='etl_user',
password='secret'
)
query = """
SELECT order_id, customer_id, total, created_at
FROM orders
WHERE created_at >= %(last_run)s
"""
df = pd.read_sql(query, conn, params={'last_run': get_last_run()})
conn.close()
return df
def transform_orders(df):
"""Transform and enrich data."""
# Clean data
df = df.dropna()
df['order_date'] = pd.to_datetime(df['created_at']).dt.date
# Enrich with customer data
customers = get_customer_data(df['customer_id'].unique())
df = df.merge(customers, on='customer_id', how='left')
# Calculate derived metrics
df['order_month'] = df['order_date'].dt.to_period('M')
df['profit_margin'] = (df['total'] - df['cost']) / df['total']
return df
def load_orders(df):
"""Load into data warehouse."""
conn = psycopg2.connect(
host='warehouse.internal',
database='analytics',
user='etl_user',
password='secret'
)
# Upsert pattern
for _, row in df.iterrows():
cursor.execute("""
INSERT INTO fact_orders (order_id, customer_id, total, order_date)
VALUES (%s, %s, %s, %s)
ON CONFLICT (order_id) DO UPDATE SET
total = EXCLUDED.total
""", (row['order_id'], row['customer_id'], row['total'], row['order_date']))
conn.commit()
conn.close()
def run_etl():
df = extract_orders()
df = transform_orders(df)
load_orders(df)
update_last_run()
Modern ETL with Apache Airflow
Airflow orchestrates complex workflows with dependencies:
# dags/etl_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'etl_orders_pipeline',
default_args=default_args,
description='Daily order ETL pipeline',
schedule_interval='0 2 * * *', # 2 AM daily
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
extract = PythonOperator(
task_id='extract_orders',
python_callable=extract_orders,
)
transform = PythonOperator(
task_id='transform_orders',
python_callable=transform_orders,
)
load = PythonOperator(
task_id='load_orders',
python_callable=load_orders,
)
analyze = PostgresOperator(
task_id='update_aggregations',
sql='sql/refresh_materialized_views.sql',
postgres_conn_id='warehouse',
)
extract >> transform >> load >> analyze
ETL Best Practices
# 1. Idempotency: Running multiple times produces same result
def extract_incremental(table, watermark_column, last_value=None):
if last_value is None:
# Full load
return f"SELECT * FROM {table}"
# Incremental load
return f"SELECT * FROM {table} WHERE {watermark_column} > '{last_value}'"
# 2. Schema validation on extract
def validate_schema(df, expected_schema):
for col, dtype in expected_schema.items():
if col not in df.columns:
raise ValueError(f"Missing column: {col}")
if df[col].dtype != dtype:
df[col] = df[col].astype(dtype)
return df
# 3. Error handling with dead letter queue
def process_with_dlq(df):
valid_records = []
errors = []
for idx, row in df.iterrows():
try:
validated = validate_record(row)
valid_records.append(validated)
except ValidationError as e:
errors.append({
'index': idx,
'error': str(e),
'data': row.to_dict()
})
# Send errors to DLQ for investigation
if errors:
save_to_dlq(errors)
return pd.DataFrame(valid_records)
ELT: Extract, Load, Transform
Why ELT?
ELT flips the traditional orderโload raw data first, then transform in the data warehouse. This approach leverages the data warehouse’s computational power and provides flexibility:
-- Raw data landing
CREATE TABLE raw_orders AS
SELECT * FROM external_source.orders;
-- Transform layer (can be re-run anytime)
CREATE MATERIALIZED VIEW mv_monthly_orders AS
SELECT
DATE_TRUNC('month', order_date) AS month,
COUNT(*) AS order_count,
SUM(total) AS revenue,
COUNT(DISTINCT customer_id) AS unique_customers
FROM raw_orders
GROUP BY DATE_TRUNC('month', order_date);
ELT with dbt
dbt (data build tool) transforms data in your warehouse using SQL:
# dbt_project.yml
name: analytics_pipeline
version: '1.0.0'
models:
analytics:
+materialized: table
staging:
+schema: staging
intermediate:
+schema: intermediate
marts:
+schema: marts
# models/staging/stg_orders.sql
{{ config(materialized='view') }}
SELECT
order_id,
customer_id,
total,
status,
created_at::TIMESTAMP AS order_timestamp,
ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY created_at) AS order_sequence
FROM {{ source('raw', 'orders') }}
WHERE created_at >= '2024-01-01'
# models/intermediate/int_order_metrics.sql
{{ config(materialized='table') }}
WITH customer_orders AS (
SELECT
customer_id,
COUNT(*) AS total_orders,
SUM(total) AS lifetime_value,
AVG(total) AS avg_order_value,
MIN(created_at) AS first_order_date,
MAX(created_at) AS last_order_date
FROM {{ ref('stg_orders') }}
GROUP BY customer_id
)
SELECT
c.*,
co.total_orders,
co.lifetime_value,
co.avg_order_value,
co.first_order_date,
co.last_order_date,
DATE_DIFF('day', co.first_order_date, co.last_order_date) AS customer_tenure_days
FROM {{ ref('stg_customers') }} c
LEFT JOIN customer_orders co ON c.customer_id = co.customer_id
# models/marts/fact_orders.sql
{{ config(materialized='table') }}
{{
config(
partition_by={
"field": "order_date",
"data_type": "date",
"granularity": "day"
}
)
}}
SELECT
o.order_id,
o.customer_id,
o.total,
o.status,
o.created_at AS order_timestamp,
DATE(o.created_at) AS order_date,
c.segment,
c.region,
p.product_category,
ROW_NUMBER() OVER (PARTITION BY o.customer_id ORDER BY o.created_at) AS customer_order_number
FROM {{ ref('stg_orders') }} o
JOIN {{ ref('stg_customers') }} c ON o.customer_id = c.customer_id
LEFT JOIN {{ ref('stg_products') }} p ON o.product_id = p.product_id
Incremental ELT with dbt
# models/marts/fact_orders_incremental.sql
{{ config(materialized='incremental', unique_key='order_id') }}
{% set max_order_date = run_query("SELECT MAX(order_date) FROM " ~ this ~ "").columns[0].values()[0] %}
SELECT
order_id,
customer_id,
total,
status,
created_at AS order_timestamp,
DATE(created_at) AS order_date
FROM {{ source('raw', 'orders') }}
WHERE
created_at >= COALESCE('{{ max_order_date }}', '1900-01-01')
{% if is_incremental() %}
AND created_at > (SELECT MAX(order_timestamp) FROM {{ this }})
{% endif %}
Streaming Data Pipelines
Apache Kafka Fundamentals
Kafka is the backbone of modern streaming architectures:
# Producer: Publishing events
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['kafka-1:9092', 'kafka-2:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8')
)
# Publish order events
for order in orders_batch:
producer.send(
'orders',
key=str(order['customer_id']),
value={
'event_type': 'order_created',
'order_id': order['id'],
'customer_id': order['customer_id'],
'total': order['total'],
'items': order['items'],
'timestamp': order['created_at'].isoformat()
}
)
producer.flush()
# Consumer: Processing events
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['kafka-1:9092', 'kafka-2:9092'],
group_id='order-processor',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=True
)
for message in consumer:
event = message.value
if event['event_type'] == 'order_created':
process_new_order(event)
elif event['event_type'] == 'order_cancelled':
process_cancellation(event)
Kafka Streams for Real-Time Processing
// Java: Kafka Streams word count
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Grouped;
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("text-lines");
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word, Grouped.with(String(), String()))
.count(Materialized.as("word-counts-store"));
wordCounts.toStream().to("word-counts-output");
Stream Processing Patterns
# Pattern 1: Windowed Aggregation
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, sum, count
spark = SparkSession.builder.getOrCreate()
orders = spark.readStream.format('kafka')\
.option('kafka.bootstrap.servers', 'kafka:9092')\
.option('subscribe', 'orders')\
.load()
# Tumbling window (non-overlapping)
orders_with_watermark = orders\
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
.withWatermark("timestamp", "10 minutes")
windowed_sales = orders_with_watermark\
.groupBy(
window("timestamp", "1 hour"),
"customer_id"
)\
.agg(
sum("total").alias("hourly_spend"),
count("order_id").alias("order_count")
)
# Pattern 2: Streaming Join
orders = spark.readStream.format('kafka')\
.option('kafka.bootstrap.servers', 'kafka:9092')\
.option('subscribe', 'orders')\
.load()
products = spark.readStream.format('kafka')\
.option('kafka.bootstrap.servers', 'kafka:9092')\
.option('subscribe', 'products')\
.load()
enriched_orders = orders.join(products, orders.product_id == products.product_id)
# Pattern 3: State Store (Flink)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.state import MapStateDescriptor
env = StreamExecutionEnvironment.get_execution_environment()
class OrderAggregator(MapStateDescriptor):
def __init__(self):
super().__init__("order-aggregator", String(), RowTypeInfo(BasicTypeInfo.LONG_TYPE, BasicTypeInfo.LONG_TYPE))
@KeyedProcessFunction
def aggregate_orders(ctx, order, aggregator: MapState):
current = aggregator.get(order.customer_id)
if current is None:
current = (0, 0)
new_count = current[0] + 1
new_total = current[1] + order.total
aggregator.put(order.customer_id, (new_count, new_total))
Data Quality and Monitoring
Data Contracts
# Define expected schema
from dataclasses import dataclass
from typing import List
import pyspark
@dataclass
class OrderContract:
order_id: str
customer_id: str
total: float
status: str
created_at: str
@staticmethod
def validate(df: pyspark.sql.DataFrame) -> bool:
required_columns = ['order_id', 'customer_id', 'total', 'status', 'created_at']
for col in required_columns:
if col not in df.columns:
raise DataContractError(f"Missing required column: {col}")
# Null checks
null_counts = {col: df.filter(df[col].isNull()).count() for col in required_columns}
for col, count in null_counts.items():
if count > 0:
raise DataContractError(f"Found {count} null values in {col}")
# Type validation
if not isinstance(df.select('total').first()[0], (int, float)):
raise DataContractError("total must be numeric")
# Value constraints
invalid_statuses = df.filter(~df.status.isin(['pending', 'completed', 'cancelled'])).count()
if invalid_statuses > 0:
raise DataContractError(f"Found {invalid_statuses} invalid status values")
return True
Pipeline Monitoring
# Airflow metrics and alerting
from airflow.hooks.base import BaseHook
from slack_sdk import WebClient
def slack_failure_callback(context):
dag_id = context['dag'].dag_id
task_id = context['task_instance'].task_id
error = context['exception']
client = WebClient(token=os.environ['SLACK_TOKEN'])
client.chat_postMessage(
channel='#data-alerts',
text=f"โ Pipeline Failed: {dag_id}.{task_id}\nError: {str(error)}"
)
# Data quality checks
from great_expectations import great_expectations as gx
def validate_orders_data():
context = gx.get_context()
batch = context.sources.pandas_files.read_csv("orders.csv")
expectations = [
expect_column_values_to_not_be_null.column("order_id"),
expect_column_values_to_be_between.column("total").min_value(0),
expect_column_distributions_to_match_histogram.column("status")
.expected_histogram_partition_object([
{"value": "pending", "count": 0.1},
{"value": "completed", "count": 0.8},
{"value": "cancelled", "count": 0.1}
])
]
results = batch.validate(expectations=expectations)
if not results["success"]:
send_alert(f"Data quality failed: {results['statistics']}")
Lambda and Kappa Architectures
Lambda Architecture
Combines batch and streaming layers:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Data Sources โ
โโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโดโโโโโโโโโโ
โ โ
โผ โผ
โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโ
โ Batch Layer โ โ Speed Layer โ
โ (HDFS/S3) โ โ (Streaming) โ
โโโโโโโโโฌโโโโโโโโ โโโโโโโโโฌโโโโโโโโ
โ โ
โโโโโโโโโโโฌโโโโโโโโโโโ
โ
โผ
โโโโโโโโโโโโโโโโโโโโโ
โ Serving Layer โ
โ (Query/Merge) โ
โโโโโโโโโโโโโโโโโโโโโ
Kappa Architecture
Simplifies to single streaming path:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Data Sources โ
โโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โผ
โโโโโโโโโโโโโโโโโโโโโ
โ Kafka Stream โ
โ (Immutable Log) โ
โโโโโโโโโโโฌโโโโโโโโโโฌโโโโโโโโโโโโ
โ โ โ
โผ โผ โผ
โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ
โ Real-time โ โ Replay โ โ Query โ
โ Service โ โ (Rebuild)โ โ Layer โ
โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ
Orchestration with Prefect
# prefect_pipeline.py
from prefect import flow, task
from prefect.transactions import transaction
import psycopg2
@task
def extract():
return fetch_from_source()
@task
def transform(data):
return transform_data(data)
@task
def load(data):
with transaction():
insert_into_warehouse(data)
mark_pipeline_complete()
@flow
def daily_pipeline():
data = extract()
transformed = transform(data)
load(transformed)
# Schedule
if __name__ == "__main__":
daily_pipeline.serve(
cron="0 2 * * *",
parameters={"environment": "production"}
)
Data Pipeline Testing
# Unit tests for transformations
import pytest
from pyspark.sql import SparkSession
@pytest.fixture
def spark():
return SparkSession.builder.master("local[*]").getOrCreate()
def test_order_total_calculation(spark):
# Arrange
input_df = spark.createDataFrame([
{"order_id": "1", "items": [{"price": 100}, {"price": 50}]},
{"order_id": "2", "items": [{"price": 75}]}
])
# Act
result = calculate_order_totals(input_df)
# Assert
assert result.filter(result.order_id == "1").first()["total"] == 150
assert result.filter(result.order_id == "2").first()["total"] == 75
def test_null_handling(spark):
input_df = spark.createDataFrame([
{"order_id": "1", "customer_id": None}
])
with pytest.raises(ValidationError):
validate_required_fields(input_df)
Conclusion
Building data pipelines requires understanding your use case: batch ETL for complex transformations, ELT for flexibility with modern data warehouses, and streaming for real-time requirements. Tools like Airflow for orchestration, Kafka for streaming, and dbt for transformations form the foundation of modern data infrastructure.
Start simple, instrument everything, and iterate. Data quality and monitoring aren’t optionalโthey’re what separate production-grade pipelines from fragile experiments.
Comments