Skip to main content
โšก Calmops

Data Quality: Validation, Monitoring, and Observability

Introduction

Data quality is the foundation of reliable analytics and machine learning. Poor data quality leads to incorrect insights, failed models, and business decisions based on flawed information. Yet data quality remains one of the most overlooked aspects of data infrastructure.

Modern data quality requires a multi-layered approach: validation at ingestion, monitoring in flight, and observability across the entire data lifecycle. This guide covers frameworks, tools, and practices for building comprehensive data quality systems.

Data Quality Dimensions

Dimension Description Example Checks
Completeness Are all expected values present? NULL counts, missing records
Accuracy Do values reflect reality? Range validation, referential integrity
Consistency Is data consistent across sources? Cross-system validation
Timeliness Is data up-to-date? Freshness checks, latency monitoring
Uniqueness Are there duplicates? Duplicate detection
Validity Do values conform to rules? Format checks, type validation

Validation Frameworks

Great Expectations

# great_expectations_suite.py
import great_expectations as gx
from great_expectations.dataset import SparkDataset

# Create expectation suite
context = gx.get_context()
suite = context.suites.add(gx.core.expectation_suite.ExpectationSuite("sales_quality"))

# Define expectations
expectation_configurations = [
    {
        "expectation_type": "expect_column_values_to_not_be_null",
        "kwargs": {"column": "customer_id"}
    },
    {
        "expectation_type": "expect_column_values_to_be_between",
        "kwargs": {
            "column": "amount",
            "min_value": 0,
            "max_value": 1000000
        }
    },
    {
        "expectation_type": "expect_column_distinct_values_to_be_in_set",
        "kwargs": {
            "column": "status",
            "value_set": ["pending", "completed", "cancelled"]
        }
    },
    {
        "expectation_type": "expect_table_row_count_to_be_between",
        "kwargs": {"min_value": 1000, "max_value": 1000000}
    }
]

# Create checkpoint for validation
checkpoint = context.checkpoints.add(
    name="sales_checkpoint",
    validations=[
        {
            "batch_request": {
                "datasource_name": "sales_datasource",
                "data_connector_name": "default_inferred_data_connector_name",
                "data_asset_name": "sales_data"
            },
            "expectation_suite_name": "sales_quality"
        }
    ]
)

# Run validation
results = checkpoint.run()

dbt Tests

# dbt schema.yml
models:
  - name: dim_customers
    description: Customer dimension table
    tests:
      - dbt_utils.recency:
          datepart: day
          field: updated_at
          interval: 1
      - dbt_utils.expression_is_true:
          expression: "birth_date < created_at"
    
    columns:
      - name: customer_id
        tests:
          - unique
          - not_null
      - name: email
        tests:
          - unique
          - not_null
      - name: status
        tests:
          - accepted_values:
              values: ['active', 'inactive', 'pending']
      - name: lifetime_value
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: ">= 0"

  - name: fct_orders
    tests:
      - dbt_utils.recency:
          datepart: hour
          field: created_at
          interval: 4
      - relationships:
          to: ref('dim_customers')
          field: customer_id
    
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('dim_customers')
              field: customer_id
      - name: amount
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: "> 0"

Data Monitoring

Freshness Monitoring

# data_freshness_monitor.py
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging

@dataclass
class TableFreshness:
    table_name: str
    latest_timestamp: datetime
    expected_freshness: timedelta
    actual_freshness: timedelta
    
    @property
    def is_fresh(self) -> bool:
        return self.actual_freshness <= self.expected_freshness
    
    @property
    def latency_minutes(self) -> int:
        return int(self.actual_freshness.total_seconds() / 60)

class FreshnessMonitor:
    def __init__(self, alerting_service):
        self.alerting = alerting_service
        self.logger = logging.getLogger(__name__)
    
    def check_freshness(self, connection, table_config: dict) -> TableFreshness:
        """Check table freshness."""
        
        query = f"""
            SELECT MAX({table_config['timestamp_column']}) as latest
            FROM {table_config['schema']}.{table_config['table']}
        """
        
        result = connection.execute(query)
        latest = result.fetchone()[0]
        
        if latest is None:
            return TableFreshness(
                table_name=table_config['table'],
                latest_timestamp=datetime.min,
                expected_freshness=table_config.get('expected_freshness', timedelta(hours=1)),
                actual_freshness=timedelta(days=999)
            )
        
        now = datetime.now()
        actual_freshness = now - latest
        
        freshness = TableFreshness(
            table_name=table_config['table'],
            latest_timestamp=latest,
            expected_freshness=table_config.get('expected_freshness', timedelta(hours=1)),
            actual_freshness=actual_freshness
        )
        
        if not freshness.is_fresh:
            self.logger.warning(f"Table {table_config['table']} is stale")
            self.alerting.send_alert(
                title=f"Data Freshness Alert: {table_config['table']}",
                message=f"Late by {freshness.latency_minutes} minutes",
                severity="high" if freshness.latency_minutes > 120 else "medium"
            )
        
        return freshness

