Skip to main content
โšก Calmops

OAuth 2.0 and OpenID Connect: Modern Authentication and Authorization

Introduction

OAuth 2.0 and OpenID Connect (OIDC) are the foundational protocols for modern authentication and authorization. Whether you’re building a web application, mobile app, or API, understanding these protocols is essential for implementing secure user authentication and delegated authorization.

OAuth 2.0 enables applications to obtain limited access to user accounts on third-party services, while OIDC builds on OAuth 2.0 to provide identity verification and user profile information. Together, they power authentication for billions of users across the internet.

Understanding OAuth 2.0

The Authorization Problem

Before OAuth, applications used the “password anti-pattern”:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    Old Pattern (Insecure)                       โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                 โ”‚
โ”‚  User: "I want to use this photo printing app"                โ”‚
โ”‚                                                                 โ”‚
โ”‚  Photo App: "Sure! Give me your Google password"               โ”‚
โ”‚                                                                 โ”‚
โ”‚  User: [Enters Google password]                                โ”‚
โ”‚                                                                 โ”‚
โ”‚  Photo App: Has full access to entire Google account!         โ”‚
โ”‚                                                                 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

OAuth 2.0 Solution

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    OAuth 2.0 Pattern (Secure)                   โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                 โ”‚
โ”‚  User: "I want to use this photo printing app"                โ”‚
โ”‚                                                                 โ”‚
โ”‚  Photo App: "I'll ask Google for permission to print photos" โ”‚
โ”‚                                                                 โ”‚
โ”‚  Google: "User, do you want to let PhotoApp print your       โ”‚
โ”‚          photos? It CANNOT access your email or anything     โ”‚
โ”‚          else."                                                โ”‚
โ”‚                                                                 โ”‚
โ”‚  User: "Yes"                                                   โ”‚
โ”‚                                                                 โ”‚
โ”‚  Google: "Here's a limited access token for photos only"      โ”‚
โ”‚                                                                 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€๏ฟฝ

OAuth 2.0 Terminology

Term Description
Resource Owner The user who owns the data
Client The application requesting access
Authorization Server The server that authenticates and issues tokens
Resource Server The server that hosts protected resources
Access Token Token that grants access to resources
Refresh Token Token used to obtain new access tokens
Authorization Code Temporary code exchanged for tokens
Redirect URI URL where authorization server sends the user

OAuth 2.0 Grant Types

1. Authorization Code Flow

The most secure flow for server-side applications:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    Authorization Code Flow                                โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                          โ”‚
โ”‚  Client App                    Auth Server              Resource          โ”‚
โ”‚      โ”‚                            โ”‚                     Server            โ”‚
โ”‚      โ”‚โ”€โ”€(1) Authorization Requestโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚                 โ”‚
โ”‚      โ”‚                            โ”‚                     โ”‚                 โ”‚
โ”‚      โ”‚โ—€โ”€โ”€(2) Login Pageโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”‚                 โ”‚
โ”‚      โ”‚                            โ”‚                     โ”‚                 โ”‚
โ”‚      โ”‚โ”€โ”€(3) User Credentialsโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚                 โ”‚
โ”‚      โ”‚                            โ”‚                     โ”‚                 โ”‚
โ”‚      โ”‚โ—€โ”€(4) Auth Codeโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”‚                     โ”‚                 โ”‚
โ”‚      โ”‚                            โ”‚                     โ”‚                 โ”‚
โ”‚      โ”‚โ”€โ”€(5) Code + Client Secretโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚                 โ”‚
โ”‚      โ”‚                            โ”‚                     โ”‚                 โ”‚
โ”‚      โ”‚โ—€โ”€(6) Access Token + Refresh Tokenโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”‚                 โ”‚
โ”‚      โ”‚                            โ”‚                     โ”‚                 โ”‚
โ”‚      โ”‚โ”€โ”€(7) Access Tokenโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚                 โ”‚
โ”‚      โ”‚                            โ”‚                     โ”‚                 โ”‚
โ”‚      โ”‚โ—€โ”€(8) Protected Resourceโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”‚                 โ”‚
โ”‚                                                                          โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
import secrets
from datetime import datetime, timedelta
from typing import Optional

