Skip to main content
โšก Calmops

Data Privacy: PII Detection, Masking, Anonymization

Introduction

Data privacy is no longer optionalโ€”it’s a legal requirement. With GDPR fines exceeding โ‚ฌ2 billion and CCPA enforcement increasing, proper data privacy implementation is critical.

Key Statistics:

  • Average GDPR fine: โ‚ฌ1.5 million
  • 67% of companies are not fully GDPR compliant
  • PII exposure costs average $165 per record
  • 80% of data breaches involve PII

PII Detection

Detection Patterns

#!/usr/bin/env python3
"""PII detection library."""

import re
from typing import List, Dict, Any

class PIIDetector:
    """Detect PII in text and data."""
    
    PATTERNS = {
        'email': r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
        'phone_us': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
        'phone_intl': r'\+?1?\d{9,15}',
        'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
        'credit_card': r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b',
        'ip_address': r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b',
        'date_of_birth': r'\b(0[1-9]|1[0-2])/(0[1-9]|[12]\d|3[01])/\d{4}\b',
        'passport': r'\b[A-Z]{1,2}\d{6,9}\b',
        'driver_license': r'\b[A-Z]{1,2}\d{5,8}\b'
    }
    
    def __init__(self):
        self.compiled_patterns = {
            name: re.compile(pattern, re.IGNORECASE)
            for name, pattern in self.PATTERNS.items()
        }
    
    def scan_text(self, text: str) -> List[Dict[str, Any]]:
        """Scan text for PII."""
        findings = []
        
        for pii_type, pattern in self.compiled_patterns.items():
            matches = pattern.finditer(text)
            for match in matches:
                findings.append({
                    'type': pii_type,
                    'value': match.group(),
                    'start': match.start(),
                    'end': match.end()
                })
        
        return findings
    
    def scan_dataframe(self, df):
        """Scan DataFrame columns for PII."""
        import pandas as pd
        
        pii_columns = {}
        
        for column in df.columns:
            column_lower = column.lower()
            
            # Check column name for PII indicators
            pii_indicators = ['email', 'phone', 'ssn', 'dob', 'birth', 
                            'address', 'name', 'passport', 'license']
            
            if any(indicator in column_lower for indicator in pii_indicators):
                pii_columns[column] = {
                    'likely_type': self._infer_pii_type(column),
                    'confidence': 'high'
                }
            else:
                # Sample-based detection
                sample = df[column].astype(str).head(100)
                pii_types = set()
                
                for value in sample:
                    findings = self.scan_text(value)
                    for f in findings:
                        pii_types.add(f['type'])
                
                if pii_types:
                    pii_columns[column] = {
                        'likely_type': list(pii_types),
                        'confidence': 'medium'
                    }
        
        return pii_columns
    
    def _infer_pii_type(self, column_name: str) -> str:
        """Infer PII type from column name."""
        name_lower = column_name.lower()
        
        if 'email' in name_lower:
            return 'email'
        elif 'phone' in name_lower:
            return 'phone'
        elif 'ssn' in name_lower:
            return 'ssn'
        elif any(x in name_lower for x in ['dob', 'birth']):
            return 'date_of_birth'
        elif 'passport' in name_lower:
            return 'passport'
        elif 'license' in name_lower:
            return 'driver_license'
        
        return 'unknown'

Data Masking

Static Masking

-- PostgreSQL Static Masking

-- Create masked view
CREATE VIEW customers_masked AS
SELECT
    customer_id,
    -- Email: mask username
    CASE 
        WHEN email IS NOT NULL 
        THEN CONCAT(
            SUBSTRING(email FROM 1 FOR 2),
            '***@',
            SUBSTRING(email FROM POSITION('@' IN email) + 1)
        )
    END AS email,
    -- Phone: mask last digits
    CASE 
        WHEN phone IS NOT NULL 
        THEN CONCAT('***-***-', RIGHT(phone, 4))
    END AS phone,
    -- SSN: mask first 5 digits
    CASE 
        WHEN ssn IS NOT NULL 
        THEN CONCAT('***-**-', RIGHT(ssn, 4))
    END AS ssn,
    -- Address: keep only city/state
    CONCAT(city, ', ', state) AS location,
    -- Amount: keep but mask precision
    ROUND(amount, -2) AS amount
