Skip to main content

Data Governance: Lineage, Cataloging, Access Control

Published: February 18, 2026 Updated: May 24, 2026 Larry Qu 7 min read

Introduction

Data governance is the foundation of a data-driven organization. Without proper governance, you risk regulatory penalties, data quality issues, and lost trust. This guide covers building a comprehensive data governance program.

Key Statistics:

  • 68% of organizations cite data governance as a top priority
  • Proper governance reduces data retrieval time by 40%
  • GDPR violations average $4.2 million
  • Data catalogs improve analyst productivity by 30%

Data Governance Framework

┌─────────────────────────────────────────────────────────────────┐
│                    Data Governance Framework                       │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐        │
│  │   Policy     │  │  Standards   │  │   Processes  │        │
│  │  Definition  │  │   & Models   │  │  & Workflows │        │
│  └──────────────┘  └──────────────┘  └──────────────┘        │
│         │                 │                  │                  │
│         └─────────────────┼──────────────────┘                 │
│                           ▼                                     │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │              Technology & Tools                          │   │
│  │  ┌────────┐  ┌────────┐  ┌────────┐  ┌────────┐       │   │
│  │  │Catalog │  │Lineage │  │Quality │  │Access  │       │   │
│  │  │        │  │        │  │        │  │Control │       │   │
│  │  └────────┘  └────────┘  └────────┘  └────────┘       │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Data Catalog Implementation

Amundsen Data Catalog

# docker-compose.yml for Amundsen
version: '3.8'

services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.17.0
    environment:
      - discovery.type=single-node
      - ES_JAVA_OPTS=-Xms2g -Xmx2g
    ports:
      - "9200:9200"
    volumes:
      - es_data:/usr/share/elasticsearch/data

  amundsenmetadata:
    image: amundsen/metadata:3.5.0
    environment:
      - LOGS_FORMAT=json
      - SPRING_ES_HOSTS=elasticsearch:9200
    ports:
      - "5002:5002"

  amundsenfrontend:
    image: amundsen/frontend:3.5.0
    environment:
      - FRONTEND_SVC_CONFIG=localDevelopment
    ports:
      - "5000:5000"
    depends_on:
      - amundsenmetadata

  amundsendatabuilder:
    image: amundsen/databuilder:3.5.0
    environment:
      - LOGS_FORMAT=json
      - ES_HOSTS=elasticsearch:9200
      - METADATA_HOST=amundsenmetadata

volumes:
  es_data:
#!/usr/bin/env python3
"""Amundsen data producer example."""

from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.task.task import DefaultTask
from databuilder.publisher.sql_alchemy_table_publisher import SQLAlchemyTablePublisher

# Example: Publish table metadata to Amundsen
def publish_table_metadata():
    """Publish table metadata to Amundsen."""
    
    table_metadata = TableMetadata(
        database='snowflake',
        cluster='production',
        schema='analytics',
        name='customer_orders',
        description='Customer order data with transaction details',
        column_names=[
            'customer_id',
            'order_id',
            'order_date',
            'amount',
            'status'
        ],
        column_descriptions=[
            'Unique customer identifier',
            'Unique order identifier',
            'Date of order',
            'Total order amount',
            'Order status (pending, shipped, delivered)'
        ],
        tags=['customer', 'orders', 'analytics'],
        owners=['[email protected]']
    )
    
    return table_metadata

Amundsen Metadata Loader

# metadata_loader.py
from amundsen_common.models.api import request as models_request

class MetadataLoader:
    def __init__(self, amundsen_client):
        self.client = amundsen_client
    
    def load_table_metadata(self, table_data: dict):
        """Load table metadata to catalog."""
        
        table_request = models_request.TableMetadataRequest(
            table_name=table_data['name'],
            table_description=table_data.get('description', ''),
            column_names=[c['name'] for c in table_data['columns']],
            column_descriptions=[c.get('description', '') for c in table_data['columns']],
            database=table_data.get('database', 'postgres'),
            cluster=table_data.get('cluster', 'production'),
            schema_name=table_data.get('schema', 'public'),
            tags=table_data.get('tags', []),
            owners=table_data.get('owners', [])
        )
        
        response = self.client.create_table_metadata(table_request)
        return response
    
    def search(self, query: str, filters: dict = None) -> list:
        """Search the data catalog."""
        return self.client.search_tables(query, filters)

