Introduction
Data lakehouses combine the best of data lakes and data warehouses. They provide ACID transactions, schema enforcement, and SQL performance while maintaining the flexibility and cost-effectiveness of data lakes. Organizations implementing lakehouses reduce data infrastructure costs by 40-60% while improving query performance.
This comprehensive guide covers data lakehouse architecture, Delta Lake, Apache Iceberg, and real-world implementation patterns.
Core Concepts
Data Lake
Centralized repository storing raw data in any format.
Data Warehouse
Structured repository optimized for analytics queries.
Data Lakehouse
Hybrid combining lake flexibility with warehouse performance.
ACID Transactions
Atomicity, Consistency, Isolation, Durability guarantees.
Schema Enforcement
Validating data structure before ingestion.
Time Travel
Querying historical versions of data.
Partitioning
Organizing data by columns for faster queries.
Compaction
Merging small files into larger ones.
Data Governance
Managing data quality, lineage, and access.
Metadata Layer
Tracking data structure and lineage.
Data Lakehouse Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Data Sources โ
โ (APIs, Databases, Logs, Streams, Files) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Data Ingestion Layer โ
โ (Apache Kafka, AWS Kinesis, Airflow, dbt) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Data Lakehouse Layer โ
โ (Delta Lake / Apache Iceberg on S3/ADLS) โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Bronze (Raw) โ Silver (Cleaned) โ Gold (Ready) โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโ
โ โ โ
โโโโโโโโโผโโโโโโโ โโโโโโโโผโโโโโโโ โโโโโโโผโโโโโโโ
โ SQL Engine โ โ BI Tools โ โ ML Models โ
โ (Spark SQL) โ โ (Tableau) โ โ (MLflow) โ
โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโ
Delta Lake Implementation
Setup and Configuration
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
# Create Spark session with Delta
builder = SparkSession.builder \
.appName("DeltaLakehouse") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
# Enable Delta features
spark.sql("SET spark.databricks.delta.preview.enabled = true")
Writing Data to Delta
def write_to_delta_bronze(df, table_name):
"""Write raw data to bronze layer"""
df.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save(f"s3://data-lake/bronze/{table_name}")
def write_to_delta_silver(df, table_name):
"""Write cleaned data to silver layer"""
df.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("date") \
.option("path", f"s3://data-lake/silver/{table_name}") \
.saveAsTable(f"silver_{table_name}")
# Example: Ingest customer data
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("customer_id", IntegerType()),
StructField("name", StringType()),
StructField("email", StringType()),
StructField("created_at", StringType())
])
# Read from source
df = spark.read.schema(schema).json("s3://raw-data/customers/*.json")
# Write to bronze
write_to_delta_bronze(df, "customers")
ACID Transactions
def merge_customer_updates(updates_df):
"""Merge updates with ACID guarantees"""
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "s3://data-lake/silver/customers")
# Merge operation (ACID guaranteed)
delta_table.alias("target") \
.merge(
updates_df.alias("source"),
"target.customer_id = source.customer_id"
) \
.whenMatchedUpdate(set={
"name": "source.name",
"email": "source.email",
"updated_at": "source.updated_at"
}) \
.whenNotMatchedInsert(values={
"customer_id": "source.customer_id",
"name": "source.name",
"email": "source.email",
"created_at": "source.created_at"
}) \
.execute()
# Example usage
updates = spark.read.json("s3://updates/customers/*.json")
merge_customer_updates(updates)
Time Travel
def query_historical_data(table_name, version):
"""Query specific version of data"""
df = spark.read \
.format("delta") \
.option("versionAsOf", version) \
.load(f"s3://data-lake/silver/{table_name}")
return df
def query_data_at_timestamp(table_name, timestamp):
"""Query data as of specific timestamp"""
df = spark.read \
.format("delta") \
.option("timestampAsOf", timestamp) \
.load(f"s3://data-lake/silver/{table_name}")
return df
# Example: Get customer data from 1 week ago
from datetime import datetime, timedelta
one_week_ago = (datetime.now() - timedelta(days=7)).isoformat()
historical_df = query_data_at_timestamp("customers", one_week_ago)
Schema Evolution
def evolve_schema(table_name, new_column_name, new_column_type):
"""Add new column to existing table"""
spark.sql(f"""
ALTER TABLE silver_{table_name}
ADD COLUMN {new_column_name} {new_column_type}
""")
# Example: Add phone column to customers
evolve_schema("customers", "phone", "STRING")
# Merge schema on write
df_with_new_column = spark.read.json("s3://data/customers_with_phone.json")
df_with_new_column.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save("s3://data-lake/silver/customers")
Apache Iceberg Implementation
Setup and Configuration
from pyspark.sql import SparkSession
# Create Spark session with Iceberg
spark = SparkSession.builder \
.appName("IcebergLakehouse") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
.config("spark.sql.catalog.spark_catalog.warehouse", "s3://data-lake/iceberg") \
.getOrCreate()
Writing Data to Iceberg
def write_to_iceberg(df, table_name, mode="append"):
"""Write data to Iceberg table"""
df.write \
.format("iceberg") \
.mode(mode) \
.partitionBy("date") \
.saveAsTable(f"iceberg_{table_name}")
# Example: Create Iceberg table
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg_customers (
customer_id INT,
name STRING,
email STRING,
created_at TIMESTAMP,
date DATE
)
USING iceberg
PARTITIONED BY (date)
""")
# Write data
df.write \
.format("iceberg") \
.mode("append") \
.insertInto("iceberg_customers")
Iceberg Features
# 1. Snapshots (versioning)
def list_snapshots(table_name):
"""List all snapshots of table"""
snapshots = spark.sql(f"""
SELECT snapshot_id, timestamp_ms, operation
FROM iceberg_{table_name}.snapshots
ORDER BY timestamp_ms DESC
""")
return snapshots
# 2. Time travel
def query_iceberg_at_snapshot(table_name, snapshot_id):
"""Query specific snapshot"""
df = spark.read \
.format("iceberg") \
.option("snapshot-id", snapshot_id) \
.load(f"iceberg_{table_name}")
return df
# 3. Schema evolution
spark.sql("""
ALTER TABLE iceberg_customers
ADD COLUMN phone STRING
""")
# 4. Partition evolution
spark.sql("""
ALTER TABLE iceberg_customers
SET PARTITION SPEC (date, country)
""")
Data Quality & Governance
Data Quality Checks
from great_expectations.dataset import SparkDFDataset
def validate_data_quality(df, table_name):
"""Validate data quality"""
ge_df = SparkDFDataset(df)
# Check for nulls
ge_df.expect_column_values_to_not_be_null("customer_id")
ge_df.expect_column_values_to_not_be_null("email")
# Check data types
ge_df.expect_column_values_to_be_of_type("customer_id", "int")
ge_df.expect_column_values_to_be_of_type("email", "string")
# Check value ranges
ge_df.expect_column_values_to_be_between("age", 0, 150)
# Check uniqueness
ge_df.expect_column_values_to_be_unique("email")
# Get validation results
results = ge_df.validate()
return results
# Example usage
df = spark.read.format("delta").load("s3://data-lake/silver/customers")
validation_results = validate_data_quality(df, "customers")
if not validation_results["success"]:
print("Data quality issues found:")
for result in validation_results["results"]:
if not result["success"]:
print(f" - {result['expectation_config']['expectation_type']}")
Data Lineage
def track_data_lineage(source_table, target_table, transformation):
"""Track data lineage"""
lineage = {
"source": source_table,
"target": target_table,
"transformation": transformation,
"timestamp": datetime.now().isoformat(),
"version": spark.sql("SELECT current_timestamp()").collect()[0][0]
}
# Store lineage
spark.createDataFrame([lineage]).write \
.format("delta") \
.mode("append") \
.save("s3://data-lake/metadata/lineage")
return lineage
# Example: Track customer transformation
track_data_lineage(
source_table="bronze_customers",
target_table="silver_customers",
transformation="Cleaned and deduplicated"
)
Performance Optimization
Partitioning Strategy
def optimize_partitioning(table_name):
"""Optimize table partitioning"""
# Analyze query patterns
spark.sql(f"""
ANALYZE TABLE silver_{table_name} COMPUTE STATISTICS
""")
# Repartition based on common filters
spark.sql(f"""
ALTER TABLE silver_{table_name}
SET TBLPROPERTIES (
'delta.dataSkippingNumIndexedCols' = '32'
)
""")
# Example: Partition by date and region
spark.sql("""
CREATE TABLE silver_orders (
order_id INT,
customer_id INT,
amount DECIMAL(10,2),
date DATE,
region STRING
)
USING delta
PARTITIONED BY (date, region)
""")
Compaction
def compact_delta_table(table_name):
"""Compact small files into larger ones"""
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, f"s3://data-lake/silver/{table_name}")
# Optimize table
delta_table.optimize().executeCompaction()
# Example: Compact customers table
compact_delta_table("customers")
# Check optimization results
spark.sql("""
DESCRIBE DETAIL silver_customers
""").show()
Caching Strategy
def cache_hot_data(table_name):
"""Cache frequently accessed data"""
df = spark.read.format("delta").load(f"s3://data-lake/silver/{table_name}")
# Cache in memory
df.cache()
# Trigger materialization
df.count()
return df
# Example: Cache customer data
customers_df = cache_hot_data("customers")
# Use cached data
customers_df.filter("country = 'US'").show()
Comparison: Delta Lake vs Iceberg
| Feature | Delta Lake | Iceberg |
|---|---|---|
| ACID Transactions | Yes | Yes |
| Time Travel | Yes | Yes |
| Schema Evolution | Yes | Yes |
| Partition Evolution | Limited | Yes |
| Metadata | Centralized | Distributed |
| Performance | Excellent | Excellent |
| Ecosystem | Databricks | Apache |
| Cost | Medium | Low |
Real-World Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Data Sources โ
โ (Databases, APIs, Logs, Streams) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Ingestion (Kafka, Airflow) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Bronze Layer (Raw Data) โ
โ (Delta Lake on S3, 1-day retention) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Silver Layer (Cleaned Data) โ
โ (Delta Lake on S3, 1-year retention) โ
โ (dbt transformations, quality checks) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Gold Layer (Analytics Ready) โ
โ (Delta Lake on S3, 3-year retention) โ
โ (Aggregated, business metrics) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโ
โ โ โ
โโโโโโโโโผโโโโโโโ โโโโโโโโผโโโโโโโ โโโโโโโผโโโโโโโ
โ SQL Queries โ โ BI Tools โ โ ML Models โ
โ (Spark SQL) โ โ (Tableau) โ โ (MLflow) โ
โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโ
External Resources
Delta Lake
Apache Iceberg
Data Engineering
Conclusion
Data lakehouses combine the flexibility of data lakes with the performance of data warehouses. Implement Delta Lake or Iceberg for ACID transactions and schema enforcement, use the medallion architecture (bronze/silver/gold) for data quality, and monitor performance continuously.
Start with Delta Lake for simplicity, migrate to Iceberg for advanced features. Build data governance and quality checks from day one.
The future of data infrastructure is lakehouses.
Comments