Introduction
Data governance provides the framework for managing data assets across an organization. It encompasses cataloging what’s available, understanding how data flows through systems, and controlling who can access what. This guide covers practical implementations.
Data Catalog
Purpose and Benefits
- Discovery: Find relevant datasets
- Understanding: Document meaning and usage
- Trust: Establish data quality signals
- Compliance: Track sensitive data
Implementation with Amundsen
# metadata_loader.py
from amundsen_common.models.api import request as models_request
class MetadataLoader:
def __init__(self, amundsen_client):
self.client = amundsen_client
def load_table_metadata(self, table_data: dict):
"""Load table metadata to catalog."""
table_request = models_request.TableMetadataRequest(
table_name=table_data['name'],
table_description=table_data.get('description', ''),
column_names=[c['name'] for c in table_data['columns']],
column_descriptions=[c.get('description', '') for c in table_data['columns']],
database=table_data.get('database', 'postgres'),
cluster=table_data.get('cluster', 'production'),
schema_name=table_data.get('schema', 'public'),
tags=table_data.get('tags', []),
owners=table_data.get('owners', [])
)
response = self.client.create_table_metadata(table_request)
return response
def search(self, query: str, filters: dict = None) -> list:
"""Search the data catalog."""
return self.client.search_tables(query, filters)
# Usage
catalog = MetadataLoader(amundsen_client)
# Register a new table
catalog.load_table_metadata({
'name': 'customer_orders',
'description': 'Customer order transactions',
'database': 'analytics',
'schema': 'warehouse',
'columns': [
{'name': 'order_id', 'description': 'Unique order identifier'},
{'name': 'customer_id', 'description': 'Foreign key to customers'},
{'name': 'total_amount', 'description': 'Order total in USD'},
{'name': 'status', 'description': 'Order status'},
{'name': 'created_at', 'description': 'Order creation timestamp'}
],
'tags': ['fact_table', 'customer', 'orders'],
'owners': ['[email protected]']
})
Data Lineage
Purpose and Benefits
- Impact Analysis: Understand change implications
- Root Cause: Trace data quality issues
- Compliance: Meet regulatory requirements
- Optimization: Identify data flow bottlenecks
Implementation with Apache Atlas
# atlas_lineage.py
from atlas_client import Atlas
class DataLineageTracker:
def __init__(self, atlas_config):
self.client = Atlas(atlas_config['host'], atlas_config['port'])
self.client.login(atlas_config['username'], atlas_config['password'])
def register_process(self, process_data: dict):
"""Register a data process (ETL job, etc.)."""
process_def = {
'typeName': 'Process',
'attributes': {
'name': process_data['name'],
'qualifiedName': f"{process_data['name']}@{process_data['cluster']}",
'description': process_data.get('description', ''),
'inputs': process_data.get('input_tables', []),
'outputs': process_data.get('output_tables', []),
'runId': process_data.get('run_id', ''),
'startTime': process_data.get('start_time', 0),
'endTime': process_data.get('end_time', 0)
}
}
entity = self.client.entity.create_entity(process_def)
return entity
def register_table(self, table_data: dict):
"""Register a table with lineage context."""
table_def = {
'typeName': 'table',
'attributes': {
'name': table_data['name'],
'qualifiedName': f"{table_data['name']}.{table_data['schema']}.{table_data['database']}",
'description': table_data.get('description', ''),
'owner': table_data.get('owner', ''),
'tableType': table_data.get('type', 'EXTERNAL'),
'columns': [
{
'name': col['name'],
'type': col['type'],
'description': col.get('description', '')
}
for col in table_data.get('columns', [])
]
}
}
return self.client.entity.create_entity(table_def)
def get_downstream_impact(self, table_qualified_name: str) -> list:
"""Get all downstream dependencies."""
lineage = self.client.lineage.get_lineage(table_qualified_name)
downstream = []
for guid in lineage.get('guidEntityMap', {}).values():
if guid.get('lineageDirection') == 'OUTPUT'):
downstream.append(guid)
return downstream
Access Control
Role-Based Access Control (RBAC)
# access_control.py
from enum import Enum
from dataclasses import dataclass
from typing import Set
class Permission(Enum):
READ = "read"
WRITE = "write"
DELETE = "delete"
ADMIN = "admin"
class Role(Enum):
DATA_ANALYST = "data_analyst"
DATA_ENGINEER = "data_engineer"
DATA_SCIENTIST = "data_scientist"
ANALYTICS_VIEWER = "analytics_viewer"
PII_ACCESS = "pii_access"
@dataclass
class DataAsset:
name: str
sensitivity: str # public, internal, confidential, restricted
owner: str
class AccessControl:
def __init__(self):
self.role_permissions = {
Role.DATA_ANALYST: {Permission.READ},
Role.DATA_SCIENTIST: {Permission.READ},
Role.DATA_ENGINEER: {Permission.READ, Permission.WRITE},
Role.ANALYTICS_VIEWER: {Permission.READ},
Role.PII_ACCESS: {Permission.READ}
}
self.sensitivity_restrictions = {
'restricted': {Role.PII_ACCESS, Role.DATA_ENGINEER},
'confidential': {Role.DATA_ENGINEER, Role.DATA_SCIENTIST, Role.DATA_ANALYST},
'internal': {Role.DATA_ENGINEER, Role.DATA_SCIENTIST, Role.DATA_ANALYST, Role.ANALYTICS_VIEWER},
'public': {Role.DATA_ENGINEER, Role.DATA_SCIENTIST, Role.DATA_ANALYST, Role.ANALYTICS_VIEWER, Role.PII_ACCESS}
}
def check_access(self, user_role: Role, asset: DataAsset, permission: Permission) -> bool:
"""Check if user has access to data asset."""
# Check basic permission
if permission not in self.role_permissions.get(user_role, set()):
return False
# Check sensitivity level
allowed_roles = self.sensitivity_restrictions.get(asset.sensitivity, set())
return user_role in allowed_roles
def get_accessible_assets(self, user_role: Role, all_assets: list) -> list:
"""Get all assets a role can access."""
accessible = []
for asset in all_assets:
if self.check_access(user_role, asset, Permission.READ):
accessible.append(asset)
return accessible
Column-Level Security
-- Snowflake row-level and column-level security
-- Create masked columns view
CREATE OR REPLACE VIEW analytics.customer_orders_v AS
SELECT
order_id,
customer_id,
-- Mask PII columns for non-authorized users
CASE
WHEN CURRENT_ROLE() IN ('PII_ACCESS_ROLE', 'ADMIN_ROLE') THEN email
ELSE '***MASKED***'
END AS email,
CASE
WHEN CURRENT_ROLE() IN ('PII_ACCESS_ROLE', 'ADMIN_ROLE') THEN phone
ELSE NULL
END AS phone,
total_amount,
status,
created_at
FROM raw.customer_orders;
Implementation Checklist
- Document all data assets
- Implement data catalog
- Set up lineage tracking
- Define roles and permissions
- Configure column-level security
- Establish data retention policies
- Create compliance workflows
Summary
Data governance provides essential framework for enterprise data management:
- Catalog: Enable discovery and understanding of available data
- Lineage: Track data flow for impact analysis and compliance
- Access Control: Implement RBAC and sensitivity-based restrictions
Modern tools like Amundsen, Apache Atlas, and cloud-native solutions (AWS Lake Formation, Azure Purview) provide integrated governance capabilities.
Comments