Skip to main content
โšก Calmops

API Security Patterns: Authentication, Authorization, and Protection Strategies

Introduction

API security is critical in modern applications. APIs expose functionality to clients, making them attractive targets for attackers. A single vulnerability can lead to data breaches, service disruption, and reputational damage. Understanding security patterns is essential for building robust APIs.

This guide covers authentication mechanisms, authorization patterns, rate limiting, and protection against common vulnerabilities like injection attacks and broken authentication.

Authentication Patterns

JWT Authentication

JSON Web Tokens (JWT) provide a compact, URL-safe way to transmit claims between parties. JWTs are commonly used for stateless authentication.

import jwt
import time
from dataclasses import dataclass
from typing import Optional, Dict, Any
from datetime import datetime, timedelta

@dataclass
class JWTConfig:
    secret_key: str
    algorithm: str = "HS256"
    access_token_expire: int = 900  # 15 minutes
    refresh_token_expire: int = 604800  # 7 days
    issuer: str = "api"

class JWTHandler:
    """JWT token generation and validation."""
    
    def __init__(self, config: JWTConfig):
        self.config = config
    
    def create_access_token(
        self,
        user_id: str,
        roles: list = None,
        additional_claims: Dict = None
    ) -> str:
        """Create an access token."""
        now = int(time.time())
        
        payload = {
            "sub": user_id,
            "type": "access",
            "iat": now,
            "exp": now + self.config.access_token_expire,
            "iss": self.config.issuer,
            "roles": roles or []
        }
        
        if additional_claims:
            payload.update(additional_claims)
        
        return jwt.encode(payload, self.config.secret_key, algorithm=self.config.algorithm)
    
    def create_refresh_token(self, user_id: str) -> str:
        """Create a refresh token."""
        now = int(time.time())
        
        payload = {
            "sub": user_id,
            "type": "refresh",
            "iat": now,
            "exp": now + self.config.refresh_token_expire,
            "iss": self.config.issuer
        }
        
        return jwt.encode(payload, self.config.secret_key, algorithm=self.config.algorithm)
    
    def validate_token(self, token: str) -> Optional[Dict]:
        """Validate and decode a token."""
        try:
            payload = jwt.decode(
                token,
                self.config.secret_key,
                algorithms=[self.config.algorithm],
                issuer=self.config.issuer
            )
            return payload
        except jwt.ExpiredSignatureError:
            return None
        except jwt.InvalidTokenError:
            return None
    
    def refresh_access_token(self, refresh_token: str) -> Optional[Dict]:
        """Use refresh token to get new access token."""
        payload = self.validate_token(refresh_token)
        
        if not payload or payload.get("type") != "refresh":
            return None
        
        return {
            "access_token": self.create_access_token(
                payload["sub"],
                payload.get("roles", [])
            ),
            "token_type": "bearer"
        }

OAuth 2.0 Implementation

from enum import Enum
from dataclasses import dataclass
from urllib.parse import urlencode
import secrets

class GrantType(Enum):
    AUTHORIZATION_CODE = "authorization_code"
    CLIENT_CREDENTIALS = "client_credentials"
    REFRESH_TOKEN = "refresh_token"

@dataclass
class AuthorizationCode:
    code: str
    client_id: str
    redirect_uri: str
    user_id: str
    expires_at: float
    scope: str

