Skip to main content
โšก Calmops

Workflow Automation with Apache Airflow: Orchestrating Data Pipelines

Workflow Automation with Apache Airflow: Orchestrating Data Pipelines

Apache Airflow is a platform for programmatically authoring, scheduling, and monitoring workflows. It’s ideal for complex data pipelines and ETL processes.

Installation and Setup

Install Airflow

# Install Airflow
pip install apache-airflow

# Initialize database
airflow db init

# Create admin user
airflow users create \
  --username admin \
  --firstname Admin \
  --lastname User \
  --role Admin \
  --email [email protected]

# Start webserver
airflow webserver --port 8080

# Start scheduler (in another terminal)
airflow scheduler

Basic DAG Structure

Simple DAG

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

# Define default arguments
default_args = {
    'owner': 'data-team',
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2025, 1, 1),
}

# Create DAG
dag = DAG(
    'simple_pipeline',
    default_args=default_args,
    description='A simple data pipeline',
    schedule_interval='@daily',  # Run daily
    catchup=False,
)

# Define tasks
def extract_data():
    print("Extracting data...")
    return {'records': 1000}

def transform_data(**context):
    ti = context['task_instance']
    data = ti.xcom_pull(task_ids='extract')
    print(f"Transforming {data['records']} records...")
    return {'transformed': data['records'] * 2}

def load_data(**context):
    ti = context['task_instance']
    data = ti.xcom_pull(task_ids='transform')
    print(f"Loading {data['transformed']} records...")

# Create task instances
extract_task = PythonOperator(
    task_id='extract',
    python_callable=extract_data,
    dag=dag,
)

transform_task = PythonOperator(
    task_id='transform',
    python_callable=transform_data,
    dag=dag,
)

load_task = PythonOperator(
    task_id='load',
    python_callable=load_data,
    dag=dag,
)

# Set dependencies
extract_task >> transform_task >> load_task

Operators

Python Operator

from airflow.operators.python import PythonOperator

def my_function(param1, param2):
    print(f"Parameters: {param1}, {param2}")
    return "result"

task = PythonOperator(
    task_id='python_task',
    python_callable=my_function,
    op_kwargs={'param1': 'value1', 'param2': 'value2'},
    dag=dag,
)

Bash Operator

from airflow.operators.bash import BashOperator

bash_task = BashOperator(
    task_id='bash_task',
    bash_command='echo "Hello from Bash"',
    dag=dag,
)

# With environment variables
bash_task = BashOperator(
    task_id='bash_with_env',
    bash_command='echo $MY_VAR',
    env={'MY_VAR': 'my_value'},
    dag=dag,
)

SQL Operator

from airflow.providers.postgres.operators.postgres import PostgresOperator

sql_task = PostgresOperator(
    task_id='sql_task',
    postgres_conn_id='postgres_default',
    sql='''
        CREATE TABLE IF NOT EXISTS users (
            id SERIAL PRIMARY KEY,
            name VARCHAR(100),
            email VARCHAR(100)
        );
    ''',
    dag=dag,
)

Sensor Operators

from airflow.sensors.filesystem import FileSensor
from airflow.sensors.sql import SqlSensor
from airflow.sensors.time_delta import TimeDeltaSensor

# Wait for file
file_sensor = FileSensor(
    task_id='wait_for_file',
    filepath='/path/to/file.txt',
    poke_interval=60,  # Check every 60 seconds
    timeout=3600,  # Timeout after 1 hour
    dag=dag,
)

# Wait for SQL condition
sql_sensor = SqlSensor(
    task_id='wait_for_data',
    conn_id='postgres_default',
    sql='SELECT COUNT(*) FROM users WHERE created_date = CURRENT_DATE',
    poke_interval=60,
    dag=dag,
)

# Wait for time delta
time_sensor = TimeDeltaSensor(
    task_id='wait_time',
    delta=timedelta(minutes=5),
    dag=dag,
)

Task Dependencies and Branching

Linear Dependencies

task1 >> task2 >> task3  # Sequential
task1 >> [task2, task3]  # Parallel
[task1, task2] >> task3  # Join

Branching

from airflow.operators.python import BranchPythonOperator

def branch_logic(**context):
    value = context['dag_run'].conf.get('value', 0)
    if value > 50:
        return 'high_value_task'
    else:
        return 'low_value_task'

branch_task = BranchPythonOperator(
    task_id='branch',
    python_callable=branch_logic,
    dag=dag,
)

high_value_task = PythonOperator(
    task_id='high_value_task',
    python_callable=lambda: print("High value"),
    dag=dag,
)

low_value_task = PythonOperator(
    task_id='low_value_task',
    python_callable=lambda: print("Low value"),
    dag=dag,
)

branch_task >> [high_value_task, low_value_task]

XCom: Inter-task Communication

