Skip to main content
โšก Calmops

Healthcare Analytics: HIPAA-Compliant Data Warehousing

Introduction

Healthcare analytics requires balancing insight with privacy. With HIPAA compliance and PHI protection non-negotiable, building analytical infrastructure requires specialized knowledge.

Key Statistics:

  • Healthcare data breaches increased 55% in 2024
  • Average HIPAA fine: $1.5 million
  • Healthcare analytics market: $50B by 2027
  • Proper de-identification reduces risk by 95%

HIPAA Compliance Framework

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    HIPAA Compliance Requirements                    โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                  โ”‚
โ”‚  Technical Safeguards (164.312)                                      โ”‚
โ”‚  โ”œโ”€โ”€ Access Control (unique user ID, auto-logoff)              โ”‚
โ”‚  โ”œโ”€โ”€ Audit Controls (logging, monitoring)                       โ”‚
โ”‚  โ”œโ”€โ”€ Integrity (PHI not improperly altered)                     โ”‚
โ”‚  โ”œโ”€โ”€ Transmission Security (encryption)                         โ”‚
โ”‚  โ””โ”€โ”€ Person or Entity Authentication                           โ”‚
โ”‚                                                                  โ”‚
โ”‚  Administrative Safeguards (164.308)                                โ”‚
โ”‚  โ”œโ”€โ”€ Security Management Process                                โ”‚
โ”‚  โ”œโ”€โ”€ Workforce Security                                        โ”‚
โ”‚  โ”œโ”€โ”€ Information Access Management                              โ”‚
โ”‚  โ””โ”€โ”€ Risk Analysis                                              โ”‚
โ”‚                                                                  โ”‚
โ”‚  Physical Safeguards (164.310)                                      โ”‚
โ”‚  โ”œโ”€โ”€ Facility Access Controls                                   โ”‚
โ”‚  โ””โ”€โ”€ Workstation Use & Security                                 โ”‚
โ”‚                                                                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Data Model

Healthcare Data Model

-- Healthcare data warehouse schema

-- Patient dimension (de-identified)
CREATE TABLE dim_patient (
    patient_key SERIAL PRIMARY KEY,
    patient_id STRING,  -- De-identified
    birth_year INT,
    gender STRING,
    race STRING,
    ethnicity STRING,
    zip_code STRING,  -- First 3 digits only
    effective_date DATE,
    expiration_date DATE,
    current_flag BOOLEAN DEFAULT TRUE
);

-- Provider dimension
CREATE TABLE dim_provider (
    provider_key SERIAL PRIMARY KEY,
    provider_id STRING,
    provider_type STRING,
    specialty STRING,
    facility_name STRING,
    state STRING,
    effective_date DATE
);

-- Encounter fact
CREATE TABLE fact_encounters (
    encounter_key SERIAL PRIMARY KEY,
    patient_key INT REFERENCES dim_patient(patient_key),
    provider_key INT REFERENCES dim_provider(provider_key),
    encounter_date DATE,
    encounter_type STRING,
    diagnosis_codes STRING[],
    procedure_codes STRING[],
    admission_type STRING,
    discharge_status STRING,
    length_of_stay INT,
    total_charges DECIMAL(12,2)
);

-- Diagnosis dimension
CREATE TABLE dim_diagnosis (
    diagnosis_key SERIAL PRIMARY KEY,
    icd10_code STRING,
    diagnosis_description STRING,
    diagnosis_category STRING,
    is_primary BOOLEAN
);

-- Procedure dimension
CREATE TABLE dim_procedure (
    procedure_key SERIAL PRIMARY KEY,
    cpt_code STRING,
    procedure_description STRING,
    procedure_category STRING
);

PHI De-identification

Safe Harbor Implementation

#!/usr/bin/env python3
"""HIPAA Safe Harbor de-identification."""

import hashlib
import re
from datetime import datetime, date
from typing import Any, Dict, List

