Skip to main content
โšก Calmops

Open Banking APIs: PSD2, Open Finance Implementation

Introduction

Open Banking has transformed financial services by enabling regulated third-party providers to access customer financial data and initiate payments. This article covers PSD2 compliance, API standards, and practical implementation patterns.

Key Statistics:

  • PSD2 covers 27 EU countries
  • UK Open Banking: 250+ regulated providers
  • Open Banking transactions: โ‚ฌ6.5B annually (EU)
  • 71% of European consumers use Open Banking

Regulatory Framework

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    Open Banking Regulatory Landscape                     โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                  โ”‚
โ”‚  European Union                                                   โ”‚
โ”‚  โ”œโ”€โ”€ PSD2 (Payment Services Directive 2)                       โ”‚
โ”‚  โ”œโ”€โ”€ PSD3 (Proposed)                                            โ”‚
โ”‚  โ”œโ”€โ”€ GDPR (Data Protection)                                    โ”‚
โ”‚  โ””โ”€โ”€ RTS (Regulatory Technical Standards)                      โ”‚
โ”‚                                                                  โ”‚
โ”‚  United Kingdom                                                   โ”‚
โ”‚  โ”œโ”€โ”€ CMA Order (Competition and Markets Authority)             โ”‚
โ”‚  โ”œโ”€โ”€ FCA Handbook                                               โ”‚
โ”‚  โ””โ”€โ”€ Open Banking Implementation Entity (OBIE)                 โ”‚
โ”‚                                                                  โ”‚
โ”‚  United States                                                   โ”‚
โ”‚  โ”œโ”€โ”€ CFPB Rule 1033 (Consumer Data Right)                      โ”‚
โ”‚  โ”œโ”€โ”€ State-level banking laws                                   โ”‚
โ”‚  โ””โ”€โ”€ CCPA (California)                                           โ”‚
โ”‚                                                                  โ”‚
โ”‚  Other Regions                                                   โ”‚
โ”‚  โ”œโ”€โ”€ Australia (CDR - Consumer Data Right)                     โ”‚
โ”‚  โ”œโ”€โ”€ Canada (Open Banking)                                      โ”‚
โ”‚  โ”œโ”€โ”€ Singapore (MAS Open API)                                  โ”‚
โ”‚  โ””โ”€โ”€ Japan (Open Banking)                                       โ”‚
โ”‚                                                                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

PSD2 Requirements

Roles and Permissions

#!/usr/bin/env python3
"""PSD2 roles and authorization scopes."""

from enum import Enum
from dataclasses import dataclass
from typing import List, Optional

class PSD2Role(Enum):
    """PSD2 defined roles."""
    
    # Account Information Service (read-only)
    AIS = "aisp"
    
    # Payment Initiation Service (initiate payments)
    PISP = "pisp"
    
    # Card-Based Payment Instrument Issuer
    CBPII = "cbpii"

@dataclass
class AccountAccess:
    """Account access permissions."""
    
    accounts: List[str]          # Specific accounts or "all"
    balances: bool = True         # Read balances
    transactions: bool = True     # Read transactions
    standing_orders: bool = False
    scheduled_payments: bool = False
    parties: bool = False        # Account parties/owners

@dataclass
class PSD2Consent:
    """PSD2 consent for TPP access."""
    
    consent_id: str
    consent_sequence: int
    access: AccountAccess
    recurring_indicator: bool
    valid_until: str              # ISO 8601 date
    frequency_per_day: int
    combined_service_indicator: bool  # For combined AIS+PISP

