Skip to main content

API Gateway Pattern: Unified Entry Point for Microservices

Created: February 28, 2026 Larry Qu 12 min read

The API Gateway pattern provides a unified entry point for a system of microservices. It handles cross-cutting concerns like authentication, rate limiting, and request routing, simplifying client interactions and centralizing policy enforcement.

Understanding API Gateway

The Problem Without API Gateway

┌─────────────────────────────────────────────────────────────────┐
│            Direct Client-to-Microservices Communication          │
│                                                                 │
│    Mobile App                                                   │
│        │                                                        │
│        ├───► User Service    (auth, profile)                   │
│        ├───► Order Service   (orders, history)                  │
│        ├───► Payment Service (billing, cards)                  │
│        ├───► Inventory       (stock, products)                  │
│        └───► Notification    (email, push)                     │
│                                                                 │
│  Problems:                                                      │
│  ✗ Client knows about all service endpoints                     │
│  ✗ Each client implements same logic (auth, retry)             │
│  ✗ No centralized policy enforcement                           │
│  ✗ Cross-origin requests from browsers                         │
│  ✗ Tight coupling between clients and services                 │
└─────────────────────────────────────────────────────────────────┘

With API Gateway

┌─────────────────────────────────────────────────────────────────┐
│                    API Gateway Architecture                      │
│                                                                 │
│    Mobile App                                                   │
│        │                                                        │
│        ▼                                                        │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    API Gateway                            │   │
│  │  ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐          │   │
│  │  │ Auth    │ │ Rate    │ │ Route   │ │ Transform│          │   │
│  │  │ Handler │ │ Limiter │ │ Engine  │ │  Layer  │          │   │
│  │  └─────────┘ └─────────┘ └─────────┘ └─────────┘          │   │
│  └─────────────────────────────────────────────────────────┘   │
│        │                                                        │
│        ├───► User Service                                       │
│        ├───► Order Service                                      │
│        ├───► Payment Service                                    │
│        ├───► Inventory                                         │
│        └───► Notification                                      │
│                                                                 │
│  Benefits:                                                      │
│  ✓ Single entry point for all clients                          │
│  ✓ Centralized cross-cutting concerns                          │
│  ✓ Service discovery and dynamic routing                       │
│  ✓ Protocol translation (REST ↔ gRPC)                          │
└─────────────────────────────────────────────────────────────────┘

Gateway Responsibilities

Request Routing

class RouteConfig:
    def __init__(self):
        self.routes = [
            Route(
                path="/api/v1/users/*",
                service="user-service",
                timeout=5000,
                retry_policy={"attempts": 3, "backoff": "exponential"}
            ),
            Route(
                path="/api/v1/orders/*",
                service="order-service", 
                timeout=10000,
                retry_policy={"attempts": 2, "backoff": "linear"}
            ),
            Route(
                path="/api/v1/payments/*",
                service="payment-service",
                timeout=30000,
                retry_policy={"attempts": 1}  # No retry for payments
            ),
        ]

class RoutingEngine:
    def __init__(self, route_config: RouteConfig, service_discovery: ServiceRegistry):
        self.routes = route_config.routes
        self.registry = service_discovery
    
    def resolve_route(self, request: Request) -> RouteResult:
        for route in self.routes:
            if self._match_path(request.path, route.path):
                service_host = self.registry.resolve(route.service)
                
                return RouteResult(
                    target_url=f"http://{service_host}{request.path}",
                    timeout=route.timeout,
                    retry_policy=route.retry_policy
                )
        
        return RouteResult(error="No route found", status_code=404)
    
    def _match_path(self, request_path: str, pattern: str) -> bool:
        if pattern.endswith("/*"):
            prefix = pattern[:-2]
            return request_path.startswith(prefix)
        return request_path == pattern

Authentication and Authorization