class OAuth2AuthorizationCodeFlow:
    def __init__(
        self,
        client_id: str,
        client_secret: str,
        redirect_uri: str,
        authorization_server_url: str,
        token_server_url: str,
        scopes: list[str],
    ):
        self.client_id = client_id
        self.client_secret = client_secret
        self.redirect_uri = redirect_uri
        self.authorization_server_url = authorization_server_url
        self.token_server_url = token_server_url
        self.scopes = scopes
        self.state = {}
    
    def generate_authorization_url(self, state: str = None) -> tuple[str, str]:
        """Generate the authorization URL for user redirect."""
        if state is None:
            state = secrets.token_urlsafe(32)
        
        # Store state for verification
        self.state[state] = {
            "created_at": datetime.utcnow(),
            "scopes": self.scopes,
        }
        
        params = {
            "response_type": "code",
            "client_id": self.client_id,
            "redirect_uri": self.redirect_uri,
            "scope": " ".join(self.scopes),
            "state": state,
        }
        
        query_string = "&".join(f"{k}={v}" for k, v in params.items())
        auth_url = f"{self.authorization_server_url}?{query_string}"
        
        return auth_url, state
    
    def exchange_code_for_tokens(self, code: str, state: str) -> dict:
        """Exchange authorization code for access and refresh tokens."""
        # Verify state
        if state not in self.state:
            raise ValueError("Invalid state parameter")
        
        # Clean up state
        del self.state[state]
        
        # Exchange code for tokens
        response = requests.post(
            self.token_server_url,
            data={
                "grant_type": "authorization_code",
                "code": code,
                "client_id": self.client_id,
                "client_secret": self.client_secret,
                "redirect_uri": self.redirect_uri,
            },
        )
        
        if response.status_code != 200:
            raise ValueError(f"Token exchange failed: {response.text}")
        
        tokens = response.json()
        
        return {
            "access_token": tokens["access_token"],
            "refresh_token": tokens.get("refresh_token"),
            "token_type": tokens.get("token_type", "Bearer"),
            "expires_in": tokens.get("expires_in", 3600),
            "scope": tokens.get("scope"),
        }
    
    def refresh_access_token(self, refresh_token: str) -> dict:
        """Use refresh token to get new access token."""
        response = requests.post(
            self.token_server_url,
            data={
                "grant_type": "refresh_token",
                "refresh_token": refresh_token,
                "client_id": self.client_id,
                "client_secret": self.client_secret,
            },
        )
        
        if response.status_code != 200:
            raise ValueError(f"Token refresh failed: {response.text}")
        
        tokens = response.json()
        
        return {
            "access_token": tokens["access_token"],
            "refresh_token": tokens.get("refresh_token", refresh_token),
            "token_type": tokens.get("token_type", "Bearer"),
            "expires_in": tokens.get("expires_in", 3600),
        }

2. PKCE (Proof Key for Code Exchange)

Enhanced security for public clients (SPA, mobile apps):

import hashlib
import secrets

class PKCEClient:
    def __init__(self, client_id: str, redirect_uri: str):
        self.client_id = client_id
        self.redirect_uri = redirect_uri
    
    def generate_code_verifier(self) -> str:
        """Generate a random code verifier."""
        return secrets.token_urlsafe(96)[:128]
    
    def generate_code_challenge(self, verifier: str) -> str:
        """Generate code challenge from verifier using SHA-256."""
        digest = hashlib.sha256(verifier.encode()).digest()
        return self._base64url_encode(digest)
    
    def _base64url_encode(self, data: bytes) -> str:
        """Base64 URL-safe encoding without padding."""
        import base64
        return base64.urlsafe_b64encode(data).rstrip(b'=').decode()
    
    def generate_authorization_url(self, verifier: str, state: str) -> str:
        """Generate authorization URL with PKCE parameters."""
        challenge = self.generate_code_challenge(verifier)
        
        params = {
            "response_type": "code",
            "client_id": self.client_id,
            "redirect_uri": self.redirect_uri,
            "scope": "openid profile email",
            "state": state,
            "code_challenge": challenge,
            "code_challenge_method": "S256",
        }
        
        return f"https://auth.example.com/authorize?{requests.compat.urlencode(params)}"
    
    def exchange_code_with_verifier(self, code: str, verifier: str) -> dict:
        """Exchange code with code verifier."""
        response = requests.post(
            "https://auth.example.com/token",
            data={
                "grant_type": "authorization_code",
                "code": code,
                "client_id": self.client_id,
                "redirect_uri": self.redirect_uri,
                "code_verifier": verifier,
            },
        )
        
        return response.json()

