Introduction
Healthcare APIs power the modern health data ecosystem. From patient access apps to clinical integrations, securing these APIs while enabling innovation requires careful implementation.
Key Statistics:
- 85% of healthcare organizations expose APIs
- API-related breaches increased 300% in 2024
- SMART on FHIR is now required for EHR certification
- 73% of patients want app access to their data
SMART on FHIR Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ SMART on FHIR Flow โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ Third โโโโโถโ Auth โโโโโถโ EHR โโโโโถโ Data โ โ
โ โ Party โ โ Server โ โ Server โ โ Return โ โ
โ โ App โ โ (OAuth2โ โ (FHIR โ โ โ โ
โ โโโโโโโโโโโ โ + OIDC)โ โ API) โ โโโโโโโโโโโ โ
โ โ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ โ
โ โผ โ
โ โโโโโโโโโโโโโโโ โ
โ โ Launch โ โ
โ โ Context โ โ
โ โ (Patient/ โ โ
โ โ Encounter)โ โ
โ โโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
OAuth2 Implementation
Authorization Server
#!/usr/bin/env python3
"""SMART on FHIR Authorization Server."""
from authlib.integrations.flask_oauth2 import AuthorizationServer
from authlib.oauth2.auth import OAuth2Request
from datetime import datetime, timedelta
class FHIRAuthorizationServer:
"""SMART on FHIR compliant authorization server."""
def __init__(self):
self.clients = {}
self.grants = {}
self.tokens = {}
def validate_client(self, client_id: str, client_secret: str = None) -> bool:
"""Validate registered client."""
client = self.clients.get(client_id)
if not client:
return False
if client_secret and client.get('client_secret') != client_secret:
return False
return True
def save_token(self, token: dict, request: OAuth2Request):
"""Save access token."""
token['client_id'] = request.client.client_id
token['user_id'] = request.user.id
token['created_at'] = datetime.now().isoformat()
token['expires_at'] = (datetime.now() +
timedelta(seconds=token.get('expires_in', 3600))).isoformat()
self.tokens[token['access_token']] = token
def get_default_scopes(self) -> list:
"""Get default SMART scopes."""
return [
'openid',
'fhirUser',
'launch/patient',
'patient/*.read',
'user/*.read'
]
def build_authorization_response(self, request: OAuth2Request) -> dict:
"""Build authorization response with SMART context."""
scopes = request.scope.split() if request.scope else self.get_default_scopes()
# Map to FHIR scopes
fhir_scopes = self._map_to_fhir_scopes(scopes)
return {
'authorization_code': generate_code(),
'state': request.state,
'scopes': scopes,
'fhir_scopes': fhir_scopes,
'patient_context': request.get('patient'), # From launch context
'encounter_context': request.get('encounter')
}
def _map_to_fhir_scopes(self, scopes: list) -> dict:
"""Map OAuth scopes to FHIR resource scopes."""
scope_mapping = {}
for scope in scopes:
if '/' in scope:
resource, action = scope.split('/')
if resource not in scope_mapping:
scope_mapping[resource] = []
scope_mapping[resource].append(action)
return scope_mapping
Token Endpoint
#!/usr/bin/env python3
"""SMART on FHIR token endpoint."""
def generate_tokens(client_id: str, user_id: str, scopes: list) -> dict:
"""Generate access and refresh tokens."""
import secrets
# Access token (short-lived)
access_token = secrets.token_urlsafe(32)
# Refresh token (long-lived)
refresh_token = secrets.token_urlsafe(32)
# ID token for OpenID Connect
id_token = create_jwt({
'sub': user_id,
'iss': 'https://fhir.example.com/auth',
'aud': client_id,
'exp': datetime.utcnow() + timedelta(hours=1),
'fhirContext': [
'http://hl7.org/fhir/StructureDefinition/Patient',
'http://hl7.org/fhir/StructureDefinition/Encounter'
]
})
return {
'access_token': access_token,
'token_type': 'Bearer',
'expires_in': 3600,
'refresh_token': refresh_token,
'id_token': id_token,
'scope': ' '.join(scopes),
'patient': get_patient_context(user_id, scopes)
}
FHIR Resource Security
Scope Enforcement
#!/usr/bin/env python3
"""FHIR scope enforcement."""
class FHIRScopeEnforcer:
"""Enforce SMART on FHIR scopes."""
# Resource to scope mapping
SCOPE_MAP = {
'patient': {
'Patient': 'patient/*.read',
'Observation': 'patient/Observation.read',
'MedicationRequest': 'patient/MedicationRequest.read'
},
'user': {
'Patient': 'user/*.read',
'Observation': 'user/Observation.read'
}
}
def __init__(self, scopes: list):
self.scopes = scopes
self.user_scopes = self._parse_user_scopes()
self.patient_scopes = self._parse_patient_scopes()
def can_read(self, resource_type: str, patient_id: str = None) -> bool:
"""Check if read is allowed."""
# Check user-level scopes
if f'user/{resource_type}.read' in self.user_scopes:
return True
# Check patient-level scopes
if f'patient/{resource_type}.read' in self.patient_scopes:
if patient_id:
return self._has_patient_access(patient_id)
return True
return False
def can_write(self, resource_type: str) -> bool:
"""Check if write is allowed."""
return (f'user/{resource_type}.write' in self.user_scopes or
f'patient/{resource_type}.write' in self.patient_scopes)
def filter_results(self, resources: list, resource_type: str) -> list:
"""Filter resources based on patient context."""
patient_scopes = self.patient_scopes
if not patient_scopes:
return resources
# Filter to accessible patients
allowed_patients = self._get_allowed_patients()
return [
r for r in resources
if r.get('resourceType') == resource_type and
r.get('id') in allowed_patients
]
def _parse_user_scopes(self) -> set:
"""Parse user-level scopes."""
return {s for s in self.scopes if s.startswith('user/')}
def _parse_patient_scopes(self) -> set:
"""Parse patient-level scopes."""
return {s for s in self.scopes if s.startswith('patient/')}
def _has_patient_access(self, patient_id: str) -> bool:
"""Check if patient access is allowed."""
# In practice, check against patient's assigned apps
return True # Simplified
def _get_allowed_patients(self) -> set:
"""Get list of allowed patient IDs."""
return set() # Implement based on patient context
API Gateway
Security Implementation
# Kong gateway configuration for FHIR API
_format_version: "3.0"
services:
- name: fhir-api
url: http://fhir-server:8080/fhir
routes:
- name: fhir-routes
paths:
- /fhir
methods:
- GET
- POST
- PUT
- DELETE
plugins:
# OAuth2/JWT validation
- name: jwt
config:
uri_param_names:
- jwt
claims_to_verify:
- exp
- iss
- aud
run_on_preflight: true
# Rate limiting per patient
- name: rate-limiting
config:
minute: 100
policy: redis
redis_host: redis
redis_port: 6379
fault_tolerant: true
hide_client_headers: false
# Request transformer
- name: request-transformer
config:
add:
headers:
- X-Forwarded-Proto:https
rename:
headers:
- from: X-Patient-Context
to: X-FHIR-Patient
# Response transformer
- name: response-transformer
config:
add:
headers:
- Access-Control-Expose-Headers:X-Total-Count
# IP restriction
- name: ip-restriction
config:
allow:
- 10.0.0.0/8
- 172.16.0.0/12
# Bot detection
- name: bot-detection
config:
allow: []
deny:
- curl
- wget
consumers:
- username: patient-portal
jwt:
key: "test-key"
algorithm: RS256
- username: clinical-app
jwt:
key: "test-key"
algorithm: RS256
Audit Logging
#!/usr/bin/env python3
"""FHIR API audit logging."""
import json
from datetime import datetime
class FHIRAuditLogger:
"""Log FHIR API access for compliance."""
def __init__(self, audit_store):
self.audit_store = audit_store
def log_request(self, request: dict, response: dict,
user_context: dict):
"""Log FHIR API access."""
audit_entry = {
'timestamp': datetime.utcnow().isoformat(),
# Who
'user': {
'id': user_context.get('user_id'),
'role': user_context.get('role'),
'organization': user_context.get('organization')
},
# What
'action': {
'resource_type': request.get('resource_type'),
'resource_id': request.get('resource_id'),
'method': request.get('method'),
'scopes': user_context.get('scopes', [])
},
# Where
'source': {
'ip': request.get('source_ip'),
'user_agent': request.get('user_agent'),
'application': user_context.get('client_id')
},
# Outcome
'outcome': {
'status_code': response.get('status_code'),
'resources_returned': response.get('resource_count', 0)
}
}
self.audit_store.save(audit_entry)
# Check for access violations
if self._is_violation(audit_entry):
self._trigger_security_alert(audit_entry)
def _is_violation(self, audit_entry: dict) -> bool:
"""Detect potential security violations."""
# Check for excessive access
if audit_entry['outcome']['resources_returned'] > 10000:
return True
# Check for unusual patterns
# Implement anomaly detection
return False
def _trigger_security_alert(self, audit_entry: dict):
"""Trigger security alert."""
print(f"SECURITY ALERT: {audit_entry}")
# Send to SIEM, notify security team
Comments