FROM customers;

-- Column-level security
GRANT SELECT ON customers_masked TO analyst_role;
REVOKE SELECT ON customers FROM analyst_role;

Dynamic Masking

-- Snowflake Dynamic Masking

-- Create masking policy for email
CREATE MASKING POLICY email_mask
AS (val VARCHAR) RETURNS VARCHAR ->
  CASE
    WHEN CURRENT_ROLE() IN ('ANALYST_ROLE', 'HR_ROLE') THEN val
    WHEN CURRENT_ROLE() = 'SELF' THEN val  -- Users see their own data
    ELSE CONCAT(
        COALESCE(SUBSTRING(val, 1, 2), '**'),
        '***@',
        COALESCE(SUBSTRING(val, POSITION('@' IN val) + 1), '***')
    )
  END;

-- Apply to column
ALTER TABLE customers
ALTER COLUMN email
SET MASKING POLICY email_mask;

-- Conditional masking for SSN
CREATE MASKING POLICY ssn_mask
AS (val VARCHAR) RETURNS VARCHAR ->
  CASE
    WHEN CURRENT_ROLE() = 'ADMIN_ROLE' THEN val
    ELSE CONCAT('***-**-', RIGHT(val, 4))
  END;

Application-Level Masking

#!/usr/bin/env python3
"""Data masking utilities."""

import hashlib
import re
from typing import Any, Optional

class DataMasker:
    """Mask sensitive data."""
    
    @staticmethod
    def mask_email(email: str, show_chars: int = 2) -> str:
        """Mask email address."""
        if not email or '@' not in email:
            return '***@***.***'
        
        local, domain = email.split('@')
        masked_local = local[:show_chars] + '*' * (len(local) - show_chars)
        
        return f"{masked_local}@{domain}"
    
    @staticmethod
    def mask_phone(phone: str, visible_digits: int = 4) -> str:
        """Mask phone number."""
        if not phone:
            return '***-***-****'
        
        digits = re.sub(r'\D', '', phone)
        
        if len(digits) <= visible_digits:
            return '*' * len(digits)
        
        return '*' * (len(digits) - visible_digits) + digits[-visible_digits:]
    
    @staticmethod
    def mask_ssn(ssn: str) -> str:
        """Mask SSN."""
        if not ssn:
            return '***-**-****'
        
        return '***-**-' + ssn.replace('-', '')[-4:]
    
    @staticmethod
    def mask_credit_card(card: str, visible: int = 4) -> str:
        """Mask credit card."""
        if not card:
            return '**** **** **** ****'
        
        digits = re.sub(r'\D', '', card)
        
        return '*' * (len(digits) - visible) + digits[-visible:]
    
    @staticmethod
    def pseudonymize(value: str, salt: str = '') -> str:
        """Pseudonymize with consistent hashing."""
        if not value:
            return ''
        
        combined = f"{value}{salt}"
        hash_obj = hashlib.sha256(combined.encode())
        
        return hash_obj.hexdigest()[:16]

Anonymization

k-Anonymity

#!/usr/bin/env python3
"""K-anonymity implementation."""

import pandas as pd
from typing import List, Set

