Skip to main content
โšก Calmops

Authentication and Authorization: Security Patterns

Introduction

Authentication and authorization form the foundation of application security. Understanding these patterns is critical for building secure systems. This guide covers authentication methods, authorization patterns, and implementing robust identity systems.

Authentication verifies who you are. Authorization determines what you can do. Both are essential for application security.

Authentication Methods

Password-Based Authentication

# Secure password handling
import bcrypt
from dataclasses import dataclass

@dataclass
class User:
    id: str
    email: str
    password_hash: bytes
    salt: bytes

async def create_user(email: str, password: str) -> User:
    # Generate salt
    salt = bcrypt.gensalt()
    
    # Hash password
    password_hash = bcrypt.hashpw(password.encode(), salt)
    
    return User(
        id=generate_id(),
        email=email,
        password_hash=password_hash,
        salt=salt
    )

async def verify_password(user: User, password: str) -> bool:
    return bcrypt.checkpw(
        password.encode(),
        user.password_hash
    )

JWT Implementation

import jwt
from datetime import datetime, timedelta
from typing import Optional

class JWTManager:
    def __init__(self, secret_key: str, algorithm: str = "HS256"):
        self.secret_key = secret_key
        self.algorithm = algorithm
    
    def create_access_token(self, user_id: str, roles: list) -> str:
        payload = {
            "sub": user_id,
            "roles": roles,
            "exp": datetime.utcnow() + timedelta(hours=1),
            "iat": datetime.utcnow()
        }
        return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
    
    def create_refresh_token(self, user_id: str) -> str:
        payload = {
            "sub": user_id,
            "type": "refresh",
            "exp": datetime.utcnow() + timedelta(days=30),
            "iat": datetime.utcnow()
        }
        return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
    
    def verify_token(self, token: str) -> Optional[dict]:
        try:
            return jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
        except jwt.ExpiredSignatureError:
            return None
        except jwt.InvalidTokenError:
            return None

OAuth 2.0

# OAuth 2.0 Authorization Code Flow
oauth_flow = {
    "step1": "User clicks 'Login with Google'",
    "step2": "Redirect to Google OAuth with client_id and redirect_uri",
    "step3": "User authorizes application",
    "step4": "Google redirects with authorization code",
    "step5": "Exchange code for access token",
    "step6": "Use access token to get user info"
}

# Implementation
async def exchange_code_for_token(code: str, client_id: str, 
                                  client_secret: str, redirect_uri: str):
    response = await requests.post(
        "https://oauth2.googleapis.com/token",
        data={
            "code": code,
            "client_id": client_id,
            "client_secret": client_secret,
            "redirect_uri": redirect_uri,
            "grant_type": "authorization_code"
        }
    )
    return response.json()

async def get_user_info(access_token: str):
    response = await requests.get(
        "https://www.googleapis.com/oauth2/v2/userinfo",
        headers={"Authorization": f"Bearer {access_token}"}
    )
    return response.json()

Authorization Patterns

RBAC (Role-Based Access Control)

from enum import Enum
from typing import Set

class Role(Enum):
    ADMIN = "admin"
    USER = "user"
    MODERATOR = "moderator"

class Permission(Enum):
    READ_USERS = "read:users"
    WRITE_USERS = "write:users"
    READ_ORDERS = "read:orders"
    WRITE_ORDERS = "write:orders"
    DELETE_CONTENT = "delete:content"

# Role to permissions mapping
ROLE_PERMISSIONS: dict[Role, Set[Permission]] = {
    Role.ADMIN: {
        Permission.READ_USERS, Permission.WRITE_USERS,
        Permission.READ_ORDERS, Permission.WRITE_ORDERS,
        Permission.DELETE_CONTENT
    },
    Role.MODERATOR: {
        Permission.READ_USERS,
        Permission.READ_ORDERS,
        Permission.DELETE_CONTENT
    },
    Role.USER: {
        Permission.READ_ORDERS,
    }
}