class HIPAADeidentifier:
    """Apply HIPAA Safe Harbor de-identification."""
    
    # 18 identifiers that must be removed
    DIRECT_IDENTIFIERS = [
        'name', 'geographic_subdivision', 'dates', 'phone', 'fax',
        'email', 'ssn', 'medical_record', 'health_plan', 
        'account_number', 'license_number', 'vehicle_id',
        'device_id', 'url', 'ip_address', 'biometric', 'full_photo',
        'any_unique_identifier'
    ]
    
    def deidentify_record(self, record: Dict) -> Dict:
        """De-identify a single record."""
        
        deidentified = {}
        
        for key, value in record.items():
            if key.lower() in self.DIRECT_IDENTIFIERS:
                # Remove or transform
                deidentified[key] = self._transform_identifier(key, value)
            elif key.lower() in ['zip', 'zip_code']:
                # Keep only first 3 digits
                deidentified[key] = self._truncate_zip(value)
            elif key.lower() in ['birth_date', 'dob', 'date_of_birth']:
                # Remove or shift to year
                deidentified[key] = self._generalize_date(value)
            else:
                # Keep as-is (assuming not PHI)
                deidentified[key] = value
        
        return deidentified
    
    def _transform_identifier(self, key: str, value: Any) -> Any:
        """Transform direct identifier."""
        if value is None:
            return None
        
        # Hash with salt
        salt = os.environ.get('DEID_SALT', 'default_salt')
        return hashlib.sha256(f"{value}{salt}".encode()).hexdigest()[:16]
    
    def _truncate_zip(self, zip_code: str) -> str:
        """Truncate ZIP to first 3 digits."""
        if not zip_code:
            return None
        
        # Remove non-digits
        digits = re.sub(r'\D', '', zip_code)
        
        if len(digits) >= 3:
            return digits[:3] + '00'
        
        return '00000'
    
    def _generalize_date(self, date_value: Any) -> Any:
        """Generalize date to year or age range."""
        if isinstance(date_value, date):
            return date_value.year
        elif isinstance(date_value, str):
            try:
                dt = datetime.strptime(date_value, '%Y-%m-%d')
                return dt.year
            except:
                return None
        
        return None

class ExpertDetermination:
    """Expert determination method (less common, more rigorous)."""
    
    def __init__(self, expert):
        self.expert = expert
    
    def evaluate_disclosure(self, data, public_release):
        """Evaluate if disclosure poses minimal risk."""
        
        # Expert must:
        # 1. Apply statistical/scientific principles
        # 2. Document methodology
        # 3. Certify analysis
        
        risk_score = self.expert.analyze_risk(data, public_release)
        
        return {
            'meets_standard': risk_score < 0.1,  # <10% re-identification risk
            'risk_score': risk_score,
            'methodology': self.expert.get_methodology(),
            'certification': self.expert.certify()
        }

HIPAA-Compliant Infrastructure

Secure Data Pipeline

# HIPAA-compliant data pipeline
terraform {
  required_version = ">= 1.0"
  
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
  }
}

# Encrypted S3 bucket for PHI
resource "aws_s3_bucket" "phi_storage" {
  bucket = "hipaa-compliant-phi-storage"
  
  server_side_encryption_configuration {
    rule {
      apply_server_side_encryption_by_default {
        sse_algorithm = "AES256"
      }
    }
  }
  
  versioning {
    enabled = true
  }
  
  lifecycle_rule {
    id     = "encryption-required"
    prefix = ""
    enabled = true
    
    lifecycle_rule {
      noncurrent_version_transition {
        noncurrent_days = 30
        storage_class   = "GLACIER"
      }
      
      expiration {
        days = 2555  # 7 years for HIPAA
      }
    }
  }
}

# Bucket policy - encryption required
resource "aws_s3_bucket_policy" "phi_policy" {
  bucket = aws_s3_bucket.phi_storage.id
  
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
       Sid       = "RequireEncryption"
        Effect   = "Deny"
        Principal = "*"
        Action   = "s3:PutObject"
        Resource = "${aws_s3_bucket.phi_storage.arn}/*"
        Condition = {
          Bool = {
            "aws:SecureTransport": "false"
          }
        }
      }
    ]
  })
}

