Introduction
Data quality is the foundation of reliable analytics and machine learning. Poor data quality leads to incorrect insights, failed models, and business decisions based on flawed information. Yet data quality remains one of the most overlooked aspects of data infrastructure.
Modern data quality requires a multi-layered approach: validation at ingestion, monitoring in flight, and observability across the entire data lifecycle. This guide covers frameworks, tools, and practices for building comprehensive data quality systems.
Data Quality Dimensions
| Dimension | Description | Example Checks |
|---|---|---|
| Completeness | Are all expected values present? | NULL counts, missing records |
| Accuracy | Do values reflect reality? | Range validation, referential integrity |
| Consistency | Is data consistent across sources? | Cross-system validation |
| Timeliness | Is data up-to-date? | Freshness checks, latency monitoring |
| Uniqueness | Are there duplicates? | Duplicate detection |
| Validity | Do values conform to rules? | Format checks, type validation |
Validation Frameworks
Great Expectations
# great_expectations_suite.py
import great_expectations as gx
from great_expectations.dataset import SparkDataset
# Create expectation suite
context = gx.get_context()
suite = context.suites.add(gx.core.expectation_suite.ExpectationSuite("sales_quality"))
# Define expectations
expectation_configurations = [
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {"column": "customer_id"}
},
{
"expectation_type": "expect_column_values_to_be_between",
"kwargs": {
"column": "amount",
"min_value": 0,
"max_value": 1000000
}
},
{
"expectation_type": "expect_column_distinct_values_to_be_in_set",
"kwargs": {
"column": "status",
"value_set": ["pending", "completed", "cancelled"]
}
},
{
"expectation_type": "expect_table_row_count_to_be_between",
"kwargs": {"min_value": 1000, "max_value": 1000000}
}
]
# Create checkpoint for validation
checkpoint = context.checkpoints.add(
name="sales_checkpoint",
validations=[
{
"batch_request": {
"datasource_name": "sales_datasource",
"data_connector_name": "default_inferred_data_connector_name",
"data_asset_name": "sales_data"
},
"expectation_suite_name": "sales_quality"
}
]
)
# Run validation
results = checkpoint.run()
dbt Tests
# dbt schema.yml
models:
- name: dim_customers
description: Customer dimension table
tests:
- dbt_utils.recency:
datepart: day
field: updated_at
interval: 1
- dbt_utils.expression_is_true:
expression: "birth_date < created_at"
columns:
- name: customer_id
tests:
- unique
- not_null
- name: email
tests:
- unique
- not_null
- name: status
tests:
- accepted_values:
values: ['active', 'inactive', 'pending']
- name: lifetime_value
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= 0"
- name: fct_orders
tests:
- dbt_utils.recency:
datepart: hour
field: created_at
interval: 4
- relationships:
to: ref('dim_customers')
field: customer_id
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_id
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
- name: amount
tests:
- not_null
- dbt_utils.expression_is_true:
expression: "> 0"
Data Monitoring
Freshness Monitoring
# data_freshness_monitor.py
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging
@dataclass
class TableFreshness:
table_name: str
latest_timestamp: datetime
expected_freshness: timedelta
actual_freshness: timedelta
@property
def is_fresh(self) -> bool:
return self.actual_freshness <= self.expected_freshness
@property
def latency_minutes(self) -> int:
return int(self.actual_freshness.total_seconds() / 60)
class FreshnessMonitor:
def __init__(self, alerting_service):
self.alerting = alerting_service
self.logger = logging.getLogger(__name__)
def check_freshness(self, connection, table_config: dict) -> TableFreshness:
"""Check table freshness."""
query = f"""
SELECT MAX({table_config['timestamp_column']}) as latest
FROM {table_config['schema']}.{table_config['table']}
"""
result = connection.execute(query)
latest = result.fetchone()[0]
if latest is None:
return TableFreshness(
table_name=table_config['table'],
latest_timestamp=datetime.min,
expected_freshness=table_config.get('expected_freshness', timedelta(hours=1)),
actual_freshness=timedelta(days=999)
)
now = datetime.now()
actual_freshness = now - latest
freshness = TableFreshness(
table_name=table_config['table'],
latest_timestamp=latest,
expected_freshness=table_config.get('expected_freshness', timedelta(hours=1)),
actual_freshness=actual_freshness
)
if not freshness.is_fresh:
self.logger.warning(f"Table {table_config['table']} is stale")
self.alerting.send_alert(
title=f"Data Freshness Alert: {table_config['table']}",
message=f"Late by {freshness.latency_minutes} minutes",
severity="high" if freshness.latency_minutes > 120 else "medium"
)
return freshness
Volume Anomaly Detection
# volume_anomaly_detector.py
import numpy as np
from scipy import stats
class VolumeAnomalyDetector:
def __init__(self, sensitivity: float = 3.0):
self.sensitivity = sensitivity
self.historical_volumes = {}
def train(self, table_name: str, historical_counts: list):
"""Train on historical volume data."""
self.historical_volumes[table_name] = {
'mean': np.mean(historical_counts),
'std': np.std(historical_counts),
'counts': historical_counts
}
def detect(self, table_name: str, current_count: int) -> dict:
"""Detect if current volume is anomalous."""
if table_name not in self.historical_volumes:
return {'anomaly': False, 'reason': 'no_baseline'}
stats_data = self.historical_volumes[table_name]
if stats_data['std'] == 0:
z_score = 0 if current_count == stats_data['mean'] else float('inf')
else:
z_score = abs(current_count - stats_data['mean']) / stats_data['std']
is_anomaly = z_score > self.sensitivity
return {
'anomaly': is_anomaly,
'z_score': z_score,
'expected_range': (
stats_data['mean'] - self.sensitivity * stats_data['std'],
stats_data['mean'] + self.sensitivity * stats_data['std']
),
'current_count': current_count
}
Data Observability
Data Contract Pattern
# data_contract.py
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime
class ColumnSchema(BaseModel):
name: str
type: str
nullable: bool = True
description: Optional[str] = None
class DataContract(BaseModel):
contract_id: str
dataset_name: str
description: str
owner: str
columns: List[ColumnSchema]
freshness_requirement: Optional[str] = None
volume_expectation: Optional[dict] = None
def validate_schema(self, actual_schema: List[ColumnSchema]) -> dict:
"""Validate actual schema matches contract."""
expected_cols = {c.name: c for c in self.columns}
actual_cols = {c.name: c for c in actual_schema}
issues = []
# Check for missing columns
for col_name in expected_cols:
if col_name not in actual_cols:
issues.append(f"Missing column: {col_name}")
# Check for unexpected columns
for col_name in actual_cols:
if col_name not in expected_cols:
issues.append(f"Unexpected column: {col_name}")
# Check types
for col_name in set(expected_cols.keys()) & set(actual_cols.keys()):
if expected_cols[col_name].type != actual_cols[col_name].type:
issues.append(
f"Type mismatch for {col_name}: "
f"expected {expected_cols[col_name].type}, "
f"got {actual_cols[col_name].type}"
)
return {
'valid': len(issues) == 0,
'issues': issues
}
# Example contract
contract = DataContract(
contract_id="contract-001",
dataset_name="orders",
description="E-commerce orders data",
owner="data-team",
columns=[
ColumnSchema(name="order_id", type="string", nullable=False),
ColumnSchema(name="customer_id", type="string", nullable=False),
ColumnSchema(name="total_amount", type="decimal", nullable=False),
ColumnSchema(name="status", type="string", nullable=False),
ColumnSchema(name="created_at", type="timestamp", nullable=False),
],
freshness_requirement="1 hour",
volume_expectation={"min": 1000, "max": 1000000}
)
Implementation Checklist
- Define data quality dimensions for each dataset
- Implement validation at ingestion point
- Set up freshness monitoring
- Configure volume anomaly detection
- Create data contracts
- Build alerting workflows
- Document data quality SLAs
Summary
Building robust data quality systems requires:
- Validation at ingestion: Catch issues early with schema validation and business rules
- Continuous monitoring: Track freshness, volume, and distribution
- Observability: Understand data lineage and dependencies
- Alerting: Notify stakeholders when quality degrades
Tools like Great Expectations, dbt tests, and custom monitoring form the foundation of modern data quality.
Comments