3. Client Credentials Flow

For machine-to-machine communication:

def client_credentials_flow(
    token_url: str,
    client_id: str,
    client_secret: str,
    scope: str = None,
) -> dict:
    """Get access token using client credentials."""
    data = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret,
    }
    
    if scope:
        data["scope"] = scope
    
    response = requests.post(token_url, data=data)
    response.raise_for_status()
    
    return response.json()


# Usage
token = client_credentials_flow(
    token_url="https://auth.example.com/oauth/token",
    client_id="my-api-client",
    client_secret="secret-key",
    scope="read write",
)

access_token = token["access_token"]

4. Device Code Flow

For devices without browsers:

def device_code_flow(auth_url: str, token_url: str, client_id: str) -> tuple[str, str]:
    """Start device code flow - get user code and verification URL."""
    response = requests.post(
        auth_url,
        data={
            "client_id": client_id,
            "scope": "openid profile",
        },
    )
    response.raise_for_status()
    
    data = response.json()
    return data["user_code"], data["verification_uri"]


def poll_for_token(token_url: str, client_id: str, device_code: str) -> dict:
    """Poll for token until user completes authorization."""
    while True:
        response = requests.post(
            token_url,
            data={
                "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
                "device_code": device_code,
                "client_id": client_id,
            },
        )
        
        data = response.json()
        
        if data.get("error") == "authorization_pending":
            import time
            time.sleep(data.get("interval", 5))
        elif data.get("error"):
            raise ValueError(data.get("error_description"))
        else:
            return data

OpenID Connect (OIDC)

OIDC is an identity layer on top of OAuth 2.0:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    OAuth 2.0 vs OIDC                             โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                 โ”‚
โ”‚  OAuth 2.0:                                                    โ”‚
โ”‚    - Access Token (what you can access)                        โ”‚
โ”‚                                                                 โ”‚
โ”‚  OIDC (OAuth 2.0 + Identity):                                  โ”‚
โ”‚    - Access Token (what you can access)                       โ”‚
โ”‚    - ID Token (who the user is)                                โ”‚
โ”‚    - UserInfo Endpoint (user profile)                          โ”‚
โ”‚                                                                 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

ID Token Structure (JWT)

{
  "iss": "https://auth.example.com",
  "sub": "user123",
  "aud": ["client-app-id"],
  "exp": 1700000000,
  "iat": 1699999900,
  "auth_time": 1699999900,
  "nonce": "abc123",
  "name": "John Doe",
  "email": "[email protected]",
  "email_verified": true,
  "picture": "https://example.com/photo.jpg",
  "preferred_username": "johndoe"
}

OIDC Implementation

import jwt
import time
from typing import Optional

