Skip to main content

Workflow Automation with Apache Airflow: Orchestrating Data Pipelines

Created: December 17, 2025 5 min read

Apache Airflow is a platform for programmatically authoring, scheduling, and monitoring workflows. It’s ideal for complex data pipelines and ETL processes. See Python Guide for more context. See Python Guide for more context. See Python Guide for more context.

Installation and Setup

Install Airflow

# Install Airflow
pip install apache-airflow

# Initialize database
airflow db init

# Create admin user
airflow users create \
  --username admin \
  --firstname Admin \
  --lastname User \
  --role Admin \
  --email [email protected]

# Start webserver
airflow webserver --port 8080

# Start scheduler (in another terminal)
airflow scheduler

Basic DAG Structure

Simple DAG

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

# Define default arguments
default_args = {
    'owner': 'data-team',
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2025, 1, 1),
}

# Create DAG
dag = DAG(
    'simple_pipeline',
    default_args=default_args,
    description='A simple data pipeline',
    schedule_interval='@daily',  # Run daily
    catchup=False,
)

# Define tasks
def extract_data():
    print("Extracting data...")
    return {'records': 1000}

def transform_data(**context):
    ti = context['task_instance']
    data = ti.xcom_pull(task_ids='extract')
    print(f"Transforming {data['records']} records...")
    return {'transformed': data['records'] * 2}

def load_data(**context):
    ti = context['task_instance']
    data = ti.xcom_pull(task_ids='transform')
    print(f"Loading {data['transformed']} records...")

# Create task instances
extract_task = PythonOperator(
    task_id='extract',
    python_callable=extract_data,
    dag=dag,
)

transform_task = PythonOperator(
    task_id='transform',
    python_callable=transform_data,
    dag=dag,
)

load_task = PythonOperator(
    task_id='load',
    python_callable=load_data,
    dag=dag,
)

# Set dependencies
extract_task >> transform_task >> load_task

Operators

Python Operator

from airflow.operators.python import PythonOperator

def my_function(param1, param2):
    print(f"Parameters: {param1}, {param2}")
    return "result"

task = PythonOperator(
    task_id='python_task',
    python_callable=my_function,
    op_kwargs={'param1': 'value1', 'param2': 'value2'},
    dag=dag,
)

Bash Operator

from airflow.operators.bash import BashOperator

bash_task = BashOperator(
    task_id='bash_task',
    bash_command='echo "Hello from Bash"',
    dag=dag,
)

# With environment variables
bash_task = BashOperator(
    task_id='bash_with_env',
    bash_command='echo $MY_VAR',
    env={'MY_VAR': 'my_value'},
    dag=dag,
)

SQL Operator

from airflow.providers.postgres.operators.postgres import PostgresOperator

sql_task = PostgresOperator(
    task_id='sql_task',
    postgres_conn_id='postgres_default',
    sql='''
        CREATE TABLE IF NOT EXISTS users (
            id SERIAL PRIMARY KEY,
            name VARCHAR(100),
            email VARCHAR(100)
        );
    ''',
    dag=dag,
)

Sensor Operators

from airflow.sensors.filesystem import FileSensor
from airflow.sensors.sql import SqlSensor
from airflow.sensors.time_delta import TimeDeltaSensor

# Wait for file
file_sensor = FileSensor(
    task_id='wait_for_file',
    filepath='/path/to/file.txt',
    poke_interval=60,  # Check every 60 seconds
    timeout=3600,  # Timeout after 1 hour
    dag=dag,
)

# Wait for SQL condition
sql_sensor = SqlSensor(
    task_id='wait_for_data',
    conn_id='postgres_default',
    sql='SELECT COUNT(*) FROM users WHERE created_date = CURRENT_DATE',
    poke_interval=60,
    dag=dag,
)

# Wait for time delta
time_sensor = TimeDeltaSensor(
    task_id='wait_time',
    delta=timedelta(minutes=5),
    dag=dag,
)

Task Dependencies and Branching

Linear Dependencies

task1 >> task2 >> task3  # Sequential
task1 >> [task2, task3]  # Parallel
[task1, task2] >> task3  # Join

Branching

from airflow.operators.python import BranchPythonOperator

def branch_logic(**context):
    value = context['dag_run'].conf.get('value', 0)
    if value > 50:
        return 'high_value_task'
    else:
        return 'low_value_task'

branch_task = BranchPythonOperator(
    task_id='branch',
    python_callable=branch_logic,
    dag=dag,
)

high_value_task = PythonOperator(
    task_id='high_value_task',
    python_callable=lambda: print("High value"),
    dag=dag,
)

low_value_task = PythonOperator(
    task_id='low_value_task',
    python_callable=lambda: print("Low value"),
    dag=dag,
)

branch_task >> [high_value_task, low_value_task]

XCom: Inter-task Communication