def task_1():
    return {'key': 'value', 'data': [1, 2, 3]}

def task_2(**context):
    ti = context['task_instance']
    # Pull from previous task
    data = ti.xcom_pull(task_ids='task_1')
    print(f"Received: {data}")
    
    # Push to XCom
    ti.xcom_push(key='result', value={'processed': True})

def task_3(**context):
    ti = context['task_instance']
    # Pull specific key
    result = ti.xcom_pull(task_ids='task_2', key='result')
    print(f"Result: {result}")

t1 = PythonOperator(task_id='task_1', python_callable=task_1, dag=dag)
t2 = PythonOperator(task_id='task_2', python_callable=task_2, dag=dag)
t3 = PythonOperator(task_id='task_3', python_callable=task_3, dag=dag)

t1 >> t2 >> t3

Scheduling

Schedule Intervals

# Cron expressions
schedule_interval='0 0 * * *'  # Daily at midnight
schedule_interval='0 */6 * * *'  # Every 6 hours
schedule_interval='0 9 * * MON-FRI'  # Weekdays at 9 AM

# Preset intervals
schedule_interval='@hourly'
schedule_interval='@daily'
schedule_interval='@weekly'
schedule_interval='@monthly'
schedule_interval=None  # Manual trigger only

Dynamic Scheduling

from airflow.models import Variable

# Get variable
interval = Variable.get('schedule_interval', '@daily')

dag = DAG(
    'dynamic_dag',
    schedule_interval=interval,
    default_args=default_args,
)

Error Handling and Retries

from airflow.utils.decorators import apply_defaults

default_args = {
    'owner': 'data-team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(hours=1),
    'on_failure_callback': lambda context: print(f"Task failed: {context}"),
    'on_success_callback': lambda context: print(f"Task succeeded: {context}"),
    'on_retry_callback': lambda context: print(f"Task retrying: {context}"),
}

# Task-level retry
task = PythonOperator(
    task_id='task_with_retry',
    python_callable=my_function,
    retries=5,
    retry_delay=timedelta(minutes=10),
    dag=dag,
)

Monitoring and Alerting

Email Alerts

from airflow.models import Variable

default_args = {
    'owner': 'data-team',
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
}

dag = DAG(
    'monitored_dag',
    default_args=default_args,
    dag=dag,
)

Custom Callbacks

def on_failure_callback(context):
    """Called when task fails"""
    task_instance = context['task_instance']
    exception = context['exception']
    print(f"Task {task_instance.task_id} failed: {exception}")
    # Send alert, log to external system, etc.

def on_success_callback(context):
    """Called when task succeeds"""
    task_instance = context['task_instance']
    print(f"Task {task_instance.task_id} succeeded")

task = PythonOperator(
    task_id='monitored_task',
    python_callable=my_function,
    on_failure_callback=on_failure_callback,
    on_success_callback=on_success_callback,
    dag=dag,
)

Dynamic DAG Generation

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'start_date': datetime(2025, 1, 1),
}

dag = DAG(
    'dynamic_dag_generation',
    default_args=default_args,
    schedule_interval='@daily',
)

# Generate tasks dynamically
tables = ['users', 'orders', 'products']

for table in tables:
    def process_table(table_name=table):
        print(f"Processing {table_name}")
    
    task = PythonOperator(
        task_id=f'process_{table}',
        python_callable=process_table,
        dag=dag,
    )

Best Practices

  1. Idempotent tasks: Tasks should produce same result if run multiple times
  2. Atomic operations: Each task should be independent and self-contained
  3. Proper error handling: Use retries and callbacks for monitoring
  4. Resource limits: Set memory and CPU limits for tasks
  5. Documentation: Document DAGs and tasks clearly
  6. Testing: Test DAGs before deploying to production
  7. Monitoring: Set up alerts for failures and SLAs

Common Pitfalls

Bad Practice:

# Don't: Hardcode values
sql = "SELECT * FROM users WHERE date = '2025-01-01'"

# Don't: No error handling
task = PythonOperator(task_id='task', python_callable=risky_function)

# Don't: Tight coupling between tasks
def task_1():
    global shared_data
    shared_data = get_data()

Good Practice:

# Do: Use templating
sql = "SELECT * FROM users WHERE date = '{{ ds }}'"

# Do: Handle errors
task = PythonOperator(
    task_id='task',
    python_callable=risky_function,
    retries=3,
    on_failure_callback=alert_on_failure,
)

# Do: Use XCom for communication
def task_1(**context):
    data = get_data()
    context['task_instance'].xcom_push(key='data', value=data)

Conclusion

Apache Airflow provides powerful workflow orchestration for complex data pipelines. Master DAG design, operators, and scheduling to build production-grade data workflows. Use proper error handling, monitoring, and testing to ensure reliability. Airflow scales from simple scripts to enterprise data platforms.

Comments