class OAuth2Provider:
    """OAuth 2.0 authorization server."""
    
    def __init__(self, jwt_handler: JWTHandler, token_store):
        self.jwt = jwt_handler
        self.token_store = token_store
        self.auth_codes: Dict[str, AuthorizationCode] = {}
        self.clients: Dict[str, dict] = {}
    
    def register_client(
        self,
        client_id: str,
        client_secret: str,
        redirect_uris: list,
        name: str
    ) -> dict:
        """Register a new OAuth client."""
        self.clients[client_id] = {
            "client_id": client_id,
            "client_secret": client_secret,
            "redirect_uris": redirect_uris,
            "name": name
        }
        return self.clients[client_id]
    
    def create_authorization_code(
        self,
        client_id: str,
        redirect_uri: str,
        user_id: str,
        scope: str
    ) -> str:
        """Create an authorization code for authorization code flow."""
        code = secrets.token_urlsafe(32)
        
        self.auth_codes[code] = AuthorizationCode(
            code=code,
            client_id=client_id,
            redirect_uri=redirect_uri,
            user_id=user_id,
            expires_at=time.time() + 600,  # 10 minutes
            scope=scope
        )
        
        return code
    
    def exchange_code_for_tokens(
        self,
        code: str,
        client_id: str,
        client_secret: str,
        redirect_uri: str
    ) -> Optional[dict]:
        """Exchange authorization code for tokens."""
        auth_code = self.auth_codes.get(code)
        
        if not auth_code:
            return None
        
        if time.time() > auth_code.expires_at:
            del self.auth_codes[code]
            return None
        
        client = self.clients.get(client_id)
        if not client or client["client_secret"] != client_secret:
            return None
        
        if redirect_uri != auth_code.redirect_uri:
            return None
        
        # Delete used code
        del self.auth_codes[code]
        
        # Generate tokens
        access_token = self.jwt.create_access_token(
            user_id=auth_code.user_id,
            roles=[],  # Add roles from user
            additional_claims={"scope": auth_code.scope}
        )
        refresh_token = self.jwt.create_refresh_token(auth_code.user_id)
        
        return {
            "access_token": access_token,
            "refresh_token": refresh_token,
            "token_type": "bearer",
            "expires_in": 900
        }
    
    def client_credentials_grant(
        self,
        client_id: str,
        client_secret: str,
        scope: str
    ) -> Optional[dict]:
        """Handle client credentials grant."""
        client = self.clients.get(client_id)
        if not client or client["client_secret"] != client_secret:
            return None
        
        # In client credentials flow, there's no user
        access_token = self.jwt.create_access_token(
            user_id=client_id,
            roles=["client"],
            additional_claims={"scope": scope}
        )
        
        return {
            "access_token": access_token,
            "token_type": "bearer",
            "expires_in": 900
        }

Authorization Patterns

Role-Based Access Control (RBAC)

from enum import Enum
from dataclasses import dataclass
from typing import List, Set

class Permission(Enum):
    READ = "read"
    WRITE = "write"
    DELETE = "delete"
    ADMIN = "admin"

@dataclass
class Role:
    name: str
    permissions: Set[Permission]
    inherits: List[str] = None

class RBACService:
    """Role-based access control service."""
    
    def __init__(self):
        self.roles: Dict[str, Role] = {}
        self.user_roles: Dict[str, Set[str]] = {}
    
    def define_role(self, name: str, permissions: Set[Permission], inherits: List[str] = None) -> Role:
        """Define a new role."""
        self.roles[name] = Role(name, permissions, inherits)
        return self.roles[name]
    
    def assign_role(self, user_id: str, role_name: str) -> None:
        """Assign a role to a user."""
        if user_id not in self.user_roles:
            self.user_roles[user_id] = set()
        self.user_roles[user_id].add(role_name)
    
    def get_user_permissions(self, user_id: str) -> Set[Permission]:
        """Get all permissions for a user."""
        permissions = set()
        user_role_names = self.user_roles.get(user_id, set())
        
        for role_name in user_role_names:
            permissions.update(self._get_role_permissions(role_name))
        
        return permissions
    
    def _get_role_permissions(self, role_name: str) -> Set[Permission]:
        """Get permissions for a role, including inherited."""
        role = self.roles.get(role_name)
        if not role:
            return set()
        
        permissions = set(role.permissions)
        
        if role.inherits:
            for inherited_role in role.inherits:
                permissions.update(self._get_role_permissions(inherited_role))
        
        return permissions
    
    def has_permission(self, user_id: str, permission: Permission) -> bool:
        """Check if user has a specific permission."""
        user_permissions = self.get_user_permissions(user_id)
        return permission in user_permissions
    
    def has_any_permission(self, user_id: str, permissions: List[Permission]) -> bool:
        """Check if user has any of the specified permissions."""
        user_permissions = self.get_user_permissions(user_id)
        return bool(user_permissions.intersection(set(permissions)))

API Key Authentication

import hashlib
import secrets
from datetime import datetime, timedelta

@dataclass
class APIKey:
    key_id: str
    key_hash: str
    user_id: str
    name: str
    permissions: set
    created_at: datetime
    expires_at: datetime
    last_used: datetime
    is_active: bool