class PSD2PermissionManager:
    """Manage PSD2 permissions and scopes."""
    
    # OAuth 2.0 scopes per PSD2
    SCOPES = {
        'accounts': 'ReadAccountsBasic',
        'accounts-detail': 'ReadAccountsDetail',
        'balances': 'ReadBalances',
        'transactions': 'ReadTransactionsBasic',
        'transactions-detail': 'ReadTransactionsDetail',
        'standing-orders': 'ReadStandingOrders',
        'scheduled-payments': 'ReadScheduledPayments',
        'parties': 'ReadParties',
        'payment-initiation': 'WritePayments',
    }
    
    def __init__(self, role: PSD2Role):
        self.role = role
    
    def build_scopes(self, access: AccountAccess) -> List[str]:
        """Build OAuth scopes from access permissions."""
        
        scopes = []
        
        if access.accounts:
            scopes.append(self.SCOPES['accounts'])
        
        if access.balances:
            scopes.append(self.SCOPES['balances'])
        
        if access.transactions:
            scopes.append(self.SCOPES['transactions'])
        
        if access.standing_orders:
            scopes.append(self.SCOPES['standing-orders'])
        
        if access.scheduled_payments:
            scopes.append(self.SCOPES['scheduled-payments'])
        
        if access.parties:
            scopes.append(self.SCOPES['parties'])
        
        return scopes
    
    def validate_consent(self, consent: PSD2Consent) -> tuple[bool, str]:
        """Validate PSD2 consent."""
        
        # Check role permissions
        if self.role == PSD2Role.AIS:
            if hasattr(consent, 'payment_initiation'):
                return False, "AIS cannot request payment initiation"
        
        if self.role == PSD2Role.PISP:
            if consent.access.transactions:
                return False, "PISP cannot request transaction access"
        
        # Check consent validity
        if consent.valid_until:
            from datetime import datetime
            expiry = datetime.fromisoformat(consent.valid_until.replace('Z', '+00:00'))
            if datetime.now(expiry.tzinfo) > expiry:
                return False, "Consent has expired"
        
        return True, "OK"

API Standards

Berlin Group NextGenPSD2

#!/usr/bin/env python3
"""Berlin Group NextGenPSD2 API implementation."""

from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime
import hashlib
import hmac

@dataclass
class AccountReference:
    """Account identification."""
    
    iban: Optional[str] = None
    bban: Optional[str] = None
    msisdn: Optional[str] = None
    mask: Optional[str] = None  # Masked PAN for cards

@dataclass
class Transaction:
    """Transaction details."""
    
    transaction_id: str
    account_id: str
    date: str
    booking_date: str
    value_date: Optional[str]
    amount: float
    currency: str
    creditor_name: Optional[str]
    creditor_account: Optional[AccountReference]
    debtor_name: Optional[str]
    debtor_account: Optional[AccountReference]
    purpose: Optional[str]
    mandate_id: Optional[str]
    proprietary_bank_transaction_code: Optional[str]

@dataclass
class Balance:
    """Account balance."""
    
    balance_amount: float
    balance_currency: str
    balance_type: str  # "closingBooked", "openingBooked", "interimAvailable"