class AuthHandler:
    def __init__(self, jwt_secret: str, user_cache: Redis):
        self.secret = jwt_secret
        self.cache = user_cache
    
    async def authenticate(self, request: Request) -> AuthResult:
        auth_header = request.headers.get("Authorization")
        
        if not auth_header or not auth_header.startswith("Bearer "):
            return AuthResult(success=False, error="Missing token", status=401)
        
        token = auth_header[7:]
        
        try:
            payload = jwt.decode(token, self.secret, algorithms=["HS256"])
            
            cached_user = await self.cache.get(f"user:{payload['sub']}")
            if cached_user:
                user = json.loads(cached_user)
            else:
                user = await self._fetch_user(payload['sub'])
                await self.cache.setex(
                    f"user:{payload['sub']}", 
                    300,  # 5 min cache
                    json.dumps(user)
                )
            
            return AuthResult(
                success=True,
                user_id=payload['sub'],
                roles=user.get("roles", []),
                permissions=user.get("permissions", [])
            )
            
        except jwt.ExpiredSignatureError:
            return AuthResult(success=False, error="Token expired", status=401)
        except jwt.InvalidTokenError:
            return AuthResult(success=False, error="Invalid token", status=401)
    
    def authorize(self, auth_result: AuthResult, required_permission: str) -> bool:
        if not auth_result.success:
            return False
        
        if "admin" in auth_result.roles:
            return True
        
        return required_permission in auth_result.permissions

Rate Limiting

import time
from collections import defaultdict
import asyncio

class TokenBucketRateLimiter:
    def __init__(self, rate: int, capacity: int):
        self.rate = rate
        self.capacity = capacity
        self.buckets: dict[str, dict] = defaultdict(self._create_bucket)
    
    def _create_bucket(self):
        return {
            "tokens": self.capacity,
            "last_refill": time.time()
        }
    
    async def allow_request(self, key: str, cost: int = 1) -> bool:
        bucket = self.buckets[key]
        
        now = time.time()
        elapsed = now - bucket["last_refill"]
        
        bucket["tokens"] = min(
            self.capacity,
            bucket["tokens"] + elapsed * self.rate
        )
        bucket["last_refill"] = now
        
        if bucket["tokens"] >= cost:
            bucket["tokens"] -= cost
            return True
        
        return False

class RateLimitConfig:
    BY_USER = {"rate": 100, "capacity": 100}      # 100 req/sec per user
    BY_IP = {"rate": 1000, "capacity": 1000}      # 1000 req/sec per IP
    BY_API_KEY = {"rate": 10000, "capacity": 5000} # Premium tier

class RateLimitMiddleware:
    def __init__(self):
        self.limiters = {
            "user": TokenBucketRateLimiter(**RateLimitConfig.BY_USER),
            "ip": TokenBucketRateLimiter(**RateLimitConfig.BY_IP),
            "api_key": TokenBucketRateLimiter(**RateLimitConfig.BY_API_KEY)
        }
    
    async def check_rate_limit(self, request: Request) -> RateLimitResult:
        client_ip = request.client_ip
        user_id = request.auth_user_id
        api_key = request.headers.get("X-API-Key")
        
        limits = []
        
        if api_key:
            allowed = await self.limiters["api_key"].allow_request(api_key)
            limits.append(("api_key", allowed))
            if not allowed:
                return RateLimitResult(
                    allowed=False,
                    retry_after=1,
                    limit_type="api_key"
                )
        
        if user_id:
            allowed = await self.limiters["user"].allow_request(user_id)
            limits.append(("user", allowed))
            if not allowed:
                return RateLimitResult(
                    allowed=False,
                    retry_after=1,
                    limit_type="user"
                )
        
        allowed = await self.limiters["ip"].allow_request(client_ip)
        limits.append(("ip", allowed))
        
        return RateLimitResult(
            allowed=all(allowed for _, allowed in limits),
            current_rates={k: v for k, v in limits}
        )

Request/Response Transformation