def has_permission(user_role: Role, permission: Permission) -> bool:
    return permission in ROLE_PERMISSIONS.get(user_role, set())

ABAC (Attribute-Based Access Control)

# ABAC policy engine
from dataclasses import dataclass
from datetime import datetime

@dataclass
class AccessRequest:
    subject: dict  # user attributes
    resource: dict  # resource attributes
    action: str
    context: dict  # time, location, etc.

class ABACPolicy:
    def __init__(self):
        self.policies = []
    
    def add_policy(self, policy: dict):
        self.policies.append(policy)
    
    def evaluate(self, request: AccessRequest) -> bool:
        for policy in self.policies:
            if self._matches_policy(request, policy):
                return policy.get("effect") == "PERMIT"
        return False
    
    def _matches_policy(self, request: AccessRequest, policy: dict) -> bool:
        # Check subject
        if "subject" in policy:
            if not self._match_attributes(request.subject, policy["subject"]):
                return False
        
        # Check resource
        if "resource" in policy:
            if not self._match_attributes(request.resource, policy["resource"]):
                return False
        
        # Check action
        if "action" in policy:
            if request.action not in policy["action"]:
                return False
        
        return True
    
    def _match_attributes(self, attributes: dict, conditions: dict) -> bool:
        for key, condition in conditions.items():
            value = attributes.get(key)
            if isinstance(condition, dict):
                # Handle operators like eq, gt, lt
                if "eq" in condition and value != condition["eq"]:
                    return False
            elif value != condition:
                return False
        return True

API Security

Middleware Implementation

# Authentication middleware
async def auth_middleware(request, call_next):
    # Extract token
    auth_header = request.headers.get("Authorization")
    
    if not auth_header:
        return Response({"error": "No token"}, status_code=401)
    
    if not auth_header.startswith("Bearer "):
        return Response({"error": "Invalid token format"}, status_code=401)
    
    token = auth_header[7:]
    
    # Verify token
    payload = jwt_manager.verify_token(token)
    if not payload:
        return Response({"error": "Invalid token"}, status_code=401)
    
    # Add user to request
    request.user_id = payload["sub"]
    request.roles = payload.get("roles", [])
    
    return await call_next(request)

# Authorization middleware
def require_permissions(*required_permissions):
    async def middleware(request, call_next):
        for role in request.roles:
            for permission in required_permissions:
                if has_permission(role, permission):
                    return await call_next(request)
        
        return Response({"error": "Forbidden"}, status_code=403)
    return middleware

Rate Limiting

# Token bucket rate limiting
from time import time

class RateLimiter:
    def __init__(self, capacity: int, refill_rate: float):
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.buckets = {}
    
    def allow_request(self, key: str) -> bool:
        now = time()
        
        if key not in self.buckets:
            self.buckets[key] = {
                "tokens": self.capacity,
                "last_refill": now
            }
        
        bucket = self.buckets[key]
        
        # Refill tokens
        elapsed = now - bucket["last_refill"]
        bucket["tokens"] = min(
            self.capacity,
            bucket["tokens"] + elapsed * self.refill_rate
        )
        bucket["last_refill"] = now
        
        # Check if request allowed
        if bucket["tokens"] >= 1:
            bucket["tokens"] -= 1
            return True
        
        return False

Best Practices

  1. Use strong password hashing: bcrypt, Argon2
  2. Implement MFA: Multi-factor authentication
  3. Use HTTPS: Always encrypt in transit
  4. Validate tokens: Check expiration, signature
  5. Implement proper logout: Invalidate tokens
  6. Log security events: Monitor for anomalies
  7. Principle of least privilege: Grant minimum access

Conclusion

Authentication and authorization are critical security components. By implementing robust patternsโ€”JWT for stateless auth, RBAC/ABAC for authorization, and proper token managementโ€”you can build secure systems that protect user data.

Comments