Introduction
OAuth 2.0 and OpenID Connect (OIDC) are the foundational protocols for modern authentication and authorization. Whether you’re building a web application, mobile app, or API, understanding these protocols is essential for implementing secure user authentication and delegated authorization.
OAuth 2.0 enables applications to obtain limited access to user accounts on third-party services, while OIDC builds on OAuth 2.0 to provide identity verification and user profile information. Together, they power authentication for billions of users across the internet.
Understanding OAuth 2.0
The Authorization Problem
Before OAuth, applications used the “password anti-pattern”:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Old Pattern (Insecure) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ User: "I want to use this photo printing app" โ
โ โ
โ Photo App: "Sure! Give me your Google password" โ
โ โ
โ User: [Enters Google password] โ
โ โ
โ Photo App: Has full access to entire Google account! โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
OAuth 2.0 Solution
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ OAuth 2.0 Pattern (Secure) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ User: "I want to use this photo printing app" โ
โ โ
โ Photo App: "I'll ask Google for permission to print photos" โ
โ โ
โ Google: "User, do you want to let PhotoApp print your โ
โ photos? It CANNOT access your email or anything โ
โ else." โ
โ โ
โ User: "Yes" โ
โ โ
โ Google: "Here's a limited access token for photos only" โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ๏ฟฝ
OAuth 2.0 Terminology
| Term | Description |
|---|---|
| Resource Owner | The user who owns the data |
| Client | The application requesting access |
| Authorization Server | The server that authenticates and issues tokens |
| Resource Server | The server that hosts protected resources |
| Access Token | Token that grants access to resources |
| Refresh Token | Token used to obtain new access tokens |
| Authorization Code | Temporary code exchanged for tokens |
| Redirect URI | URL where authorization server sends the user |
OAuth 2.0 Grant Types
1. Authorization Code Flow
The most secure flow for server-side applications:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Authorization Code Flow โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Client App Auth Server Resource โ
โ โ โ Server โ
โ โโโ(1) Authorization Requestโโโโโโโโโโโโโโโโโโโโโถโ โ
โ โ โ โ โ
โ โโโโ(2) Login Pageโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ โ โ
โ โโโ(3) User Credentialsโโโโโโโโโโโโโโโโโโโโโโโโโโโโโถโ โ
โ โ โ โ โ
โ โโโ(4) Auth Codeโโโโโโโโโโโโโโ โ โ
โ โ โ โ โ
โ โโโ(5) Code + Client Secretโโโโโโโโโโโโโโโโโโโโโโโโโถโ โ
โ โ โ โ โ
โ โโโ(6) Access Token + Refresh Tokenโโโโโโโโโโโโโโโโโ โ
โ โ โ โ โ
โ โโโ(7) Access Tokenโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโถโ โ
โ โ โ โ โ
โ โโโ(8) Protected Resourceโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
import secrets
from datetime import datetime, timedelta
from typing import Optional
class OAuth2AuthorizationCodeFlow:
def __init__(
self,
client_id: str,
client_secret: str,
redirect_uri: str,
authorization_server_url: str,
token_server_url: str,
scopes: list[str],
):
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
self.authorization_server_url = authorization_server_url
self.token_server_url = token_server_url
self.scopes = scopes
self.state = {}
def generate_authorization_url(self, state: str = None) -> tuple[str, str]:
"""Generate the authorization URL for user redirect."""
if state is None:
state = secrets.token_urlsafe(32)
# Store state for verification
self.state[state] = {
"created_at": datetime.utcnow(),
"scopes": self.scopes,
}
params = {
"response_type": "code",
"client_id": self.client_id,
"redirect_uri": self.redirect_uri,
"scope": " ".join(self.scopes),
"state": state,
}
query_string = "&".join(f"{k}={v}" for k, v in params.items())
auth_url = f"{self.authorization_server_url}?{query_string}"
return auth_url, state
def exchange_code_for_tokens(self, code: str, state: str) -> dict:
"""Exchange authorization code for access and refresh tokens."""
# Verify state
if state not in self.state:
raise ValueError("Invalid state parameter")
# Clean up state
del self.state[state]
# Exchange code for tokens
response = requests.post(
self.token_server_url,
data={
"grant_type": "authorization_code",
"code": code,
"client_id": self.client_id,
"client_secret": self.client_secret,
"redirect_uri": self.redirect_uri,
},
)
if response.status_code != 200:
raise ValueError(f"Token exchange failed: {response.text}")
tokens = response.json()
return {
"access_token": tokens["access_token"],
"refresh_token": tokens.get("refresh_token"),
"token_type": tokens.get("token_type", "Bearer"),
"expires_in": tokens.get("expires_in", 3600),
"scope": tokens.get("scope"),
}
def refresh_access_token(self, refresh_token: str) -> dict:
"""Use refresh token to get new access token."""
response = requests.post(
self.token_server_url,
data={
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": self.client_id,
"client_secret": self.client_secret,
},
)
if response.status_code != 200:
raise ValueError(f"Token refresh failed: {response.text}")
tokens = response.json()
return {
"access_token": tokens["access_token"],
"refresh_token": tokens.get("refresh_token", refresh_token),
"token_type": tokens.get("token_type", "Bearer"),
"expires_in": tokens.get("expires_in", 3600),
}
2. PKCE (Proof Key for Code Exchange)
Enhanced security for public clients (SPA, mobile apps):
import hashlib
import secrets
class PKCEClient:
def __init__(self, client_id: str, redirect_uri: str):
self.client_id = client_id
self.redirect_uri = redirect_uri
def generate_code_verifier(self) -> str:
"""Generate a random code verifier."""
return secrets.token_urlsafe(96)[:128]
def generate_code_challenge(self, verifier: str) -> str:
"""Generate code challenge from verifier using SHA-256."""
digest = hashlib.sha256(verifier.encode()).digest()
return self._base64url_encode(digest)
def _base64url_encode(self, data: bytes) -> str:
"""Base64 URL-safe encoding without padding."""
import base64
return base64.urlsafe_b64encode(data).rstrip(b'=').decode()
def generate_authorization_url(self, verifier: str, state: str) -> str:
"""Generate authorization URL with PKCE parameters."""
challenge = self.generate_code_challenge(verifier)
params = {
"response_type": "code",
"client_id": self.client_id,
"redirect_uri": self.redirect_uri,
"scope": "openid profile email",
"state": state,
"code_challenge": challenge,
"code_challenge_method": "S256",
}
return f"https://auth.example.com/authorize?{requests.compat.urlencode(params)}"
def exchange_code_with_verifier(self, code: str, verifier: str) -> dict:
"""Exchange code with code verifier."""
response = requests.post(
"https://auth.example.com/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": self.client_id,
"redirect_uri": self.redirect_uri,
"code_verifier": verifier,
},
)
return response.json()
3. Client Credentials Flow
For machine-to-machine communication:
def client_credentials_flow(
token_url: str,
client_id: str,
client_secret: str,
scope: str = None,
) -> dict:
"""Get access token using client credentials."""
data = {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
}
if scope:
data["scope"] = scope
response = requests.post(token_url, data=data)
response.raise_for_status()
return response.json()
# Usage
token = client_credentials_flow(
token_url="https://auth.example.com/oauth/token",
client_id="my-api-client",
client_secret="secret-key",
scope="read write",
)
access_token = token["access_token"]
4. Device Code Flow
For devices without browsers:
def device_code_flow(auth_url: str, token_url: str, client_id: str) -> tuple[str, str]:
"""Start device code flow - get user code and verification URL."""
response = requests.post(
auth_url,
data={
"client_id": client_id,
"scope": "openid profile",
},
)
response.raise_for_status()
data = response.json()
return data["user_code"], data["verification_uri"]
def poll_for_token(token_url: str, client_id: str, device_code: str) -> dict:
"""Poll for token until user completes authorization."""
while True:
response = requests.post(
token_url,
data={
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"device_code": device_code,
"client_id": client_id,
},
)
data = response.json()
if data.get("error") == "authorization_pending":
import time
time.sleep(data.get("interval", 5))
elif data.get("error"):
raise ValueError(data.get("error_description"))
else:
return data
OpenID Connect (OIDC)
OIDC is an identity layer on top of OAuth 2.0:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ OAuth 2.0 vs OIDC โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ OAuth 2.0: โ
โ - Access Token (what you can access) โ
โ โ
โ OIDC (OAuth 2.0 + Identity): โ
โ - Access Token (what you can access) โ
โ - ID Token (who the user is) โ
โ - UserInfo Endpoint (user profile) โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
ID Token Structure (JWT)
{
"iss": "https://auth.example.com",
"sub": "user123",
"aud": ["client-app-id"],
"exp": 1700000000,
"iat": 1699999900,
"auth_time": 1699999900,
"nonce": "abc123",
"name": "John Doe",
"email": "[email protected]",
"email_verified": true,
"picture": "https://example.com/photo.jpg",
"preferred_username": "johndoe"
}
OIDC Implementation
import jwt
import time
from typing import Optional
class OIDCAuthentication:
def __init__(
self,
issuer: str,
client_id: str,
client_secret: str,
redirect_uri: str,
scopes: list[str] = None,
):
self.issuer = issuer
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
self.scopes = scopes or ["openid", "profile", "email"]
# Will be fetched from issuer's discovery endpoint
self.issuer_config = None
def discover(self):
"""Fetch OIDC discovery document."""
import requests
response = requests.get(f"{self.issuer}/.well-known/openid-configuration")
response.raise_for_status()
self.issuer_config = response.json()
return self.issuer_config
def get_authorization_url(self, state: str, nonce: str = None) -> tuple[str, str]:
"""Generate authorization URL."""
import secrets
if nonce is None:
nonce = secrets.token_urlsafe(32)
if self.issuer_config is None:
self.discover()
params = {
"response_type": "code",
"client_id": self.client_id,
"redirect_uri": self.redirect_uri,
"scope": " ".join(self.scopes),
"state": state,
"nonce": nonce,
"response_mode": "query",
}
auth_url = f"{self.issuer_config['authorization_endpoint']}?{requests.compat.urlencode(params)}"
return auth_url, nonce
def exchange_and_verify_tokens(self, code: str, nonce: str) -> dict:
"""Exchange code for tokens and verify ID token."""
import requests
if self.issuer_config is None:
self.discover()
# Exchange code for tokens
token_response = requests.post(
self.issuer_config["token_endpoint"],
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": self.redirect_uri,
"client_id": self.client_id,
"client_secret": self.client_secret,
},
)
token_response.raise_for_status()
tokens = token_response.json()
# Decode and verify ID token
id_token = self.verify_id_token(
tokens["id_token"],
nonce,
)
return {
"id_token_claims": id_token,
"access_token": tokens["access_token"],
"refresh_token": tokens.get("refresh_token"),
"token_type": tokens.get("token_type", "Bearer"),
"expires_in": tokens.get("expires_in"),
}
def verify_id_token(self, id_token: str, nonce: str) -> dict:
"""Verify and decode the ID token."""
import requests
# Fetch JWKS
jwks_response = requests.get(self.issuer_config["jwks_uri"])
jwks = jwks_response.json()
# Get signing key
unverified_header = jwt.get_unverified_header(id_token)
signing_key = None
for key in jwks.get("keys", []):
if key.get("kid") == unverified_header.get("kid"):
signing_key = jwt.algorithms.RSAAlgorithm.from_jwk(key)
break
if signing_key is None:
raise ValueError("Unable to find signing key")
# Decode and verify token
claims = jwt.decode(
id_token,
signing_key,
algorithms=["RS256"],
audience=self.client_id,
issuer=self.issuer,
options={
"verify_exp": True,
"verify_iat": True,
"verify_auth_time": True,
},
)
# Verify nonce
if claims.get("nonce") != nonce:
raise ValueError("Invalid nonce")
return claims
def get_userinfo(self, access_token: str) -> dict:
"""Fetch user info from UserInfo endpoint."""
import requests
if self.issuer_config is None:
self.discover()
response = requests.get(
self.issuer_config["userinfo_endpoint"],
headers={"Authorization": f"Bearer {access_token}"},
)
response.raise_for_status()
return response.json()
Token Management
JWT Token Structure
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ JWT Token Structure โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Header โ โ Payload โ โ Signature โ โ
โ โ (JSON) โ โ (JSON) โ โ (HMAC/RSA/ECDSA) โ โ
โ โ โ โ โ โ โ โ
โ โ { โ โ { โ โ [Signature] โ โ
โ โ "alg": โ โ "sub": โ โ โ โ
โ โ "RS256", โ โ "user123", โ โ โ โ
โ โ "typ": โ โ "exp": โ โ โ โ
โ โ "JWT" โ โ 1700000000,โ โ โ โ
โ โ } โ โ "iat": โ โ โ โ
โ โ โ โ 1699999900 โ โ โ โ
โ โ โ โ } โ โ โ โ
โ โโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โ Base64URL โโโโโโโโโโ Base64URL โโโโโโโโ Base64URL โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Token Validation
from functools import wraps
from typing import Callable
import jwt
import requests
class TokenValidator:
def __init__(self, issuer: str, jwks_uri: str, audience: str):
self.issuer = issuer
self.jwks_uri = jwks_uri
self.audience = audience
self._jwks = None
self._jwks_expiry = 0
def get_signing_key(self, token: str) -> jwt.api.RSAAlgorithm:
"""Get the signing key from JWKS."""
import time
# Refresh JWKS if expired
if time.time() > self._jwks_expiry:
response = requests.get(self.jwks_uri)
self._jwks = response.json()
self._jwks_expiry = time.time() + 3600 # Cache for 1 hour
# Find the key
unverified_header = jwt.get_unverified_header(token)
for key in self._jwks.get("keys", []):
if key.get("kid") == unverified_header.get("kid"):
return jwt.algorithms.RSAAlgorithm.from_jwk(key)
raise ValueError("Unable to find signing key")
def validate(self, token: str) -> dict:
"""Validate token and return claims."""
key = self.get_signing_key(token)
return jwt.decode(
token,
key,
algorithms=["RS256", "ES256"],
audience=self.audience,
issuer=self.issuer,
options={
"verify_exp": True,
"verify_iat": True,
"verify_aud": True,
"verify_iss": True,
"require": ["exp", "iat", "iss", "sub"],
},
)
def require_auth(validator: TokenValidator):
"""Decorator to require authentication."""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(request, *args, **kwargs):
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return {"error": "missing_token"}, 401
token = auth_header[7:] # Remove "Bearer "
try:
claims = validator.validate(token)
request.user = claims
except jwt.ExpiredSignatureError:
return {"error": "token_expired"}, 401
except jwt.InvalidTokenError as e:
return {"error": "invalid_token", "message": str(e)}, 401
return func(request, *args, **kwargs)
return wrapper
return decorator
Refresh Token Rotation
import secrets
class TokenManager:
def __init__(self, db_pool):
self.db = db_pool
async def store_refresh_token(
self,
user_id: str,
client_id: str,
refresh_token: str,
token_hash: str,
expires_at: datetime,
):
"""Store refresh token in database."""
async with self.db.acquire() as conn:
await conn.execute(
"""
INSERT INTO refresh_tokens
(user_id, client_id, token_hash, expires_at, created_at)
VALUES ($1, $2, $3, $4, NOW())
""",
user_id, client_id, token_hash, expires_at,
)
async def revoke_token(self, token_hash: str):
"""Revoke a refresh token."""
async with self.db.acquire() as conn:
await conn.execute(
"DELETE FROM refresh_tokens WHERE token_hash = $1",
token_hash,
)
async def revoke_all_user_tokens(self, user_id: str, client_id: str = None):
"""Revoke all tokens for a user."""
async with self.db.acquire() as conn:
if client_id:
await conn.execute(
"DELETE FROM refresh_tokens WHERE user_id = $1 AND client_id = $2",
user_id, client_id,
)
else:
await conn.execute(
"DELETE FROM refresh_tokens WHERE user_id = $1",
user_id,
)
def hash_token(self, token: str) -> str:
"""Hash token for secure storage."""
import hashlib
return hashlib.sha256(token.encode()).hexdigest()
Security Best Practices
Authorization Code Flow Security
def secure_authorization_flow():
"""Security best practices for authorization flow."""
# 1. Use PKCE even for confidential clients
code_verifier = generate_code_verifier()
code_challenge = generate_code_challenge(code_verifier)
# 2. Generate and verify state
state = secrets.token_urlsafe(32)
# 3. Use nonce to prevent replay attacks
nonce = secrets.token_urlsafe(32)
# 4. Validate redirect_uri matches registered URI
# 5. Use HTTPS for all redirects
# 6. Short authorization codes (expires quickly)
# 7. Bind token to client_id
Token Security
def token_security_practices():
"""Best practices for token management."""
# Access tokens:
# - Short expiry (5-15 minutes)
# - Store in memory, not localStorage (for browser)
# - Send only over HTTPS
# Refresh tokens:
# - Longer expiry (days/weeks)
# - One-time use (rotation)
# - Store securely (encrypted at rest)
# - Bind to client
# Always use secure cookies for browser
# httponly=True, secure=True, SameSite=Strict
CSRF Protection
# State parameter verification
def verify_state(state: str, stored_state: str) -> bool:
"""Verify state parameter to prevent CSRF."""
if not state or not stored_state:
return False
# Use constant-time comparison
import hmac
return hmac.compare_digest(state, stored_state)
Token Introspection
def introspect_token(
introspection_url: str,
token: str,
client_id: str,
client_secret: str,
) -> dict:
"""Introspect token with authorization server."""
import requests
response = requests.post(
introspection_url,
data={"token": token},
auth=(client_id, client_secret),
)
return response.json()
Implementation Examples
Flask Integration
from flask import Flask, request, jsonify, session, redirect
import requests
from functools import wraps
app = Flask(__name__)
app.secret_key = secrets.token_urlsafe(32)
# OIDC Configuration
OIDC_CONFIG = {
"issuer": "https://auth.example.com",
"client_id": "my-flask-app",
"client_secret": "secret",
"redirect_uri": "http://localhost:5000/callback",
"scopes": ["openid", "profile", "email"],
}
class OIDCAuth:
def __init__(self, config: dict):
self.config = config
self._discover()
def _discover(self):
response = requests.get(
f"{self.config['issuer']}/.well-known/openid-configuration"
)
self.discovery = response.json()
def login(self):
import secrets
state = secrets.token_urlsafe(32)
nonce = secrets.token_urlsafe(32)
session["oauth_state"] = state
session["oauth_nonce"] = nonce
params = {
"response_type": "code",
"client_id": self.config["client_id"],
"redirect_uri": self.config["redirect_uri"],
"scope": " ".join(self.config["scopes"]),
"state": state,
"nonce": nonce,
}
return f"{self.discovery['authorization_endpoint']}?{requests.compat.urlencode(params)}"
def callback(self, code: str, state: str) -> dict:
import jwt
if state != session.get("oauth_state"):
raise ValueError("Invalid state")
# Exchange code for tokens
token_response = requests.post(
self.discovery["token_endpoint"],
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": self.config["redirect_uri"],
"client_id": self.config["client_id"],
"client_secret": self.config["client_secret"],
},
)
tokens = token_response.json()
# Verify ID token
# ... (verification code)
# Get userinfo
userinfo_response = requests.get(
self.discovery["userinfo_endpoint"],
headers={"Authorization": f"Bearer {tokens['access_token']}"},
)
userinfo = userinfo_response.json()
# Store in session
session["user"] = userinfo
session["access_token"] = tokens["access_token"]
return userinfo
auth = OIDCAuth(OIDC_CONFIG)
@app.route("/login")
def login():
return redirect(auth.login())
@app.route("/callback")
def callback():
code = request.args.get("code")
state = request.args.get("state")
auth.callback(code, state)
return redirect("/profile")
@app.route("/profile")
def profile():
if "user" not in session:
return redirect("/login")
return jsonify(session["user"])
@app.route("/logout")
def logout():
session.clear()
return redirect("/")
# Protected route decorator
def login_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if "user" not in session:
return redirect("/login")
return f(*args, **kwargs)
return decorated
@app.route("/api/data")
@login_required
def api_data():
return jsonify({"data": "secure data"})
if __name__ == "__main__":
app.run(debug=True)
FastAPI Integration
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
from typing import Optional
import requests
import jwt
app = FastAPI()
# OAuth2 scheme
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# OIDC Configuration
OIDC_CONFIG = {
"issuer": "https://auth.example.com",
"client_id": "my-fastapi-app",
"jwks_uri": "https://auth.example.com/.well-known/jwks.json",
}
# Cache for JWKS
jwks_cache = {"keys": None, "expiry": 0}
class User(BaseModel):
sub: str
name: Optional[str] = None
email: Optional[str] = None
picture: Optional[str] = None
async def get_jwks():
import time
if jwks_cache["keys"] is None or time.time() > jwks_cache["expiry"]:
response = requests.get(OIDC_CONFIG["jwks_uri"])
jwks_cache["keys"] = response.json()["keys"]
jwks_cache["expiry"] = time.time() + 3600
return jwks_cache["keys"]
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
"""Validate token and return current user."""
try:
# Get JWKS
jwks = await get_jwks()
# Get signing key
unverified_header = jwt.get_unverified_header(token)
signing_key = None
for key in jwks:
if key.get("kid") == unverified_header.get("kid"):
signing_key = jwt.algorithms.RSAAlgorithm.from_jwk(key)
break
if signing_key is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Unable to find signing key",
)
# Decode and verify token
claims = jwt.decode(
token,
signing_key,
algorithms=["RS256"],
audience="my-fastapi-app",
issuer=OIDC_CONFIG["issuer"],
)
return User(
sub=claims["sub"],
name=claims.get("name"),
email=claims.get("email"),
picture=claims.get("picture"),
)
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired",
)
except jwt.InvalidTokenError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid token: {str(e)}",
)
@app.get("/protected")
async def protected_route(user: User = Depends(get_current_user)):
"""Protected route that requires authentication."""
return {
"message": f"Hello, {user.name}!",
"user": user.dict(),
}
@app.get("/public")
async def public_route():
"""Public route that doesn't require authentication."""
return {"message": "This is a public route"}
Conclusion
OAuth 2.0 and OpenID Connect are essential protocols for modern authentication and authorization. OAuth 2.0 provides the foundation for secure delegated access, while OIDC adds an identity layer on top for user authentication.
Key takeaways:
- Use Authorization Code flow with PKCE for web and mobile applications
- Always validate tokens, including signature, expiration, audience, and issuer
- Implement proper token storage and refresh mechanisms
- Use short-lived access tokens with refresh token rotation
- Follow security best practices: HTTPS, state parameters, nonce validation
- OIDC simplifies user authentication by providing standardized user info
Resources
- OAuth 2.0 Specification
- OpenID Connect Specification
- OAuth 2.0 Security Best Practices
- Auth0 Documentation
- Okta Developer
- JWT.io
Comments