class RequestTransformer:
    def transform_incoming(self, request: Request, service_config: dict) -> TransformedRequest:
        transformed = Request(
            method=request.method,
            path=self._transform_path(request.path, service_config),
            headers=self._transform_headers(request.headers, service_config),
            body=self._transform_body(request.body, service_config, direction="in"),
            query_params=self._transform_query(request.query_params, service_config)
        )
        
        transformed.headers["X-Request-ID"] = request.request_id
        transformed.headers["X-Forwarded-For"] = request.client_ip
        transformed.headers["X-Gateway-Timestamp"] = str(int(time.time()))
        
        return transformed
    
    def _transform_path(self, path: str, config: dict) -> str:
        if "path_rewrite" in config:
            for pattern, replacement in config["path_rewrite"].items():
                if pattern in path:
                    return path.replace(pattern, replacement)
        return path
    
    def _transform_headers(self, headers: dict, config: dict) -> dict:
        transformed = {}
        
        header_mapping = config.get("header_mapping", {})
        for original, target in header_mapping.items():
            if original in headers:
                transformed[target] = headers[original]
        
        return transformed
    
    def _transform_body(self, body: dict, config: dict, direction: str) -> dict:
        if "body_transform" not in config:
            return body
        
        transform = config["body_transform"]
        
        if direction == "in" and "wrap" in transform:
            return {transform["wrap"]: body}
        
        if direction == "out" and "unwrap" in transform:
            return body.get(transform["unwrap"], body)
        
        return body


class ResponseTransformer:
    def transform_outgoing(
        self, 
        response: Response, 
        client_config: dict
    ) -> TransformedResponse:
        transformed = Response(
            status_code=self._map_status_code(response.status_code),
            headers=self._transform_headers(response.headers, client_config),
            body=self._transform_body(response.body, client_config)
        )
        
        transformed.headers["X-Gateway-Response-Time"] = str(response.processing_time)
        
        return transformed
    
    def _map_status_code(self, code: int) -> int:
        code_mapping = {
            501: 404,  # Service not implemented -> Not found
            502: 503,  # Bad gateway -> Service unavailable
            503: 504,  # Service unavailable -> Gateway timeout
        }
        return code_mapping.get(code, code)

Implementation Examples

Kong API Gateway Configuration

# kong.yml - Declarative configuration
_format_version: "3.0"

services:
  - name: user-service
    url: http://user-service:8080
    routes:
      - name: user-routes
        paths:
          - /api/v1/users
        methods:
          - GET
          - POST
          - PUT
          - DELETE
        strip_path: true
    plugins:
      - name: jwt
        config:
          key_claim_name: kid
          claims_to_verify:
            - exp
      - name: rate-limiting
        config:
          minute: 100
          hour: 10000
          policy: local
          fault_tolerant: true
      - name: cors
        config:
          origins:
            - "https://app.example.com"
          methods:
            - GET
            - POST
            - PUT
            - DELETE
            - OPTIONS
          headers:
            - Authorization
            - Content-Type
          exposed_headers:
            - X-Total-Count
          credentials: true
          max_age: 3600

  - name: order-service
    url: http://order-service:8081
    routes:
      - name: order-routes
        paths:
          - /api/v1/orders
        strip_path: true
    plugins:
      - name: jwt
      - name: rate-limiting
        config:
          minute: 50
          hour: 5000
      - name: request-transformer
        config:
          add:
            headers:
              - X-Service-Name:order-service
      - name: response-transformer
        config:
          add:
            headers:
              - X-Gateway:Kong

consumers:
  - username: mobile-app
    plugins:
      - name: rate-limiting
        config:
          minute: 1000
          hour: 50000

  - username: web-app
    plugins:
      - name: rate-limiting
        config:
          minute: 500
          hour: 20000

  - username: partner-api
    plugins:
      - name: rate-limiting
        config:
          minute: 5000
          hour: 200000

Custom Gateway Implementation

import asyncio
from aiohttp import web
from aiohttp_middlewares import cors_middleware
import logging

