Introduction
The ETL vs ELT debate has been settledโmostly. Modern cloud data warehouses have made ELT the default, but understanding when to use each approach is critical for building efficient data systems.
Key Statistics:
- 80% of new data pipelines use ELT
- ELT reduces pipeline development time by 50%
- ETL remains relevant for sensitive data transformations
- Modern data stacks process data 10x faster than traditional ETL
ETL vs ELT Comparison
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ ETL Process โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ Extract โโโโโถโ Transformโโโโโถโ Load โโโโโถโ Store โ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ
โ - Transform before load โ
โ - Smaller target storage โ
โ - Complex transformations in staging โ
โ - Sensitive data stays protected โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ ELT Process โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ Extract โโโโโถโ Load โโโโโถโTransformโโโโโถโ Store โ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ
โ - Load raw data first โ
โ - Transform in data warehouse โ
โ - Leverage DW compute power โ
โ - Faster initial data availability โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
ETL Implementation
Traditional ETL with Python
#!/usr/bin/env python3
"""Traditional ETL pipeline."""
import pandas as pd
from sqlalchemy import create_engine
import requests
from datetime import datetime, timedelta
import json
class ETLPipeline:
def __init__(self, config):
self.source_config = config['source']
self.target_config = config['target']
def extract(self):
"""Extract data from source."""
# Example: API to JSON
data = []
page = 1
while True:
response = requests.get(
f"{self.source_config['base_url']}/records",
params={'page': page, 'limit': 1000}
)
if response.status_code != 200:
break
page_data = response.json()
if not page_data:
break
data.extend(page_data)
page += 1
if page > self.source_config.get('max_pages', 10):
break
return pd.DataFrame(data)
def transform(self, df):
"""Transform data before loading."""
# Data cleaning
df = df.dropna(subset=['id', 'created_at'])
# Type conversions
df['created_at'] = pd.to_datetime(df['created_at'])
df['amount'] = pd.to_numeric(df['amount'], errors='coerce')
# Derived fields
df['date'] = df['created_at'].dt.date
df['hour'] = df['created_at'].dt.hour
# Aggregations
df = df.merge(
df.groupby('user_id').agg({
'amount': 'sum',
'id': 'count'
}).rename(columns={'id': 'transaction_count'}),
on='user_id',
how='left'
)
# PII handling
df['email_hash'] = df['email'].apply(lambda x: hash(x) if pd.notna(x) else None)
df = df.drop(columns=['email', 'phone'])
return df
def load(self, df):
"""Load data to target."""
engine = create_engine(self.target_config['connection_string'])
# Incremental load
max_date = pd.read_sql(
"SELECT MAX(date) FROM target_table",
engine
).iloc[0, 0]
if max_date:
df = df[df['date'] > max_date]
df.to_sql(
'target_table',
engine,
if_exists='append',
index=False
)
def run(self):
"""Run complete ETL pipeline."""
print(f"Starting ETL at {datetime.now()}")
data = self.extract()
print(f"Extracted {len(data)} records")
transformed = self.transform(data)
print(f"Transformed to {len(transformed)} records")
self.load(transformed)
print("Data loaded successfully")
return True
if __name__ == '__main__':
config = {
'source': {
'base_url': 'https://api.example.com',
'max_pages': 10
},
'target': {
'connection_string': 'postgresql://user:pass@localhost:5432/dw'
}
}
pipeline = ETLPipeline(config)
pipeline.run()
ELT Implementation
ELT with dbt
# dbt_project.yml
name: my_analytics
version: '1.0.0'
config-version: 2
vars:
# Source configuration
source_database: raw
source_schema: ecommerce
# Data modeling
stripe_enabled: true
models:
my_analytics:
+materialized: table
staging:
+materialized: view
intermediate:
+materialized: ephemeral
marts:
+materialized: table
-- staging/stg_stripe_payments.sql
{{ config(materialized='view') }}
SELECT
id as payment_id,
created,
amount,
currency,
status,
customer_id,
COALESCE(amount, 0) / 100.0 as amount_cents,
CASE
WHEN status = 'succeeded' THEN TRUE
ELSE FALSE
END as is_successful,
DATE_TRUNC('day', TIMESTAMP_SECONDS(created)) as payment_date
FROM
{{ source('stripe', 'payments') }}
WHERE
created >= '{{ var('lookback_days', 365) }}'
-- intermediate/int_payments_by_customer.sql
{{ config(materialized='table') }}
WITH payments AS (
SELECT * FROM {{ ref('stg_stripe_payments') }}
WHERE is_successful
)
SELECT
customer_id,
payment_date,
COUNT(*) as payment_count,
SUM(amount_cents) as total_amount_cents,
AVG(amount_cents) as avg_amount_cents,
MIN(amount_cents) as min_amount_cents,
MAX(amount_cents) as max_amount_cents
FROM payments
GROUP BY
customer_id,
payment_date
-- marts/fct_customer_revenue.sql
{{ config(materialized='table') }}
WITH customer_payments AS (
SELECT * FROM {{ ref('int_payments_by_customer') }}
),
customer_first_payment AS (
SELECT
customer_id,
MIN(payment_date) as first_payment_date
FROM customer_payments
GROUP BY customer_id
)
SELECT
cp.*,
cf.first_payment_date,
DATEDIFF('day', cf.first_payment_date, cp.payment_date) as days_since_first_payment,
SUM(cp.total_amount_cents) OVER (
PARTITION BY cp.customer_id
ORDER BY cp.payment_date
) as running_total_amount
FROM customer_payments cp
JOIN customer_first_payment cf
ON cp.customer_id = cf.customer_id
# dbt schedule
cron: "0 0 * * *" # Daily at midnight
Modern Data Stack
Tools Comparison
| Layer | ETL Tools | ELT Tools |
|---|---|---|
| Ingestion | Fivetran, Airbyte | Fivetran, Airbyte |
| Transformation | dbt, SQL | dbt, Snowflake Scripts |
| Orchestration | Airflow, Dagster | Airflow, Dagster |
| Storage | S3, GCS | Snowflake, BigQuery, Redshift |
| Visualization | Tableau, Looker | Looker, Preset |
Airbyte + dbt + Airflow
# Airflow DAG for ELT
from airflow import DAG
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.providers.dbt.operators.dbt import DbtRunOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2026, 1, 1),
'retries': 3,
}
with DAG('elt_pipeline',
default_args=default_args,
schedule_interval='@daily') as dag:
# Extract & Load
sync_stripe = AirbyteTriggerSyncOperator(
task_id='sync_stripe',
airbyte_conn_id='airbyte',
connection_id='stripe_connection_id',
timeout=3600
)
sync_shopify = AirbyteTriggerSyncOperator(
task_id='sync_shopify',
airbyte_conn_id='airbyte',
connection_id='shopify_connection_id',
timeout=3600
)
# Transform
dbt_run_staging = DbtRunOperator(
task_id='dbt_run_staging',
dir='/dbt',
select='tag:staging'
)
dbt_run_marts = DbtRunOperator(
task_id='dbt_run_marts',
dir='/dbt',
select='tag:marts'
)
# Dependencies
[sync_stripe, sync_shopify] >> dbt_run_staging >> dbt_run_marts
When to Use Each
Use ETL When
- Transforming sensitive data (PII, financial)
- Data must not exist in raw form in warehouse
- Source systems have strict API limits
- Target system has limited compute
- Legacy system compatibility required
Use ELT When
- Using modern cloud data warehouse
- Need fast time-to-insight on raw data
- Have large data volumes
- Want to leverage DW compute power
- Need flexibility in transformations
Comments