Workflow Automation with Apache Airflow: Orchestrating Data Pipelines
Apache Airflow is a platform for programmatically authoring, scheduling, and monitoring workflows. It’s ideal for complex data pipelines and ETL processes.
Installation and Setup
Install Airflow
# Install Airflow
pip install apache-airflow
# Initialize database
airflow db init
# Create admin user
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email [email protected]
# Start webserver
airflow webserver --port 8080
# Start scheduler (in another terminal)
airflow scheduler
Basic DAG Structure
Simple DAG
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
# Define default arguments
default_args = {
'owner': 'data-team',
'retries': 1,
'retry_delay': timedelta(minutes=5),
'start_date': datetime(2025, 1, 1),
}
# Create DAG
dag = DAG(
'simple_pipeline',
default_args=default_args,
description='A simple data pipeline',
schedule_interval='@daily', # Run daily
catchup=False,
)
# Define tasks
def extract_data():
print("Extracting data...")
return {'records': 1000}
def transform_data(**context):
ti = context['task_instance']
data = ti.xcom_pull(task_ids='extract')
print(f"Transforming {data['records']} records...")
return {'transformed': data['records'] * 2}
def load_data(**context):
ti = context['task_instance']
data = ti.xcom_pull(task_ids='transform')
print(f"Loading {data['transformed']} records...")
# Create task instances
extract_task = PythonOperator(
task_id='extract',
python_callable=extract_data,
dag=dag,
)
transform_task = PythonOperator(
task_id='transform',
python_callable=transform_data,
dag=dag,
)
load_task = PythonOperator(
task_id='load',
python_callable=load_data,
dag=dag,
)
# Set dependencies
extract_task >> transform_task >> load_task
Operators
Python Operator
from airflow.operators.python import PythonOperator
def my_function(param1, param2):
print(f"Parameters: {param1}, {param2}")
return "result"
task = PythonOperator(
task_id='python_task',
python_callable=my_function,
op_kwargs={'param1': 'value1', 'param2': 'value2'},
dag=dag,
)
Bash Operator
from airflow.operators.bash import BashOperator
bash_task = BashOperator(
task_id='bash_task',
bash_command='echo "Hello from Bash"',
dag=dag,
)
# With environment variables
bash_task = BashOperator(
task_id='bash_with_env',
bash_command='echo $MY_VAR',
env={'MY_VAR': 'my_value'},
dag=dag,
)
SQL Operator
from airflow.providers.postgres.operators.postgres import PostgresOperator
sql_task = PostgresOperator(
task_id='sql_task',
postgres_conn_id='postgres_default',
sql='''
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100)
);
''',
dag=dag,
)
Sensor Operators
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.sql import SqlSensor
from airflow.sensors.time_delta import TimeDeltaSensor
# Wait for file
file_sensor = FileSensor(
task_id='wait_for_file',
filepath='/path/to/file.txt',
poke_interval=60, # Check every 60 seconds
timeout=3600, # Timeout after 1 hour
dag=dag,
)
# Wait for SQL condition
sql_sensor = SqlSensor(
task_id='wait_for_data',
conn_id='postgres_default',
sql='SELECT COUNT(*) FROM users WHERE created_date = CURRENT_DATE',
poke_interval=60,
dag=dag,
)
# Wait for time delta
time_sensor = TimeDeltaSensor(
task_id='wait_time',
delta=timedelta(minutes=5),
dag=dag,
)
Task Dependencies and Branching
Linear Dependencies
task1 >> task2 >> task3 # Sequential
task1 >> [task2, task3] # Parallel
[task1, task2] >> task3 # Join
Branching
from airflow.operators.python import BranchPythonOperator
def branch_logic(**context):
value = context['dag_run'].conf.get('value', 0)
if value > 50:
return 'high_value_task'
else:
return 'low_value_task'
branch_task = BranchPythonOperator(
task_id='branch',
python_callable=branch_logic,
dag=dag,
)
high_value_task = PythonOperator(
task_id='high_value_task',
python_callable=lambda: print("High value"),
dag=dag,
)
low_value_task = PythonOperator(
task_id='low_value_task',
python_callable=lambda: print("Low value"),
dag=dag,
)
branch_task >> [high_value_task, low_value_task]
XCom: Inter-task Communication
def task_1():
return {'key': 'value', 'data': [1, 2, 3]}
def task_2(**context):
ti = context['task_instance']
# Pull from previous task
data = ti.xcom_pull(task_ids='task_1')
print(f"Received: {data}")
# Push to XCom
ti.xcom_push(key='result', value={'processed': True})
def task_3(**context):
ti = context['task_instance']
# Pull specific key
result = ti.xcom_pull(task_ids='task_2', key='result')
print(f"Result: {result}")
t1 = PythonOperator(task_id='task_1', python_callable=task_1, dag=dag)
t2 = PythonOperator(task_id='task_2', python_callable=task_2, dag=dag)
t3 = PythonOperator(task_id='task_3', python_callable=task_3, dag=dag)
t1 >> t2 >> t3
Scheduling
Schedule Intervals
# Cron expressions
schedule_interval='0 0 * * *' # Daily at midnight
schedule_interval='0 */6 * * *' # Every 6 hours
schedule_interval='0 9 * * MON-FRI' # Weekdays at 9 AM
# Preset intervals
schedule_interval='@hourly'
schedule_interval='@daily'
schedule_interval='@weekly'
schedule_interval='@monthly'
schedule_interval=None # Manual trigger only
Dynamic Scheduling
from airflow.models import Variable
# Get variable
interval = Variable.get('schedule_interval', '@daily')
dag = DAG(
'dynamic_dag',
schedule_interval=interval,
default_args=default_args,
)
Error Handling and Retries
from airflow.utils.decorators import apply_defaults
default_args = {
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(hours=1),
'on_failure_callback': lambda context: print(f"Task failed: {context}"),
'on_success_callback': lambda context: print(f"Task succeeded: {context}"),
'on_retry_callback': lambda context: print(f"Task retrying: {context}"),
}
# Task-level retry
task = PythonOperator(
task_id='task_with_retry',
python_callable=my_function,
retries=5,
retry_delay=timedelta(minutes=10),
dag=dag,
)
Monitoring and Alerting
Email Alerts
from airflow.models import Variable
default_args = {
'owner': 'data-team',
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
}
dag = DAG(
'monitored_dag',
default_args=default_args,
dag=dag,
)
Custom Callbacks
def on_failure_callback(context):
"""Called when task fails"""
task_instance = context['task_instance']
exception = context['exception']
print(f"Task {task_instance.task_id} failed: {exception}")
# Send alert, log to external system, etc.
def on_success_callback(context):
"""Called when task succeeds"""
task_instance = context['task_instance']
print(f"Task {task_instance.task_id} succeeded")
task = PythonOperator(
task_id='monitored_task',
python_callable=my_function,
on_failure_callback=on_failure_callback,
on_success_callback=on_success_callback,
dag=dag,
)
Dynamic DAG Generation
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'start_date': datetime(2025, 1, 1),
}
dag = DAG(
'dynamic_dag_generation',
default_args=default_args,
schedule_interval='@daily',
)
# Generate tasks dynamically
tables = ['users', 'orders', 'products']
for table in tables:
def process_table(table_name=table):
print(f"Processing {table_name}")
task = PythonOperator(
task_id=f'process_{table}',
python_callable=process_table,
dag=dag,
)
Best Practices
- Idempotent tasks: Tasks should produce same result if run multiple times
- Atomic operations: Each task should be independent and self-contained
- Proper error handling: Use retries and callbacks for monitoring
- Resource limits: Set memory and CPU limits for tasks
- Documentation: Document DAGs and tasks clearly
- Testing: Test DAGs before deploying to production
- Monitoring: Set up alerts for failures and SLAs
Common Pitfalls
Bad Practice:
# Don't: Hardcode values
sql = "SELECT * FROM users WHERE date = '2025-01-01'"
# Don't: No error handling
task = PythonOperator(task_id='task', python_callable=risky_function)
# Don't: Tight coupling between tasks
def task_1():
global shared_data
shared_data = get_data()
Good Practice:
# Do: Use templating
sql = "SELECT * FROM users WHERE date = '{{ ds }}'"
# Do: Handle errors
task = PythonOperator(
task_id='task',
python_callable=risky_function,
retries=3,
on_failure_callback=alert_on_failure,
)
# Do: Use XCom for communication
def task_1(**context):
data = get_data()
context['task_instance'].xcom_push(key='data', value=data)
Conclusion
Apache Airflow provides powerful workflow orchestration for complex data pipelines. Master DAG design, operators, and scheduling to build production-grade data workflows. Use proper error handling, monitoring, and testing to ensure reliability. Airflow scales from simple scripts to enterprise data platforms.
Comments