class OIDCAuthentication:
    def __init__(
        self,
        issuer: str,
        client_id: str,
        client_secret: str,
        redirect_uri: str,
        scopes: list[str] = None,
    ):
        self.issuer = issuer
        self.client_id = client_id
        self.client_secret = client_secret
        self.redirect_uri = redirect_uri
        self.scopes = scopes or ["openid", "profile", "email"]
        
        # Will be fetched from issuer's discovery endpoint
        self.issuer_config = None
    
    def discover(self):
        """Fetch OIDC discovery document."""
        import requests
        
        response = requests.get(f"{self.issuer}/.well-known/openid-configuration")
        response.raise_for_status()
        
        self.issuer_config = response.json()
        
        return self.issuer_config
    
    def get_authorization_url(self, state: str, nonce: str = None) -> tuple[str, str]:
        """Generate authorization URL."""
        import secrets
        
        if nonce is None:
            nonce = secrets.token_urlsafe(32)
        
        if self.issuer_config is None:
            self.discover()
        
        params = {
            "response_type": "code",
            "client_id": self.client_id,
            "redirect_uri": self.redirect_uri,
            "scope": " ".join(self.scopes),
            "state": state,
            "nonce": nonce,
            "response_mode": "query",
        }
        
        auth_url = f"{self.issuer_config['authorization_endpoint']}?{requests.compat.urlencode(params)}"
        
        return auth_url, nonce
    
    def exchange_and_verify_tokens(self, code: str, nonce: str) -> dict:
        """Exchange code for tokens and verify ID token."""
        import requests
        
        if self.issuer_config is None:
            self.discover()
        
        # Exchange code for tokens
        token_response = requests.post(
            self.issuer_config["token_endpoint"],
            data={
                "grant_type": "authorization_code",
                "code": code,
                "redirect_uri": self.redirect_uri,
                "client_id": self.client_id,
                "client_secret": self.client_secret,
            },
        )
        token_response.raise_for_status()
        
        tokens = token_response.json()
        
        # Decode and verify ID token
        id_token = self.verify_id_token(
            tokens["id_token"],
            nonce,
        )
        
        return {
            "id_token_claims": id_token,
            "access_token": tokens["access_token"],
            "refresh_token": tokens.get("refresh_token"),
            "token_type": tokens.get("token_type", "Bearer"),
            "expires_in": tokens.get("expires_in"),
        }
    
    def verify_id_token(self, id_token: str, nonce: str) -> dict:
        """Verify and decode the ID token."""
        import requests
        
        # Fetch JWKS
        jwks_response = requests.get(self.issuer_config["jwks_uri"])
        jwks = jwks_response.json()
        
        # Get signing key
        unverified_header = jwt.get_unverified_header(id_token)
        signing_key = None
        for key in jwks.get("keys", []):
            if key.get("kid") == unverified_header.get("kid"):
                signing_key = jwt.algorithms.RSAAlgorithm.from_jwk(key)
                break
        
        if signing_key is None:
            raise ValueError("Unable to find signing key")
        
        # Decode and verify token
        claims = jwt.decode(
            id_token,
            signing_key,
            algorithms=["RS256"],
            audience=self.client_id,
            issuer=self.issuer,
            options={
                "verify_exp": True,
                "verify_iat": True,
                "verify_auth_time": True,
            },
        )
        
        # Verify nonce
        if claims.get("nonce") != nonce:
            raise ValueError("Invalid nonce")
        
        return claims
    
    def get_userinfo(self, access_token: str) -> dict:
        """Fetch user info from UserInfo endpoint."""
        import requests
        
        if self.issuer_config is None:
            self.discover()
        
        response = requests.get(
            self.issuer_config["userinfo_endpoint"],
            headers={"Authorization": f"Bearer {access_token}"},
        )
        response.raise_for_status()
        
        return response.json()

Token Management

