Skip to main content
โšก Calmops

Data Governance: Catalog, Lineage, and Access Control

Introduction

Data governance provides the framework for managing data assets across an organization. It encompasses cataloging what’s available, understanding how data flows through systems, and controlling who can access what. This guide covers practical implementations.

Data Catalog

Purpose and Benefits

  • Discovery: Find relevant datasets
  • Understanding: Document meaning and usage
  • Trust: Establish data quality signals
  • Compliance: Track sensitive data

Implementation with Amundsen

# metadata_loader.py
from amundsen_common.models.api import request as models_request

class MetadataLoader:
    def __init__(self, amundsen_client):
        self.client = amundsen_client
    
    def load_table_metadata(self, table_data: dict):
        """Load table metadata to catalog."""
        
        table_request = models_request.TableMetadataRequest(
            table_name=table_data['name'],
            table_description=table_data.get('description', ''),
            column_names=[c['name'] for c in table_data['columns']],
            column_descriptions=[c.get('description', '') for c in table_data['columns']],
            database=table_data.get('database', 'postgres'),
            cluster=table_data.get('cluster', 'production'),
            schema_name=table_data.get('schema', 'public'),
            tags=table_data.get('tags', []),
            owners=table_data.get('owners', [])
        )
        
        response = self.client.create_table_metadata(table_request)
        return response
    
    def search(self, query: str, filters: dict = None) -> list:
        """Search the data catalog."""
        return self.client.search_tables(query, filters)

# Usage
catalog = MetadataLoader(amundsen_client)

# Register a new table
catalog.load_table_metadata({
    'name': 'customer_orders',
    'description': 'Customer order transactions',
    'database': 'analytics',
    'schema': 'warehouse',
    'columns': [
        {'name': 'order_id', 'description': 'Unique order identifier'},
        {'name': 'customer_id', 'description': 'Foreign key to customers'},
        {'name': 'total_amount', 'description': 'Order total in USD'},
        {'name': 'status', 'description': 'Order status'},
        {'name': 'created_at', 'description': 'Order creation timestamp'}
    ],
    'tags': ['fact_table', 'customer', 'orders'],
    'owners': ['[email protected]']
})

Data Lineage

Purpose and Benefits

  • Impact Analysis: Understand change implications
  • Root Cause: Trace data quality issues
  • Compliance: Meet regulatory requirements
  • Optimization: Identify data flow bottlenecks

Implementation with Apache Atlas

# atlas_lineage.py
from atlas_client import Atlas

class DataLineageTracker:
    def __init__(self, atlas_config):
        self.client = Atlas(atlas_config['host'], atlas_config['port'])
        self.client.login(atlas_config['username'], atlas_config['password'])
    
    def register_process(self, process_data: dict):
        """Register a data process (ETL job, etc.)."""
        
        process_def = {
            'typeName': 'Process',
            'attributes': {
                'name': process_data['name'],
                'qualifiedName': f"{process_data['name']}@{process_data['cluster']}",
                'description': process_data.get('description', ''),
                'inputs': process_data.get('input_tables', []),
                'outputs': process_data.get('output_tables', []),
                'runId': process_data.get('run_id', ''),
                'startTime': process_data.get('start_time', 0),
                'endTime': process_data.get('end_time', 0)
            }
        }
        
        entity = self.client.entity.create_entity(process_def)
        return entity
    
    def register_table(self, table_data: dict):
        """Register a table with lineage context."""
        
        table_def = {
            'typeName': 'table',
            'attributes': {
                'name': table_data['name'],
                'qualifiedName': f"{table_data['name']}.{table_data['schema']}.{table_data['database']}",
                'description': table_data.get('description', ''),
                'owner': table_data.get('owner', ''),
                'tableType': table_data.get('type', 'EXTERNAL'),
                'columns': [
                    {
                        'name': col['name'],
                        'type': col['type'],
                        'description': col.get('description', '')
                    }
                    for col in table_data.get('columns', [])
                ]
            }
        }
        
        return self.client.entity.create_entity(table_def)
    
    def get_downstream_impact(self, table_qualified_name: str) -> list:
        """Get all downstream dependencies."""
        
        lineage = self.client.lineage.get_lineage(table_qualified_name)
        
        downstream = []
        for guid in lineage.get('guidEntityMap', {}).values():
            if guid.get('lineageDirection') == 'OUTPUT'):
                downstream.append(guid)
        
        return downstream