class NextGenPSD2API:
    """Berlin Group NextGenPSD2 API client."""
    
    BASE_URL = "https://api.bank.example.com/v1"
    
    def __init__(self, api_client_id: str, api_secret: str):
        self.client_id = api_client_id
        self.client_secret = api_secret
        self.access_token: Optional[str] = None
    
    def get_accounts(self, consent_id: str) -> List[Dict]:
        """GET /accounts - List accounts."""
        
        # Request
        # GET /v1/accounts
        # Authorization: Bearer {access_token}
        # X-Request-ID: {uuid}
        # Consent-ID: {consent_id}
        
        response = self._request(
            'GET',
            '/accounts',
            headers={'Consent-ID': consent_id}
        )
        
        return response.get('accounts', [])
    
    def get_account_balance(self, account_id: str, 
                            consent_id: str) -> List[Balance]:
        """GET /accounts/{account-id}/balances - Get balances."""
        
        response = self._request(
            'GET',
            f'/accounts/{account_id}/balances',
            headers={'Consent-ID': consent_id}
        )
        
        balances = []
        for bal in response.get('balances', []):
            balances.append(Balance(
                balance_amount=float(bal['balanceAmount']['amount']),
                balance_currency=bal['balanceAmount']['currency'],
                balance_type=bal['balanceType']
            ))
        
        return balances
    
    def get_transactions(self, account_id: str, consent_id: str,
                        date_from: Optional[str] = None,
                        date_to: Optional[str] = None) -> List[Transaction]:
        """GET /accounts/{account-id}/transactions - Get transactions."""
        
        params = {}
        if date_from:
            params['dateFrom'] = date_from
        if date_to:
            params['dateTo'] = date_to
        
        response = self._request(
            'GET',
            f'/accounts/{account_id}/transactions',
            params=params,
            headers={'Consent-ID': consent_id}
        )
        
        transactions = []
        for txn in response.get('transactions', []):
            transactions.append(self._parse_transaction(txn))
        
        return transactions
    
    def _parse_transaction(self, txn: Dict) -> Transaction:
        """Parse transaction from API response."""
        
        amount = float(txn['transactionAmount']['amount'])
        currency = txn['transactionAmount']['currency']
        
        return Transaction(
            transaction_id=txn.get('transactionId', ''),
            account_id=txn.get('accountId', ''),
            date=txn.get('date', ''),
            booking_date=txn.get('bookingDate', ''),
            value_date=txn.get('valueDate'),
            amount=amount,
            currency=currency,
            creditor_name=txn.get('creditorName'),
            creditor_account=txn.get('creditorAccount'),
            debtor_name=txn.get('debtorName'),
            debtor_account=txn.get('debtorAccount'),
            purpose=txn.get('purpose'),
            mandate_id=txn.get('mandateId'),
            proprietary_bank_transaction_code=txn.get(
                'proprietaryBankTransactionCode'
            )
        )
    
    def initiate_payment(self, consent_id: str, 
                        payment_request: Dict) -> Dict:
        """POST /payments - Initiate payment."""
        
        # Request
        # POST /v1/payments/sepa-credit-transfers
        # Authorization: Bearer {access_token}
        # X-Request-ID: {uuid}
        # Consent-ID: {consent_id}
        # Content-Type: application/json
        
        # {
        #   "instructedAmount": {
        #     "amount": "100.00",
        #     "currency": "EUR"
        #   },
        #   "debtorAccount": {
        #     "iban": "DE1234567890"
        #   },
        #   "creditorAccount": {
        #     "iban": "DE0987654321"
        #   },
        #   "creditorName": "Merchant Ltd",
        #   "remittanceInformationUnstructured": "Ref 12345"
        # }
        
        response = self._request(
            'POST',
            '/payments/sepa-credit-transfers',
            json=payment_request,
            headers={'Consent-ID': consent_id}
        )
        
        return response
    
    def _request(self, method: str, path: str, **kwargs) -> Dict:
        """Make authenticated API request."""
        
        # Implementation would use requests library
        # Includes OAuth 2.0 token management
        # Signing with OAuth 1.0a for older APIs
        
        return {"accounts": [], "transactions": []}

UK Open Banking Standards

# UK Open Banking API Configuration
open_banking:
  api_version: "v3.1"
  base_url: "https://api.openbanking.org.uk"
  
  endpoints:
    accounts:
      - "/accounts"
      - "/accounts/{AccountId}"
      - "/accounts/{AccountId}/balances"
      - "/accounts/{AccountId}/transactions"
      - "/accounts/{AccountId}/direct-debits"
      - "/accounts/{AccountId}/standing-orders"
      - "/accounts/{AccountId}/scheduled-payments"
      
    payments:
      - "/domestic-payments"
      - "/domestic-scheduled-payments"
      - "/domestic-standing-orders"
      - "/international-payments"
      
    events:
      - "/events"
      - "/events/{EventSubscriptionId}"
  
  authentication:
    method: "OAuth 2.0"
    signing: "JWT"
    token_endpoint: "https://auth.openbanking.org.uk/token"
    authorization_endpoint: "https://auth.openbanking.org.uk/authorise"
  
  data_types:
    account:
      - "AccountId"
      - "Nickname"
      - "AccountType"
      - "AccountSubType"
      - "Currency"
      - "AccountNumber"
      
    transaction:
      - "TransactionId"
      - "AccountId"
      - "Amount"
      - "CreditDebitIndicator"
      - "Status"
      - "BookingDateTime"
      - "ValueDateTime"

OAuth 2.0 Implementation

Authorization Flow

#!/usr/bin/env python3
"""Open Banking OAuth 2.0 authorization flow."""

import uuid
import hashlib
import base64
from urllib.parse import urlencode, urljoin
from dataclasses import dataclass
from typing import Optional, Dict

