Apache Airflow is a platform for programmatically authoring, scheduling, and monitoring workflows. It’s ideal for complex data pipelines and ETL processes. See Python Guide for more context. See Python Guide for more context. See Python Guide for more context.
Installation and Setup
Install Airflow
# Install Airflow
pip install apache-airflow
# Initialize database
airflow db init
# Create admin user
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email [email protected]
# Start webserver
airflow webserver --port 8080
# Start scheduler (in another terminal)
airflow scheduler
Basic DAG Structure
Simple DAG
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
# Define default arguments
default_args = {
'owner': 'data-team',
'retries': 1,
'retry_delay': timedelta(minutes=5),
'start_date': datetime(2025, 1, 1),
}
# Create DAG
dag = DAG(
'simple_pipeline',
default_args=default_args,
description='A simple data pipeline',
schedule_interval='@daily', # Run daily
catchup=False,
)
# Define tasks
def extract_data():
print("Extracting data...")
return {'records': 1000}
def transform_data(**context):
ti = context['task_instance']
data = ti.xcom_pull(task_ids='extract')
print(f"Transforming {data['records']} records...")
return {'transformed': data['records'] * 2}
def load_data(**context):
ti = context['task_instance']
data = ti.xcom_pull(task_ids='transform')
print(f"Loading {data['transformed']} records...")
# Create task instances
extract_task = PythonOperator(
task_id='extract',
python_callable=extract_data,
dag=dag,
)
transform_task = PythonOperator(
task_id='transform',
python_callable=transform_data,
dag=dag,
)
load_task = PythonOperator(
task_id='load',
python_callable=load_data,
dag=dag,
)
# Set dependencies
extract_task >> transform_task >> load_task
Operators
Python Operator
from airflow.operators.python import PythonOperator
def my_function(param1, param2):
print(f"Parameters: {param1}, {param2}")
return "result"
task = PythonOperator(
task_id='python_task',
python_callable=my_function,
op_kwargs={'param1': 'value1', 'param2': 'value2'},
dag=dag,
)
Bash Operator
from airflow.operators.bash import BashOperator
bash_task = BashOperator(
task_id='bash_task',
bash_command='echo "Hello from Bash"',
dag=dag,
)
# With environment variables
bash_task = BashOperator(
task_id='bash_with_env',
bash_command='echo $MY_VAR',
env={'MY_VAR': 'my_value'},
dag=dag,
)
SQL Operator
from airflow.providers.postgres.operators.postgres import PostgresOperator
sql_task = PostgresOperator(
task_id='sql_task',
postgres_conn_id='postgres_default',
sql='''
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100)
);
''',
dag=dag,
)
Sensor Operators
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.sql import SqlSensor
from airflow.sensors.time_delta import TimeDeltaSensor
# Wait for file
file_sensor = FileSensor(
task_id='wait_for_file',
filepath='/path/to/file.txt',
poke_interval=60, # Check every 60 seconds
timeout=3600, # Timeout after 1 hour
dag=dag,
)
# Wait for SQL condition
sql_sensor = SqlSensor(
task_id='wait_for_data',
conn_id='postgres_default',
sql='SELECT COUNT(*) FROM users WHERE created_date = CURRENT_DATE',
poke_interval=60,
dag=dag,
)
# Wait for time delta
time_sensor = TimeDeltaSensor(
task_id='wait_time',
delta=timedelta(minutes=5),
dag=dag,
)
Task Dependencies and Branching
Linear Dependencies
task1 >> task2 >> task3 # Sequential
task1 >> [task2, task3] # Parallel
[task1, task2] >> task3 # Join
Branching
from airflow.operators.python import BranchPythonOperator
def branch_logic(**context):
value = context['dag_run'].conf.get('value', 0)
if value > 50:
return 'high_value_task'
else:
return 'low_value_task'
branch_task = BranchPythonOperator(
task_id='branch',
python_callable=branch_logic,
dag=dag,
)
high_value_task = PythonOperator(
task_id='high_value_task',
python_callable=lambda: print("High value"),
dag=dag,
)
low_value_task = PythonOperator(
task_id='low_value_task',
python_callable=lambda: print("Low value"),
dag=dag,
)
branch_task >> [high_value_task, low_value_task]
XCom: Inter-task Communication
def task_1():
return {'key': 'value', 'data': [1, 2, 3]}
def task_2(**context):
ti = context['task_instance']
# Pull from previous task
data = ti.xcom_pull(task_ids='task_1')
print(f"Received: {data}")
# Push to XCom
ti.xcom_push(key='result', value={'processed': True})
def task_3(**context):
ti = context['task_instance']
# Pull specific key
result = ti.xcom_pull(task_ids='task_2', key='result')
print(f"Result: {result}")
t1 = PythonOperator(task_id='task_1', python_callable=task_1, dag=dag)
t2 = PythonOperator(task_id='task_2', python_callable=task_2, dag=dag)
t3 = PythonOperator(task_id='task_3', python_callable=task_3, dag=dag)
t1 >> t2 >> t3
Scheduling
Schedule Intervals
# Cron expressions
schedule_interval='0 0 * * *' # Daily at midnight
schedule_interval='0 */6 * * *' # Every 6 hours
schedule_interval='0 9 * * MON-FRI' # Weekdays at 9 AM
# Preset intervals
schedule_interval='@hourly'
schedule_interval='@daily'
schedule_interval='@weekly'
schedule_interval='@monthly'
schedule_interval=None # Manual trigger only
Dynamic Scheduling
from airflow.models import Variable
# Get variable
interval = Variable.get('schedule_interval', '@daily')
dag = DAG(
'dynamic_dag',
schedule_interval=interval,
default_args=default_args,
)
Error Handling and Retries
from airflow.utils.decorators import apply_defaults
default_args = {
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(hours=1),
'on_failure_callback': lambda context: print(f"Task failed: {context}"),
'on_success_callback': lambda context: print(f"Task succeeded: {context}"),
'on_retry_callback': lambda context: print(f"Task retrying: {context}"),
}
# Task-level retry
task = PythonOperator(
task_id='task_with_retry',
python_callable=my_function,
retries=5,
retry_delay=timedelta(minutes=10),
dag=dag,
)
Monitoring and Alerting
Email Alerts
from airflow.models import Variable
default_args = {
'owner': 'data-team',
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
}
dag = DAG(
'monitored_dag',
default_args=default_args,
dag=dag,
)
Custom Callbacks
def on_failure_callback(context):
"""Called when task fails"""
task_instance = context['task_instance']
exception = context['exception']
print(f"Task {task_instance.task_id} failed: {exception}")
# Send alert, log to external system, etc.
def on_success_callback(context):
"""Called when task succeeds"""
task_instance = context['task_instance']
print(f"Task {task_instance.task_id} succeeded")
task = PythonOperator(
task_id='monitored_task',
python_callable=my_function,
on_failure_callback=on_failure_callback,
on_success_callback=on_success_callback,
dag=dag,
)
Dynamic DAG Generation
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'start_date': datetime(2025, 1, 1),
}
dag = DAG(
'dynamic_dag_generation',
default_args=default_args,
schedule_interval='@daily',
)
# Generate tasks dynamically
tables = ['users', 'orders', 'products']
for table in tables:
def process_table(table_name=table):
print(f"Processing {table_name}")
task = PythonOperator(
task_id=f'process_{table}',
python_callable=process_table,
dag=dag,
)
Best Practices
- Idempotent tasks: Tasks should produce same result if run multiple times
- Atomic operations: Each task should be independent and self-contained
- Proper error handling: Use retries and callbacks for monitoring
- Resource limits: Set memory and CPU limits for tasks
- Documentation: Document DAGs and tasks clearly
- Testing: Test DAGs before deploying to production
- Monitoring: Set up alerts for failures and SLAs
Common Pitfalls
Bad Practice:
# Don't: Hardcode values
sql = "SELECT * FROM users WHERE date = '2025-01-01'"
# Don't: No error handling
task = PythonOperator(task_id='task', python_callable=risky_function)
# Don't: Tight coupling between tasks
def task_1():
global shared_data
shared_data = get_data()
Good Practice:
# Do: Use templating
sql = "SELECT * FROM users WHERE date = '{{ ds }}'"
# Do: Handle errors
task = PythonOperator(
task_id='task',
python_callable=risky_function,
retries=3,
on_failure_callback=alert_on_failure,
)
# Do: Use XCom for communication
def task_1(**context):
data = get_data()
context['task_instance'].xcom_push(key='data', value=data)
Conclusion
Apache Airflow provides powerful workflow orchestration for complex data pipelines. Master DAG design, operators, and scheduling to build production-grade data workflows. Use proper error handling, monitoring, and testing to ensure reliability. Airflow scales from simple scripts to enterprise data platforms.
Comments