Access Control

Role-Based Access Control (RBAC)

# access_control.py
from enum import Enum
from dataclasses import dataclass
from typing import Set

class Permission(Enum):
    READ = "read"
    WRITE = "write"
    DELETE = "delete"
    ADMIN = "admin"

class Role(Enum):
    DATA_ANALYST = "data_analyst"
    DATA_ENGINEER = "data_engineer"
    DATA_SCIENTIST = "data_scientist"
    ANALYTICS_VIEWER = "analytics_viewer"
    PII_ACCESS = "pii_access"

@dataclass
class DataAsset:
    name: str
    sensitivity: str  # public, internal, confidential, restricted
    owner: str

class AccessControl:
    def __init__(self):
        self.role_permissions = {
            Role.DATA_ANALYST: {Permission.READ},
            Role.DATA_SCIENTIST: {Permission.READ},
            Role.DATA_ENGINEER: {Permission.READ, Permission.WRITE},
            Role.ANALYTICS_VIEWER: {Permission.READ},
            Role.PII_ACCESS: {Permission.READ}
        }
        
        self.sensitivity_restrictions = {
            'restricted': {Role.PII_ACCESS, Role.DATA_ENGINEER},
            'confidential': {Role.DATA_ENGINEER, Role.DATA_SCIENTIST, Role.DATA_ANALYST},
            'internal': {Role.DATA_ENGINEER, Role.DATA_SCIENTIST, Role.DATA_ANALYST, Role.ANALYTICS_VIEWER},
            'public': {Role.DATA_ENGINEER, Role.DATA_SCIENTIST, Role.DATA_ANALYST, Role.ANALYTICS_VIEWER, Role.PII_ACCESS}
        }
    
    def check_access(self, user_role: Role, asset: DataAsset, permission: Permission) -> bool:
        """Check if user has access to data asset."""
        
        # Check basic permission
        if permission not in self.role_permissions.get(user_role, set()):
            return False
        
        # Check sensitivity level
        allowed_roles = self.sensitivity_restrictions.get(asset.sensitivity, set())
        
        return user_role in allowed_roles
    
    def get_accessible_assets(self, user_role: Role, all_assets: list) -> list:
        """Get all assets a role can access."""
        
        accessible = []
        
        for asset in all_assets:
            if self.check_access(user_role, asset, Permission.READ):
                accessible.append(asset)
        
        return accessible

Column-Level Security

-- Snowflake row-level and column-level security

-- Create masked columns view
CREATE OR REPLACE VIEW analytics.customer_orders_v AS
SELECT
    order_id,
    customer_id,
    -- Mask PII columns for non-authorized users
    CASE 
        WHEN CURRENT_ROLE() IN ('PII_ACCESS_ROLE', 'ADMIN_ROLE') THEN email
        ELSE '***MASKED***'
    END AS email,
    CASE 
        WHEN CURRENT_ROLE() IN ('PII_ACCESS_ROLE', 'ADMIN_ROLE') THEN phone
        ELSE NULL
    END AS phone,
    total_amount,
    status,
    created_at
FROM raw.customer_orders;

Implementation Checklist

  • Document all data assets
  • Implement data catalog
  • Set up lineage tracking
  • Define roles and permissions
  • Configure column-level security
  • Establish data retention policies
  • Create compliance workflows

Summary

Data governance provides essential framework for enterprise data management:

  1. Catalog: Enable discovery and understanding of available data
  2. Lineage: Track data flow for impact analysis and compliance
  3. Access Control: Implement RBAC and sensitivity-based restrictions

Modern tools like Amundsen, Apache Atlas, and cloud-native solutions (AWS Lake Formation, Azure Purview) provide integrated governance capabilities.


External Resources

Comments