@dataclass
class AuthorizationRequest:
    """OAuth 2.0 authorization request."""
    
    response_type: str = "code"  # or "code id_token" for hybrid
    client_id: str
    redirect_uri: str
    scope: str
    state: str
    nonce: str
    response_mode: str = "jwt"
    prompt: Optional[str] = None

class OpenBankingAuth:
    """Handle Open Banking OAuth 2.0 flows."""
    
    def __init__(self, issuer_url: str, client_id: str, 
                 client_secret: str, redirect_uri: str):
        self.issuer_url = issuer_url
        self.client_id = client_id
        self.client_secret = client_secret
        self.redirect_uri = redirect_uri
    
    def build_authorization_url(self, consent: Dict) -> str:
        """Build OAuth authorization URL."""
        
        # Generate PKCE challenge
        code_verifier = self._generate_code_verifier()
        code_challenge = self._generate_code_challenge(code_verifier)
        
        # Store verifier for token exchange
        self._cache_code_verifier(code_verifier)
        
        # Build authorization request
        params = {
            'response_type': 'code id_token',
            'client_id': self.client_id,
            'redirect_uri': self.redirect_uri,
            'scope': 'openid accounts payments',
            'state': str(uuid.uuid4()),
            'nonce': str(uuid.uuid4()),
            'response_mode': 'jwt',
            'code_challenge': code_challenge,
            'code_challenge_method': 'S256',
            'consent_id': consent.get('consent_id'),
        }
        
        auth_url = urljoin(self.issuer_url, 'authorize')
        return f"{auth_url}?{urlencode(params)}"
    
    def _generate_code_verifier(self) -> str:
        """Generate PKCE code verifier."""
        
        # 32-128 characters, URL-safe
        return base64.urlsafe_b64encode(
            uuid.uuid4().bytes + uuid.uuid4().bytes
        ).decode('utf-8').rstrip('=')[:128]
    
    def _generate_code_challenge(self, verifier: str) -> str:
        """Generate PKCE code challenge from verifier."""
        
        # SHA256 hash, base64url encode
        digest = hashlib.sha256(verifier.encode('utf-8')).digest()
        return base64.urlsafe_b64encode(digest).decode('utf-8').rstrip('=')
    
    def _cache_code_verifier(self, verifier: str):
        """Cache code verifier (use Redis in production)."""
        
        # In production, store in secure session/Redis
        # with client_id as key
        self._code_verifier = verifier
    
    def exchange_code_for_token(self, auth_code: str) -> Dict:
        """Exchange authorization code for tokens."""
        
        import requests
        
        token_url = urljoin(self.issuer_url, 'token')
        
        data = {
            'grant_type': 'authorization_code',
            'code': auth_code,
            'redirect_uri': self.redirect_uri,
            'client_id': self.client_id,
            'code_verifier': getattr(self, '_code_verifier', ''),
        }
        
        # Basic auth or client secret
        auth = (self.client_id, self.client_secret)
        
        response = requests.post(token_url, data=data, auth=auth)
        response.raise_for_status()
        
        tokens = response.json()
        
        return {
            'access_token': tokens.get('access_token'),
            'id_token': tokens.get('id_token'),
            'refresh_token': tokens.get('refresh_token'),
            'expires_in': tokens.get('expires_in'),
            'token_type': tokens.get('token_type'),
        }
    
    def refresh_access_token(self, refresh_token: str) -> Dict:
        """Refresh expired access token."""
        
        import requests
        
        token_url = urljoin(self.issuer_url, 'token')
        
        data = {
            'grant_type': 'refresh_token',
            'refresh_token': refresh_token,
            'client_id': self.client_id,
        }
        
        response = requests.post(token_url, data=data)
        response.raise_for_status()
        
        return response.json()

Strong Customer Authentication (SCA)

Implementation

#!/usr/bin/env python3
"""Strong Customer Authentication (SCA) for Open Banking."""

import hashlib
import hmac
import json
from typing import Dict, Optional
from dataclasses import dataclass
from enum import Enum