JWT Token Structure

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    JWT Token Structure                          โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                 โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚  โ”‚   Header       โ”‚ โ”‚   Payload      โ”‚ โ”‚   Signature        โ”‚  โ”‚
โ”‚  โ”‚ (JSON)         โ”‚ โ”‚ (JSON)         โ”‚ โ”‚ (HMAC/RSA/ECDSA)   โ”‚  โ”‚
โ”‚  โ”‚                โ”‚ โ”‚                โ”‚ โ”‚                    โ”‚  โ”‚
โ”‚  โ”‚ {              โ”‚ โ”‚ {              โ”‚ โ”‚ [Signature]       โ”‚  โ”‚
โ”‚  โ”‚   "alg":       โ”‚ โ”‚   "sub":       โ”‚ โ”‚                    โ”‚  โ”‚
โ”‚  โ”‚     "RS256",   โ”‚ โ”‚     "user123", โ”‚ โ”‚                    โ”‚  โ”‚
โ”‚  โ”‚   "typ":       โ”‚ โ”‚   "exp":       โ”‚ โ”‚                    โ”‚  โ”‚
โ”‚  โ”‚     "JWT"      โ”‚ โ”‚     1700000000,โ”‚ โ”‚                    โ”‚  โ”‚
โ”‚  โ”‚ }              โ”‚ โ”‚   "iat":       โ”‚ โ”‚                    โ”‚  โ”‚
โ”‚  โ”‚                โ”‚ โ”‚     1699999900 โ”‚ โ”‚                    โ”‚  โ”‚
โ”‚  โ”‚                โ”‚ โ”‚ }              โ”‚ โ”‚                    โ”‚  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ”‚                                                                 โ”‚
โ”‚  Base64URL โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ Base64URL โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ Base64URL             โ”‚
โ”‚                                                                 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Token Validation

from functools import wraps
from typing import Callable
import jwt
import requests

class TokenValidator:
    def __init__(self, issuer: str, jwks_uri: str, audience: str):
        self.issuer = issuer
        self.jwks_uri = jwks_uri
        self.audience = audience
        self._jwks = None
        self._jwks_expiry = 0
    
    def get_signing_key(self, token: str) -> jwt.api.RSAAlgorithm:
        """Get the signing key from JWKS."""
        import time
        
        # Refresh JWKS if expired
        if time.time() > self._jwks_expiry:
            response = requests.get(self.jwks_uri)
            self._jwks = response.json()
            self._jwks_expiry = time.time() + 3600  # Cache for 1 hour
        
        # Find the key
        unverified_header = jwt.get_unverified_header(token)
        
        for key in self._jwks.get("keys", []):
            if key.get("kid") == unverified_header.get("kid"):
                return jwt.algorithms.RSAAlgorithm.from_jwk(key)
        
        raise ValueError("Unable to find signing key")
    
    def validate(self, token: str) -> dict:
        """Validate token and return claims."""
        key = self.get_signing_key(token)
        
        return jwt.decode(
            token,
            key,
            algorithms=["RS256", "ES256"],
            audience=self.audience,
            issuer=self.issuer,
            options={
                "verify_exp": True,
                "verify_iat": True,
                "verify_aud": True,
                "verify_iss": True,
                "require": ["exp", "iat", "iss", "sub"],
            },
        )


def require_auth(validator: TokenValidator):
    """Decorator to require authentication."""
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(request, *args, **kwargs):
            auth_header = request.headers.get("Authorization")
            
            if not auth_header or not auth_header.startswith("Bearer "):
                return {"error": "missing_token"}, 401
            
            token = auth_header[7:]  # Remove "Bearer "
            
            try:
                claims = validator.validate(token)
                request.user = claims
            except jwt.ExpiredSignatureError:
                return {"error": "token_expired"}, 401
            except jwt.InvalidTokenError as e:
                return {"error": "invalid_token", "message": str(e)}, 401
            
            return func(request, *args, **kwargs)
        
        return wrapper
    return decorator

Refresh Token Rotation

import secrets

class TokenManager:
    def __init__(self, db_pool):
        self.db = db_pool
    
    async def store_refresh_token(
        self,
        user_id: str,
        client_id: str,
        refresh_token: str,
        token_hash: str,
        expires_at: datetime,
    ):
        """Store refresh token in database."""
        async with self.db.acquire() as conn:
            await conn.execute(
                """
                INSERT INTO refresh_tokens 
                (user_id, client_id, token_hash, expires_at, created_at)
                VALUES ($1, $2, $3, $4, NOW())
                """,
                user_id, client_id, token_hash, expires_at,
            )
    
    async def revoke_token(self, token_hash: str):
        """Revoke a refresh token."""
        async with self.db.acquire() as conn:
            await conn.execute(
                "DELETE FROM refresh_tokens WHERE token_hash = $1",
                token_hash,
            )
    
    async def revoke_all_user_tokens(self, user_id: str, client_id: str = None):
        """Revoke all tokens for a user."""
        async with self.db.acquire() as conn:
            if client_id:
                await conn.execute(
                    "DELETE FROM refresh_tokens WHERE user_id = $1 AND client_id = $2",
                    user_id, client_id,
                )
            else:
                await conn.execute(
                    "DELETE FROM refresh_tokens WHERE user_id = $1",
                    user_id,
                )
    
    def hash_token(self, token: str) -> str:
        """Hash token for secure storage."""
        import hashlib
        return hashlib.sha256(token.encode()).hexdigest()

