Skip to main content
โšก Calmops

Healthcare API Security: OAuth2, SMART on FHIR

Introduction

Healthcare APIs power the modern health data ecosystem. From patient access apps to clinical integrations, securing these APIs while enabling innovation requires careful implementation.

Key Statistics:

  • 85% of healthcare organizations expose APIs
  • API-related breaches increased 300% in 2024
  • SMART on FHIR is now required for EHR certification
  • 73% of patients want app access to their data

SMART on FHIR Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    SMART on FHIR Flow                                โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                  โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”‚
โ”‚  โ”‚ Third   โ”‚โ”€โ”€โ”€โ–ถโ”‚ Auth   โ”‚โ”€โ”€โ”€โ–ถโ”‚  EHR    โ”‚โ”€โ”€โ”€โ–ถโ”‚  Data   โ”‚     โ”‚
โ”‚  โ”‚ Party   โ”‚    โ”‚ Server โ”‚    โ”‚  Server โ”‚    โ”‚  Return โ”‚     โ”‚
โ”‚  โ”‚ App     โ”‚    โ”‚ (OAuth2โ”‚    โ”‚ (FHIR   โ”‚    โ”‚         โ”‚     โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ”‚  + OIDC)โ”‚    โ”‚  API)   โ”‚    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜     โ”‚
โ”‚       โ”‚         โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                      โ”‚
โ”‚       โ”‚                                                      โ”‚
โ”‚       โ–ผ                                                      โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                                              โ”‚
โ”‚  โ”‚ Launch      โ”‚                                              โ”‚
โ”‚  โ”‚ Context     โ”‚                                              โ”‚
โ”‚  โ”‚ (Patient/  โ”‚                                              โ”‚
โ”‚  โ”‚  Encounter)โ”‚                                              โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                                              โ”‚
โ”‚                                                                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

OAuth2 Implementation

Authorization Server

#!/usr/bin/env python3
"""SMART on FHIR Authorization Server."""

from authlib.integrations.flask_oauth2 import AuthorizationServer
from authlib.oauth2.auth import OAuth2Request
from datetime import datetime, timedelta

class FHIRAuthorizationServer:
    """SMART on FHIR compliant authorization server."""
    
    def __init__(self):
        self.clients = {}
        self.grants = {}
        self.tokens = {}
    
    def validate_client(self, client_id: str, client_secret: str = None) -> bool:
        """Validate registered client."""
        
        client = self.clients.get(client_id)
        
        if not client:
            return False
        
        if client_secret and client.get('client_secret') != client_secret:
            return False
        
        return True
    
    def save_token(self, token: dict, request: OAuth2Request):
        """Save access token."""
        
        token['client_id'] = request.client.client_id
        token['user_id'] = request.user.id
        token['created_at'] = datetime.now().isoformat()
        token['expires_at'] = (datetime.now() + 
                              timedelta(seconds=token.get('expires_in', 3600))).isoformat()
        
        self.tokens[token['access_token']] = token
    
    def get_default_scopes(self) -> list:
        """Get default SMART scopes."""
        
        return [
            'openid',
            'fhirUser',
            'launch/patient',
            'patient/*.read',
            'user/*.read'
        ]
    
    def build_authorization_response(self, request: OAuth2Request) -> dict:
        """Build authorization response with SMART context."""
        
        scopes = request.scope.split() if request.scope else self.get_default_scopes()
        
        # Map to FHIR scopes
        fhir_scopes = self._map_to_fhir_scopes(scopes)
        
        return {
            'authorization_code': generate_code(),
            'state': request.state,
            'scopes': scopes,
            'fhir_scopes': fhir_scopes,
            'patient_context': request.get('patient'),  # From launch context
            'encounter_context': request.get('encounter')
        }
    
    def _map_to_fhir_scopes(self, scopes: list) -> dict:
        """Map OAuth scopes to FHIR resource scopes."""
        
        scope_mapping = {}
        
        for scope in scopes:
            if '/' in scope:
                resource, action = scope.split('/')
                if resource not in scope_mapping:
                    scope_mapping[resource] = []
                scope_mapping[resource].append(action)
        
        return scope_mapping

Token Endpoint

#!/usr/bin/env python3
"""SMART on FHIR token endpoint."""

def generate_tokens(client_id: str, user_id: str, scopes: list) -> dict:
    """Generate access and refresh tokens."""
    
    import secrets
    
    # Access token (short-lived)
    access_token = secrets.token_urlsafe(32)
    
    # Refresh token (long-lived)
    refresh_token = secrets.token_urlsafe(32)
    
    # ID token for OpenID Connect
    id_token = create_jwt({
        'sub': user_id,
        'iss': 'https://fhir.example.com/auth',
        'aud': client_id,
        'exp': datetime.utcnow() + timedelta(hours=1),
        'fhirContext': [
            'http://hl7.org/fhir/StructureDefinition/Patient',
            'http://hl7.org/fhir/StructureDefinition/Encounter'
        ]
    })
    
    return {
        'access_token': access_token,
        'token_type': 'Bearer',
        'expires_in': 3600,
        'refresh_token': refresh_token,
        'id_token': id_token,
        'scope': ' '.join(scopes),
        'patient': get_patient_context(user_id, scopes)
    }

FHIR Resource Security

Scope Enforcement

#!/usr/bin/env python3
"""FHIR scope enforcement."""