class SCAAuthenticationMethod(Enum):
    """SCA authentication methods."""
    
    SCA_METHOD_SCA_RELYING_PARTY = "SCA_RP"
    SCA_METHOD_OTP = "OTP"
    SCA_METHOD_OTHER = "OTHER"

class SCABase(Enum):
    """SCA base classes."""
    
    BASE_1 = "01"  # Single transaction
    BASE_2 = "02"  # Recurring payments (same amount)
    BASE_3 = "03"  # Recurring payments (variable amount)
    BASE_4 = "04"  # Account listing

@dataclass
class AuthenticationObject:
    """SCA authentication method."""
    
    authentication_method: str
    authentication_version: str
    authentication_data: str

@dataclass
class SCACredential:
    """SCA credential (e.g., OTP)."""
    
    credential_id: Optional[str]
    credential_data: str

class OpenBankingSCA:
    """Handle SCA for Open Banking payments."""
    
    # Authentication approaches
    APPROACH_REDIRECT = "redirect"
    APPROACH_DECOUPLED = "decoupled"
    APPROACH_EMBEDDED = "embedded"
    
    def __init__(self, aspsp_id: str):
        self.aspsp_id = aspsp_id
    
    def build_sca_authentication(self, payment: Dict, 
                                 approach: str = APPROACH_REDIRECT) -> Dict:
        """Build SCA authentication request."""
        
        # Determine authentication approach
        if approach == self.APPROACH_REDIRECT:
            return self._build_redirect_auth(payment)
        elif approach == self.APPROACH_DECOUPLED:
            return self._build_decoupled_auth(payment)
        elif approach == self.APPROACH_EMBEDDED:
            return self._build_embedded_auth(payment)
    
    def _build_redirect_auth(self, payment: Dict) -> Dict:
        """Build redirect authentication flow."""
        
        return {
            "scaApproach": self.APPROACH_REDIRECT,
            "authenticationMethods": [
                {
                    "authenticationMethod": "SCA_METHOD_SCA_RELYING_PARTY",
                    "authenticationVersion": "1.0",
                    "authenticationData": ""
                }
            ],
            "scaStatus": "started",
            "redirect": {
                "redirectURI": "https://bank.example.com/auth",
                "expirationDateTime": self._get_expiry_time(5),
            }
        }
    
    def _build_decoupled_auth(self, payment: Dict) -> Dict:
        """Build decoupled authentication (e.g., mobile app)."""
        
        return {
            "scaApproach": self.APPROACH_DECOUPLED,
            "authenticationMethods": [
                {
                    "authenticationMethod": "SCA_METHOD_OTP",
                    "authenticationVersion": "1.0",
                    "authenticationData": ""
                }
            ],
            "scaStatus": "started",
            "decoupled": {
                "issuer": "Bank Mobile App",
                "expirationDateTime": self._get_expiry_time(5),
                "info": "Please authenticate in your mobile app"
            }
        }
    
    def _build_embedded_auth(self, payment: Dict) -> Dict:
        """Build embedded authentication flow."""
        
        return {
            "scaApproach": self.APPROACH_EMBEDDED,
            "authenticationMethods": [
                {
                    "authenticationMethod": "SCA_METHOD_OTP",
                    "authenticationVersion": "1.0",
                    "authenticationData": ""
                }
            ],
            "scaStatus": "started",
            "chosenSCAMethod": {
                "authenticationMethod": "SCA_METHOD_OTP"
            }
        }
    
    def _get_expiry_time(self, minutes: int) -> str:
        """Calculate expiry timestamp."""
        
        from datetime import datetime, timedelta
        expiry = datetime.utcnow() + timedelta(minutes=minutes)
        return expiry.strftime('%Y-%m-%dT%H:%M:%SZ')
    
    def verify_sca_result(self, sca_result: Dict) -> tuple[bool, str]:
        """Verify SCA result from ASPSP."""
        
        # Check SCA status
        sca_status = sca_result.get('scaStatus')
        
        if sca_status == 'exempted':
            return True, "SCA exempted"
        
        if sca_status == 'failed':
            return False, "SCA failed"
        
        if sca_status == 'attempted':
            # Verify authentication data
            if self._verify_authentication(sca_result):
                return True, "SCA successful"
        
        if sca_status == 'finalised':
            return True, "SCA completed"
        
        return False, "Unknown SCA status"
    
    def _verify_authentication(self, sca_result: Dict) -> bool:
        """Verify authentication data."""
        
        # Verify signature/JWT
        # Check timestamp
        # Verify authentication method
        
        return True
    
    def apply_sca_exemption(self, payment: Dict, 
                           exemption_type: str) -> Dict:
        """Apply SCA exemption (limited cases)."""
        
        valid_exemptions = [
            'low_value',
            'trusted_beneficiary',
            'recurring',
            'corporate',
            'secure_corporate'
        ]
        
        if exemption_type not in valid_exemptions:
            raise ValueError(f"Invalid exemption type: {exemption_type}")
        
        return {
            "scaExemptionType": exemption_type,
            "scaStatus": "exempted"
        }