Security Best Practices

Authorization Code Flow Security

def secure_authorization_flow():
    """Security best practices for authorization flow."""
    
    # 1. Use PKCE even for confidential clients
    code_verifier = generate_code_verifier()
    code_challenge = generate_code_challenge(code_verifier)
    
    # 2. Generate and verify state
    state = secrets.token_urlsafe(32)
    
    # 3. Use nonce to prevent replay attacks
    nonce = secrets.token_urlsafe(32)
    
    # 4. Validate redirect_uri matches registered URI
    # 5. Use HTTPS for all redirects
    # 6. Short authorization codes (expires quickly)
    # 7. Bind token to client_id

Token Security

def token_security_practices():
    """Best practices for token management."""
    
    # Access tokens:
    # - Short expiry (5-15 minutes)
    # - Store in memory, not localStorage (for browser)
    # - Send only over HTTPS
    
    # Refresh tokens:
    # - Longer expiry (days/weeks)
    # - One-time use (rotation)
    # - Store securely (encrypted at rest)
    # - Bind to client
    
    # Always use secure cookies for browser
    # httponly=True, secure=True, SameSite=Strict

CSRF Protection

# State parameter verification
def verify_state(state: str, stored_state: str) -> bool:
    """Verify state parameter to prevent CSRF."""
    if not state or not stored_state:
        return False
    
    # Use constant-time comparison
    import hmac
    return hmac.compare_digest(state, stored_state)

Token Introspection

def introspect_token(
    introspection_url: str,
    token: str,
    client_id: str,
    client_secret: str,
) -> dict:
    """Introspect token with authorization server."""
    import requests
    
    response = requests.post(
        introspection_url,
        data={"token": token},
        auth=(client_id, client_secret),
    )
    
    return response.json()

Implementation Examples

Flask Integration

from flask import Flask, request, jsonify, session, redirect
import requests
from functools import wraps

app = Flask(__name__)
app.secret_key = secrets.token_urlsafe(32)

# OIDC Configuration
OIDC_CONFIG = {
    "issuer": "https://auth.example.com",
    "client_id": "my-flask-app",
    "client_secret": "secret",
    "redirect_uri": "http://localhost:5000/callback",
    "scopes": ["openid", "profile", "email"],
}

class OIDCAuth:
    def __init__(self, config: dict):
        self.config = config
        self._discover()
    
    def _discover(self):
        response = requests.get(
            f"{self.config['issuer']}/.well-known/openid-configuration"
        )
        self.discovery = response.json()
    
    def login(self):
        import secrets
        state = secrets.token_urlsafe(32)
        nonce = secrets.token_urlsafe(32)
        
        session["oauth_state"] = state
        session["oauth_nonce"] = nonce
        
        params = {
            "response_type": "code",
            "client_id": self.config["client_id"],
            "redirect_uri": self.config["redirect_uri"],
            "scope": " ".join(self.config["scopes"]),
            "state": state,
            "nonce": nonce,
        }
        
        return f"{self.discovery['authorization_endpoint']}?{requests.compat.urlencode(params)}"
    
    def callback(self, code: str, state: str) -> dict:
        import jwt
        
        if state != session.get("oauth_state"):
            raise ValueError("Invalid state")
        
        # Exchange code for tokens
        token_response = requests.post(
            self.discovery["token_endpoint"],
            data={
                "grant_type": "authorization_code",
                "code": code,
                "redirect_uri": self.config["redirect_uri"],
                "client_id": self.config["client_id"],
                "client_secret": self.config["client_secret"],
            },
        )
        
        tokens = token_response.json()
        
        # Verify ID token
        # ... (verification code)
        
        # Get userinfo
        userinfo_response = requests.get(
            self.discovery["userinfo_endpoint"],
            headers={"Authorization": f"Bearer {tokens['access_token']}"},
        )
        
        userinfo = userinfo_response.json()
        
        # Store in session
        session["user"] = userinfo
        session["access_token"] = tokens["access_token"]
        
        return userinfo


