The API Gateway pattern provides a unified entry point for a system of microservices. It handles cross-cutting concerns like authentication, rate limiting, and request routing, simplifying client interactions and centralizing policy enforcement.
Understanding API Gateway
The Problem Without API Gateway
┌─────────────────────────────────────────────────────────────────┐
│ Direct Client-to-Microservices Communication │
│ │
│ Mobile App │
│ │ │
│ ├───► User Service (auth, profile) │
│ ├───► Order Service (orders, history) │
│ ├───► Payment Service (billing, cards) │
│ ├───► Inventory (stock, products) │
│ └───► Notification (email, push) │
│ │
│ Problems: │
│ ✗ Client knows about all service endpoints │
│ ✗ Each client implements same logic (auth, retry) │
│ ✗ No centralized policy enforcement │
│ ✗ Cross-origin requests from browsers │
│ ✗ Tight coupling between clients and services │
└─────────────────────────────────────────────────────────────────┘
With API Gateway
┌─────────────────────────────────────────────────────────────────┐
│ API Gateway Architecture │
│ │
│ Mobile App │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ API Gateway │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Auth │ │ Rate │ │ Route │ │ Transform│ │ │
│ │ │ Handler │ │ Limiter │ │ Engine │ │ Layer │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ├───► User Service │
│ ├───► Order Service │
│ ├───► Payment Service │
│ ├───► Inventory │
│ └───► Notification │
│ │
│ Benefits: │
│ ✓ Single entry point for all clients │
│ ✓ Centralized cross-cutting concerns │
│ ✓ Service discovery and dynamic routing │
│ ✓ Protocol translation (REST ↔ gRPC) │
└─────────────────────────────────────────────────────────────────┘
Gateway Responsibilities
Request Routing
class RouteConfig:
def __init__(self):
self.routes = [
Route(
path="/api/v1/users/*",
service="user-service",
timeout=5000,
retry_policy={"attempts": 3, "backoff": "exponential"}
),
Route(
path="/api/v1/orders/*",
service="order-service",
timeout=10000,
retry_policy={"attempts": 2, "backoff": "linear"}
),
Route(
path="/api/v1/payments/*",
service="payment-service",
timeout=30000,
retry_policy={"attempts": 1} # No retry for payments
),
]
class RoutingEngine:
def __init__(self, route_config: RouteConfig, service_discovery: ServiceRegistry):
self.routes = route_config.routes
self.registry = service_discovery
def resolve_route(self, request: Request) -> RouteResult:
for route in self.routes:
if self._match_path(request.path, route.path):
service_host = self.registry.resolve(route.service)
return RouteResult(
target_url=f"http://{service_host}{request.path}",
timeout=route.timeout,
retry_policy=route.retry_policy
)
return RouteResult(error="No route found", status_code=404)
def _match_path(self, request_path: str, pattern: str) -> bool:
if pattern.endswith("/*"):
prefix = pattern[:-2]
return request_path.startswith(prefix)
return request_path == pattern
Authentication and Authorization
class AuthHandler:
def __init__(self, jwt_secret: str, user_cache: Redis):
self.secret = jwt_secret
self.cache = user_cache
async def authenticate(self, request: Request) -> AuthResult:
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return AuthResult(success=False, error="Missing token", status=401)
token = auth_header[7:]
try:
payload = jwt.decode(token, self.secret, algorithms=["HS256"])
cached_user = await self.cache.get(f"user:{payload['sub']}")
if cached_user:
user = json.loads(cached_user)
else:
user = await self._fetch_user(payload['sub'])
await self.cache.setex(
f"user:{payload['sub']}",
300, # 5 min cache
json.dumps(user)
)
return AuthResult(
success=True,
user_id=payload['sub'],
roles=user.get("roles", []),
permissions=user.get("permissions", [])
)
except jwt.ExpiredSignatureError:
return AuthResult(success=False, error="Token expired", status=401)
except jwt.InvalidTokenError:
return AuthResult(success=False, error="Invalid token", status=401)
def authorize(self, auth_result: AuthResult, required_permission: str) -> bool:
if not auth_result.success:
return False
if "admin" in auth_result.roles:
return True
return required_permission in auth_result.permissions
Rate Limiting
import time
from collections import defaultdict
import asyncio
class TokenBucketRateLimiter:
def __init__(self, rate: int, capacity: int):
self.rate = rate
self.capacity = capacity
self.buckets: dict[str, dict] = defaultdict(self._create_bucket)
def _create_bucket(self):
return {
"tokens": self.capacity,
"last_refill": time.time()
}
async def allow_request(self, key: str, cost: int = 1) -> bool:
bucket = self.buckets[key]
now = time.time()
elapsed = now - bucket["last_refill"]
bucket["tokens"] = min(
self.capacity,
bucket["tokens"] + elapsed * self.rate
)
bucket["last_refill"] = now
if bucket["tokens"] >= cost:
bucket["tokens"] -= cost
return True
return False
class RateLimitConfig:
BY_USER = {"rate": 100, "capacity": 100} # 100 req/sec per user
BY_IP = {"rate": 1000, "capacity": 1000} # 1000 req/sec per IP
BY_API_KEY = {"rate": 10000, "capacity": 5000} # Premium tier
class RateLimitMiddleware:
def __init__(self):
self.limiters = {
"user": TokenBucketRateLimiter(**RateLimitConfig.BY_USER),
"ip": TokenBucketRateLimiter(**RateLimitConfig.BY_IP),
"api_key": TokenBucketRateLimiter(**RateLimitConfig.BY_API_KEY)
}
async def check_rate_limit(self, request: Request) -> RateLimitResult:
client_ip = request.client_ip
user_id = request.auth_user_id
api_key = request.headers.get("X-API-Key")
limits = []
if api_key:
allowed = await self.limiters["api_key"].allow_request(api_key)
limits.append(("api_key", allowed))
if not allowed:
return RateLimitResult(
allowed=False,
retry_after=1,
limit_type="api_key"
)
if user_id:
allowed = await self.limiters["user"].allow_request(user_id)
limits.append(("user", allowed))
if not allowed:
return RateLimitResult(
allowed=False,
retry_after=1,
limit_type="user"
)
allowed = await self.limiters["ip"].allow_request(client_ip)
limits.append(("ip", allowed))
return RateLimitResult(
allowed=all(allowed for _, allowed in limits),
current_rates={k: v for k, v in limits}
)
Request/Response Transformation
class RequestTransformer:
def transform_incoming(self, request: Request, service_config: dict) -> TransformedRequest:
transformed = Request(
method=request.method,
path=self._transform_path(request.path, service_config),
headers=self._transform_headers(request.headers, service_config),
body=self._transform_body(request.body, service_config, direction="in"),
query_params=self._transform_query(request.query_params, service_config)
)
transformed.headers["X-Request-ID"] = request.request_id
transformed.headers["X-Forwarded-For"] = request.client_ip
transformed.headers["X-Gateway-Timestamp"] = str(int(time.time()))
return transformed
def _transform_path(self, path: str, config: dict) -> str:
if "path_rewrite" in config:
for pattern, replacement in config["path_rewrite"].items():
if pattern in path:
return path.replace(pattern, replacement)
return path
def _transform_headers(self, headers: dict, config: dict) -> dict:
transformed = {}
header_mapping = config.get("header_mapping", {})
for original, target in header_mapping.items():
if original in headers:
transformed[target] = headers[original]
return transformed
def _transform_body(self, body: dict, config: dict, direction: str) -> dict:
if "body_transform" not in config:
return body
transform = config["body_transform"]
if direction == "in" and "wrap" in transform:
return {transform["wrap"]: body}
if direction == "out" and "unwrap" in transform:
return body.get(transform["unwrap"], body)
return body
class ResponseTransformer:
def transform_outgoing(
self,
response: Response,
client_config: dict
) -> TransformedResponse:
transformed = Response(
status_code=self._map_status_code(response.status_code),
headers=self._transform_headers(response.headers, client_config),
body=self._transform_body(response.body, client_config)
)
transformed.headers["X-Gateway-Response-Time"] = str(response.processing_time)
return transformed
def _map_status_code(self, code: int) -> int:
code_mapping = {
501: 404, # Service not implemented -> Not found
502: 503, # Bad gateway -> Service unavailable
503: 504, # Service unavailable -> Gateway timeout
}
return code_mapping.get(code, code)
Implementation Examples
Kong API Gateway Configuration
# kong.yml - Declarative configuration
_format_version: "3.0"
services:
- name: user-service
url: http://user-service:8080
routes:
- name: user-routes
paths:
- /api/v1/users
methods:
- GET
- POST
- PUT
- DELETE
strip_path: true
plugins:
- name: jwt
config:
key_claim_name: kid
claims_to_verify:
- exp
- name: rate-limiting
config:
minute: 100
hour: 10000
policy: local
fault_tolerant: true
- name: cors
config:
origins:
- "https://app.example.com"
methods:
- GET
- POST
- PUT
- DELETE
- OPTIONS
headers:
- Authorization
- Content-Type
exposed_headers:
- X-Total-Count
credentials: true
max_age: 3600
- name: order-service
url: http://order-service:8081
routes:
- name: order-routes
paths:
- /api/v1/orders
strip_path: true
plugins:
- name: jwt
- name: rate-limiting
config:
minute: 50
hour: 5000
- name: request-transformer
config:
add:
headers:
- X-Service-Name:order-service
- name: response-transformer
config:
add:
headers:
- X-Gateway:Kong
consumers:
- username: mobile-app
plugins:
- name: rate-limiting
config:
minute: 1000
hour: 50000
- username: web-app
plugins:
- name: rate-limiting
config:
minute: 500
hour: 20000
- username: partner-api
plugins:
- name: rate-limiting
config:
minute: 5000
hour: 200000
Custom Gateway Implementation
import asyncio
from aiohttp import web
from aiohttp_middlewares import cors_middleware
import logging
class APIGateway:
def __init__(
self,
route_config: RouteConfig,
auth_handler: AuthHandler,
rate_limiter: RateLimitMiddleware,
transformer: RequestTransformer,
http_client: AsyncHTTPClient
):
self.routes = route_config
self.auth = auth_handler
self.rate_limiter = rate_limiter
self.transformer = transformer
self.client = http_client
self.metrics = MetricsCollector()
async def handle_request(self, request: web.Request) -> web.Response:
request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
start_time = time.time()
try:
rate_result = await self.rate_limiter.check_rate_limit(request)
if not rate_result.allowed:
return web.Response(
status=429,
headers={
"Retry-After": str(rate_result.retry_after),
"X-RateLimit-Limit-Type": rate_result.limit_type
},
text="Rate limit exceeded"
)
auth_result = await self.auth.authenticate(request)
if not auth_result.success:
return web.Response(
status=auth_result.status,
text=auth_result.error
)
route_result = self.routes.resolve_route(request)
if route_result.error:
return web.Response(status=404, text=route_result.error)
transformed_req = self.transformer.transform_incoming(
request,
route_result.service_config
)
upstream_response = await self.client.request(
method=transformed_req.method,
url=route_result.target_url,
headers=transformed_req.headers,
data=transformed_req.body,
timeout=route_result.timeout
)
transformed_resp = ResponseTransformer().transform_outgoing(
upstream_response,
request.headers.get("Accept", "application/json")
)
processing_time = (time.time() - start_time) * 1000
self.metrics.record_request(
service=route_result.service,
status=upstream_response.status,
duration=processing_time
)
return web.Response(
status=transformed_resp.status_code,
headers=transformed_resp.headers,
body=transformed_resp.body
)
except asyncio.TimeoutError:
return web.Response(status=504, text="Gateway timeout")
except Exception as e:
logging.error(f"Gateway error: {e}")
return web.Response(status=500, text="Internal gateway error")
Service Discovery Integration
class ConsulServiceRegistry:
def __init__(self, consul_host: str, consul_port: int):
self.consul = consul.Consul(host=consul_host, port=consul_port)
async def resolve(self, service_name: str) -> str:
_, services = self.consul.health.service(service_name, passing=True)
if not services:
raise ServiceUnavailable(f"No instances of {service_name}")
healthy = [s for s in services if self._is_healthy(s)]
if not healthy:
raise ServiceUnavailable(f"No healthy instances of {service_name}")
instance = self._select_instance(healthy)
return f"{instance['Address']}:{instance['ServicePort']}"
def _is_healthy(self, service: dict) -> bool:
checks = service.get("Checks", [])
passing = [c for c in checks if c["Status"] == "passing"]
return len(passing) > 0
def _select_instance(self, instances: list) -> dict:
# Simple round-robin or choose least connections
return random.choice(instances)
class KubernetesServiceRegistry:
def __init__(self, kube_client):
self.client = kube_client
async def resolve(self, service_name: str, namespace: str = "default") -> str:
service = self.client.read_namespaced_service(
name=service_name,
namespace=namespace
)
endpoints = self.client.read_namespaced_endpoints(
name=service_name,
namespace=namespace
)
ready_addresses = [
subset.addresses[0].target_ref.name
for subset in endpoints.subsets
if subset.ready_addresses
]
if not ready_addresses:
raise ServiceUnavailable(f"No ready pods for {service_name}")
# Use Kubernetes DNS
return f"{service_name}.{namespace}.svc.cluster.local"
Circuit Breaker Integration
class GatewayCircuitBreaker:
def __init__(self):
self.breakers: dict[str, CircuitBreaker] = {}
def get_breaker(self, service_name: str) -> CircuitBreaker:
if service_name not in self.breakers:
self.breakers[service_name] = CircuitBreaker(
failure_threshold=5,
recovery_timeout=30,
half_open_requests=3
)
return self.breakers[service_name]
async def call_service(
self,
service_name: str,
request: Request
) -> Response:
breaker = self.get_breaker(service_name)
if not breaker.can_execute():
return Response(
status=503,
body=json.dumps({"error": "Service temporarily unavailable"})
)
try:
response = await self._make_request(request)
breaker.record_success()
return response
except Exception as e:
breaker.record_failure()
raise
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 30,
half_open_requests: int = 3
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_requests = half_open_requests
self.state = "CLOSED"
self.failures = 0
self.last_failure_time = None
self.half_open_successes = 0
def can_execute(self) -> bool:
if self.state == "CLOSED":
return True
if self.state == "OPEN":
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "HALF_OPEN"
self.half_open_successes = 0
return True
return False
if self.state == "HALF_OPEN":
return True
return False
def record_success(self):
if self.state == "HALF_OPEN":
self.half_open_successes += 1
if self.half_open_successes >= self.half_open_requests:
self.state = "CLOSED"
self.failures = 0
else:
self.failures = 0
def record_failure(self):
self.failures += 1
self.last_failure_time = time.time()
if self.state == "HALF_OPEN":
self.state = "OPEN"
elif self.failures >= self.failure_threshold:
self.state = "OPEN"
Caching Strategies
class GatewayCache:
def __init__(self, redis_client: Redis, stats: StatsD):
self.redis = redis_client
self.stats = stats
async def get_cached_response(self, cache_key: str) -> Response | None:
cached = await self.redis.get(cache_key)
if cached:
self.stats.increment("gateway.cache.hit")
return json.loads(cached)
self.stats.increment("gateway.cache.miss")
return None
async def cache_response(
self,
cache_key: str,
response: Response,
ttl: int
):
cache_data = {
"status": response.status_code,
"headers": dict(response.headers),
"body": response.body
}
await self.redis.setex(
cache_key,
ttl,
json.dumps(cache_data)
)
class CacheKeyGenerator:
def __init__(self, config: CacheConfig):
self.config = config
def generate_key(self, request: Request) -> str:
components = [
request.method,
request.path,
]
if self.config.vary_by_user:
components.append(request.auth_user_id)
if self.config.vary_by_headers:
for header in self.config.vary_by_headers:
if header in request.headers:
components.append(request.headers[header])
key_string = ":".join(str(c) for c in components)
return f"gateway:cache:{hashlib.md5(key_string.encode()).hexdigest()}"
Monitoring and Observability
class GatewayMetrics:
def __init__(self, prometheus_client: Prometheus):
self.requests_total = Counter(
"gateway_requests_total",
"Total requests",
["method", "path", "status"]
)
self.request_duration = Histogram(
"gateway_request_duration_ms",
"Request duration in ms",
["method", "path", "service"]
)
self.rate_limit_hits = Counter(
"gateway_rate_limit_hits_total",
"Rate limit hits",
["limiter_type"]
)
self.upstream_status = Counter(
"gateway_upstream_status_total",
"Upstream service status",
["service", "status"]
)
# Example Prometheus queries
PROMETHEUS_QUERIES = {
"requests_per_second": """
sum(rate(gateway_requests_total[5m])) by (service)
""",
"p99_latency": """
histogram_quantile(0.99,
sum(rate(gateway_request_duration_ms_bucket[5m]))
by (le, service)
)
""",
"error_rate": """
sum(rate(gateway_requests_total{status=~"5.."}[5m]))
/ sum(rate(gateway_requests_total[5m]))
""",
"rate_limit_efficiency": """
sum(rate(gateway_rate_limit_hits_total[5m]))
/ sum(rate(gateway_requests_total[5m]))
"""
}
Best Practices
Good Patterns
GOOD_PATTERNS = {
"stateless_gateway": """
# Keep gateway stateless for easy scaling
✅ Good:
- Use external cache (Redis) for session/rate limit state
- Store configurations in etcd/Consul
- Scale horizontally with load balancer
❌ Bad:
- In-memory state for rate limiting
- Sticky sessions for gateway nodes
""",
"graceful_degradation": """
# Implement circuit breakers and fallback responses
✅ Good:
- Return cached data if service is down
- Show degraded response instead of error
- Have fallback services for critical paths
❌ Bad:
- Propagate all failures to clients
- No caching of any responses
- Single point of failure in gateway
""",
"security_layers": """
# Defense in depth for API security
✅ Good:
- TLS termination at gateway
- JWT validation with short expiry
- Request validation with schema
- IP allowlisting for admin routes
❌ Bad:
- Trust all internal traffic
- No input validation
- Expose internal service names
"""
}
Bad Patterns
BAD_PATTERNS = {
"single_point_of_failure": """
❌ Bad:
- Gateway as single instance
- No health checks
- No automatic failover
✅ Good:
- Multiple gateway instances
- Health checks and auto-restart
- Load balancer with auto-scaling
""",
"blocking_operations": """
❌ Bad:
- Synchronous logging to disk
- Blocking auth calls
- No timeouts on upstream calls
✅ Good:
- Async logging to external system
- Cached auth tokens
- Proper timeout configuration
""",
"bypassing_gateway": """
❌ Bad:
- Services expose direct public endpoints
- Clients connect directly to services
- No gateway for internal traffic
✅ Good:
- All traffic through gateway
- Internal services not exposed
- Consistent policy everywhere
"""
}
GraphQL Federation at the Gateway
Modern gateways increasingly support GraphQL federation, composing multiple subgraphs into a unified schema:
# Supergraph schema (composed by gateway)
type Query {
user(id: ID!): User # From user-subgraph
order(id: ID!): Order # From order-subgraph
}
# User subgraph
type User @key(fields: "id") {
id: ID!
name: String!
orders: [Order!]! # Resolved across subgraphs
}
# Order subgraph
type Order @key(fields: "id") {
id: ID!
total: Float!
userId: ID!
}
Tools like Apollo Router and WunderGraph operate as dedicated federation gateways, handling schema stitching, query planning, and subgraph health checks.
Backend for Frontend (BFF) Pattern
When different clients need different API surfaces, a BFF layer tailors responses per client type:
| Client | Concerns | BFF Responsibilities |
|---|---|---|
| Mobile | Minimize payload size | Data aggregation, image resizing |
| Web | Rich data for UIs | GraphQL federation, server-side rendering |
| IoT | Tiny payloads, binary protocols | Protocol translation, data filtering |
| Partner | Full access, webhook delivery | API key management, rate limit tiers |
Emerging Gateway Trends
- AI Gateway: Gateways that handle LLM API consumption—token rate limiting, prompt injection detection, cost tracking, and response caching for OpenAI/Anthropic APIs.
- WebAssembly (Wasm) Plugins: Run custom middleware as Wasm modules for isolation and polyglot support (e.g., Envoy Wasm filters).
- eBPF-Based Gateways: Kernel-level traffic filtering for sub-millisecond routing decisions, used in Cilium and Istio ambient mesh.
- Service Mesh Integration: Gateways as the ingress tier of a service mesh, delegating mTLS, traffic splitting, and observability to the mesh sidecar.
Related Articles
- Microservices Patterns
- Circuit Breaker Pattern
- Rate Limiting Strategies
- API Design Trends 2026
- API Versioning Strategies
Conclusion
The API Gateway pattern is a foundational building block for microservices architectures. A well-designed gateway provides:
- Single Entry Point — Simplifies client code and presents a unified API surface
- Cross-Cutting Concerns — Centralized authentication, rate limiting, logging, and request validation
- Protocol Translation — Bridging REST, gRPC, GraphQL, and WebSocket clients to backend services
- Resilience — Circuit breakers, retries, timeouts, and edge caching prevent cascade failures
- Observability — Unified metrics, tracing, and logging across all upstream services
- Evolving Capabilities — GraphQL federation, BFF per client type, Wasm plugins, and AI gateway patterns
Popular implementations include Kong, NGINX, Traefik, Envoy, AWS API Gateway, and Azure API Management. The right choice depends on your scale, deployment model (self-hosted vs. managed), and feature requirements. As API ecosystems grow more complex—spanning microservices, third-party integrations, and AI agents—the gateway’s role as a unified control point becomes increasingly valuable.
Comments