# CloudTrail logging
resource "aws_cloudtrail" "phi_audit" {
  name           = "hipaa-compliant-trail"
  s3_bucket_name = aws_s3_bucket.audit_logs.id
  is_multi_region_trail = true
  
  event_selector {
    read_write_type = "All"
    include_management_events = true
    
    data_resource {
      type   = "AWS::S3::Object"
      values = ["${aws_s3_bucket.phi_storage.arn}/*"]
    }
  }
}

Access Control

#!/usr/bin/env python3
"""HIPAA-compliant access control."""

from typing import List, Set

class HIPAAAccessControl:
    """Implement minimum necessary access."""
    
    def __init__(self):
        self.role_permissions = {}
    
    def define_role(self, role: str, permissions: Set[str]):
        """Define role with specific permissions."""
        
        # Ensure minimum necessary
        if role == 'analyst':
            # Analysts get aggregated, not individual PHI
            assert 'patient_details' not in permissions
            assert 'demographics' not in permissions
        
        self.role_permissions[role] = permissions
    
    def check_access(self, user_role: str, data_class: str, action: str) -> bool:
        """Check if user can access data."""
        
        required_perm = f"{data_class}:{action}"
        
        return required_perm in self.role_permissions.get(user_role, set())
    
    def audit_access(self, user: str, role: str, data_accessed: List[str]):
        """Log all data access for audit."""
        
        print(f"AUDIT: User={user} Role={role} Accessed={data_accessed}")
        
        # Store in audit log (encrypted, retained 6 years)
        audit_log = {
            'timestamp': datetime.now().isoformat(),
            'user': user,
            'role': role,
            'data_classes': data_accessed,
            'action': 'read'
        }
        
        store_audit_log(audit_log)

Healthcare Analytics Queries

-- Patient population analysis (de-identified)
SELECT 
    birth_year_decade,
    gender,
    race,
    COUNT(DISTINCT patient_id) AS patient_count,
    AVG(length_of_stay) AS avg_length_of_stay,
    SUM(total_charges) AS total_charges
FROM fact_encounters e
JOIN dim_patient p ON e.patient_key = p.patient_key
WHERE encounter_date >= '2025-01-01'
  AND current_flag = TRUE
GROUP BY 
    (p.birth_year / 10) * 10,
    p.gender,
    p.race
ORDER BY patient_count DESC;

-- Diagnosis trends (no PHI)
SELECT 
    d.diagnosis_category,
    encounter_date,
    COUNT(*) AS encounter_count,
    COUNT(DISTINCT patient_id) AS unique_patients
FROM fact_encounters e
JOIN dim_diagnosis d ON e.primary_diagnosis_key = d.diagnosis_key
WHERE encounter_date >= DATE_TRUNC('year', CURRENT_DATE - INTERVAL '1 year')
GROUP BY d.diagnosis_category, encounter_date
ORDER BY encounter_count DESC;

-- Readmission analysis
WITH index_admissions AS (
    SELECT 
        patient_id,
        encounter_date AS admission_date,
        encounter_key
    FROM fact_encounters
    WHERE encounter_type = 'inpatient'
),
readmissions AS (
    SELECT 
        a.patient_id,
        a.admission_date AS index_date,
        b.admission_date AS readmit_date,
        DATEDIFF('day', a.admission_date, b.admission_date) AS days_to_readmit
    FROM index_admissions a
    JOIN index_admissions b ON a.patient_id = b.patient_id
        AND b.admission_date > a.admission_date
        AND DATEDIFF('day', a.admission_date, b.admission_date) <= 30
)
SELECT 
    COUNT(DISTINCT patient_id) AS readmitted_patients,
    COUNT(*) AS total_readmissions,
    AVG(days_to_readmit) AS avg_days_to_readmit
FROM readmissions;

External Resources


Comments