Skip to main content
โšก Calmops

Data Governance: Lineage, Cataloging, Access Control

Introduction

Data governance is the foundation of a data-driven organization. Without proper governance, you risk regulatory penalties, data quality issues, and lost trust. This guide covers building a comprehensive data governance program.

Key Statistics:

  • 68% of organizations cite data governance as a top priority
  • Proper governance reduces data retrieval time by 40%
  • GDPR violations average $4.2 million
  • Data catalogs improve analyst productivity by 30%

Data Governance Framework

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    Data Governance Framework                       โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                  โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”        โ”‚
โ”‚  โ”‚   Policy     โ”‚  โ”‚  Standards   โ”‚  โ”‚   Processes  โ”‚        โ”‚
โ”‚  โ”‚  Definition  โ”‚  โ”‚   & Models   โ”‚  โ”‚  & Workflows โ”‚        โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜        โ”‚
โ”‚         โ”‚                 โ”‚                  โ”‚                  โ”‚
โ”‚         โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                 โ”‚
โ”‚                           โ–ผ                                     โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚  โ”‚              Technology & Tools                          โ”‚   โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”       โ”‚   โ”‚
โ”‚  โ”‚  โ”‚Catalog โ”‚  โ”‚Lineage โ”‚  โ”‚Quality โ”‚  โ”‚Access  โ”‚       โ”‚   โ”‚
โ”‚  โ”‚  โ”‚        โ”‚  โ”‚        โ”‚  โ”‚        โ”‚  โ”‚Control โ”‚       โ”‚   โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜       โ”‚   โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ”‚                                                                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Data Catalog Implementation

Amundsen Data Catalog

# docker-compose.yml for Amundsen
version: '3.8'

services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.17.0
    environment:
      - discovery.type=single-node
      - ES_JAVA_OPTS=-Xms2g -Xmx2g
    ports:
      - "9200:9200"
    volumes:
      - es_data:/usr/share/elasticsearch/data

  amundsenmetadata:
    image: amundsen/metadata:3.5.0
    environment:
      - LOGS_FORMAT=json
      - SPRING_ES_HOSTS=elasticsearch:9200
    ports:
      - "5002:5002"

  amundsenfrontend:
    image: amundsen/frontend:3.5.0
    environment:
      - FRONTEND_SVC_CONFIG=localDevelopment
    ports:
      - "5000:5000"
    depends_on:
      - amundsenmetadata

  amundsendatabuilder:
    image: amundsen/databuilder:3.5.0
    environment:
      - LOGS_FORMAT=json
      - ES_HOSTS=elasticsearch:9200
      - METADATA_HOST=amundsenmetadata

volumes:
  es_data:
#!/usr/bin/env python3
"""Amundsen data producer example."""

from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.task.task import DefaultTask
from databuilder.publisher.sql_alchemy_table_publisher import SQLAlchemyTablePublisher

# Example: Publish table metadata to Amundsen
def publish_table_metadata():
    """Publish table metadata to Amundsen."""
    
    table_metadata = TableMetadata(
        database='snowflake',
        cluster='production',
        schema='analytics',
        name='customer_orders',
        description='Customer order data with transaction details',
        column_names=[
            'customer_id',
            'order_id',
            'order_date',
            'amount',
            'status'
        ],
        column_descriptions=[
            'Unique customer identifier',
            'Unique order identifier',
            'Date of order',
            'Total order amount',
            'Order status (pending, shipped, delivered)'
        ],
        tags=['customer', 'orders', 'analytics'],
        owners=['[email protected]']
    )
    
    return table_metadata

DataHub Integration

# DataHub ingestion configuration
source:
  type: snowflake
  config:
    account_id: "xy12345.us-east-1"
    username: "${SNOWFLAKE_USER}"
    password: "${SNOWFLAKE_PASSWORD}"
    warehouse: "ANALYTICS_WH"
    database: "ANALYTICS"
    
sink:
  type: datahub-rest
  config:
    server: "http://datahub-gms:8080"
    token: "${DATAHUB_TOKEN}"

transformers:
  - type: "datahub_transformer.add_dataset_business_owner"
    config:
      business_owner: "[email protected]"
      business_owner_email: "[email protected]"

Data Lineage

OpenLineage Implementation

#!/usr/bin/env python3
"""OpenLineage integration for data lineage."""

from openlineage.client import OpenLineageClient
from openlineage.client.run import RunEvent, RunState
from datetime import datetime

client = OpenLineageClient.from_api_key("your-api-key")