TPP Integration

Registration and Onboarding

#!/usr/bin/env python3
"""Third Party Provider (TPP) registration and onboarding."""

import requests
from typing import Dict, List, Optional
from dataclasses import dataclass

@dataclass
class TPPDetails:
    """TPP registration details."""
    
    name: str
    registration_id: str
    role: str  # AISP, PISP, CBPII
    email: str
    website: str
    org_id: str  # Legal entity identifier

@dataclass
class CertificateDetails:
    """QWAC/QSealC certificate details."""
    
    certificate: str
    certificate_type: str  # QWAC or QSealC
    validity_start: str
    validity_end: str

class TPPManager:
    """Manage TPP registration and access."""
    
    def __init__(self, api_base_url: str, tpp_details: TPPDetails):
        self.api_base_url = api_base_url
        self.tpp = tpp_details
        self.certificate: Optional[CertificateDetails] = None
    
    def register_with_nca(self, national_competent_authority: str) -> Dict:
        """Register TPP with National Competent Authority."""
        
        registration_data = {
            "name": self.tpp.name,
            "registration_id": self.tpp.registration_id,
            "role": self.tpp.role,
            "email": self.tpp.email,
            "website": self.tpp.website,
            "org_id": self.tpp.org_id,
            "country": national_competent_authority,
        }
        
        # Submit to NCA register
        # POST https://register.nca.example.gov
        
        return {
            "status": "registered",
            "registration_number": "TPP-REG-001",
            "effective_date": "2026-01-01"
        }
    
    def setup_qsealc_certificate(self, cert_path: str) -> CertificateDetails:
        """Setup QSealC certificate for signing."""
        
        # In production: Load certificate from secure storage
        # Validate certificate chain
        # Check NCA registration
        
        self.certificate = CertificateDetails(
            certificate=cert_path,
            certificate_type="QSealC",
            validity_start="2026-01-01",
            validity_end="2028-01-01"
        )
        
        return self.certificate
    
    def setup_qwac_certificate(self, cert_path: str) -> CertificateDetails:
        """Setup QWAC certificate for TLS."""
        
        self.certificate = CertificateDetails(
            certificate=cert_path,
            certificate_type="QWAC",
            validity_start="2026-01-01",
            validity_end="2028-01-01"
        )
        
        return self.certificate
    
    def create_api_client(self, bank_id: str) -> Dict:
        """Create API client for specific bank."""
        
        # Register with bank's developer portal
        # Obtain client credentials
        # Configure redirect URIs
        
        return {
            "client_id": f"tpp-{bank_id}-{self.tpp.registration_id}",
            "client_secret": "generated-secret",
            "bank_id": bank_id,
            "registered_redirect_uris": [
                "https://tpp.example.com/callback"
            ],
            "status": "active"
        }
    
    def generate_client_assertion(self, client_id: str) -> str:
        """Generate JWT client assertion for token requests."""
        
        import jwt
        from datetime import datetime, timedelta
        
        # Create JWT with QSealC certificate
        assertion = jwt.encode(
            {
                "iss": client_id,
                "sub": client_id,
                "aud": "https://auth.bank.example.com",
                "jti": f"assertion-{datetime.utcnow().timestamp()}",
                "exp": datetime.utcnow() + timedelta(minutes=5),
                "iat": datetime.utcnow(),
            },
            self.certificate.certificate,
            algorithm="RS256",
            headers={
                "typ": "JWT",
                "alg": "RS256",
                "x5c": [self.certificate.certificate]  # Certificate chain
            }
        )
        
        return assertion