def task_1():
    return {'key': 'value', 'data': [1, 2, 3]}

def task_2(**context):
    ti = context['task_instance']
    # Pull from previous task
    data = ti.xcom_pull(task_ids='task_1')
    print(f"Received: {data}")
    
    # Push to XCom
    ti.xcom_push(key='result', value={'processed': True})

def task_3(**context):
    ti = context['task_instance']
    # Pull specific key
    result = ti.xcom_pull(task_ids='task_2', key='result')
    print(f"Result: {result}")

t1 = PythonOperator(task_id='task_1', python_callable=task_1, dag=dag)
t2 = PythonOperator(task_id='task_2', python_callable=task_2, dag=dag)
t3 = PythonOperator(task_id='task_3', python_callable=task_3, dag=dag)

t1 >> t2 >> t3

Scheduling

Schedule Intervals

# Cron expressions
schedule_interval='0 0 * * *'  # Daily at midnight
schedule_interval='0 */6 * * *'  # Every 6 hours
schedule_interval='0 9 * * MON-FRI'  # Weekdays at 9 AM

# Preset intervals
schedule_interval='@hourly'
schedule_interval='@daily'
schedule_interval='@weekly'
schedule_interval='@monthly'
schedule_interval=None  # Manual trigger only

Dynamic Scheduling

from airflow.models import Variable

# Get variable
interval = Variable.get('schedule_interval', '@daily')

dag = DAG(
    'dynamic_dag',
    schedule_interval=interval,
    default_args=default_args,
)

Error Handling and Retries

from airflow.utils.decorators import apply_defaults

default_args = {
    'owner': 'data-team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(hours=1),
    'on_failure_callback': lambda context: print(f"Task failed: {context}"),
    'on_success_callback': lambda context: print(f"Task succeeded: {context}"),
    'on_retry_callback': lambda context: print(f"Task retrying: {context}"),
}

# Task-level retry
task = PythonOperator(
    task_id='task_with_retry',
    python_callable=my_function,
    retries=5,
    retry_delay=timedelta(minutes=10),
    dag=dag,
)

Monitoring and Alerting

Email Alerts

from airflow.models import Variable

default_args = {
    'owner': 'data-team',
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
}

dag = DAG(
    'monitored_dag',
    default_args=default_args,
    dag=dag,
)

Custom Callbacks

def on_failure_callback(context):
    """Called when task fails"""
    task_instance = context['task_instance']
    exception = context['exception']
    print(f"Task {task_instance.task_id} failed: {exception}")
    # Send alert, log to external system, etc.

def on_success_callback(context):
    """Called when task succeeds"""
    task_instance = context['task_instance']
    print(f"Task {task_instance.task_id} succeeded")

task = PythonOperator(
    task_id='monitored_task',
    python_callable=my_function,
    on_failure_callback=on_failure_callback,
    on_success_callback=on_success_callback,
    dag=dag,
)

Dynamic DAG Generation

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'start_date': datetime(2025, 1, 1),
}

dag = DAG(
    'dynamic_dag_generation',
    default_args=default_args,
    schedule_interval='@daily',
)

# Generate tasks dynamically
tables = ['users', 'orders', 'products']

for table in tables:
    def process_table(table_name=table):
        print(f"Processing {table_name}")
    
    task = PythonOperator(
        task_id=f'process_{table}',
        python_callable=process_table,
        dag=dag,
    )

Best Practices

  1. Idempotent tasks: Tasks should produce same result if run multiple times
  2. Atomic operations: Each task should be independent and self-contained
  3. Proper error handling: Use retries and callbacks for monitoring
  4. Resource limits: Set memory and CPU limits for tasks
  5. Documentation: Document DAGs and tasks clearly
  6. Testing: Test DAGs before deploying to production
  7. Monitoring: Set up alerts for failures and SLAs

Common Pitfalls

Bad Practice:

# Don't: Hardcode values
sql = "SELECT * FROM users WHERE date = '2025-01-01'"

# Don't: No error handling
task = PythonOperator(task_id='task', python_callable=risky_function)

# Don't: Tight coupling between tasks
def task_1():
    global shared_data
    shared_data = get_data()

Good Practice:

# Do: Use templating
sql = "SELECT * FROM users WHERE date = '{{ ds }}'"

# Do: Handle errors
task = PythonOperator(
    task_id='task',
    python_callable=risky_function,
    retries=3,
    on_failure_callback=alert_on_failure,
)

# Do: Use XCom for communication
def task_1(**context):
    data = get_data()
    context['task_instance'].xcom_push(key='data', value=data)

Conclusion

Apache Airflow provides powerful workflow orchestration for complex data pipelines. Master DAG design, operators, and scheduling to build production-grade data workflows. Use proper error handling, monitoring, and testing to ensure reliability. Airflow scales from simple scripts to enterprise data platforms.

Resources

Comments

Share this article

Scan to read on mobile