class APIKeyManager:
    """Manage API keys for service authentication."""
    
    def __init__(self, db_connection):
        self.db = db_connection
    
    def create_key(
        self,
        user_id: str,
        name: str,
        permissions: set,
        expires_in_days: int = 365
    ) -> str:
        """Create a new API key. Returns the raw key (shown only once)."""
        raw_key = f"sk_{secrets.token_urlsafe(48)}"
        key_id = secrets.token_urlsafe(16)
        key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
        
        now = datetime.utcnow()
        
        self.db.execute(
            """INSERT INTO api_keys
               (key_id, key_hash, user_id, name, permissions, created_at, expires_at, last_used, is_active)
               VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
            key_id, key_hash, user_id, name, ",".join(permissions),
            now, now + timedelta(days=expires_in_days), now, True
        )
        
        return f"{key_id}_{raw_key}"
    
    def validate_key(self, key: str) -> Optional[dict]:
        """Validate an API key."""
        if not key.startswith("sk_"):
            return None
        
        parts = key.split("_", 1)
        if len(parts) != 2:
            return None
        
        key_id, raw_key = parts
        key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
        
        row = self.db.query_one(
            "SELECT * FROM api_keys WHERE key_id = ? AND is_active = 1", key_id
        )
        
        if not row:
            return None
        
        stored_hash = row["key_hash"]
        if not secrets.compare_digest(key_hash, stored_hash):
            return None
        
        if datetime.fromisoformat(row["expires_at"]) < datetime.utcnow():
            return None
        
        # Update last used
        self.db.execute(
            "UPDATE api_keys SET last_used = ? WHERE key_id = ?",
            datetime.utcnow(), key_id
        )
        
        return {
            "user_id": row["user_id"],
            "permissions": set(row["permissions"].split(",")),
            "name": row["name"]
        }
    
    def revoke_key(self, key_id: str) -> bool:
        """Revoke an API key."""
        result = self.db.execute(
            "UPDATE api_keys SET is_active = 0 WHERE key_id = ?", key_id
        )
        return result > 0

Rate Limiting

import time
from dataclasses import dataclass
from typing import Dict, Tuple

@dataclass
class RateLimitConfig:
    requests: int
    window_seconds: int

class TokenBucketRateLimiter:
    """Token bucket rate limiter using Redis."""
    
    def __init__(self, redis_client, config: RateLimitConfig):
        self.redis = redis_client
        self.config = config
    
    def is_allowed(self, identifier: str) -> Tuple[bool, Dict]:
        """Check if request is allowed. Returns (allowed, headers)."""
        key = f"ratelimit:{identifier}"
        
        now = time.time()
        window_start = now - self.config.window_seconds
        
        pipe = self.redis.pipeline()
        pipe.zadd(key, {str(now): now})
        pipe.zremrangebyscore(key, 0, window_start)
        pipe.zcard(key)
        pipe.expire(key, self.config.window_seconds)
        results = pipe.execute()
        
        current_count = results[2]
        remaining = max(0, self.config.requests - current_count)
        
        if current_count > self.config.requests:
            return False, {
                "X-RateLimit-Limit": str(self.config.requests),
                "X-RateLimit-Remaining": "0",
                "X-RateLimit-Reset": str(int(now + self.config.window_seconds)),
                "Retry-After": str(self.config.window_seconds)
            }
        
        return True, {
            "X-RateLimit-Limit": str(self.config.requests),
            "X-RateLimit-Remaining": str(remaining),
            "X-RateLimit-Reset": str(int(now + self.config.window_seconds))
        }

Security Best Practices

Input Validation

import re
from pydantic import BaseModel, validator, EmailStr
from typing import Optional

class UserCreateRequest(BaseModel):
    email: EmailStr
    name: str
    password: str
    
    @validator("name")
    def validate_name(cls, v):
        if len(v) < 2 or len(v) > 100:
            raise ValueError("Name must be 2-100 characters")
        if not re.match(r"^[a-zA-Z\s\-']+$", v):
            raise ValueError("Name contains invalid characters")
        return v
    
    @validator("password")
    def validate_password(cls, v):
        if len(v) < 8:
            raise ValueError("Password must be at least 8 characters")
        if not re.search(r"[A-Z]", v):
            raise ValueError("Password must contain uppercase letter")
        if not re.search(r"[a-z]", v):
            raise ValueError("Password must contain lowercase letter")
        if not re.search(r"[0-9]", v):
            raise ValueError("Password must contain number")
        return v

class Sanitizer:
    """Input sanitization utilities."""
    
    @staticmethod
    def sanitize_html(value: str) -> str:
        """Remove potentially dangerous HTML."""
        # In production, use a library like bleach
        return value.replace("<", "&lt;").replace(">", "&gt;")
    
    @staticmethod
    def sanitize_sql(value: str) -> str:
        """Prevent SQL injection by escaping."""
        return value.replace("'", "''")

Conclusion

API security requires defense in depth: multiple layers of protection. Authentication verifies identity, authorization controls access, rate limiting prevents abuse, and input validation prevents injection attacks. No single measure is sufficient; together they create robust security.

Key practices: use established protocols (OAuth 2.0, JWT), implement proper rate limiting, validate all input, use HTTPS, and keep dependencies updated. Security is not a one-time effort but an ongoing process.

Resources

  • OWASP API Security Top 10
  • OAuth 2.0 RFC 6749
  • JWT RFC 7519
  • “Security Engineering” by Ross Anderson

Comments