class KAnonymity:
    """Implement k-anonymity for datasets."""
    
    def __init__(self, k: int = 5):
        self.k = k
    
    def generalize_quasi_identifiers(self, df: pd.DataFrame, 
                                    qi_columns: List[str],
                                    hierarchies: dict = None) -> pd.DataFrame:
        """Generalize quasi-identifiers to achieve k-anonymity."""
        
        result = df.copy()
        
        for col in qi_columns:
            if hierarchies and col in hierarchies:
                result[col] = result[col].map(hierarchies[col])
            else:
                # Default generalization strategies
                if pd.api.types.is_numeric_dtype(result[col]):
                    # Bin into ranges
                    result[col] = pd.cut(result[col], bins=5)
                elif pd.api.types.is_datetime64_any_dtype(result[col]):
                    # Truncate to year/month
                    result[col] = result[col].dt.to_period('M')
                else:
                    # Mask first N characters
                    result[col] = result[col].astype(str).str[:2] + '*'
        
        return result
    
    def check_k_anonymity(self, df: pd.DataFrame, 
                         qi_columns: List[str]) -> pd.DataFrame:
        """Check k-anonymity for dataset."""
        
        # Group by quasi-identifiers
        groups = df.groupby(qi_columns).size().reset_index(name='count')
        
        # Check which groups meet k threshold
        groups['meets_k'] = groups['count'] >= self.k
        
        return groups
    
    def anonymize(self, df: pd.DataFrame,
                  qi_columns: List[str],
                  sensitive_columns: List[str],
                  k: int = 5) -> pd.DataFrame:
        """Apply full k-anonymity transformation."""
        
        result = df.copy()
        
        # Generalize quasi-identifiers
        result = self.generalize_quasi_identifiers(result, qi_columns)
        
        # Add noise to sensitive attributes
        for col in sensitive_columns:
            if pd.api.types.is_numeric_dtype(result[col]):
                noise = result[col].std() * 0.1
                result[col] = result[col] + pd.Series(
                    [0] * len(result)  # Add differential privacy
                )
        
        # Verify k-anonymity
        groups = self.check_k_anonymity(result, qi_columns)
        
        if not groups['meets_k'].all():
            # Need more generalization
            print(f"Warning: {len(groups[~groups['meets_k']])} groups don't meet k={k}")
        
        return result

Differential Privacy

#!/usr/bin/env python3
"""Differential privacy implementation."""

import numpy as np
from typing import Any

class DifferentialPrivacy:
    """Implement differential privacy."""
    
    def __init__(self, epsilon: float = 1.0):
        self.epsilon = epsilon
    
    def laplace_noise(self, sensitivity: float) -> float:
        """Generate Laplace noise for differential privacy."""
        scale = sensitivity / self.epsilon
        return np.random.laplace(0, scale)
    
    def add_noise_to_count(self, count: int, sensitivity: float = 1.0) -> int:
        """Add noise to a count query."""
        noisy_count = count + self.laplace_noise(sensitivity)
        return max(0, int(round(noisy_count)))
    
    def add_noise_to_sum(self, values: list, sensitivity: float = None) -> float:
        """Add noise to a sum query."""
        if sensitivity is None:
            sensitivity = max(values) - min(values)
        
        noisy_sum = sum(values) + self.laplace_noise(sensitivity)
        return noisy_sum
    
    def add_noise_to_mean(self, values: list) -> float:
        """Add noise to a mean query."""
        mean = np.mean(values)
        sensitivity = (max(values) - min(values)) / len(values)
        
        noisy_mean = mean + self.laplace_noise(sensitivity)
        return noisy_mean

Compliance Mapping

GDPR Rights

Right Implementation Technical Approach
Access Data Subject Access Request (DSAR) Export pipelines, audit logs
Rectification Update incorrect data Update pipelines, correction flows
Erasure Right to be forgotten Deletion pipelines, soft-delete
Portability Export in machine-readable format JSON/CSV export APIs
Processing Consent management Consent tracking, opt-out flows

CCPA Requirements

Requirement Implementation
Opt-out Do Not Sell My Personal Information
Disclosure Privacy policy, notice at collection
Non-discrimination Equal service regardless of opt-out

External Resources


Comments