# Usage
catalog = MetadataLoader(amundsen_client)

# Register a new table
catalog.load_table_metadata({
    'name': 'customer_orders',
    'description': 'Customer order transactions',
    'database': 'analytics',
    'schema': 'warehouse',
    'columns': [
        {'name': 'order_id', 'description': 'Unique order identifier'},
        {'name': 'customer_id', 'description': 'Foreign key to customers'},
        {'name': 'total_amount', 'description': 'Order total in USD'},
        {'name': 'status', 'description': 'Order status'},
        {'name': 'created_at', 'description': 'Order creation timestamp'}
    ],
    'tags': ['fact_table', 'customer', 'orders'],
    'owners': ['[email protected]']
})

DataHub Integration

# DataHub ingestion configuration
source:
  type: snowflake
  config:
    account_id: "xy12345.us-east-1"
    username: "${SNOWFLAKE_USER}"
    password: "${SNOWFLAKE_PASSWORD}"
    warehouse: "ANALYTICS_WH"
    database: "ANALYTICS"
    
sink:
  type: datahub-rest
  config:
    server: "http://datahub-gms:8080"
    token: "${DATAHUB_TOKEN}"

transformers:
  - type: "datahub_transformer.add_dataset_business_owner"
    config:
      business_owner: "[email protected]"
      business_owner_email: "[email protected]"

Data Lineage

OpenLineage Implementation

#!/usr/bin/env python3
"""OpenLineage integration for data lineage."""

from openlineage.client import OpenLineageClient
from openlineage.client.run import RunEvent, RunState
from datetime import datetime

client = OpenLineageClient.from_api_key("your-api-key")

def track_dagster_job():
    """Track Dagster job lineage."""
    
    run_id = "job-12345"
    job_name = "daily_sales_aggregation"
    namespace = "analytics"
    
    # Emit start event
    start_event = RunEvent(
        eventType=RunState.START,
        eventTime=datetime.now().isoformat(),
        run=RunRunState(runId=run_id),
        job=JobRunState(name=job_name, namespace=namespace),
        inputs=[
            InputDataset(
                namespace="snowflake://analytics",
                name="raw.sales.transactions",
                facets={
                    "schema": SchemaDatasetFacet(
                        fields=[
                            SchemaField(name="transaction_id", type="STRING"),
                            SchemaField(name="amount", type="DECIMAL"),
                            SchemaField(name="created_at", type="TIMESTAMP")
                        ]
                    )
                }
            )
        ],
        outputs=[
            OutputDataset(
                namespace="snowflake://analytics",
                name="analytics.daily_sales",
                facets={
                    "schema": SchemaDatasetFacet(
                        fields=[
                            SchemaField(name="date", type="DATE"),
                            SchemaField(name="total_sales", type="DECIMAL"),
                            SchemaField(name="transaction_count", type="INTEGER")
                        ]
                    )
                }
            )
        ]
    )
    
    client.emit(start_event)
-- Query lineage in Snowflake
SELECT 
    upstreams.table_name as source_table,
    upstreams.column_name as source_column,
    downstreams.table_name as target_table,
    downstreams.column_name as target_column,
    t.transform_type,
    t.transform_logic
FROM analytics.column_lineage t
JOIN analytics.table_metadata upstreams 
    ON t.upstream_table_id = upstreams.id
JOIN analytics.table_metadata downstreams 
    ON t.downstream_table_id = downstreams.id
WHERE downstreams.table_name = 'daily_sales';

Apache Atlas Lineage

# atlas_lineage.py
from atlas_client import Atlas

