The API Gateway pattern provides a unified entry point for a system of microservices. It handles cross-cutting concerns like authentication, rate limiting, and request routing, simplifying client interactions and centralizing policy enforcement.
Understanding API Gateway
The Problem Without API Gateway
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Direct Client-to-Microservices Communication โ
โ โ
โ Mobile App โ
โ โ โ
โ โโโโโบ User Service (auth, profile) โ
โ โโโโโบ Order Service (orders, history) โ
โ โโโโโบ Payment Service (billing, cards) โ
โ โโโโโบ Inventory (stock, products) โ
โ โโโโโบ Notification (email, push) โ
โ โ
โ Problems: โ
โ โ Client knows about all service endpoints โ
โ โ Each client implements same logic (auth, retry) โ
โ โ No centralized policy enforcement โ
โ โ Cross-origin requests from browsers โ
โ โ Tight coupling between clients and services โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
With API Gateway
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ API Gateway Architecture โ
โ โ
โ Mobile App โ
โ โ โ
โ โผ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ API Gateway โ โ
โ โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ โ
โ โ โ Auth โ โ Rate โ โ Route โ โ Transformโ โ โ
โ โ โ Handler โ โ Limiter โ โ Engine โ โ Layer โ โ โ
โ โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโบ User Service โ
โ โโโโโบ Order Service โ
โ โโโโโบ Payment Service โ
โ โโโโโบ Inventory โ
โ โโโโโบ Notification โ
โ โ
โ Benefits: โ
โ โ Single entry point for all clients โ
โ โ Centralized cross-cutting concerns โ
โ โ Service discovery and dynamic routing โ
โ โ Protocol translation (REST โ gRPC) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Gateway Responsibilities
Request Routing
class RouteConfig:
def __init__(self):
self.routes = [
Route(
path="/api/v1/users/*",
service="user-service",
timeout=5000,
retry_policy={"attempts": 3, "backoff": "exponential"}
),
Route(
path="/api/v1/orders/*",
service="order-service",
timeout=10000,
retry_policy={"attempts": 2, "backoff": "linear"}
),
Route(
path="/api/v1/payments/*",
service="payment-service",
timeout=30000,
retry_policy={"attempts": 1} # No retry for payments
),
]
class RoutingEngine:
def __init__(self, route_config: RouteConfig, service_discovery: ServiceRegistry):
self.routes = route_config.routes
self.registry = service_discovery
def resolve_route(self, request: Request) -> RouteResult:
for route in self.routes:
if self._match_path(request.path, route.path):
service_host = self.registry.resolve(route.service)
return RouteResult(
target_url=f"http://{service_host}{request.path}",
timeout=route.timeout,
retry_policy=route.retry_policy
)
return RouteResult(error="No route found", status_code=404)
def _match_path(self, request_path: str, pattern: str) -> bool:
if pattern.endswith("/*"):
prefix = pattern[:-2]
return request_path.startswith(prefix)
return request_path == pattern
Authentication and Authorization
class AuthHandler:
def __init__(self, jwt_secret: str, user_cache: Redis):
self.secret = jwt_secret
self.cache = user_cache
async def authenticate(self, request: Request) -> AuthResult:
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return AuthResult(success=False, error="Missing token", status=401)
token = auth_header[7:]
try:
payload = jwt.decode(token, self.secret, algorithms=["HS256"])
cached_user = await self.cache.get(f"user:{payload['sub']}")
if cached_user:
user = json.loads(cached_user)
else:
user = await self._fetch_user(payload['sub'])
await self.cache.setex(
f"user:{payload['sub']}",
300, # 5 min cache
json.dumps(user)
)
return AuthResult(
success=True,
user_id=payload['sub'],
roles=user.get("roles", []),
permissions=user.get("permissions", [])
)
except jwt.ExpiredSignatureError:
return AuthResult(success=False, error="Token expired", status=401)
except jwt.InvalidTokenError:
return AuthResult(success=False, error="Invalid token", status=401)
def authorize(self, auth_result: AuthResult, required_permission: str) -> bool:
if not auth_result.success:
return False
if "admin" in auth_result.roles:
return True
return required_permission in auth_result.permissions
Rate Limiting
import time
from collections import defaultdict
import asyncio
class TokenBucketRateLimiter:
def __init__(self, rate: int, capacity: int):
self.rate = rate
self.capacity = capacity
self.buckets: dict[str, dict] = defaultdict(self._create_bucket)
def _create_bucket(self):
return {
"tokens": self.capacity,
"last_refill": time.time()
}
async def allow_request(self, key: str, cost: int = 1) -> bool:
bucket = self.buckets[key]
now = time.time()
elapsed = now - bucket["last_refill"]
bucket["tokens"] = min(
self.capacity,
bucket["tokens"] + elapsed * self.rate
)
bucket["last_refill"] = now
if bucket["tokens"] >= cost:
bucket["tokens"] -= cost
return True
return False
class RateLimitConfig:
BY_USER = {"rate": 100, "capacity": 100} # 100 req/sec per user
BY_IP = {"rate": 1000, "capacity": 1000} # 1000 req/sec per IP
BY_API_KEY = {"rate": 10000, "capacity": 5000} # Premium tier
class RateLimitMiddleware:
def __init__(self):
self.limiters = {
"user": TokenBucketRateLimiter(**RateLimitConfig.BY_USER),
"ip": TokenBucketRateLimiter(**RateLimitConfig.BY_IP),
"api_key": TokenBucketRateLimiter(**RateLimitConfig.BY_API_KEY)
}
async def check_rate_limit(self, request: Request) -> RateLimitResult:
client_ip = request.client_ip
user_id = request.auth_user_id
api_key = request.headers.get("X-API-Key")
limits = []
if api_key:
allowed = await self.limiters["api_key"].allow_request(api_key)
limits.append(("api_key", allowed))
if not allowed:
return RateLimitResult(
allowed=False,
retry_after=1,
limit_type="api_key"
)
if user_id:
allowed = await self.limiters["user"].allow_request(user_id)
limits.append(("user", allowed))
if not allowed:
return RateLimitResult(
allowed=False,
retry_after=1,
limit_type="user"
)
allowed = await self.limiters["ip"].allow_request(client_ip)
limits.append(("ip", allowed))
return RateLimitResult(
allowed=all(allowed for _, allowed in limits),
current_rates={k: v for k, v in limits}
)
Request/Response Transformation
class RequestTransformer:
def transform_incoming(self, request: Request, service_config: dict) -> TransformedRequest:
transformed = Request(
method=request.method,
path=self._transform_path(request.path, service_config),
headers=self._transform_headers(request.headers, service_config),
body=self._transform_body(request.body, service_config, direction="in"),
query_params=self._transform_query(request.query_params, service_config)
)
transformed.headers["X-Request-ID"] = request.request_id
transformed.headers["X-Forwarded-For"] = request.client_ip
transformed.headers["X-Gateway-Timestamp"] = str(int(time.time()))
return transformed
def _transform_path(self, path: str, config: dict) -> str:
if "path_rewrite" in config:
for pattern, replacement in config["path_rewrite"].items():
if pattern in path:
return path.replace(pattern, replacement)
return path
def _transform_headers(self, headers: dict, config: dict) -> dict:
transformed = {}
header_mapping = config.get("header_mapping", {})
for original, target in header_mapping.items():
if original in headers:
transformed[target] = headers[original]
return transformed
def _transform_body(self, body: dict, config: dict, direction: str) -> dict:
if "body_transform" not in config:
return body
transform = config["body_transform"]
if direction == "in" and "wrap" in transform:
return {transform["wrap"]: body}
if direction == "out" and "unwrap" in transform:
return body.get(transform["unwrap"], body)
return body
class ResponseTransformer:
def transform_outgoing(
self,
response: Response,
client_config: dict
) -> TransformedResponse:
transformed = Response(
status_code=self._map_status_code(response.status_code),
headers=self._transform_headers(response.headers, client_config),
body=self._transform_body(response.body, client_config)
)
transformed.headers["X-Gateway-Response-Time"] = str(response.processing_time)
return transformed
def _map_status_code(self, code: int) -> int:
code_mapping = {
501: 404, # Service not implemented -> Not found
502: 503, # Bad gateway -> Service unavailable
503: 504, # Service unavailable -> Gateway timeout
}
return code_mapping.get(code, code)
Implementation Examples
Kong API Gateway Configuration
# kong.yml - Declarative configuration
_format_version: "3.0"
services:
- name: user-service
url: http://user-service:8080
routes:
- name: user-routes
paths:
- /api/v1/users
methods:
- GET
- POST
- PUT
- DELETE
strip_path: true
plugins:
- name: jwt
config:
key_claim_name: kid
claims_to_verify:
- exp
- name: rate-limiting
config:
minute: 100
hour: 10000
policy: local
fault_tolerant: true
- name: cors
config:
origins:
- "https://app.example.com"
methods:
- GET
- POST
- PUT
- DELETE
- OPTIONS
headers:
- Authorization
- Content-Type
exposed_headers:
- X-Total-Count
credentials: true
max_age: 3600
- name: order-service
url: http://order-service:8081
routes:
- name: order-routes
paths:
- /api/v1/orders
strip_path: true
plugins:
- name: jwt
- name: rate-limiting
config:
minute: 50
hour: 5000
- name: request-transformer
config:
add:
headers:
- X-Service-Name:order-service
- name: response-transformer
config:
add:
headers:
- X-Gateway:Kong
consumers:
- username: mobile-app
plugins:
- name: rate-limiting
config:
minute: 1000
hour: 50000
- username: web-app
plugins:
- name: rate-limiting
config:
minute: 500
hour: 20000
- username: partner-api
plugins:
- name: rate-limiting
config:
minute: 5000
hour: 200000
Custom Gateway Implementation
import asyncio
from aiohttp import web
from aiohttp_middlewares import cors_middleware
import logging
class APIGateway:
def __init__(
self,
route_config: RouteConfig,
auth_handler: AuthHandler,
rate_limiter: RateLimitMiddleware,
transformer: RequestTransformer,
http_client: AsyncHTTPClient
):
self.routes = route_config
self.auth = auth_handler
self.rate_limiter = rate_limiter
self.transformer = transformer
self.client = http_client
self.metrics = MetricsCollector()
async def handle_request(self, request: web.Request) -> web.Response:
request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
start_time = time.time()
try:
rate_result = await self.rate_limiter.check_rate_limit(request)
if not rate_result.allowed:
return web.Response(
status=429,
headers={
"Retry-After": str(rate_result.retry_after),
"X-RateLimit-Limit-Type": rate_result.limit_type
},
text="Rate limit exceeded"
)
auth_result = await self.auth.authenticate(request)
if not auth_result.success:
return web.Response(
status=auth_result.status,
text=auth_result.error
)
route_result = self.routes.resolve_route(request)
if route_result.error:
return web.Response(status=404, text=route_result.error)
transformed_req = self.transformer.transform_incoming(
request,
route_result.service_config
)
upstream_response = await self.client.request(
method=transformed_req.method,
url=route_result.target_url,
headers=transformed_req.headers,
data=transformed_req.body,
timeout=route_result.timeout
)
transformed_resp = ResponseTransformer().transform_outgoing(
upstream_response,
request.headers.get("Accept", "application/json")
)
processing_time = (time.time() - start_time) * 1000
self.metrics.record_request(
service=route_result.service,
status=upstream_response.status,
duration=processing_time
)
return web.Response(
status=transformed_resp.status_code,
headers=transformed_resp.headers,
body=transformed_resp.body
)
except asyncio.TimeoutError:
return web.Response(status=504, text="Gateway timeout")
except Exception as e:
logging.error(f"Gateway error: {e}")
return web.Response(status=500, text="Internal gateway error")
Service Discovery Integration
class ConsulServiceRegistry:
def __init__(self, consul_host: str, consul_port: int):
self.consul = consul.Consul(host=consul_host, port=consul_port)
async def resolve(self, service_name: str) -> str:
_, services = self.consul.health.service(service_name, passing=True)
if not services:
raise ServiceUnavailable(f"No instances of {service_name}")
healthy = [s for s in services if self._is_healthy(s)]
if not healthy:
raise ServiceUnavailable(f"No healthy instances of {service_name}")
instance = self._select_instance(healthy)
return f"{instance['Address']}:{instance['ServicePort']}"
def _is_healthy(self, service: dict) -> bool:
checks = service.get("Checks", [])
passing = [c for c in checks if c["Status"] == "passing"]
return len(passing) > 0
def _select_instance(self, instances: list) -> dict:
# Simple round-robin or choose least connections
return random.choice(instances)
class KubernetesServiceRegistry:
def __init__(self, kube_client):
self.client = kube_client
async def resolve(self, service_name: str, namespace: str = "default") -> str:
service = self.client.read_namespaced_service(
name=service_name,
namespace=namespace
)
endpoints = self.client.read_namespaced_endpoints(
name=service_name,
namespace=namespace
)
ready_addresses = [
subset.addresses[0].target_ref.name
for subset in endpoints.subsets
if subset.ready_addresses
]
if not ready_addresses:
raise ServiceUnavailable(f"No ready pods for {service_name}")
# Use Kubernetes DNS
return f"{service_name}.{namespace}.svc.cluster.local"
Circuit Breaker Integration
class GatewayCircuitBreaker:
def __init__(self):
self.breakers: dict[str, CircuitBreaker] = {}
def get_breaker(self, service_name: str) -> CircuitBreaker:
if service_name not in self.breakers:
self.breakers[service_name] = CircuitBreaker(
failure_threshold=5,
recovery_timeout=30,
half_open_requests=3
)
return self.breakers[service_name]
async def call_service(
self,
service_name: str,
request: Request
) -> Response:
breaker = self.get_breaker(service_name)
if not breaker.can_execute():
return Response(
status=503,
body=json.dumps({"error": "Service temporarily unavailable"})
)
try:
response = await self._make_request(request)
breaker.record_success()
return response
except Exception as e:
breaker.record_failure()
raise
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 30,
half_open_requests: int = 3
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_requests = half_open_requests
self.state = "CLOSED"
self.failures = 0
self.last_failure_time = None
self.half_open_successes = 0
def can_execute(self) -> bool:
if self.state == "CLOSED":
return True
if self.state == "OPEN":
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "HALF_OPEN"
self.half_open_successes = 0
return True
return False
if self.state == "HALF_OPEN":
return True
return False
def record_success(self):
if self.state == "HALF_OPEN":
self.half_open_successes += 1
if self.half_open_successes >= self.half_open_requests:
self.state = "CLOSED"
self.failures = 0
else:
self.failures = 0
def record_failure(self):
self.failures += 1
self.last_failure_time = time.time()
if self.state == "HALF_OPEN":
self.state = "OPEN"
elif self.failures >= self.failure_threshold:
self.state = "OPEN"
Caching Strategies
class GatewayCache:
def __init__(self, redis_client: Redis, stats: StatsD):
self.redis = redis_client
self.stats = stats
async def get_cached_response(self, cache_key: str) -> Response | None:
cached = await self.redis.get(cache_key)
if cached:
self.stats.increment("gateway.cache.hit")
return json.loads(cached)
self.stats.increment("gateway.cache.miss")
return None
async def cache_response(
self,
cache_key: str,
response: Response,
ttl: int
):
cache_data = {
"status": response.status_code,
"headers": dict(response.headers),
"body": response.body
}
await self.redis.setex(
cache_key,
ttl,
json.dumps(cache_data)
)
class CacheKeyGenerator:
def __init__(self, config: CacheConfig):
self.config = config
def generate_key(self, request: Request) -> str:
components = [
request.method,
request.path,
]
if self.config.vary_by_user:
components.append(request.auth_user_id)
if self.config.vary_by_headers:
for header in self.config.vary_by_headers:
if header in request.headers:
components.append(request.headers[header])
key_string = ":".join(str(c) for c in components)
return f"gateway:cache:{hashlib.md5(key_string.encode()).hexdigest()}"
Monitoring and Observability
class GatewayMetrics:
def __init__(self, prometheus_client: Prometheus):
self.requests_total = Counter(
"gateway_requests_total",
"Total requests",
["method", "path", "status"]
)
self.request_duration = Histogram(
"gateway_request_duration_ms",
"Request duration in ms",
["method", "path", "service"]
)
self.rate_limit_hits = Counter(
"gateway_rate_limit_hits_total",
"Rate limit hits",
["limiter_type"]
)
self.upstream_status = Counter(
"gateway_upstream_status_total",
"Upstream service status",
["service", "status"]
)
# Example Prometheus queries
PROMETHEUS_QUERIES = {
"requests_per_second": """
sum(rate(gateway_requests_total[5m])) by (service)
""",
"p99_latency": """
histogram_quantile(0.99,
sum(rate(gateway_request_duration_ms_bucket[5m]))
by (le, service)
)
""",
"error_rate": """
sum(rate(gateway_requests_total{status=~"5.."}[5m]))
/ sum(rate(gateway_requests_total[5m]))
""",
"rate_limit_efficiency": """
sum(rate(gateway_rate_limit_hits_total[5m]))
/ sum(rate(gateway_requests_total[5m]))
"""
}
Best Practices
Good Patterns
GOOD_PATTERNS = {
"stateless_gateway": """
# Keep gateway stateless for easy scaling
โ
Good:
- Use external cache (Redis) for session/rate limit state
- Store configurations in etcd/Consul
- Scale horizontally with load balancer
โ Bad:
- In-memory state for rate limiting
- Sticky sessions for gateway nodes
""",
"graceful_degradation": """
# Implement circuit breakers and fallback responses
โ
Good:
- Return cached data if service is down
- Show degraded response instead of error
- Have fallback services for critical paths
โ Bad:
- Propagate all failures to clients
- No caching of any responses
- Single point of failure in gateway
""",
"security_layers": """
# Defense in depth for API security
โ
Good:
- TLS termination at gateway
- JWT validation with short expiry
- Request validation with schema
- IP allowlisting for admin routes
โ Bad:
- Trust all internal traffic
- No input validation
- Expose internal service names
"""
}
Bad Patterns
BAD_PATTERNS = {
"single_point_of_failure": """
โ Bad:
- Gateway as single instance
- No health checks
- No automatic failover
โ
Good:
- Multiple gateway instances
- Health checks and auto-restart
- Load balancer with auto-scaling
""",
"blocking_operations": """
โ Bad:
- Synchronous logging to disk
- Blocking auth calls
- No timeouts on upstream calls
โ
Good:
- Async logging to external system
- Cached auth tokens
- Proper timeout configuration
""",
"bypassing_gateway": """
โ Bad:
- Services expose direct public endpoints
- Clients connect directly to services
- No gateway for internal traffic
โ
Good:
- All traffic through gateway
- Internal services not exposed
- Consistent policy everywhere
"""
}
Summary
The API Gateway pattern is essential for microservices architectures:
- Single Entry Point - Simplifies client code and provides unified API
- Cross-Cutting Concerns - Centralized auth, rate limiting, logging
- Protocol Translation - REST to gRPC, SOAP to REST
- Service Discovery - Dynamic routing to healthy instances
- Circuit Breaking - Prevent cascade failures
- Caching - Reduce load on backend services
- Observability - Metrics, tracing, logging at the edge
Popular implementations include Kong, NGINX, Traefik, AWS API Gateway, and Azure API Management. Choose based on your scale, complexity, and managed vs. self-hosted preferences.
Comments