class APIGateway:
    def __init__(
        self,
        route_config: RouteConfig,
        auth_handler: AuthHandler,
        rate_limiter: RateLimitMiddleware,
        transformer: RequestTransformer,
        http_client: AsyncHTTPClient
    ):
        self.routes = route_config
        self.auth = auth_handler
        self.rate_limiter = rate_limiter
        self.transformer = transformer
        self.client = http_client
        self.metrics = MetricsCollector()
    
    async def handle_request(self, request: web.Request) -> web.Response:
        request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
        
        start_time = time.time()
        
        try:
            rate_result = await self.rate_limiter.check_rate_limit(request)
            if not rate_result.allowed:
                return web.Response(
                    status=429,
                    headers={
                        "Retry-After": str(rate_result.retry_after),
                        "X-RateLimit-Limit-Type": rate_result.limit_type
                    },
                    text="Rate limit exceeded"
                )
            
            auth_result = await self.auth.authenticate(request)
            if not auth_result.success:
                return web.Response(
                    status=auth_result.status,
                    text=auth_result.error
                )
            
            route_result = self.routes.resolve_route(request)
            if route_result.error:
                return web.Response(status=404, text=route_result.error)
            
            transformed_req = self.transformer.transform_incoming(
                request, 
                route_result.service_config
            )
            
            upstream_response = await self.client.request(
                method=transformed_req.method,
                url=route_result.target_url,
                headers=transformed_req.headers,
                data=transformed_req.body,
                timeout=route_result.timeout
            )
            
            transformed_resp = ResponseTransformer().transform_outgoing(
                upstream_response,
                request.headers.get("Accept", "application/json")
            )
            
            processing_time = (time.time() - start_time) * 1000
            self.metrics.record_request(
                service=route_result.service,
                status=upstream_response.status,
                duration=processing_time
            )
            
            return web.Response(
                status=transformed_resp.status_code,
                headers=transformed_resp.headers,
                body=transformed_resp.body
            )
            
        except asyncio.TimeoutError:
            return web.Response(status=504, text="Gateway timeout")
        except Exception as e:
            logging.error(f"Gateway error: {e}")
            return web.Response(status=500, text="Internal gateway error")

Service Discovery Integration

class ConsulServiceRegistry:
    def __init__(self, consul_host: str, consul_port: int):
        self.consul = consul.Consul(host=consul_host, port=consul_port)
    
    async def resolve(self, service_name: str) -> str:
        _, services = self.consul.health.service(service_name, passing=True)
        
        if not services:
            raise ServiceUnavailable(f"No instances of {service_name}")
        
        healthy = [s for s in services if self._is_healthy(s)]
        
        if not healthy:
            raise ServiceUnavailable(f"No healthy instances of {service_name}")
        
        instance = self._select_instance(healthy)
        return f"{instance['Address']}:{instance['ServicePort']}"
    
    def _is_healthy(self, service: dict) -> bool:
        checks = service.get("Checks", [])
        passing = [c for c in checks if c["Status"] == "passing"]
        return len(passing) > 0
    
    def _select_instance(self, instances: list) -> dict:
        # Simple round-robin or choose least connections
        return random.choice(instances)


class KubernetesServiceRegistry:
    def __init__(self, kube_client):
        self.client = kube_client
    
    async def resolve(self, service_name: str, namespace: str = "default") -> str:
        service = self.client.read_namespaced_service(
            name=service_name,
            namespace=namespace
        )
        
        endpoints = self.client.read_namespaced_endpoints(
            name=service_name,
            namespace=namespace
        )
        
        ready_addresses = [
            subset.addresses[0].target_ref.name
            for subset in endpoints.subsets
            if subset.ready_addresses
        ]
        
        if not ready_addresses:
            raise ServiceUnavailable(f"No ready pods for {service_name}")
        
        # Use Kubernetes DNS
        return f"{service_name}.{namespace}.svc.cluster.local"

Circuit Breaker Integration

class GatewayCircuitBreaker:
    def __init__(self):
        self.breakers: dict[str, CircuitBreaker] = {}
    
    def get_breaker(self, service_name: str) -> CircuitBreaker:
        if service_name not in self.breakers:
            self.breakers[service_name] = CircuitBreaker(
                failure_threshold=5,
                recovery_timeout=30,
                half_open_requests=3
            )
        return self.breakers[service_name]
    
    async def call_service(
        self, 
        service_name: str, 
        request: Request
    ) -> Response:
        breaker = self.get_breaker(service_name)
        
        if not breaker.can_execute():
            return Response(
                status=503,
                body=json.dumps({"error": "Service temporarily unavailable"})
            )
        
        try:
            response = await self._make_request(request)
            breaker.record_success()
            return response
        except Exception as e:
            breaker.record_failure()
            raise