class DataLineageTracker:
    def __init__(self, atlas_config):
        self.client = Atlas(atlas_config['host'], atlas_config['port'])
        self.client.login(atlas_config['username'], atlas_config['password'])
    
    def register_process(self, process_data: dict):
        """Register a data process (ETL job, etc.)."""
        
        process_def = {
            'typeName': 'Process',
            'attributes': {
                'name': process_data['name'],
                'qualifiedName': f"{process_data['name']}@{process_data['cluster']}",
                'description': process_data.get('description', ''),
                'inputs': process_data.get('input_tables', []),
                'outputs': process_data.get('output_tables', []),
                'runId': process_data.get('run_id', ''),
                'startTime': process_data.get('start_time', 0),
                'endTime': process_data.get('end_time', 0)
            }
        }
        
        entity = self.client.entity.create_entity(process_def)
        return entity
    
    def register_table(self, table_data: dict):
        """Register a table with lineage context."""
        
        table_def = {
            'typeName': 'table',
            'attributes': {
                'name': table_data['name'],
                'qualifiedName': f"{table_data['name']}.{table_data['schema']}.{table_data['database']}",
                'description': table_data.get('description', ''),
                'owner': table_data.get('owner', ''),
                'tableType': table_data.get('type', 'EXTERNAL'),
                'columns': [
                    {
                        'name': col['name'],
                        'type': col['type'],
                        'description': col.get('description', '')
                    }
                    for col in table_data.get('columns', [])
                ]
            }
        }
        
        return self.client.entity.create_entity(table_def)
    
    def get_downstream_impact(self, table_qualified_name: str) -> list:
        """Get all downstream dependencies."""
        
        lineage = self.client.lineage.get_lineage(table_qualified_name)
        
        downstream = []
        for guid in lineage.get('guidEntityMap', {}).values():
            if guid.get('lineageDirection') == 'OUTPUT'):
                downstream.append(guid)
        
        return downstream

Access Control

Row-Level Security

-- Snowflake Row Access Policies
-- Create row access policy
CREATE ROW ACCESS POLICY analytics.user_region_policy
AS (user_region VARCHAR) RETURNS BOOLEAN -
  CASE
    WHEN CURRENT_ROLE() IN ('ANALYST_ROLE', 'ADMIN_ROLE') THEN TRUE
    WHEN CURRENT_ROLE() = 'SALES_ROLE' THEN 
      user_region = CURRENT_REGION()
    WHEN CURRENT_ROLE() = 'MARKETING_ROLE' THEN 
      user_region IN ('US', 'EU', 'APAC')
    ELSE FALSE
  END;

-- Apply to table
ALTER TABLE analytics.customer_data
ADD ROW ACCESS POLICY analytics.user_region_policy
ON (region);

Column-Level Security

-- BigQuery Column-Level Security
CREATE TABLE analytics.customer_pii (
    customer_id STRING,
    email STRING OPTIONS (require_partition_filter=true),
    phone STRING OPTIONS (labels=[('pii', 'true')]),
    address STRING OPTIONS (labels=[('pii', 'true')]),
    created_at TIMESTAMP
);

-- Column-level IAM
-- Grant access at column level
GRANT SELECT(customer_id, email, created_at)
ON analytics.customer_pii
TO ROLE analyst_role;

-- Deny PII columns
DENY SELECT(phone, address)
ON analytics.customer_pii
TO ROLE analyst_role;

Dynamic Masking

-- Dynamic Data Masking in Snowflake
-- Create masking policy
CREATE MASKING POLICY analytics.email_mask
AS (val VARCHAR) RETURNS VARCHAR ->
  CASE
    WHEN CURRENT_ROLE() IN ('ADMIN_ROLE', 'HR_ROLE') THEN val
    ELSE CONCAT(LEFT(val, 2), '***', RIGHT(val, 4))
  END;

-- Apply to column
ALTER TABLE analytics.users
ALTER COLUMN email
SET MASKING POLICY analytics.email_mask;

-- Apply to multiple columns
ALTER TABLE analytics.users
ALTER COLUMN phone
SET MASKING POLICY analytics.phone_mask;

Role-Based Access Control (RBAC)

# access_control.py
from enum import Enum
from dataclasses import dataclass
from typing import Set

class Permission(Enum):
    READ = "read"
    WRITE = "write"
    DELETE = "delete"
    ADMIN = "admin"

class Role(Enum):
    DATA_ANALYST = "data_analyst"
    DATA_ENGINEER = "data_engineer"
    DATA_SCIENTIST = "data_scientist"
    ANALYTICS_VIEWER = "analytics_viewer"
    PII_ACCESS = "pii_access"

@dataclass
class DataAsset:
    name: str
    sensitivity: str  # public, internal, confidential, restricted
    owner: str