Volume Anomaly Detection

# volume_anomaly_detector.py
import numpy as np
from scipy import stats

class VolumeAnomalyDetector:
    def __init__(self, sensitivity: float = 3.0):
        self.sensitivity = sensitivity
        self.historical_volumes = {}
    
    def train(self, table_name: str, historical_counts: list):
        """Train on historical volume data."""
        self.historical_volumes[table_name] = {
            'mean': np.mean(historical_counts),
            'std': np.std(historical_counts),
            'counts': historical_counts
        }
    
    def detect(self, table_name: str, current_count: int) -> dict:
        """Detect if current volume is anomalous."""
        
        if table_name not in self.historical_volumes:
            return {'anomaly': False, 'reason': 'no_baseline'}
        
        stats_data = self.historical_volumes[table_name]
        
        if stats_data['std'] == 0:
            z_score = 0 if current_count == stats_data['mean'] else float('inf')
        else:
            z_score = abs(current_count - stats_data['mean']) / stats_data['std']
        
        is_anomaly = z_score > self.sensitivity
        
        return {
            'anomaly': is_anomaly,
            'z_score': z_score,
            'expected_range': (
                stats_data['mean'] - self.sensitivity * stats_data['std'],
                stats_data['mean'] + self.sensitivity * stats_data['std']
            ),
            'current_count': current_count
        }

Data Observability

Data Contract Pattern

# data_contract.py
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime

class ColumnSchema(BaseModel):
    name: str
    type: str
    nullable: bool = True
    description: Optional[str] = None

class DataContract(BaseModel):
    contract_id: str
    dataset_name: str
    description: str
    owner: str
    columns: List[ColumnSchema]
    freshness_requirement: Optional[str] = None
    volume_expectation: Optional[dict] = None
    
    def validate_schema(self, actual_schema: List[ColumnSchema]) -> dict:
        """Validate actual schema matches contract."""
        
        expected_cols = {c.name: c for c in self.columns}
        actual_cols = {c.name: c for c in actual_schema}
        
        issues = []
        
        # Check for missing columns
        for col_name in expected_cols:
            if col_name not in actual_cols:
                issues.append(f"Missing column: {col_name}")
        
        # Check for unexpected columns
        for col_name in actual_cols:
            if col_name not in expected_cols:
                issues.append(f"Unexpected column: {col_name}")
        
        # Check types
        for col_name in set(expected_cols.keys()) & set(actual_cols.keys()):
            if expected_cols[col_name].type != actual_cols[col_name].type:
                issues.append(
                    f"Type mismatch for {col_name}: "
                    f"expected {expected_cols[col_name].type}, "
                    f"got {actual_cols[col_name].type}"
                )
        
        return {
            'valid': len(issues) == 0,
            'issues': issues
        }

# Example contract
contract = DataContract(
    contract_id="contract-001",
    dataset_name="orders",
    description="E-commerce orders data",
    owner="data-team",
    columns=[
        ColumnSchema(name="order_id", type="string", nullable=False),
        ColumnSchema(name="customer_id", type="string", nullable=False),
        ColumnSchema(name="total_amount", type="decimal", nullable=False),
        ColumnSchema(name="status", type="string", nullable=False),
        ColumnSchema(name="created_at", type="timestamp", nullable=False),
    ],
    freshness_requirement="1 hour",
    volume_expectation={"min": 1000, "max": 1000000}
)

Implementation Checklist

  • Define data quality dimensions for each dataset
  • Implement validation at ingestion point
  • Set up freshness monitoring
  • Configure volume anomaly detection
  • Create data contracts
  • Build alerting workflows
  • Document data quality SLAs

Summary

Building robust data quality systems requires:

  1. Validation at ingestion: Catch issues early with schema validation and business rules
  2. Continuous monitoring: Track freshness, volume, and distribution
  3. Observability: Understand data lineage and dependencies
  4. Alerting: Notify stakeholders when quality degrades

Tools like Great Expectations, dbt tests, and custom monitoring form the foundation of modern data quality.


External Resources

Comments