class FHIRScopeEnforcer:
    """Enforce SMART on FHIR scopes."""
    
    # Resource to scope mapping
    SCOPE_MAP = {
        'patient': {
            'Patient': 'patient/*.read',
            'Observation': 'patient/Observation.read',
            'MedicationRequest': 'patient/MedicationRequest.read'
        },
        'user': {
            'Patient': 'user/*.read',
            'Observation': 'user/Observation.read'
        }
    }
    
    def __init__(self, scopes: list):
        self.scopes = scopes
        self.user_scopes = self._parse_user_scopes()
        self.patient_scopes = self._parse_patient_scopes()
    
    def can_read(self, resource_type: str, patient_id: str = None) -> bool:
        """Check if read is allowed."""
        
        # Check user-level scopes
        if f'user/{resource_type}.read' in self.user_scopes:
            return True
        
        # Check patient-level scopes
        if f'patient/{resource_type}.read' in self.patient_scopes:
            if patient_id:
                return self._has_patient_access(patient_id)
            return True
        
        return False
    
    def can_write(self, resource_type: str) -> bool:
        """Check if write is allowed."""
        
        return (f'user/{resource_type}.write' in self.user_scopes or
                f'patient/{resource_type}.write' in self.patient_scopes)
    
    def filter_results(self, resources: list, resource_type: str) -> list:
        """Filter resources based on patient context."""
        
        patient_scopes = self.patient_scopes
        
        if not patient_scopes:
            return resources
        
        # Filter to accessible patients
        allowed_patients = self._get_allowed_patients()
        
        return [
            r for r in resources 
            if r.get('resourceType') == resource_type and
               r.get('id') in allowed_patients
        ]
    
    def _parse_user_scopes(self) -> set:
        """Parse user-level scopes."""
        
        return {s for s in self.scopes if s.startswith('user/')}
    
    def _parse_patient_scopes(self) -> set:
        """Parse patient-level scopes."""
        
        return {s for s in self.scopes if s.startswith('patient/')}
    
    def _has_patient_access(self, patient_id: str) -> bool:
        """Check if patient access is allowed."""
        
        # In practice, check against patient's assigned apps
        return True  # Simplified
    
    def _get_allowed_patients(self) -> set:
        """Get list of allowed patient IDs."""
        
        return set()  # Implement based on patient context

API Gateway

Security Implementation

# Kong gateway configuration for FHIR API
_format_version: "3.0"

services:
  - name: fhir-api
    url: http://fhir-server:8080/fhir
    routes:
      - name: fhir-routes
        paths:
          - /fhir
        methods:
          - GET
          - POST
          - PUT
          - DELETE
    plugins:
      # OAuth2/JWT validation
      - name: jwt
        config:
          uri_param_names:
            - jwt
          claims_to_verify:
            - exp
            - iss
            - aud
          run_on_preflight: true
      
      # Rate limiting per patient
      - name: rate-limiting
        config:
          minute: 100
          policy: redis
          redis_host: redis
          redis_port: 6379
          fault_tolerant: true
          hide_client_headers: false
      
      # Request transformer
      - name: request-transformer
        config:
          add:
            headers:
              - X-Forwarded-Proto:https
          rename:
            headers:
              - from: X-Patient-Context
                to: X-FHIR-Patient
      
      # Response transformer
      - name: response-transformer
        config:
          add:
            headers:
              - Access-Control-Expose-Headers:X-Total-Count
      
      # IP restriction
      - name: ip-restriction
        config:
          allow:
            - 10.0.0.0/8
            - 172.16.0.0/12
      
      # Bot detection
      - name: bot-detection
        config:
          allow: []
          deny:
            - curl
            - wget

consumers:
  - username: patient-portal
    jwt:
      key: "test-key"
      algorithm: RS256

  - username: clinical-app
    jwt:
      key: "test-key"
      algorithm: RS256

Audit Logging

#!/usr/bin/env python3
"""FHIR API audit logging."""

import json
from datetime import datetime

class FHIRAuditLogger:
    """Log FHIR API access for compliance."""
    
    def __init__(self, audit_store):
        self.audit_store = audit_store
    
    def log_request(self, request: dict, response: dict, 
                   user_context: dict):
        """Log FHIR API access."""
        
        audit_entry = {
            'timestamp': datetime.utcnow().isoformat(),
            
            # Who
            'user': {
                'id': user_context.get('user_id'),
                'role': user_context.get('role'),
                'organization': user_context.get('organization')
            },
            
            # What
            'action': {
                'resource_type': request.get('resource_type'),
                'resource_id': request.get('resource_id'),
                'method': request.get('method'),
                'scopes': user_context.get('scopes', [])
            },
            
            # Where
            'source': {
                'ip': request.get('source_ip'),
                'user_agent': request.get('user_agent'),
                'application': user_context.get('client_id')
            },
            
            # Outcome
            'outcome': {
                'status_code': response.get('status_code'),
                'resources_returned': response.get('resource_count', 0)
            }
        }
        
        self.audit_store.save(audit_entry)
        
        # Check for access violations
        if self._is_violation(audit_entry):
            self._trigger_security_alert(audit_entry)
    
    def _is_violation(self, audit_entry: dict) -> bool:
        """Detect potential security violations."""
        
        # Check for excessive access
        if audit_entry['outcome']['resources_returned'] > 10000:
            return True
        
        # Check for unusual patterns
        # Implement anomaly detection
        
        return False
    
    def _trigger_security_alert(self, audit_entry: dict):
        """Trigger security alert."""
        
        print(f"SECURITY ALERT: {audit_entry}")
        # Send to SIEM, notify security team

External Resources


Comments