Error Handling

Error Codes

#!/usr/bin/env python3
"""Open Banking error handling."""

from typing import Optional, Dict
from dataclasses import dataclass

@dataclass
class OBError:
    """Open Banking error response."""
    
    error: str
    error_description: str
    tpp_message: Optional[str] = None
    additional_information: Optional[Dict] = None

class OpenBankingError(Exception):
    """Base exception for Open Banking errors."""
    
    def __init__(self, error_code: str, message: str, 
                 tpp_message: Optional[str] = None):
        self.error_code = error_code
        self.message = message
        self.tpp_message = tpp_message
        super().__init__(f"{error_code}: {message}")

class ConsentError(OpenBankingError):
    """Consent-related errors."""
    
    CONSENT_INVALID = "CONSENT_INVALID"
    CONSENT_EXPIRED = "CONSENT_EXPIRED"
    CONSENT_NOT_FOUND = "CONSENT_NOT_FOUND"
    CONSENT_ALREADY_REVOKED = "CONSENT_ALREADY_REVOKED"

class AuthenticationError(OpenBankingError):
    """Authentication errors."""
    
    INVALID_TOKEN = "INVALID_TOKEN"
    TOKEN_EXPIRED = "TOKEN_EXPIRED"
    UNAUTHORIZED = "UNAUTHORIZED"
    INVALID_SIGNATURE = "INVALID_SIGNATURE"

class PaymentError(OpenBankingError):
    """Payment processing errors."""
    
    PAYMENT_INVALID = "PAYMENT_INVALID"
    PAYMENT_FAILED = "PAYMENT_FAILED"
    INVALID_AMOUNT = "INVALID_AMOUNT"
    INVALID_CURRENCY = "INVALID_CURRENCY"
    ACCOUNT_NOT_FOUND = "ACCOUNT_NOT_FOUND"

ERROR_MESSAGES = {
    "CONSENT_INVALID": "The consent provided is invalid or malformed",
    "CONSENT_EXPIRED": "The consent has expired and is no longer valid",
    "TOKEN_EXPIRED": "The access token has expired",
    "INVALID_TOKEN": "The access token is invalid",
    "PAYMENT_FAILED": "The payment could not be processed",
    "ACCOUNT_NOT_FOUND": "The specified account was not found",
    "INVALID_SCA": "Strong Customer Authentication failed",
}

def handle_api_error(response: Dict) -> OpenBankingError:
    """Convert API error response to exception."""
    
    error_code = response.get('error', 'UNKNOWN')
    error_desc = response.get('error_description', 'Unknown error')
    
    # Map to specific exception type
    if 'CONSENT' in error_code:
        return ConsentError(error_code, error_desc)
    elif 'TOKEN' in error_code or 'UNAUTHORIZED' in error_code:
        return AuthenticationError(error_code, error_desc)
    elif 'PAYMENT' in error_code or 'ACCOUNT' in error_code:
        return PaymentError(error_code, error_desc)
    else:
        return OpenBankingError(error_code, error_desc)

Security Best Practices

# Open Banking Security Checklist
security:
  certificate_management:
    - "Use qualified certificates (QWAC/QSealC in EU)"
    - "Rotate certificates before expiry"
    - "Secure private key storage (HSM)"
    - "Certificate chain validation"
    
  token_security:
    - "Store tokens securely (encrypted at rest)"
    - "Use short-lived access tokens"
    - "Implement token refresh"
    - "Invalidate tokens on logout"
    
  api_security:
    - "TLS 1.3 minimum"
    - "Request signing (JWS)"
    - "Rate limiting"
    - "Input validation"
    - "Output encoding"
    
  sca_implementation:
    - "Implement 2FA for all payments"
    - "Use dynamic linking"
    - "Transaction data in SCA"
    - "Fallback procedures"
    
  monitoring:
    - "Log all API calls"
    - "Alert on anomalies"
    - "Fraud detection"
    - "Compliance reporting"

External Resources


Comments