auth = OIDCAuth(OIDC_CONFIG)

@app.route("/login")
def login():
    return redirect(auth.login())

@app.route("/callback")
def callback():
    code = request.args.get("code")
    state = request.args.get("state")
    
    auth.callback(code, state)
    return redirect("/profile")

@app.route("/profile")
def profile():
    if "user" not in session:
        return redirect("/login")
    
    return jsonify(session["user"])

@app.route("/logout")
def logout():
    session.clear()
    return redirect("/")


# Protected route decorator
def login_required(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        if "user" not in session:
            return redirect("/login")
        return f(*args, **kwargs)
    return decorated


@app.route("/api/data")
@login_required
def api_data():
    return jsonify({"data": "secure data"})


if __name__ == "__main__":
    app.run(debug=True)

FastAPI Integration

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
from typing import Optional
import requests
import jwt

app = FastAPI()

# OAuth2 scheme
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# OIDC Configuration
OIDC_CONFIG = {
    "issuer": "https://auth.example.com",
    "client_id": "my-fastapi-app",
    "jwks_uri": "https://auth.example.com/.well-known/jwks.json",
}

# Cache for JWKS
jwks_cache = {"keys": None, "expiry": 0}


class User(BaseModel):
    sub: str
    name: Optional[str] = None
    email: Optional[str] = None
    picture: Optional[str] = None


async def get_jwks():
    import time
    
    if jwks_cache["keys"] is None or time.time() > jwks_cache["expiry"]:
        response = requests.get(OIDC_CONFIG["jwks_uri"])
        jwks_cache["keys"] = response.json()["keys"]
        jwks_cache["expiry"] = time.time() + 3600
    
    return jwks_cache["keys"]


async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
    """Validate token and return current user."""
    try:
        # Get JWKS
        jwks = await get_jwks()
        
        # Get signing key
        unverified_header = jwt.get_unverified_header(token)
        
        signing_key = None
        for key in jwks:
            if key.get("kid") == unverified_header.get("kid"):
                signing_key = jwt.algorithms.RSAAlgorithm.from_jwk(key)
                break
        
        if signing_key is None:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Unable to find signing key",
            )
        
        # Decode and verify token
        claims = jwt.decode(
            token,
            signing_key,
            algorithms=["RS256"],
            audience="my-fastapi-app",
            issuer=OIDC_CONFIG["issuer"],
        )
        
        return User(
            sub=claims["sub"],
            name=claims.get("name"),
            email=claims.get("email"),
            picture=claims.get("picture"),
        )
    
    except jwt.ExpiredSignatureError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Token has expired",
        )
    except jwt.InvalidTokenError as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=f"Invalid token: {str(e)}",
        )


@app.get("/protected")
async def protected_route(user: User = Depends(get_current_user)):
    """Protected route that requires authentication."""
    return {
        "message": f"Hello, {user.name}!",
        "user": user.dict(),
    }


@app.get("/public")
async def public_route():
    """Public route that doesn't require authentication."""
    return {"message": "This is a public route"}

Conclusion

OAuth 2.0 and OpenID Connect are essential protocols for modern authentication and authorization. OAuth 2.0 provides the foundation for secure delegated access, while OIDC adds an identity layer on top for user authentication.

Key takeaways:

  • Use Authorization Code flow with PKCE for web and mobile applications
  • Always validate tokens, including signature, expiration, audience, and issuer
  • Implement proper token storage and refresh mechanisms
  • Use short-lived access tokens with refresh token rotation
  • Follow security best practices: HTTPS, state parameters, nonce validation
  • OIDC simplifies user authentication by providing standardized user info

Resources

Comments