Introduction
Data privacy is no longer optionalโit’s a legal requirement. With GDPR fines exceeding โฌ2 billion and CCPA enforcement increasing, proper data privacy implementation is critical.
Key Statistics:
- Average GDPR fine: โฌ1.5 million
- 67% of companies are not fully GDPR compliant
- PII exposure costs average $165 per record
- 80% of data breaches involve PII
PII Detection
Detection Patterns
#!/usr/bin/env python3
"""PII detection library."""
import re
from typing import List, Dict, Any
class PIIDetector:
"""Detect PII in text and data."""
PATTERNS = {
'email': r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
'phone_us': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
'phone_intl': r'\+?1?\d{9,15}',
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
'credit_card': r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b',
'ip_address': r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b',
'date_of_birth': r'\b(0[1-9]|1[0-2])/(0[1-9]|[12]\d|3[01])/\d{4}\b',
'passport': r'\b[A-Z]{1,2}\d{6,9}\b',
'driver_license': r'\b[A-Z]{1,2}\d{5,8}\b'
}
def __init__(self):
self.compiled_patterns = {
name: re.compile(pattern, re.IGNORECASE)
for name, pattern in self.PATTERNS.items()
}
def scan_text(self, text: str) -> List[Dict[str, Any]]:
"""Scan text for PII."""
findings = []
for pii_type, pattern in self.compiled_patterns.items():
matches = pattern.finditer(text)
for match in matches:
findings.append({
'type': pii_type,
'value': match.group(),
'start': match.start(),
'end': match.end()
})
return findings
def scan_dataframe(self, df):
"""Scan DataFrame columns for PII."""
import pandas as pd
pii_columns = {}
for column in df.columns:
column_lower = column.lower()
# Check column name for PII indicators
pii_indicators = ['email', 'phone', 'ssn', 'dob', 'birth',
'address', 'name', 'passport', 'license']
if any(indicator in column_lower for indicator in pii_indicators):
pii_columns[column] = {
'likely_type': self._infer_pii_type(column),
'confidence': 'high'
}
else:
# Sample-based detection
sample = df[column].astype(str).head(100)
pii_types = set()
for value in sample:
findings = self.scan_text(value)
for f in findings:
pii_types.add(f['type'])
if pii_types:
pii_columns[column] = {
'likely_type': list(pii_types),
'confidence': 'medium'
}
return pii_columns
def _infer_pii_type(self, column_name: str) -> str:
"""Infer PII type from column name."""
name_lower = column_name.lower()
if 'email' in name_lower:
return 'email'
elif 'phone' in name_lower:
return 'phone'
elif 'ssn' in name_lower:
return 'ssn'
elif any(x in name_lower for x in ['dob', 'birth']):
return 'date_of_birth'
elif 'passport' in name_lower:
return 'passport'
elif 'license' in name_lower:
return 'driver_license'
return 'unknown'
Data Masking
Static Masking
-- PostgreSQL Static Masking
-- Create masked view
CREATE VIEW customers_masked AS
SELECT
customer_id,
-- Email: mask username
CASE
WHEN email IS NOT NULL
THEN CONCAT(
SUBSTRING(email FROM 1 FOR 2),
'***@',
SUBSTRING(email FROM POSITION('@' IN email) + 1)
)
END AS email,
-- Phone: mask last digits
CASE
WHEN phone IS NOT NULL
THEN CONCAT('***-***-', RIGHT(phone, 4))
END AS phone,
-- SSN: mask first 5 digits
CASE
WHEN ssn IS NOT NULL
THEN CONCAT('***-**-', RIGHT(ssn, 4))
END AS ssn,
-- Address: keep only city/state
CONCAT(city, ', ', state) AS location,
-- Amount: keep but mask precision
ROUND(amount, -2) AS amount
FROM customers;
-- Column-level security
GRANT SELECT ON customers_masked TO analyst_role;
REVOKE SELECT ON customers FROM analyst_role;
Dynamic Masking
-- Snowflake Dynamic Masking
-- Create masking policy for email
CREATE MASKING POLICY email_mask
AS (val VARCHAR) RETURNS VARCHAR ->
CASE
WHEN CURRENT_ROLE() IN ('ANALYST_ROLE', 'HR_ROLE') THEN val
WHEN CURRENT_ROLE() = 'SELF' THEN val -- Users see their own data
ELSE CONCAT(
COALESCE(SUBSTRING(val, 1, 2), '**'),
'***@',
COALESCE(SUBSTRING(val, POSITION('@' IN val) + 1), '***')
)
END;
-- Apply to column
ALTER TABLE customers
ALTER COLUMN email
SET MASKING POLICY email_mask;
-- Conditional masking for SSN
CREATE MASKING POLICY ssn_mask
AS (val VARCHAR) RETURNS VARCHAR ->
CASE
WHEN CURRENT_ROLE() = 'ADMIN_ROLE' THEN val
ELSE CONCAT('***-**-', RIGHT(val, 4))
END;
Application-Level Masking
#!/usr/bin/env python3
"""Data masking utilities."""
import hashlib
import re
from typing import Any, Optional
class DataMasker:
"""Mask sensitive data."""
@staticmethod
def mask_email(email: str, show_chars: int = 2) -> str:
"""Mask email address."""
if not email or '@' not in email:
return '***@***.***'
local, domain = email.split('@')
masked_local = local[:show_chars] + '*' * (len(local) - show_chars)
return f"{masked_local}@{domain}"
@staticmethod
def mask_phone(phone: str, visible_digits: int = 4) -> str:
"""Mask phone number."""
if not phone:
return '***-***-****'
digits = re.sub(r'\D', '', phone)
if len(digits) <= visible_digits:
return '*' * len(digits)
return '*' * (len(digits) - visible_digits) + digits[-visible_digits:]
@staticmethod
def mask_ssn(ssn: str) -> str:
"""Mask SSN."""
if not ssn:
return '***-**-****'
return '***-**-' + ssn.replace('-', '')[-4:]
@staticmethod
def mask_credit_card(card: str, visible: int = 4) -> str:
"""Mask credit card."""
if not card:
return '**** **** **** ****'
digits = re.sub(r'\D', '', card)
return '*' * (len(digits) - visible) + digits[-visible:]
@staticmethod
def pseudonymize(value: str, salt: str = '') -> str:
"""Pseudonymize with consistent hashing."""
if not value:
return ''
combined = f"{value}{salt}"
hash_obj = hashlib.sha256(combined.encode())
return hash_obj.hexdigest()[:16]
Anonymization
k-Anonymity
#!/usr/bin/env python3
"""K-anonymity implementation."""
import pandas as pd
from typing import List, Set
class KAnonymity:
"""Implement k-anonymity for datasets."""
def __init__(self, k: int = 5):
self.k = k
def generalize_quasi_identifiers(self, df: pd.DataFrame,
qi_columns: List[str],
hierarchies: dict = None) -> pd.DataFrame:
"""Generalize quasi-identifiers to achieve k-anonymity."""
result = df.copy()
for col in qi_columns:
if hierarchies and col in hierarchies:
result[col] = result[col].map(hierarchies[col])
else:
# Default generalization strategies
if pd.api.types.is_numeric_dtype(result[col]):
# Bin into ranges
result[col] = pd.cut(result[col], bins=5)
elif pd.api.types.is_datetime64_any_dtype(result[col]):
# Truncate to year/month
result[col] = result[col].dt.to_period('M')
else:
# Mask first N characters
result[col] = result[col].astype(str).str[:2] + '*'
return result
def check_k_anonymity(self, df: pd.DataFrame,
qi_columns: List[str]) -> pd.DataFrame:
"""Check k-anonymity for dataset."""
# Group by quasi-identifiers
groups = df.groupby(qi_columns).size().reset_index(name='count')
# Check which groups meet k threshold
groups['meets_k'] = groups['count'] >= self.k
return groups
def anonymize(self, df: pd.DataFrame,
qi_columns: List[str],
sensitive_columns: List[str],
k: int = 5) -> pd.DataFrame:
"""Apply full k-anonymity transformation."""
result = df.copy()
# Generalize quasi-identifiers
result = self.generalize_quasi_identifiers(result, qi_columns)
# Add noise to sensitive attributes
for col in sensitive_columns:
if pd.api.types.is_numeric_dtype(result[col]):
noise = result[col].std() * 0.1
result[col] = result[col] + pd.Series(
[0] * len(result) # Add differential privacy
)
# Verify k-anonymity
groups = self.check_k_anonymity(result, qi_columns)
if not groups['meets_k'].all():
# Need more generalization
print(f"Warning: {len(groups[~groups['meets_k']])} groups don't meet k={k}")
return result
Differential Privacy
#!/usr/bin/env python3
"""Differential privacy implementation."""
import numpy as np
from typing import Any
class DifferentialPrivacy:
"""Implement differential privacy."""
def __init__(self, epsilon: float = 1.0):
self.epsilon = epsilon
def laplace_noise(self, sensitivity: float) -> float:
"""Generate Laplace noise for differential privacy."""
scale = sensitivity / self.epsilon
return np.random.laplace(0, scale)
def add_noise_to_count(self, count: int, sensitivity: float = 1.0) -> int:
"""Add noise to a count query."""
noisy_count = count + self.laplace_noise(sensitivity)
return max(0, int(round(noisy_count)))
def add_noise_to_sum(self, values: list, sensitivity: float = None) -> float:
"""Add noise to a sum query."""
if sensitivity is None:
sensitivity = max(values) - min(values)
noisy_sum = sum(values) + self.laplace_noise(sensitivity)
return noisy_sum
def add_noise_to_mean(self, values: list) -> float:
"""Add noise to a mean query."""
mean = np.mean(values)
sensitivity = (max(values) - min(values)) / len(values)
noisy_mean = mean + self.laplace_noise(sensitivity)
return noisy_mean
Compliance Mapping
GDPR Rights
| Right | Implementation | Technical Approach |
|---|---|---|
| Access | Data Subject Access Request (DSAR) | Export pipelines, audit logs |
| Rectification | Update incorrect data | Update pipelines, correction flows |
| Erasure | Right to be forgotten | Deletion pipelines, soft-delete |
| Portability | Export in machine-readable format | JSON/CSV export APIs |
| Processing | Consent management | Consent tracking, opt-out flows |
CCPA Requirements
| Requirement | Implementation |
|---|---|
| Opt-out | Do Not Sell My Personal Information |
| Disclosure | Privacy policy, notice at collection |
| Non-discrimination | Equal service regardless of opt-out |
Comments