Introduction
Authentication and authorization form the foundation of application security. Understanding these patterns is critical for building secure systems. This guide covers authentication methods, authorization patterns, and implementing robust identity systems.
Authentication verifies who you are. Authorization determines what you can do. Both are essential for application security.
Authentication Methods
Password-Based Authentication
# Secure password handling
import bcrypt
from dataclasses import dataclass
@dataclass
class User:
id: str
email: str
password_hash: bytes
salt: bytes
async def create_user(email: str, password: str) -> User:
# Generate salt
salt = bcrypt.gensalt()
# Hash password
password_hash = bcrypt.hashpw(password.encode(), salt)
return User(
id=generate_id(),
email=email,
password_hash=password_hash,
salt=salt
)
async def verify_password(user: User, password: str) -> bool:
return bcrypt.checkpw(
password.encode(),
user.password_hash
)
JWT Implementation
import jwt
from datetime import datetime, timedelta
from typing import Optional
class JWTManager:
def __init__(self, secret_key: str, algorithm: str = "HS256"):
self.secret_key = secret_key
self.algorithm = algorithm
def create_access_token(self, user_id: str, roles: list) -> str:
payload = {
"sub": user_id,
"roles": roles,
"exp": datetime.utcnow() + timedelta(hours=1),
"iat": datetime.utcnow()
}
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
def create_refresh_token(self, user_id: str) -> str:
payload = {
"sub": user_id,
"type": "refresh",
"exp": datetime.utcnow() + timedelta(days=30),
"iat": datetime.utcnow()
}
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
def verify_token(self, token: str) -> Optional[dict]:
try:
return jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
OAuth 2.0
# OAuth 2.0 Authorization Code Flow
oauth_flow = {
"step1": "User clicks 'Login with Google'",
"step2": "Redirect to Google OAuth with client_id and redirect_uri",
"step3": "User authorizes application",
"step4": "Google redirects with authorization code",
"step5": "Exchange code for access token",
"step6": "Use access token to get user info"
}
# Implementation
async def exchange_code_for_token(code: str, client_id: str,
client_secret: str, redirect_uri: str):
response = await requests.post(
"https://oauth2.googleapis.com/token",
data={
"code": code,
"client_id": client_id,
"client_secret": client_secret,
"redirect_uri": redirect_uri,
"grant_type": "authorization_code"
}
)
return response.json()
async def get_user_info(access_token: str):
response = await requests.get(
"https://www.googleapis.com/oauth2/v2/userinfo",
headers={"Authorization": f"Bearer {access_token}"}
)
return response.json()
Authorization Patterns
RBAC (Role-Based Access Control)
from enum import Enum
from typing import Set
class Role(Enum):
ADMIN = "admin"
USER = "user"
MODERATOR = "moderator"
class Permission(Enum):
READ_USERS = "read:users"
WRITE_USERS = "write:users"
READ_ORDERS = "read:orders"
WRITE_ORDERS = "write:orders"
DELETE_CONTENT = "delete:content"
# Role to permissions mapping
ROLE_PERMISSIONS: dict[Role, Set[Permission]] = {
Role.ADMIN: {
Permission.READ_USERS, Permission.WRITE_USERS,
Permission.READ_ORDERS, Permission.WRITE_ORDERS,
Permission.DELETE_CONTENT
},
Role.MODERATOR: {
Permission.READ_USERS,
Permission.READ_ORDERS,
Permission.DELETE_CONTENT
},
Role.USER: {
Permission.READ_ORDERS,
}
}
def has_permission(user_role: Role, permission: Permission) -> bool:
return permission in ROLE_PERMISSIONS.get(user_role, set())
ABAC (Attribute-Based Access Control)
# ABAC policy engine
from dataclasses import dataclass
from datetime import datetime
@dataclass
class AccessRequest:
subject: dict # user attributes
resource: dict # resource attributes
action: str
context: dict # time, location, etc.
class ABACPolicy:
def __init__(self):
self.policies = []
def add_policy(self, policy: dict):
self.policies.append(policy)
def evaluate(self, request: AccessRequest) -> bool:
for policy in self.policies:
if self._matches_policy(request, policy):
return policy.get("effect") == "PERMIT"
return False
def _matches_policy(self, request: AccessRequest, policy: dict) -> bool:
# Check subject
if "subject" in policy:
if not self._match_attributes(request.subject, policy["subject"]):
return False
# Check resource
if "resource" in policy:
if not self._match_attributes(request.resource, policy["resource"]):
return False
# Check action
if "action" in policy:
if request.action not in policy["action"]:
return False
return True
def _match_attributes(self, attributes: dict, conditions: dict) -> bool:
for key, condition in conditions.items():
value = attributes.get(key)
if isinstance(condition, dict):
# Handle operators like eq, gt, lt
if "eq" in condition and value != condition["eq"]:
return False
elif value != condition:
return False
return True
API Security
Middleware Implementation
# Authentication middleware
async def auth_middleware(request, call_next):
# Extract token
auth_header = request.headers.get("Authorization")
if not auth_header:
return Response({"error": "No token"}, status_code=401)
if not auth_header.startswith("Bearer "):
return Response({"error": "Invalid token format"}, status_code=401)
token = auth_header[7:]
# Verify token
payload = jwt_manager.verify_token(token)
if not payload:
return Response({"error": "Invalid token"}, status_code=401)
# Add user to request
request.user_id = payload["sub"]
request.roles = payload.get("roles", [])
return await call_next(request)
# Authorization middleware
def require_permissions(*required_permissions):
async def middleware(request, call_next):
for role in request.roles:
for permission in required_permissions:
if has_permission(role, permission):
return await call_next(request)
return Response({"error": "Forbidden"}, status_code=403)
return middleware
Rate Limiting
# Token bucket rate limiting
from time import time
class RateLimiter:
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.refill_rate = refill_rate
self.buckets = {}
def allow_request(self, key: str) -> bool:
now = time()
if key not in self.buckets:
self.buckets[key] = {
"tokens": self.capacity,
"last_refill": now
}
bucket = self.buckets[key]
# Refill tokens
elapsed = now - bucket["last_refill"]
bucket["tokens"] = min(
self.capacity,
bucket["tokens"] + elapsed * self.refill_rate
)
bucket["last_refill"] = now
# Check if request allowed
if bucket["tokens"] >= 1:
bucket["tokens"] -= 1
return True
return False
Best Practices
- Use strong password hashing: bcrypt, Argon2
- Implement MFA: Multi-factor authentication
- Use HTTPS: Always encrypt in transit
- Validate tokens: Check expiration, signature
- Implement proper logout: Invalidate tokens
- Log security events: Monitor for anomalies
- Principle of least privilege: Grant minimum access
Conclusion
Authentication and authorization are critical security components. By implementing robust patternsโJWT for stateless auth, RBAC/ABAC for authorization, and proper token managementโyou can build secure systems that protect user data.
Comments