class AccessControl:
    def __init__(self):
        self.role_permissions = {
            Role.DATA_ANALYST: {Permission.READ},
            Role.DATA_SCIENTIST: {Permission.READ},
            Role.DATA_ENGINEER: {Permission.READ, Permission.WRITE},
            Role.ANALYTICS_VIEWER: {Permission.READ},
            Role.PII_ACCESS: {Permission.READ}
        }
        
        self.sensitivity_restrictions = {
            'restricted': {Role.PII_ACCESS, Role.DATA_ENGINEER},
            'confidential': {Role.DATA_ENGINEER, Role.DATA_SCIENTIST, Role.DATA_ANALYST},
            'internal': {Role.DATA_ENGINEER, Role.DATA_SCIENTIST, Role.DATA_ANALYST, Role.ANALYTICS_VIEWER},
            'public': {Role.DATA_ENGINEER, Role.DATA_SCIENTIST, Role.DATA_ANALYST, Role.ANALYTICS_VIEWER, Role.PII_ACCESS}
        }
    
    def check_access(self, user_role: Role, asset: DataAsset, permission: Permission) -> bool:
        """Check if user has access to data asset."""
        
        # Check basic permission
        if permission not in self.role_permissions.get(user_role, set()):
            return False
        
        # Check sensitivity level
        allowed_roles = self.sensitivity_restrictions.get(asset.sensitivity, set())
        
        return user_role in allowed_roles
    
    def get_accessible_assets(self, user_role: Role, all_assets: list) -> list:
        """Get all assets a role can access."""
        
        accessible = []
        
        for asset in all_assets:
            if self.check_access(user_role, asset, Permission.READ):
                accessible.append(asset)
        
        return accessible

Column-Level Security with Views

-- Snowflake row-level and column-level security

-- Create masked columns view
CREATE OR REPLACE VIEW analytics.customer_orders_v AS
SELECT
    order_id,
    customer_id,
    -- Mask PII columns for non-authorized users
    CASE 
        WHEN CURRENT_ROLE() IN ('PII_ACCESS_ROLE', 'ADMIN_ROLE') THEN email
        ELSE '***MASKED***'
    END AS email,
    CASE 
        WHEN CURRENT_ROLE() IN ('PII_ACCESS_ROLE', 'ADMIN_ROLE') THEN phone
        ELSE NULL
    END AS phone,
    total_amount,
    status,
    created_at
FROM raw.customer_orders;

Implementation Checklist

  • Document all data assets
  • Implement data catalog
  • Set up lineage tracking
  • Define roles and permissions
  • Configure column-level security
  • Establish data retention policies
  • Create compliance workflows

Data Quality

Great Expectations Integration

# great_expectations.yml
config_version: 3.0

datasources:
  snowflake:
    data_connectors:
      default_runtime_data_connector_name:
        class_name: RuntimeDataConnector
        batch_identifiers:
          - default_identifier_name
    
    execution_engine:
      class_name: SqlAlchemyExecutionEngine
      connection_string: ${SNOWFLAKE_CONN}

stores:
  expectations_store:
    class_name: ExpectationsStore
    store_backend:
      class_name: TupleFilesystemStoreBackend
      base_directory: expectations/

  validations_store:
    class_name: ValidationsStore
    store_backend:
      class_name: TupleFilesystemStoreBackend
      base_directory: uncommitted/validations/
#!/usr/bin/env python3
"""Great Expectations data quality checks."""

import great_expectations as ge

# Create expectation suite
context = ge.get_context()

suite = context.add_expectation_suite("customer_data_quality")

# Add expectations
suite.add_expectation(
    ExpectColumnValuesToNotBeNull(column="customer_id")
)

suite.add_expectation(
    ExpectColumnValueLengthsToBeBetween(
        column="email",
        min_value=5,
        max_value=100
    )
)

suite.add_expectation(
    ExpectColumnValuesToBeUnique(column="customer_id")
)

suite.add_expectation(
    ExpectColumnValuesToMatchRegex(
        column="email",
        regex=r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'
    )
)

# Validate data
df = context.read_sql("SELECT * FROM analytics.customers")

results = df.validate(
    expectation_suite="customer_data_quality",
    only_return_failures=False
)

print(f"Success: {results.success}")
print(f"Failed expectations: {len(results.results)}")

External Resources


Comments

👍 Was this article helpful?