def track_dagster_job():
    """Track Dagster job lineage."""
    
    run_id = "job-12345"
    job_name = "daily_sales_aggregation"
    namespace = "analytics"
    
    # Emit start event
    start_event = RunEvent(
        eventType=RunState.START,
        eventTime=datetime.now().isoformat(),
        run=RunRunState(runId=run_id),
        job=JobRunState(name=job_name, namespace=namespace),
        inputs=[
            InputDataset(
                namespace="snowflake://analytics",
                name="raw.sales.transactions",
                facets={
                    "schema": SchemaDatasetFacet(
                        fields=[
                            SchemaField(name="transaction_id", type="STRING"),
                            SchemaField(name="amount", type="DECIMAL"),
                            SchemaField(name="created_at", type="TIMESTAMP")
                        ]
                    )
                }
            )
        ],
        outputs=[
            OutputDataset(
                namespace="snowflake://analytics",
                name="analytics.daily_sales",
                facets={
                    "schema": SchemaDatasetFacet(
                        fields=[
                            SchemaField(name="date", type="DATE"),
                            SchemaField(name="total_sales", type="DECIMAL"),
                            SchemaField(name="transaction_count", type="INTEGER")
                        ]
                    )
                }
            )
        ]
    )
    
    client.emit(start_event)
-- Query lineage in Snowflake
SELECT 
    upstreams.table_name as source_table,
    upstreams.column_name as source_column,
    downstreams.table_name as target_table,
    downstreams.column_name as target_column,
    t.transform_type,
    t.transform_logic
FROM analytics.column_lineage t
JOIN analytics.table_metadata upstreams 
    ON t.upstream_table_id = upstreams.id
JOIN analytics.table_metadata downstreams 
    ON t.downstream_table_id = downstreams.id
WHERE downstreams.table_name = 'daily_sales';

Access Control

Row-Level Security

-- Snowflake Row Access Policies
-- Create row access policy
CREATE ROW ACCESS POLICY analytics.user_region_policy
AS (user_region VARCHAR) RETURNS BOOLEAN -
  CASE
    WHEN CURRENT_ROLE() IN ('ANALYST_ROLE', 'ADMIN_ROLE') THEN TRUE
    WHEN CURRENT_ROLE() = 'SALES_ROLE' THEN 
      user_region = CURRENT_REGION()
    WHEN CURRENT_ROLE() = 'MARKETING_ROLE' THEN 
      user_region IN ('US', 'EU', 'APAC')
    ELSE FALSE
  END;

-- Apply to table
ALTER TABLE analytics.customer_data
ADD ROW ACCESS POLICY analytics.user_region_policy
ON (region);

Column-Level Security

-- BigQuery Column-Level Security
CREATE TABLE analytics.customer_pii (
    customer_id STRING,
    email STRING OPTIONS (require_partition_filter=true),
    phone STRING OPTIONS (labels=[('pii', 'true')]),
    address STRING OPTIONS (labels=[('pii', 'true')]),
    created_at TIMESTAMP
);

-- Column-level IAM
-- Grant access at column level
GRANT SELECT(customer_id, email, created_at)
ON analytics.customer_pii
TO ROLE analyst_role;

-- Deny PII columns
DENY SELECT(phone, address)
ON analytics.customer_pii
TO ROLE analyst_role;

Dynamic Masking

-- Dynamic Data Masking in Snowflake
-- Create masking policy
CREATE MASKING POLICY analytics.email_mask
AS (val VARCHAR) RETURNS VARCHAR ->
  CASE
    WHEN CURRENT_ROLE() IN ('ADMIN_ROLE', 'HR_ROLE') THEN val
    ELSE CONCAT(LEFT(val, 2), '***', RIGHT(val, 4))
  END;

-- Apply to column
ALTER TABLE analytics.users
ALTER COLUMN email
SET MASKING POLICY analytics.email_mask;

-- Apply to multiple columns
ALTER TABLE analytics.users
ALTER COLUMN phone
SET MASKING POLICY analytics.phone_mask;

Data Quality

Great Expectations Integration

# great_expectations.yml
config_version: 3.0

datasources:
  snowflake:
    data_connectors:
      default_runtime_data_connector_name:
        class_name: RuntimeDataConnector
        batch_identifiers:
          - default_identifier_name
    
    execution_engine:
      class_name: SqlAlchemyExecutionEngine
      connection_string: ${SNOWFLAKE_CONN}

stores:
  expectations_store:
    class_name: ExpectationsStore
    store_backend:
      class_name: TupleFilesystemStoreBackend
      base_directory: expectations/

  validations_store:
    class_name: ValidationsStore
    store_backend:
      class_name: TupleFilesystemStoreBackend
      base_directory: uncommitted/validations/
#!/usr/bin/env python3
"""Great Expectations data quality checks."""

import great_expectations as ge

# Create expectation suite
context = ge.get_context()

suite = context.add_expectation_suite("customer_data_quality")

# Add expectations
suite.add_expectation(
    ExpectColumnValuesToNotBeNull(column="customer_id")
)

suite.add_expectation(
    ExpectColumnValueLengthsToBeBetween(
        column="email",
        min_value=5,
        max_value=100
    )
)

suite.add_expectation(
    ExpectColumnValuesToBeUnique(column="customer_id")
)

suite.add_expectation(
    ExpectColumnValuesToMatchRegex(
        column="email",
        regex=r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'
    )
)

# Validate data
df = context.read_sql("SELECT * FROM analytics.customers")

results = df.validate(
    expectation_suite="customer_data_quality",
    only_return_failures=False
)

print(f"Success: {results.success}")
print(f"Failed expectations: {len(results.results)}")

External Resources


Comments