Introduction
Data governance is the foundation of a data-driven organization. Without proper governance, you risk regulatory penalties, data quality issues, and lost trust. This guide covers building a comprehensive data governance program.
Key Statistics:
- 68% of organizations cite data governance as a top priority
- Proper governance reduces data retrieval time by 40%
- GDPR violations average $4.2 million
- Data catalogs improve analyst productivity by 30%
Data Governance Framework
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Data Governance Framework โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โ โ Policy โ โ Standards โ โ Processes โ โ
โ โ Definition โ โ & Models โ โ & Workflows โ โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โ โ โ โ โ
โ โโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโ โ
โ โผ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Technology & Tools โ โ
โ โ โโโโโโโโโโ โโโโโโโโโโ โโโโโโโโโโ โโโโโโโโโโ โ โ
โ โ โCatalog โ โLineage โ โQuality โ โAccess โ โ โ
โ โ โ โ โ โ โ โ โControl โ โ โ
โ โ โโโโโโโโโโ โโโโโโโโโโ โโโโโโโโโโ โโโโโโโโโโ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Data Catalog Implementation
Amundsen Data Catalog
# docker-compose.yml for Amundsen
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.0
environment:
- discovery.type=single-node
- ES_JAVA_OPTS=-Xms2g -Xmx2g
ports:
- "9200:9200"
volumes:
- es_data:/usr/share/elasticsearch/data
amundsenmetadata:
image: amundsen/metadata:3.5.0
environment:
- LOGS_FORMAT=json
- SPRING_ES_HOSTS=elasticsearch:9200
ports:
- "5002:5002"
amundsenfrontend:
image: amundsen/frontend:3.5.0
environment:
- FRONTEND_SVC_CONFIG=localDevelopment
ports:
- "5000:5000"
depends_on:
- amundsenmetadata
amundsendatabuilder:
image: amundsen/databuilder:3.5.0
environment:
- LOGS_FORMAT=json
- ES_HOSTS=elasticsearch:9200
- METADATA_HOST=amundsenmetadata
volumes:
es_data:
#!/usr/bin/env python3
"""Amundsen data producer example."""
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.task.task import DefaultTask
from databuilder.publisher.sql_alchemy_table_publisher import SQLAlchemyTablePublisher
# Example: Publish table metadata to Amundsen
def publish_table_metadata():
"""Publish table metadata to Amundsen."""
table_metadata = TableMetadata(
database='snowflake',
cluster='production',
schema='analytics',
name='customer_orders',
description='Customer order data with transaction details',
column_names=[
'customer_id',
'order_id',
'order_date',
'amount',
'status'
],
column_descriptions=[
'Unique customer identifier',
'Unique order identifier',
'Date of order',
'Total order amount',
'Order status (pending, shipped, delivered)'
],
tags=['customer', 'orders', 'analytics'],
owners=['[email protected]']
)
return table_metadata
DataHub Integration
# DataHub ingestion configuration
source:
type: snowflake
config:
account_id: "xy12345.us-east-1"
username: "${SNOWFLAKE_USER}"
password: "${SNOWFLAKE_PASSWORD}"
warehouse: "ANALYTICS_WH"
database: "ANALYTICS"
sink:
type: datahub-rest
config:
server: "http://datahub-gms:8080"
token: "${DATAHUB_TOKEN}"
transformers:
- type: "datahub_transformer.add_dataset_business_owner"
config:
business_owner: "[email protected]"
business_owner_email: "[email protected]"
Data Lineage
OpenLineage Implementation
#!/usr/bin/env python3
"""OpenLineage integration for data lineage."""
from openlineage.client import OpenLineageClient
from openlineage.client.run import RunEvent, RunState
from datetime import datetime
client = OpenLineageClient.from_api_key("your-api-key")
def track_dagster_job():
"""Track Dagster job lineage."""
run_id = "job-12345"
job_name = "daily_sales_aggregation"
namespace = "analytics"
# Emit start event
start_event = RunEvent(
eventType=RunState.START,
eventTime=datetime.now().isoformat(),
run=RunRunState(runId=run_id),
job=JobRunState(name=job_name, namespace=namespace),
inputs=[
InputDataset(
namespace="snowflake://analytics",
name="raw.sales.transactions",
facets={
"schema": SchemaDatasetFacet(
fields=[
SchemaField(name="transaction_id", type="STRING"),
SchemaField(name="amount", type="DECIMAL"),
SchemaField(name="created_at", type="TIMESTAMP")
]
)
}
)
],
outputs=[
OutputDataset(
namespace="snowflake://analytics",
name="analytics.daily_sales",
facets={
"schema": SchemaDatasetFacet(
fields=[
SchemaField(name="date", type="DATE"),
SchemaField(name="total_sales", type="DECIMAL"),
SchemaField(name="transaction_count", type="INTEGER")
]
)
}
)
]
)
client.emit(start_event)
-- Query lineage in Snowflake
SELECT
upstreams.table_name as source_table,
upstreams.column_name as source_column,
downstreams.table_name as target_table,
downstreams.column_name as target_column,
t.transform_type,
t.transform_logic
FROM analytics.column_lineage t
JOIN analytics.table_metadata upstreams
ON t.upstream_table_id = upstreams.id
JOIN analytics.table_metadata downstreams
ON t.downstream_table_id = downstreams.id
WHERE downstreams.table_name = 'daily_sales';
Access Control
Row-Level Security
-- Snowflake Row Access Policies
-- Create row access policy
CREATE ROW ACCESS POLICY analytics.user_region_policy
AS (user_region VARCHAR) RETURNS BOOLEAN -
CASE
WHEN CURRENT_ROLE() IN ('ANALYST_ROLE', 'ADMIN_ROLE') THEN TRUE
WHEN CURRENT_ROLE() = 'SALES_ROLE' THEN
user_region = CURRENT_REGION()
WHEN CURRENT_ROLE() = 'MARKETING_ROLE' THEN
user_region IN ('US', 'EU', 'APAC')
ELSE FALSE
END;
-- Apply to table
ALTER TABLE analytics.customer_data
ADD ROW ACCESS POLICY analytics.user_region_policy
ON (region);
Column-Level Security
-- BigQuery Column-Level Security
CREATE TABLE analytics.customer_pii (
customer_id STRING,
email STRING OPTIONS (require_partition_filter=true),
phone STRING OPTIONS (labels=[('pii', 'true')]),
address STRING OPTIONS (labels=[('pii', 'true')]),
created_at TIMESTAMP
);
-- Column-level IAM
-- Grant access at column level
GRANT SELECT(customer_id, email, created_at)
ON analytics.customer_pii
TO ROLE analyst_role;
-- Deny PII columns
DENY SELECT(phone, address)
ON analytics.customer_pii
TO ROLE analyst_role;
Dynamic Masking
-- Dynamic Data Masking in Snowflake
-- Create masking policy
CREATE MASKING POLICY analytics.email_mask
AS (val VARCHAR) RETURNS VARCHAR ->
CASE
WHEN CURRENT_ROLE() IN ('ADMIN_ROLE', 'HR_ROLE') THEN val
ELSE CONCAT(LEFT(val, 2), '***', RIGHT(val, 4))
END;
-- Apply to column
ALTER TABLE analytics.users
ALTER COLUMN email
SET MASKING POLICY analytics.email_mask;
-- Apply to multiple columns
ALTER TABLE analytics.users
ALTER COLUMN phone
SET MASKING POLICY analytics.phone_mask;
Data Quality
Great Expectations Integration
# great_expectations.yml
config_version: 3.0
datasources:
snowflake:
data_connectors:
default_runtime_data_connector_name:
class_name: RuntimeDataConnector
batch_identifiers:
- default_identifier_name
execution_engine:
class_name: SqlAlchemyExecutionEngine
connection_string: ${SNOWFLAKE_CONN}
stores:
expectations_store:
class_name: ExpectationsStore
store_backend:
class_name: TupleFilesystemStoreBackend
base_directory: expectations/
validations_store:
class_name: ValidationsStore
store_backend:
class_name: TupleFilesystemStoreBackend
base_directory: uncommitted/validations/
#!/usr/bin/env python3
"""Great Expectations data quality checks."""
import great_expectations as ge
# Create expectation suite
context = ge.get_context()
suite = context.add_expectation_suite("customer_data_quality")
# Add expectations
suite.add_expectation(
ExpectColumnValuesToNotBeNull(column="customer_id")
)
suite.add_expectation(
ExpectColumnValueLengthsToBeBetween(
column="email",
min_value=5,
max_value=100
)
)
suite.add_expectation(
ExpectColumnValuesToBeUnique(column="customer_id")
)
suite.add_expectation(
ExpectColumnValuesToMatchRegex(
column="email",
regex=r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'
)
)
# Validate data
df = context.read_sql("SELECT * FROM analytics.customers")
results = df.validate(
expectation_suite="customer_data_quality",
only_return_failures=False
)
print(f"Success: {results.success}")
print(f"Failed expectations: {len(results.results)}")
Comments