Introduction
API security is critical in modern applications. APIs expose functionality to clients, making them attractive targets for attackers. A single vulnerability can lead to data breaches, service disruption, and reputational damage. Understanding security patterns is essential for building robust APIs.
This guide covers authentication mechanisms, authorization patterns, rate limiting, and protection against common vulnerabilities like injection attacks and broken authentication.
Authentication Patterns
JWT Authentication
JSON Web Tokens (JWT) provide a compact, URL-safe way to transmit claims between parties. JWTs are commonly used for stateless authentication.
import jwt
import time
from dataclasses import dataclass
from typing import Optional, Dict, Any
from datetime import datetime, timedelta
@dataclass
class JWTConfig:
secret_key: str
algorithm: str = "HS256"
access_token_expire: int = 900 # 15 minutes
refresh_token_expire: int = 604800 # 7 days
issuer: str = "api"
class JWTHandler:
"""JWT token generation and validation."""
def __init__(self, config: JWTConfig):
self.config = config
def create_access_token(
self,
user_id: str,
roles: list = None,
additional_claims: Dict = None
) -> str:
"""Create an access token."""
now = int(time.time())
payload = {
"sub": user_id,
"type": "access",
"iat": now,
"exp": now + self.config.access_token_expire,
"iss": self.config.issuer,
"roles": roles or []
}
if additional_claims:
payload.update(additional_claims)
return jwt.encode(payload, self.config.secret_key, algorithm=self.config.algorithm)
def create_refresh_token(self, user_id: str) -> str:
"""Create a refresh token."""
now = int(time.time())
payload = {
"sub": user_id,
"type": "refresh",
"iat": now,
"exp": now + self.config.refresh_token_expire,
"iss": self.config.issuer
}
return jwt.encode(payload, self.config.secret_key, algorithm=self.config.algorithm)
def validate_token(self, token: str) -> Optional[Dict]:
"""Validate and decode a token."""
try:
payload = jwt.decode(
token,
self.config.secret_key,
algorithms=[self.config.algorithm],
issuer=self.config.issuer
)
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
def refresh_access_token(self, refresh_token: str) -> Optional[Dict]:
"""Use refresh token to get new access token."""
payload = self.validate_token(refresh_token)
if not payload or payload.get("type") != "refresh":
return None
return {
"access_token": self.create_access_token(
payload["sub"],
payload.get("roles", [])
),
"token_type": "bearer"
}
OAuth 2.0 Implementation
from enum import Enum
from dataclasses import dataclass
from urllib.parse import urlencode
import secrets
class GrantType(Enum):
AUTHORIZATION_CODE = "authorization_code"
CLIENT_CREDENTIALS = "client_credentials"
REFRESH_TOKEN = "refresh_token"
@dataclass
class AuthorizationCode:
code: str
client_id: str
redirect_uri: str
user_id: str
expires_at: float
scope: str
class OAuth2Provider:
"""OAuth 2.0 authorization server."""
def __init__(self, jwt_handler: JWTHandler, token_store):
self.jwt = jwt_handler
self.token_store = token_store
self.auth_codes: Dict[str, AuthorizationCode] = {}
self.clients: Dict[str, dict] = {}
def register_client(
self,
client_id: str,
client_secret: str,
redirect_uris: list,
name: str
) -> dict:
"""Register a new OAuth client."""
self.clients[client_id] = {
"client_id": client_id,
"client_secret": client_secret,
"redirect_uris": redirect_uris,
"name": name
}
return self.clients[client_id]
def create_authorization_code(
self,
client_id: str,
redirect_uri: str,
user_id: str,
scope: str
) -> str:
"""Create an authorization code for authorization code flow."""
code = secrets.token_urlsafe(32)
self.auth_codes[code] = AuthorizationCode(
code=code,
client_id=client_id,
redirect_uri=redirect_uri,
user_id=user_id,
expires_at=time.time() + 600, # 10 minutes
scope=scope
)
return code
def exchange_code_for_tokens(
self,
code: str,
client_id: str,
client_secret: str,
redirect_uri: str
) -> Optional[dict]:
"""Exchange authorization code for tokens."""
auth_code = self.auth_codes.get(code)
if not auth_code:
return None
if time.time() > auth_code.expires_at:
del self.auth_codes[code]
return None
client = self.clients.get(client_id)
if not client or client["client_secret"] != client_secret:
return None
if redirect_uri != auth_code.redirect_uri:
return None
# Delete used code
del self.auth_codes[code]
# Generate tokens
access_token = self.jwt.create_access_token(
user_id=auth_code.user_id,
roles=[], # Add roles from user
additional_claims={"scope": auth_code.scope}
)
refresh_token = self.jwt.create_refresh_token(auth_code.user_id)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
"expires_in": 900
}
def client_credentials_grant(
self,
client_id: str,
client_secret: str,
scope: str
) -> Optional[dict]:
"""Handle client credentials grant."""
client = self.clients.get(client_id)
if not client or client["client_secret"] != client_secret:
return None
# In client credentials flow, there's no user
access_token = self.jwt.create_access_token(
user_id=client_id,
roles=["client"],
additional_claims={"scope": scope}
)
return {
"access_token": access_token,
"token_type": "bearer",
"expires_in": 900
}
Authorization Patterns
Role-Based Access Control (RBAC)
from enum import Enum
from dataclasses import dataclass
from typing import List, Set
class Permission(Enum):
READ = "read"
WRITE = "write"
DELETE = "delete"
ADMIN = "admin"
@dataclass
class Role:
name: str
permissions: Set[Permission]
inherits: List[str] = None
class RBACService:
"""Role-based access control service."""
def __init__(self):
self.roles: Dict[str, Role] = {}
self.user_roles: Dict[str, Set[str]] = {}
def define_role(self, name: str, permissions: Set[Permission], inherits: List[str] = None) -> Role:
"""Define a new role."""
self.roles[name] = Role(name, permissions, inherits)
return self.roles[name]
def assign_role(self, user_id: str, role_name: str) -> None:
"""Assign a role to a user."""
if user_id not in self.user_roles:
self.user_roles[user_id] = set()
self.user_roles[user_id].add(role_name)
def get_user_permissions(self, user_id: str) -> Set[Permission]:
"""Get all permissions for a user."""
permissions = set()
user_role_names = self.user_roles.get(user_id, set())
for role_name in user_role_names:
permissions.update(self._get_role_permissions(role_name))
return permissions
def _get_role_permissions(self, role_name: str) -> Set[Permission]:
"""Get permissions for a role, including inherited."""
role = self.roles.get(role_name)
if not role:
return set()
permissions = set(role.permissions)
if role.inherits:
for inherited_role in role.inherits:
permissions.update(self._get_role_permissions(inherited_role))
return permissions
def has_permission(self, user_id: str, permission: Permission) -> bool:
"""Check if user has a specific permission."""
user_permissions = self.get_user_permissions(user_id)
return permission in user_permissions
def has_any_permission(self, user_id: str, permissions: List[Permission]) -> bool:
"""Check if user has any of the specified permissions."""
user_permissions = self.get_user_permissions(user_id)
return bool(user_permissions.intersection(set(permissions)))
API Key Authentication
import hashlib
import secrets
from datetime import datetime, timedelta
@dataclass
class APIKey:
key_id: str
key_hash: str
user_id: str
name: str
permissions: set
created_at: datetime
expires_at: datetime
last_used: datetime
is_active: bool
class APIKeyManager:
"""Manage API keys for service authentication."""
def __init__(self, db_connection):
self.db = db_connection
def create_key(
self,
user_id: str,
name: str,
permissions: set,
expires_in_days: int = 365
) -> str:
"""Create a new API key. Returns the raw key (shown only once)."""
raw_key = f"sk_{secrets.token_urlsafe(48)}"
key_id = secrets.token_urlsafe(16)
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
now = datetime.utcnow()
self.db.execute(
"""INSERT INTO api_keys
(key_id, key_hash, user_id, name, permissions, created_at, expires_at, last_used, is_active)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
key_id, key_hash, user_id, name, ",".join(permissions),
now, now + timedelta(days=expires_in_days), now, True
)
return f"{key_id}_{raw_key}"
def validate_key(self, key: str) -> Optional[dict]:
"""Validate an API key."""
if not key.startswith("sk_"):
return None
parts = key.split("_", 1)
if len(parts) != 2:
return None
key_id, raw_key = parts
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
row = self.db.query_one(
"SELECT * FROM api_keys WHERE key_id = ? AND is_active = 1", key_id
)
if not row:
return None
stored_hash = row["key_hash"]
if not secrets.compare_digest(key_hash, stored_hash):
return None
if datetime.fromisoformat(row["expires_at"]) < datetime.utcnow():
return None
# Update last used
self.db.execute(
"UPDATE api_keys SET last_used = ? WHERE key_id = ?",
datetime.utcnow(), key_id
)
return {
"user_id": row["user_id"],
"permissions": set(row["permissions"].split(",")),
"name": row["name"]
}
def revoke_key(self, key_id: str) -> bool:
"""Revoke an API key."""
result = self.db.execute(
"UPDATE api_keys SET is_active = 0 WHERE key_id = ?", key_id
)
return result > 0
Rate Limiting
import time
from dataclasses import dataclass
from typing import Dict, Tuple
@dataclass
class RateLimitConfig:
requests: int
window_seconds: int
class TokenBucketRateLimiter:
"""Token bucket rate limiter using Redis."""
def __init__(self, redis_client, config: RateLimitConfig):
self.redis = redis_client
self.config = config
def is_allowed(self, identifier: str) -> Tuple[bool, Dict]:
"""Check if request is allowed. Returns (allowed, headers)."""
key = f"ratelimit:{identifier}"
now = time.time()
window_start = now - self.config.window_seconds
pipe = self.redis.pipeline()
pipe.zadd(key, {str(now): now})
pipe.zremrangebyscore(key, 0, window_start)
pipe.zcard(key)
pipe.expire(key, self.config.window_seconds)
results = pipe.execute()
current_count = results[2]
remaining = max(0, self.config.requests - current_count)
if current_count > self.config.requests:
return False, {
"X-RateLimit-Limit": str(self.config.requests),
"X-RateLimit-Remaining": "0",
"X-RateLimit-Reset": str(int(now + self.config.window_seconds)),
"Retry-After": str(self.config.window_seconds)
}
return True, {
"X-RateLimit-Limit": str(self.config.requests),
"X-RateLimit-Remaining": str(remaining),
"X-RateLimit-Reset": str(int(now + self.config.window_seconds))
}
Security Best Practices
Input Validation
import re
from pydantic import BaseModel, validator, EmailStr
from typing import Optional
class UserCreateRequest(BaseModel):
email: EmailStr
name: str
password: str
@validator("name")
def validate_name(cls, v):
if len(v) < 2 or len(v) > 100:
raise ValueError("Name must be 2-100 characters")
if not re.match(r"^[a-zA-Z\s\-']+$", v):
raise ValueError("Name contains invalid characters")
return v
@validator("password")
def validate_password(cls, v):
if len(v) < 8:
raise ValueError("Password must be at least 8 characters")
if not re.search(r"[A-Z]", v):
raise ValueError("Password must contain uppercase letter")
if not re.search(r"[a-z]", v):
raise ValueError("Password must contain lowercase letter")
if not re.search(r"[0-9]", v):
raise ValueError("Password must contain number")
return v
class Sanitizer:
"""Input sanitization utilities."""
@staticmethod
def sanitize_html(value: str) -> str:
"""Remove potentially dangerous HTML."""
# In production, use a library like bleach
return value.replace("<", "<").replace(">", ">")
@staticmethod
def sanitize_sql(value: str) -> str:
"""Prevent SQL injection by escaping."""
return value.replace("'", "''")
Conclusion
API security requires defense in depth: multiple layers of protection. Authentication verifies identity, authorization controls access, rate limiting prevents abuse, and input validation prevents injection attacks. No single measure is sufficient; together they create robust security.
Key practices: use established protocols (OAuth 2.0, JWT), implement proper rate limiting, validate all input, use HTTPS, and keep dependencies updated. Security is not a one-time effort but an ongoing process.
Resources
- OWASP API Security Top 10
- OAuth 2.0 RFC 6749
- JWT RFC 7519
- “Security Engineering” by Ross Anderson
Comments