Introduction
Data governance is the foundation of a data-driven organization. Without proper governance, you risk regulatory penalties, data quality issues, and lost trust. This guide covers building a comprehensive data governance program.
Key Statistics:
- 68% of organizations cite data governance as a top priority
- Proper governance reduces data retrieval time by 40%
- GDPR violations average $4.2 million
- Data catalogs improve analyst productivity by 30%
Data Governance Framework
┌─────────────────────────────────────────────────────────────────┐
│ Data Governance Framework │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Policy │ │ Standards │ │ Processes │ │
│ │ Definition │ │ & Models │ │ & Workflows │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │ │
│ └─────────────────┼──────────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Technology & Tools │ │
│ │ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │ │
│ │ │Catalog │ │Lineage │ │Quality │ │Access │ │ │
│ │ │ │ │ │ │ │ │Control │ │ │
│ │ └────────┘ └────────┘ └────────┘ └────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Data Catalog Implementation
Amundsen Data Catalog
# docker-compose.yml for Amundsen
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.0
environment:
- discovery.type=single-node
- ES_JAVA_OPTS=-Xms2g -Xmx2g
ports:
- "9200:9200"
volumes:
- es_data:/usr/share/elasticsearch/data
amundsenmetadata:
image: amundsen/metadata:3.5.0
environment:
- LOGS_FORMAT=json
- SPRING_ES_HOSTS=elasticsearch:9200
ports:
- "5002:5002"
amundsenfrontend:
image: amundsen/frontend:3.5.0
environment:
- FRONTEND_SVC_CONFIG=localDevelopment
ports:
- "5000:5000"
depends_on:
- amundsenmetadata
amundsendatabuilder:
image: amundsen/databuilder:3.5.0
environment:
- LOGS_FORMAT=json
- ES_HOSTS=elasticsearch:9200
- METADATA_HOST=amundsenmetadata
volumes:
es_data:
#!/usr/bin/env python3
"""Amundsen data producer example."""
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.task.task import DefaultTask
from databuilder.publisher.sql_alchemy_table_publisher import SQLAlchemyTablePublisher
# Example: Publish table metadata to Amundsen
def publish_table_metadata():
"""Publish table metadata to Amundsen."""
table_metadata = TableMetadata(
database='snowflake',
cluster='production',
schema='analytics',
name='customer_orders',
description='Customer order data with transaction details',
column_names=[
'customer_id',
'order_id',
'order_date',
'amount',
'status'
],
column_descriptions=[
'Unique customer identifier',
'Unique order identifier',
'Date of order',
'Total order amount',
'Order status (pending, shipped, delivered)'
],
tags=['customer', 'orders', 'analytics'],
owners=['[email protected]']
)
return table_metadata
Amundsen Metadata Loader
# metadata_loader.py
from amundsen_common.models.api import request as models_request
class MetadataLoader:
def __init__(self, amundsen_client):
self.client = amundsen_client
def load_table_metadata(self, table_data: dict):
"""Load table metadata to catalog."""
table_request = models_request.TableMetadataRequest(
table_name=table_data['name'],
table_description=table_data.get('description', ''),
column_names=[c['name'] for c in table_data['columns']],
column_descriptions=[c.get('description', '') for c in table_data['columns']],
database=table_data.get('database', 'postgres'),
cluster=table_data.get('cluster', 'production'),
schema_name=table_data.get('schema', 'public'),
tags=table_data.get('tags', []),
owners=table_data.get('owners', [])
)
response = self.client.create_table_metadata(table_request)
return response
def search(self, query: str, filters: dict = None) -> list:
"""Search the data catalog."""
return self.client.search_tables(query, filters)
# Usage
catalog = MetadataLoader(amundsen_client)
# Register a new table
catalog.load_table_metadata({
'name': 'customer_orders',
'description': 'Customer order transactions',
'database': 'analytics',
'schema': 'warehouse',
'columns': [
{'name': 'order_id', 'description': 'Unique order identifier'},
{'name': 'customer_id', 'description': 'Foreign key to customers'},
{'name': 'total_amount', 'description': 'Order total in USD'},
{'name': 'status', 'description': 'Order status'},
{'name': 'created_at', 'description': 'Order creation timestamp'}
],
'tags': ['fact_table', 'customer', 'orders'],
'owners': ['[email protected]']
})
DataHub Integration
# DataHub ingestion configuration
source:
type: snowflake
config:
account_id: "xy12345.us-east-1"
username: "${SNOWFLAKE_USER}"
password: "${SNOWFLAKE_PASSWORD}"
warehouse: "ANALYTICS_WH"
database: "ANALYTICS"
sink:
type: datahub-rest
config:
server: "http://datahub-gms:8080"
token: "${DATAHUB_TOKEN}"
transformers:
- type: "datahub_transformer.add_dataset_business_owner"
config:
business_owner: "[email protected]"
business_owner_email: "[email protected]"
Data Lineage
OpenLineage Implementation
#!/usr/bin/env python3
"""OpenLineage integration for data lineage."""
from openlineage.client import OpenLineageClient
from openlineage.client.run import RunEvent, RunState
from datetime import datetime
client = OpenLineageClient.from_api_key("your-api-key")
def track_dagster_job():
"""Track Dagster job lineage."""
run_id = "job-12345"
job_name = "daily_sales_aggregation"
namespace = "analytics"
# Emit start event
start_event = RunEvent(
eventType=RunState.START,
eventTime=datetime.now().isoformat(),
run=RunRunState(runId=run_id),
job=JobRunState(name=job_name, namespace=namespace),
inputs=[
InputDataset(
namespace="snowflake://analytics",
name="raw.sales.transactions",
facets={
"schema": SchemaDatasetFacet(
fields=[
SchemaField(name="transaction_id", type="STRING"),
SchemaField(name="amount", type="DECIMAL"),
SchemaField(name="created_at", type="TIMESTAMP")
]
)
}
)
],
outputs=[
OutputDataset(
namespace="snowflake://analytics",
name="analytics.daily_sales",
facets={
"schema": SchemaDatasetFacet(
fields=[
SchemaField(name="date", type="DATE"),
SchemaField(name="total_sales", type="DECIMAL"),
SchemaField(name="transaction_count", type="INTEGER")
]
)
}
)
]
)
client.emit(start_event)
-- Query lineage in Snowflake
SELECT
upstreams.table_name as source_table,
upstreams.column_name as source_column,
downstreams.table_name as target_table,
downstreams.column_name as target_column,
t.transform_type,
t.transform_logic
FROM analytics.column_lineage t
JOIN analytics.table_metadata upstreams
ON t.upstream_table_id = upstreams.id
JOIN analytics.table_metadata downstreams
ON t.downstream_table_id = downstreams.id
WHERE downstreams.table_name = 'daily_sales';
Apache Atlas Lineage
# atlas_lineage.py
from atlas_client import Atlas
class DataLineageTracker:
def __init__(self, atlas_config):
self.client = Atlas(atlas_config['host'], atlas_config['port'])
self.client.login(atlas_config['username'], atlas_config['password'])
def register_process(self, process_data: dict):
"""Register a data process (ETL job, etc.)."""
process_def = {
'typeName': 'Process',
'attributes': {
'name': process_data['name'],
'qualifiedName': f"{process_data['name']}@{process_data['cluster']}",
'description': process_data.get('description', ''),
'inputs': process_data.get('input_tables', []),
'outputs': process_data.get('output_tables', []),
'runId': process_data.get('run_id', ''),
'startTime': process_data.get('start_time', 0),
'endTime': process_data.get('end_time', 0)
}
}
entity = self.client.entity.create_entity(process_def)
return entity
def register_table(self, table_data: dict):
"""Register a table with lineage context."""
table_def = {
'typeName': 'table',
'attributes': {
'name': table_data['name'],
'qualifiedName': f"{table_data['name']}.{table_data['schema']}.{table_data['database']}",
'description': table_data.get('description', ''),
'owner': table_data.get('owner', ''),
'tableType': table_data.get('type', 'EXTERNAL'),
'columns': [
{
'name': col['name'],
'type': col['type'],
'description': col.get('description', '')
}
for col in table_data.get('columns', [])
]
}
}
return self.client.entity.create_entity(table_def)
def get_downstream_impact(self, table_qualified_name: str) -> list:
"""Get all downstream dependencies."""
lineage = self.client.lineage.get_lineage(table_qualified_name)
downstream = []
for guid in lineage.get('guidEntityMap', {}).values():
if guid.get('lineageDirection') == 'OUTPUT'):
downstream.append(guid)
return downstream
Access Control
Row-Level Security
-- Snowflake Row Access Policies
-- Create row access policy
CREATE ROW ACCESS POLICY analytics.user_region_policy
AS (user_region VARCHAR) RETURNS BOOLEAN -
CASE
WHEN CURRENT_ROLE() IN ('ANALYST_ROLE', 'ADMIN_ROLE') THEN TRUE
WHEN CURRENT_ROLE() = 'SALES_ROLE' THEN
user_region = CURRENT_REGION()
WHEN CURRENT_ROLE() = 'MARKETING_ROLE' THEN
user_region IN ('US', 'EU', 'APAC')
ELSE FALSE
END;
-- Apply to table
ALTER TABLE analytics.customer_data
ADD ROW ACCESS POLICY analytics.user_region_policy
ON (region);
Column-Level Security
-- BigQuery Column-Level Security
CREATE TABLE analytics.customer_pii (
customer_id STRING,
email STRING OPTIONS (require_partition_filter=true),
phone STRING OPTIONS (labels=[('pii', 'true')]),
address STRING OPTIONS (labels=[('pii', 'true')]),
created_at TIMESTAMP
);
-- Column-level IAM
-- Grant access at column level
GRANT SELECT(customer_id, email, created_at)
ON analytics.customer_pii
TO ROLE analyst_role;
-- Deny PII columns
DENY SELECT(phone, address)
ON analytics.customer_pii
TO ROLE analyst_role;
Dynamic Masking
-- Dynamic Data Masking in Snowflake
-- Create masking policy
CREATE MASKING POLICY analytics.email_mask
AS (val VARCHAR) RETURNS VARCHAR ->
CASE
WHEN CURRENT_ROLE() IN ('ADMIN_ROLE', 'HR_ROLE') THEN val
ELSE CONCAT(LEFT(val, 2), '***', RIGHT(val, 4))
END;
-- Apply to column
ALTER TABLE analytics.users
ALTER COLUMN email
SET MASKING POLICY analytics.email_mask;
-- Apply to multiple columns
ALTER TABLE analytics.users
ALTER COLUMN phone
SET MASKING POLICY analytics.phone_mask;
Role-Based Access Control (RBAC)
# access_control.py
from enum import Enum
from dataclasses import dataclass
from typing import Set
class Permission(Enum):
READ = "read"
WRITE = "write"
DELETE = "delete"
ADMIN = "admin"
class Role(Enum):
DATA_ANALYST = "data_analyst"
DATA_ENGINEER = "data_engineer"
DATA_SCIENTIST = "data_scientist"
ANALYTICS_VIEWER = "analytics_viewer"
PII_ACCESS = "pii_access"
@dataclass
class DataAsset:
name: str
sensitivity: str # public, internal, confidential, restricted
owner: str
class AccessControl:
def __init__(self):
self.role_permissions = {
Role.DATA_ANALYST: {Permission.READ},
Role.DATA_SCIENTIST: {Permission.READ},
Role.DATA_ENGINEER: {Permission.READ, Permission.WRITE},
Role.ANALYTICS_VIEWER: {Permission.READ},
Role.PII_ACCESS: {Permission.READ}
}
self.sensitivity_restrictions = {
'restricted': {Role.PII_ACCESS, Role.DATA_ENGINEER},
'confidential': {Role.DATA_ENGINEER, Role.DATA_SCIENTIST, Role.DATA_ANALYST},
'internal': {Role.DATA_ENGINEER, Role.DATA_SCIENTIST, Role.DATA_ANALYST, Role.ANALYTICS_VIEWER},
'public': {Role.DATA_ENGINEER, Role.DATA_SCIENTIST, Role.DATA_ANALYST, Role.ANALYTICS_VIEWER, Role.PII_ACCESS}
}
def check_access(self, user_role: Role, asset: DataAsset, permission: Permission) -> bool:
"""Check if user has access to data asset."""
# Check basic permission
if permission not in self.role_permissions.get(user_role, set()):
return False
# Check sensitivity level
allowed_roles = self.sensitivity_restrictions.get(asset.sensitivity, set())
return user_role in allowed_roles
def get_accessible_assets(self, user_role: Role, all_assets: list) -> list:
"""Get all assets a role can access."""
accessible = []
for asset in all_assets:
if self.check_access(user_role, asset, Permission.READ):
accessible.append(asset)
return accessible
Column-Level Security with Views
-- Snowflake row-level and column-level security
-- Create masked columns view
CREATE OR REPLACE VIEW analytics.customer_orders_v AS
SELECT
order_id,
customer_id,
-- Mask PII columns for non-authorized users
CASE
WHEN CURRENT_ROLE() IN ('PII_ACCESS_ROLE', 'ADMIN_ROLE') THEN email
ELSE '***MASKED***'
END AS email,
CASE
WHEN CURRENT_ROLE() IN ('PII_ACCESS_ROLE', 'ADMIN_ROLE') THEN phone
ELSE NULL
END AS phone,
total_amount,
status,
created_at
FROM raw.customer_orders;
Implementation Checklist
- Document all data assets
- Implement data catalog
- Set up lineage tracking
- Define roles and permissions
- Configure column-level security
- Establish data retention policies
- Create compliance workflows
Data Quality
Great Expectations Integration
# great_expectations.yml
config_version: 3.0
datasources:
snowflake:
data_connectors:
default_runtime_data_connector_name:
class_name: RuntimeDataConnector
batch_identifiers:
- default_identifier_name
execution_engine:
class_name: SqlAlchemyExecutionEngine
connection_string: ${SNOWFLAKE_CONN}
stores:
expectations_store:
class_name: ExpectationsStore
store_backend:
class_name: TupleFilesystemStoreBackend
base_directory: expectations/
validations_store:
class_name: ValidationsStore
store_backend:
class_name: TupleFilesystemStoreBackend
base_directory: uncommitted/validations/
#!/usr/bin/env python3
"""Great Expectations data quality checks."""
import great_expectations as ge
# Create expectation suite
context = ge.get_context()
suite = context.add_expectation_suite("customer_data_quality")
# Add expectations
suite.add_expectation(
ExpectColumnValuesToNotBeNull(column="customer_id")
)
suite.add_expectation(
ExpectColumnValueLengthsToBeBetween(
column="email",
min_value=5,
max_value=100
)
)
suite.add_expectation(
ExpectColumnValuesToBeUnique(column="customer_id")
)
suite.add_expectation(
ExpectColumnValuesToMatchRegex(
column="email",
regex=r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'
)
)
# Validate data
df = context.read_sql("SELECT * FROM analytics.customers")
results = df.validate(
expectation_suite="customer_data_quality",
only_return_failures=False
)
print(f"Success: {results.success}")
print(f"Failed expectations: {len(results.results)}")
External Resources
- Apache Atlas
- Amundsen Data Catalog
- AWS Lake Formation
- Azure Purview
- DataHub
- Great Expectations
- OpenLineage
Comments