Introduction
Data pipeline orchestration is the backbone of modern data infrastructure. This comprehensive guide covers workflow orchestration tools, patterns, and best practices for building reliable data pipelines.
Key Statistics:
- 70% of enterprises use Airflow for data orchestration
- Well-orchestrated pipelines reduce data downtime by 90%
- Dagster reduces pipeline development time by 40%
- Prefect enables 10x faster iteration on data workflows
Orchestration Concepts
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Data Pipeline Orchestration โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ Extract โโโโโโถโTransformโโโโโโถโ Load โ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ โ โ โ
โ โโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโ โ
โ โผ โ
โ โโโโโโโโโโโโโโโโโ โ
โ โ Orchestrator โ โ
โ โโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโ โ
โ โผ โผ โผ โ
โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ
โ โ Schedule โ โ Monitor โ โ Alert โ โ
โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ
โ โ
โ Key Capabilities: โ
โ โ Dependency management โ Scheduling โ
โ โ Parallel execution โ Error handling โ
โ โ Monitoring โ Retries โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Apache Airflow
DAG Definition
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryOperator
default_args = {
'owner': 'data_team',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'etl_pipeline',
default_args=default_args,
description='ETL pipeline for analytics',
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
max_active_runs=1,
) as dag:
def extract_data(**context):
# Extract from source
data = extract_from_api()
# Push to XCom for task communication
context['task_instance'].xcom_push(key='raw_data', value=data)
return data
def transform_data(**context):
# Pull from XCom
raw_data = context['task_instance'].xcom_pull(
key='raw_data', task_ids='extract'
)
transformed = transform(raw_data)
return transformed
def load_to_warehouse(data):
load_to_bigquery(data)
extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
)
transform = PythonOperator(
task_id='transform',
python_callable=transform_data,
)
load = PythonOperator(
task_id='load',
python_callable=load_to_warehouse,
op_args=[transform.output],
)
# Set dependencies
extract >> transform >> load
Sensors and Hooks
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.providers.postgres.hooks.postgres import PostgresHook
# Wait for upstream DAG to complete
wait_for_dag = ExternalTaskSensor(
task_id='wait_for_upstream',
external_dag_id='upstream_dag',
external_task_id='finish',
timeout=3600,
)
# PostgreSQL to GCS
def transfer_to_gcs(**context):
pg_hook = PostgresHook(postgres_conn_id='postgres_default')
connection = pg_hook.get_conn()
# Export data
result = connection.execute("SELECT * FROM orders")
# Upload to GCS
gcs_hook = GCSHook(gcp_conn_id='google_cloud_default')
gcs_hook.upload(
bucket_name='data-bucket',
object_name=f'orders/{ds}.json',
data=str(result.fetchall())
)
connection.close()
Dagster
Asset-Based Pipeline
from dagster import asset, define_asset_job, AssetSelection
from dagster_duckdb import DuckDBResource
import pandas as pd
class DataWarehouse:
def __init__(self):
pass
# Define data assets
@asset(
deps=["raw_orders"],
group_name="analytics",
)
def cleaned_orders(raw_orders: pd.DataFrame) -> pd.DataFrame:
"""Clean and validate orders data."""
return raw_orders.dropna(subset=['order_id'])
@asset(
deps=["cleaned_orders"],
group_name="analytics",
)
def orders_by_day(cleaned_orders: pd.DataFrame) -> pd.DataFrame:
"""Aggregate orders by day."""
return cleaned_orders.groupby('order_date').agg({
'order_id': 'count',
'total': 'sum'
}).reset_index()
@asset(
deps=["orders_by_day"],
group_name="analytics",
)
def daily_revenue(orders_by_day: pd.DataFrame) -> pd.DataFrame:
"""Calculate daily revenue metrics."""
return orders_by_day.assign(
revenue_growth=orders_by_day['total'].pct_change()
)
# Define job
analytics_job = define_asset_job(
name="analytics_pipeline",
selection=AssetSelection.all(),
)
# Schedule
from dagster import ScheduleDefinition
daily_analytics_schedule = ScheduleDefinition(
job=analytics_job,
cron_schedule="0 6 * * *",
)
Op-Based Pipeline
from dagster import op, job, In, Out
@op
def extract_orders() -> list[dict]:
"""Extract orders from API."""
return fetch_orders()
@op(
ins={"orders": In(list[dict])},
out={"cleaned": Out(list[dict])},
)
def clean_orders(orders: list[dict]) -> list[dict]:
"""Clean order data."""
return [clean_order(o) for o in orders]
@op(
ins={"cleaned": In(list[dict])},
)
def load_to_warehouse(cleaned: list[dict]) -> None:
"""Load to data warehouse."""
insert_orders(cleaned)
@job
def etl_job():
orders = extract_orders()
cleaned = clean_orders(orders)
load_to_warehouse(cleaned)
Prefect
Flow Definition
from prefect import flow, task, get_run_logger
from prefect.cache_policy import CachePolicy
import time
@task(
name="Extract from API",
cache_policy=CachePolicy(ttl=3600),
retries=3,
retry_delay_seconds=30,
)
def extract_data(source: str) -> dict:
logger = get_run_logger()
logger.info(f"Extracting data from {source}")
response = requests.get(f"{source}/api/data")
return response.json()
@task(name="Transform Data")
def transform_data(raw_data: dict) -> dict:
"""Transform raw data."""
return transform(raw_data)
@task(name="Load to Warehouse")
def load_data(transformed: dict, destination: str) -> None:
"""Load to destination."""
# Load implementation
pass
@flow(
name="ETL Pipeline",
log_prints=True,
retries=2,
)
def etl_pipeline(source: str, destination: str):
"""Main ETL pipeline."""
raw = extract_data(source)
transformed = transform_data(raw)
load_data(transformed, destination)
return {"status": "success"}
# Run with parameters
if __name__ == "__main__":
result = etl_pipeline(
source="https://api.example.com",
destination="bigquery://project.dataset.table"
)
Deployment
# prefect.yaml
name: etl-pipeline
version: 1.0.0
deployments:
- name: daily-run
schedule: "0 6 * * *"
flow_name: ETL Pipeline
parameters:
source: "https://api.example.com"
destination: "bigquery://prod/analytics/orders"
work_queue: default
infrastructure:
type: process
- name: hourly-run
schedule: "0 * * * *"
flow_name: Real-time Pipeline
work_queue: streaming
Best Practices
- Idempotency: Pipeline runs should produce same results regardless of execution count
- Small tasks: Break into smaller, testable units
- Error handling: Implement proper retries and fallbacks
- Monitoring: Add logging, metrics, and alerting
- Documentation: Document data contracts and expectations
- Testing: Unit test individual operators/tasks
Tool Comparison
| Feature | Airflow | Dagster | Prefect |
|---|---|---|---|
| Model | Task-based | Asset-based | Hybrid |
| Storage | Metadata DB | Metadata DB | Cloud |
| UI | Built-in | Built-in | Cloud/Server |
| Learning Curve | Steep | Medium | Low |
| Scalability | High | High | High |
Conclusion
Pipeline orchestration is essential for reliable data infrastructure. Choose the tool that fits your team’s skills and infrastructure needs. Airflow offers the largest ecosystem, Dagster provides modern asset-based workflows, and Prefect delivers excellent developer experience.
Comments