class CircuitBreaker:
    def __init__(
        self, 
        failure_threshold: int = 5,
        recovery_timeout: int = 30,
        half_open_requests: int = 3
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_requests = half_open_requests
        
        self.state = "CLOSED"
        self.failures = 0
        self.last_failure_time = None
        self.half_open_successes = 0
    
    def can_execute(self) -> bool:
        if self.state == "CLOSED":
            return True
        
        if self.state == "OPEN":
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = "HALF_OPEN"
                self.half_open_successes = 0
                return True
            return False
        
        if self.state == "HALF_OPEN":
            return True
        
        return False
    
    def record_success(self):
        if self.state == "HALF_OPEN":
            self.half_open_successes += 1
            if self.half_open_successes >= self.half_open_requests:
                self.state = "CLOSED"
                self.failures = 0
        else:
            self.failures = 0
    
    def record_failure(self):
        self.failures += 1
        self.last_failure_time = time.time()
        
        if self.state == "HALF_OPEN":
            self.state = "OPEN"
        elif self.failures >= self.failure_threshold:
            self.state = "OPEN"

Caching Strategies

class GatewayCache:
    def __init__(self, redis_client: Redis, stats: StatsD):
        self.redis = redis_client
        self.stats = stats
    
    async def get_cached_response(self, cache_key: str) -> Response | None:
        cached = await self.redis.get(cache_key)
        
        if cached:
            self.stats.increment("gateway.cache.hit")
            return json.loads(cached)
        
        self.stats.increment("gateway.cache.miss")
        return None
    
    async def cache_response(
        self, 
        cache_key: str, 
        response: Response, 
        ttl: int
    ):
        cache_data = {
            "status": response.status_code,
            "headers": dict(response.headers),
            "body": response.body
        }
        
        await self.redis.setex(
            cache_key,
            ttl,
            json.dumps(cache_data)
        )


class CacheKeyGenerator:
    def __init__(self, config: CacheConfig):
        self.config = config
    
    def generate_key(self, request: Request) -> str:
        components = [
            request.method,
            request.path,
        ]
        
        if self.config.vary_by_user:
            components.append(request.auth_user_id)
        
        if self.config.vary_by_headers:
            for header in self.config.vary_by_headers:
                if header in request.headers:
                    components.append(request.headers[header])
        
        key_string = ":".join(str(c) for c in components)
        return f"gateway:cache:{hashlib.md5(key_string.encode()).hexdigest()}"

Monitoring and Observability

class GatewayMetrics:
    def __init__(self, prometheus_client: Prometheus):
        self.requests_total = Counter(
            "gateway_requests_total",
            "Total requests",
            ["method", "path", "status"]
        )
        
        self.request_duration = Histogram(
            "gateway_request_duration_ms",
            "Request duration in ms",
            ["method", "path", "service"]
        )
        
        self.rate_limit_hits = Counter(
            "gateway_rate_limit_hits_total",
            "Rate limit hits",
            ["limiter_type"]
        )
        
        self.upstream_status = Counter(
            "gateway_upstream_status_total",
            "Upstream service status",
            ["service", "status"]
        )

# Example Prometheus queries
PROMETHEUS_QUERIES = {
    "requests_per_second": """
        sum(rate(gateway_requests_total[5m])) by (service)
    """,
    
    "p99_latency": """
        histogram_quantile(0.99, 
            sum(rate(gateway_request_duration_ms_bucket[5m])) 
            by (le, service)
        )
    """,
    
    "error_rate": """
        sum(rate(gateway_requests_total{status=~"5.."}[5m])) 
        / sum(rate(gateway_requests_total[5m]))
    """,
    
    "rate_limit_efficiency": """
        sum(rate(gateway_rate_limit_hits_total[5m])) 
        / sum(rate(gateway_requests_total[5m]))
    """
}

Best Practices

Good Patterns

GOOD_PATTERNS = {
    "stateless_gateway": """
# Keep gateway stateless for easy scaling

✅ Good:
- Use external cache (Redis) for session/rate limit state
- Store configurations in etcd/Consul
- Scale horizontally with load balancer

❌ Bad:
- In-memory state for rate limiting
- Sticky sessions for gateway nodes
""",
    
    "graceful_degradation": """
# Implement circuit breakers and fallback responses

✅ Good:
- Return cached data if service is down
- Show degraded response instead of error
- Have fallback services for critical paths

❌ Bad:
- Propagate all failures to clients
- No caching of any responses
- Single point of failure in gateway
""",
    
    "security_layers": """
# Defense in depth for API security

✅ Good:
- TLS termination at gateway
- JWT validation with short expiry
- Request validation with schema
- IP allowlisting for admin routes

❌ Bad:
- Trust all internal traffic
- No input validation
- Expose internal service names
"""
}

Bad Patterns

BAD_PATTERNS = {
    "single_point_of_failure": """
❌ Bad:
- Gateway as single instance
- No health checks
- No automatic failover

✅ Good:
- Multiple gateway instances
- Health checks and auto-restart
- Load balancer with auto-scaling
""",
    
    "blocking_operations": """
❌ Bad:
- Synchronous logging to disk
- Blocking auth calls
- No timeouts on upstream calls

✅ Good:
- Async logging to external system
- Cached auth tokens
- Proper timeout configuration
""",
    
    "bypassing_gateway": """
❌ Bad:
- Services expose direct public endpoints
- Clients connect directly to services
- No gateway for internal traffic

✅ Good:
- All traffic through gateway
- Internal services not exposed
- Consistent policy everywhere
"""
}

GraphQL Federation at the Gateway

Modern gateways increasingly support GraphQL federation, composing multiple subgraphs into a unified schema:

# Supergraph schema (composed by gateway)
type Query {
  user(id: ID!): User    # From user-subgraph
  order(id: ID!): Order  # From order-subgraph
}

# User subgraph
type User @key(fields: "id") {
  id: ID!
  name: String!
  orders: [Order!]!       # Resolved across subgraphs
}

# Order subgraph  
type Order @key(fields: "id") {
  id: ID!
  total: Float!
  userId: ID!
}

Tools like Apollo Router and WunderGraph operate as dedicated federation gateways, handling schema stitching, query planning, and subgraph health checks.

Backend for Frontend (BFF) Pattern

When different clients need different API surfaces, a BFF layer tailors responses per client type:

Client Concerns BFF Responsibilities
Mobile Minimize payload size Data aggregation, image resizing
Web Rich data for UIs GraphQL federation, server-side rendering
IoT Tiny payloads, binary protocols Protocol translation, data filtering
Partner Full access, webhook delivery API key management, rate limit tiers
  • AI Gateway: Gateways that handle LLM API consumption—token rate limiting, prompt injection detection, cost tracking, and response caching for OpenAI/Anthropic APIs.
  • WebAssembly (Wasm) Plugins: Run custom middleware as Wasm modules for isolation and polyglot support (e.g., Envoy Wasm filters).
  • eBPF-Based Gateways: Kernel-level traffic filtering for sub-millisecond routing decisions, used in Cilium and Istio ambient mesh.
  • Service Mesh Integration: Gateways as the ingress tier of a service mesh, delegating mTLS, traffic splitting, and observability to the mesh sidecar.

Conclusion

The API Gateway pattern is a foundational building block for microservices architectures. A well-designed gateway provides:

  • Single Entry Point — Simplifies client code and presents a unified API surface
  • Cross-Cutting Concerns — Centralized authentication, rate limiting, logging, and request validation
  • Protocol Translation — Bridging REST, gRPC, GraphQL, and WebSocket clients to backend services
  • Resilience — Circuit breakers, retries, timeouts, and edge caching prevent cascade failures
  • Observability — Unified metrics, tracing, and logging across all upstream services
  • Evolving Capabilities — GraphQL federation, BFF per client type, Wasm plugins, and AI gateway patterns

Popular implementations include Kong, NGINX, Traefik, Envoy, AWS API Gateway, and Azure API Management. The right choice depends on your scale, deployment model (self-hosted vs. managed), and feature requirements. As API ecosystems grow more complex—spanning microservices, third-party integrations, and AI agents—the gateway’s role as a unified control point becomes increasingly valuable.

Comments

Share this article

Scan to read on mobile

👍 Was this article helpful?