Skip to main content
โšก Calmops

API Gateway Pattern: Unified Entry Point for Microservices

The API Gateway pattern provides a unified entry point for a system of microservices. It handles cross-cutting concerns like authentication, rate limiting, and request routing, simplifying client interactions and centralizing policy enforcement.

Understanding API Gateway

The Problem Without API Gateway

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚            Direct Client-to-Microservices Communication          โ”‚
โ”‚                                                                 โ”‚
โ”‚    Mobile App                                                   โ”‚
โ”‚        โ”‚                                                        โ”‚
โ”‚        โ”œโ”€โ”€โ”€โ–บ User Service    (auth, profile)                   โ”‚
โ”‚        โ”œโ”€โ”€โ”€โ–บ Order Service   (orders, history)                  โ”‚
โ”‚        โ”œโ”€โ”€โ”€โ–บ Payment Service (billing, cards)                  โ”‚
โ”‚        โ”œโ”€โ”€โ”€โ–บ Inventory       (stock, products)                  โ”‚
โ”‚        โ””โ”€โ”€โ”€โ–บ Notification    (email, push)                     โ”‚
โ”‚                                                                 โ”‚
โ”‚  Problems:                                                      โ”‚
โ”‚  โœ— Client knows about all service endpoints                     โ”‚
โ”‚  โœ— Each client implements same logic (auth, retry)             โ”‚
โ”‚  โœ— No centralized policy enforcement                           โ”‚
โ”‚  โœ— Cross-origin requests from browsers                         โ”‚
โ”‚  โœ— Tight coupling between clients and services                 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

With API Gateway

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    API Gateway Architecture                      โ”‚
โ”‚                                                                 โ”‚
โ”‚    Mobile App                                                   โ”‚
โ”‚        โ”‚                                                        โ”‚
โ”‚        โ–ผ                                                        โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚  โ”‚                    API Gateway                            โ”‚   โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”          โ”‚   โ”‚
โ”‚  โ”‚  โ”‚ Auth    โ”‚ โ”‚ Rate    โ”‚ โ”‚ Route   โ”‚ โ”‚ Transformโ”‚          โ”‚   โ”‚
โ”‚  โ”‚  โ”‚ Handler โ”‚ โ”‚ Limiter โ”‚ โ”‚ Engine  โ”‚ โ”‚  Layer  โ”‚          โ”‚   โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜          โ”‚   โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ”‚        โ”‚                                                        โ”‚
โ”‚        โ”œโ”€โ”€โ”€โ–บ User Service                                       โ”‚
โ”‚        โ”œโ”€โ”€โ”€โ–บ Order Service                                      โ”‚
โ”‚        โ”œโ”€โ”€โ”€โ–บ Payment Service                                    โ”‚
โ”‚        โ”œโ”€โ”€โ”€โ–บ Inventory                                         โ”‚
โ”‚        โ””โ”€โ”€โ”€โ–บ Notification                                      โ”‚
โ”‚                                                                 โ”‚
โ”‚  Benefits:                                                      โ”‚
โ”‚  โœ“ Single entry point for all clients                          โ”‚
โ”‚  โœ“ Centralized cross-cutting concerns                          โ”‚
โ”‚  โœ“ Service discovery and dynamic routing                       โ”‚
โ”‚  โœ“ Protocol translation (REST โ†” gRPC)                          โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Gateway Responsibilities

Request Routing

class RouteConfig:
    def __init__(self):
        self.routes = [
            Route(
                path="/api/v1/users/*",
                service="user-service",
                timeout=5000,
                retry_policy={"attempts": 3, "backoff": "exponential"}
            ),
            Route(
                path="/api/v1/orders/*",
                service="order-service", 
                timeout=10000,
                retry_policy={"attempts": 2, "backoff": "linear"}
            ),
            Route(
                path="/api/v1/payments/*",
                service="payment-service",
                timeout=30000,
                retry_policy={"attempts": 1}  # No retry for payments
            ),
        ]

class RoutingEngine:
    def __init__(self, route_config: RouteConfig, service_discovery: ServiceRegistry):
        self.routes = route_config.routes
        self.registry = service_discovery
    
    def resolve_route(self, request: Request) -> RouteResult:
        for route in self.routes:
            if self._match_path(request.path, route.path):
                service_host = self.registry.resolve(route.service)
                
                return RouteResult(
                    target_url=f"http://{service_host}{request.path}",
                    timeout=route.timeout,
                    retry_policy=route.retry_policy
                )
        
        return RouteResult(error="No route found", status_code=404)
    
    def _match_path(self, request_path: str, pattern: str) -> bool:
        if pattern.endswith("/*"):
            prefix = pattern[:-2]
            return request_path.startswith(prefix)
        return request_path == pattern

Authentication and Authorization

class AuthHandler:
    def __init__(self, jwt_secret: str, user_cache: Redis):
        self.secret = jwt_secret
        self.cache = user_cache
    
    async def authenticate(self, request: Request) -> AuthResult:
        auth_header = request.headers.get("Authorization")
        
        if not auth_header or not auth_header.startswith("Bearer "):
            return AuthResult(success=False, error="Missing token", status=401)
        
        token = auth_header[7:]
        
        try:
            payload = jwt.decode(token, self.secret, algorithms=["HS256"])
            
            cached_user = await self.cache.get(f"user:{payload['sub']}")
            if cached_user:
                user = json.loads(cached_user)
            else:
                user = await self._fetch_user(payload['sub'])
                await self.cache.setex(
                    f"user:{payload['sub']}", 
                    300,  # 5 min cache
                    json.dumps(user)
                )
            
            return AuthResult(
                success=True,
                user_id=payload['sub'],
                roles=user.get("roles", []),
                permissions=user.get("permissions", [])
            )
            
        except jwt.ExpiredSignatureError:
            return AuthResult(success=False, error="Token expired", status=401)
        except jwt.InvalidTokenError:
            return AuthResult(success=False, error="Invalid token", status=401)
    
    def authorize(self, auth_result: AuthResult, required_permission: str) -> bool:
        if not auth_result.success:
            return False
        
        if "admin" in auth_result.roles:
            return True
        
        return required_permission in auth_result.permissions

Rate Limiting

import time
from collections import defaultdict
import asyncio

class TokenBucketRateLimiter:
    def __init__(self, rate: int, capacity: int):
        self.rate = rate
        self.capacity = capacity
        self.buckets: dict[str, dict] = defaultdict(self._create_bucket)
    
    def _create_bucket(self):
        return {
            "tokens": self.capacity,
            "last_refill": time.time()
        }
    
    async def allow_request(self, key: str, cost: int = 1) -> bool:
        bucket = self.buckets[key]
        
        now = time.time()
        elapsed = now - bucket["last_refill"]
        
        bucket["tokens"] = min(
            self.capacity,
            bucket["tokens"] + elapsed * self.rate
        )
        bucket["last_refill"] = now
        
        if bucket["tokens"] >= cost:
            bucket["tokens"] -= cost
            return True
        
        return False

class RateLimitConfig:
    BY_USER = {"rate": 100, "capacity": 100}      # 100 req/sec per user
    BY_IP = {"rate": 1000, "capacity": 1000}      # 1000 req/sec per IP
    BY_API_KEY = {"rate": 10000, "capacity": 5000} # Premium tier

class RateLimitMiddleware:
    def __init__(self):
        self.limiters = {
            "user": TokenBucketRateLimiter(**RateLimitConfig.BY_USER),
            "ip": TokenBucketRateLimiter(**RateLimitConfig.BY_IP),
            "api_key": TokenBucketRateLimiter(**RateLimitConfig.BY_API_KEY)
        }
    
    async def check_rate_limit(self, request: Request) -> RateLimitResult:
        client_ip = request.client_ip
        user_id = request.auth_user_id
        api_key = request.headers.get("X-API-Key")
        
        limits = []
        
        if api_key:
            allowed = await self.limiters["api_key"].allow_request(api_key)
            limits.append(("api_key", allowed))
            if not allowed:
                return RateLimitResult(
                    allowed=False,
                    retry_after=1,
                    limit_type="api_key"
                )
        
        if user_id:
            allowed = await self.limiters["user"].allow_request(user_id)
            limits.append(("user", allowed))
            if not allowed:
                return RateLimitResult(
                    allowed=False,
                    retry_after=1,
                    limit_type="user"
                )
        
        allowed = await self.limiters["ip"].allow_request(client_ip)
        limits.append(("ip", allowed))
        
        return RateLimitResult(
            allowed=all(allowed for _, allowed in limits),
            current_rates={k: v for k, v in limits}
        )

Request/Response Transformation

class RequestTransformer:
    def transform_incoming(self, request: Request, service_config: dict) -> TransformedRequest:
        transformed = Request(
            method=request.method,
            path=self._transform_path(request.path, service_config),
            headers=self._transform_headers(request.headers, service_config),
            body=self._transform_body(request.body, service_config, direction="in"),
            query_params=self._transform_query(request.query_params, service_config)
        )
        
        transformed.headers["X-Request-ID"] = request.request_id
        transformed.headers["X-Forwarded-For"] = request.client_ip
        transformed.headers["X-Gateway-Timestamp"] = str(int(time.time()))
        
        return transformed
    
    def _transform_path(self, path: str, config: dict) -> str:
        if "path_rewrite" in config:
            for pattern, replacement in config["path_rewrite"].items():
                if pattern in path:
                    return path.replace(pattern, replacement)
        return path
    
    def _transform_headers(self, headers: dict, config: dict) -> dict:
        transformed = {}
        
        header_mapping = config.get("header_mapping", {})
        for original, target in header_mapping.items():
            if original in headers:
                transformed[target] = headers[original]
        
        return transformed
    
    def _transform_body(self, body: dict, config: dict, direction: str) -> dict:
        if "body_transform" not in config:
            return body
        
        transform = config["body_transform"]
        
        if direction == "in" and "wrap" in transform:
            return {transform["wrap"]: body}
        
        if direction == "out" and "unwrap" in transform:
            return body.get(transform["unwrap"], body)
        
        return body


class ResponseTransformer:
    def transform_outgoing(
        self, 
        response: Response, 
        client_config: dict
    ) -> TransformedResponse:
        transformed = Response(
            status_code=self._map_status_code(response.status_code),
            headers=self._transform_headers(response.headers, client_config),
            body=self._transform_body(response.body, client_config)
        )
        
        transformed.headers["X-Gateway-Response-Time"] = str(response.processing_time)
        
        return transformed
    
    def _map_status_code(self, code: int) -> int:
        code_mapping = {
            501: 404,  # Service not implemented -> Not found
            502: 503,  # Bad gateway -> Service unavailable
            503: 504,  # Service unavailable -> Gateway timeout
        }
        return code_mapping.get(code, code)

Implementation Examples

Kong API Gateway Configuration

# kong.yml - Declarative configuration
_format_version: "3.0"

services:
  - name: user-service
    url: http://user-service:8080
    routes:
      - name: user-routes
        paths:
          - /api/v1/users
        methods:
          - GET
          - POST
          - PUT
          - DELETE
        strip_path: true
    plugins:
      - name: jwt
        config:
          key_claim_name: kid
          claims_to_verify:
            - exp
      - name: rate-limiting
        config:
          minute: 100
          hour: 10000
          policy: local
          fault_tolerant: true
      - name: cors
        config:
          origins:
            - "https://app.example.com"
          methods:
            - GET
            - POST
            - PUT
            - DELETE
            - OPTIONS
          headers:
            - Authorization
            - Content-Type
          exposed_headers:
            - X-Total-Count
          credentials: true
          max_age: 3600

  - name: order-service
    url: http://order-service:8081
    routes:
      - name: order-routes
        paths:
          - /api/v1/orders
        strip_path: true
    plugins:
      - name: jwt
      - name: rate-limiting
        config:
          minute: 50
          hour: 5000
      - name: request-transformer
        config:
          add:
            headers:
              - X-Service-Name:order-service
      - name: response-transformer
        config:
          add:
            headers:
              - X-Gateway:Kong

consumers:
  - username: mobile-app
    plugins:
      - name: rate-limiting
        config:
          minute: 1000
          hour: 50000

  - username: web-app
    plugins:
      - name: rate-limiting
        config:
          minute: 500
          hour: 20000

  - username: partner-api
    plugins:
      - name: rate-limiting
        config:
          minute: 5000
          hour: 200000

Custom Gateway Implementation

import asyncio
from aiohttp import web
from aiohttp_middlewares import cors_middleware
import logging

class APIGateway:
    def __init__(
        self,
        route_config: RouteConfig,
        auth_handler: AuthHandler,
        rate_limiter: RateLimitMiddleware,
        transformer: RequestTransformer,
        http_client: AsyncHTTPClient
    ):
        self.routes = route_config
        self.auth = auth_handler
        self.rate_limiter = rate_limiter
        self.transformer = transformer
        self.client = http_client
        self.metrics = MetricsCollector()
    
    async def handle_request(self, request: web.Request) -> web.Response:
        request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
        
        start_time = time.time()
        
        try:
            rate_result = await self.rate_limiter.check_rate_limit(request)
            if not rate_result.allowed:
                return web.Response(
                    status=429,
                    headers={
                        "Retry-After": str(rate_result.retry_after),
                        "X-RateLimit-Limit-Type": rate_result.limit_type
                    },
                    text="Rate limit exceeded"
                )
            
            auth_result = await self.auth.authenticate(request)
            if not auth_result.success:
                return web.Response(
                    status=auth_result.status,
                    text=auth_result.error
                )
            
            route_result = self.routes.resolve_route(request)
            if route_result.error:
                return web.Response(status=404, text=route_result.error)
            
            transformed_req = self.transformer.transform_incoming(
                request, 
                route_result.service_config
            )
            
            upstream_response = await self.client.request(
                method=transformed_req.method,
                url=route_result.target_url,
                headers=transformed_req.headers,
                data=transformed_req.body,
                timeout=route_result.timeout
            )
            
            transformed_resp = ResponseTransformer().transform_outgoing(
                upstream_response,
                request.headers.get("Accept", "application/json")
            )
            
            processing_time = (time.time() - start_time) * 1000
            self.metrics.record_request(
                service=route_result.service,
                status=upstream_response.status,
                duration=processing_time
            )
            
            return web.Response(
                status=transformed_resp.status_code,
                headers=transformed_resp.headers,
                body=transformed_resp.body
            )
            
        except asyncio.TimeoutError:
            return web.Response(status=504, text="Gateway timeout")
        except Exception as e:
            logging.error(f"Gateway error: {e}")
            return web.Response(status=500, text="Internal gateway error")

Service Discovery Integration

class ConsulServiceRegistry:
    def __init__(self, consul_host: str, consul_port: int):
        self.consul = consul.Consul(host=consul_host, port=consul_port)
    
    async def resolve(self, service_name: str) -> str:
        _, services = self.consul.health.service(service_name, passing=True)
        
        if not services:
            raise ServiceUnavailable(f"No instances of {service_name}")
        
        healthy = [s for s in services if self._is_healthy(s)]
        
        if not healthy:
            raise ServiceUnavailable(f"No healthy instances of {service_name}")
        
        instance = self._select_instance(healthy)
        return f"{instance['Address']}:{instance['ServicePort']}"
    
    def _is_healthy(self, service: dict) -> bool:
        checks = service.get("Checks", [])
        passing = [c for c in checks if c["Status"] == "passing"]
        return len(passing) > 0
    
    def _select_instance(self, instances: list) -> dict:
        # Simple round-robin or choose least connections
        return random.choice(instances)


class KubernetesServiceRegistry:
    def __init__(self, kube_client):
        self.client = kube_client
    
    async def resolve(self, service_name: str, namespace: str = "default") -> str:
        service = self.client.read_namespaced_service(
            name=service_name,
            namespace=namespace
        )
        
        endpoints = self.client.read_namespaced_endpoints(
            name=service_name,
            namespace=namespace
        )
        
        ready_addresses = [
            subset.addresses[0].target_ref.name
            for subset in endpoints.subsets
            if subset.ready_addresses
        ]
        
        if not ready_addresses:
            raise ServiceUnavailable(f"No ready pods for {service_name}")
        
        # Use Kubernetes DNS
        return f"{service_name}.{namespace}.svc.cluster.local"

Circuit Breaker Integration

class GatewayCircuitBreaker:
    def __init__(self):
        self.breakers: dict[str, CircuitBreaker] = {}
    
    def get_breaker(self, service_name: str) -> CircuitBreaker:
        if service_name not in self.breakers:
            self.breakers[service_name] = CircuitBreaker(
                failure_threshold=5,
                recovery_timeout=30,
                half_open_requests=3
            )
        return self.breakers[service_name]
    
    async def call_service(
        self, 
        service_name: str, 
        request: Request
    ) -> Response:
        breaker = self.get_breaker(service_name)
        
        if not breaker.can_execute():
            return Response(
                status=503,
                body=json.dumps({"error": "Service temporarily unavailable"})
            )
        
        try:
            response = await self._make_request(request)
            breaker.record_success()
            return response
        except Exception as e:
            breaker.record_failure()
            raise

class CircuitBreaker:
    def __init__(
        self, 
        failure_threshold: int = 5,
        recovery_timeout: int = 30,
        half_open_requests: int = 3
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_requests = half_open_requests
        
        self.state = "CLOSED"
        self.failures = 0
        self.last_failure_time = None
        self.half_open_successes = 0
    
    def can_execute(self) -> bool:
        if self.state == "CLOSED":
            return True
        
        if self.state == "OPEN":
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = "HALF_OPEN"
                self.half_open_successes = 0
                return True
            return False
        
        if self.state == "HALF_OPEN":
            return True
        
        return False
    
    def record_success(self):
        if self.state == "HALF_OPEN":
            self.half_open_successes += 1
            if self.half_open_successes >= self.half_open_requests:
                self.state = "CLOSED"
                self.failures = 0
        else:
            self.failures = 0
    
    def record_failure(self):
        self.failures += 1
        self.last_failure_time = time.time()
        
        if self.state == "HALF_OPEN":
            self.state = "OPEN"
        elif self.failures >= self.failure_threshold:
            self.state = "OPEN"

Caching Strategies

class GatewayCache:
    def __init__(self, redis_client: Redis, stats: StatsD):
        self.redis = redis_client
        self.stats = stats
    
    async def get_cached_response(self, cache_key: str) -> Response | None:
        cached = await self.redis.get(cache_key)
        
        if cached:
            self.stats.increment("gateway.cache.hit")
            return json.loads(cached)
        
        self.stats.increment("gateway.cache.miss")
        return None
    
    async def cache_response(
        self, 
        cache_key: str, 
        response: Response, 
        ttl: int
    ):
        cache_data = {
            "status": response.status_code,
            "headers": dict(response.headers),
            "body": response.body
        }
        
        await self.redis.setex(
            cache_key,
            ttl,
            json.dumps(cache_data)
        )


class CacheKeyGenerator:
    def __init__(self, config: CacheConfig):
        self.config = config
    
    def generate_key(self, request: Request) -> str:
        components = [
            request.method,
            request.path,
        ]
        
        if self.config.vary_by_user:
            components.append(request.auth_user_id)
        
        if self.config.vary_by_headers:
            for header in self.config.vary_by_headers:
                if header in request.headers:
                    components.append(request.headers[header])
        
        key_string = ":".join(str(c) for c in components)
        return f"gateway:cache:{hashlib.md5(key_string.encode()).hexdigest()}"

Monitoring and Observability

class GatewayMetrics:
    def __init__(self, prometheus_client: Prometheus):
        self.requests_total = Counter(
            "gateway_requests_total",
            "Total requests",
            ["method", "path", "status"]
        )
        
        self.request_duration = Histogram(
            "gateway_request_duration_ms",
            "Request duration in ms",
            ["method", "path", "service"]
        )
        
        self.rate_limit_hits = Counter(
            "gateway_rate_limit_hits_total",
            "Rate limit hits",
            ["limiter_type"]
        )
        
        self.upstream_status = Counter(
            "gateway_upstream_status_total",
            "Upstream service status",
            ["service", "status"]
        )

# Example Prometheus queries
PROMETHEUS_QUERIES = {
    "requests_per_second": """
        sum(rate(gateway_requests_total[5m])) by (service)
    """,
    
    "p99_latency": """
        histogram_quantile(0.99, 
            sum(rate(gateway_request_duration_ms_bucket[5m])) 
            by (le, service)
        )
    """,
    
    "error_rate": """
        sum(rate(gateway_requests_total{status=~"5.."}[5m])) 
        / sum(rate(gateway_requests_total[5m]))
    """,
    
    "rate_limit_efficiency": """
        sum(rate(gateway_rate_limit_hits_total[5m])) 
        / sum(rate(gateway_requests_total[5m]))
    """
}

Best Practices

Good Patterns

GOOD_PATTERNS = {
    "stateless_gateway": """
# Keep gateway stateless for easy scaling

โœ… Good:
- Use external cache (Redis) for session/rate limit state
- Store configurations in etcd/Consul
- Scale horizontally with load balancer

โŒ Bad:
- In-memory state for rate limiting
- Sticky sessions for gateway nodes
""",
    
    "graceful_degradation": """
# Implement circuit breakers and fallback responses

โœ… Good:
- Return cached data if service is down
- Show degraded response instead of error
- Have fallback services for critical paths

โŒ Bad:
- Propagate all failures to clients
- No caching of any responses
- Single point of failure in gateway
""",
    
    "security_layers": """
# Defense in depth for API security

โœ… Good:
- TLS termination at gateway
- JWT validation with short expiry
- Request validation with schema
- IP allowlisting for admin routes

โŒ Bad:
- Trust all internal traffic
- No input validation
- Expose internal service names
"""
}

Bad Patterns

BAD_PATTERNS = {
    "single_point_of_failure": """
โŒ Bad:
- Gateway as single instance
- No health checks
- No automatic failover

โœ… Good:
- Multiple gateway instances
- Health checks and auto-restart
- Load balancer with auto-scaling
""",
    
    "blocking_operations": """
โŒ Bad:
- Synchronous logging to disk
- Blocking auth calls
- No timeouts on upstream calls

โœ… Good:
- Async logging to external system
- Cached auth tokens
- Proper timeout configuration
""",
    
    "bypassing_gateway": """
โŒ Bad:
- Services expose direct public endpoints
- Clients connect directly to services
- No gateway for internal traffic

โœ… Good:
- All traffic through gateway
- Internal services not exposed
- Consistent policy everywhere
"""
}

Summary

The API Gateway pattern is essential for microservices architectures:

  • Single Entry Point - Simplifies client code and provides unified API
  • Cross-Cutting Concerns - Centralized auth, rate limiting, logging
  • Protocol Translation - REST to gRPC, SOAP to REST
  • Service Discovery - Dynamic routing to healthy instances
  • Circuit Breaking - Prevent cascade failures
  • Caching - Reduce load on backend services
  • Observability - Metrics, tracing, logging at the edge

Popular implementations include Kong, NGINX, Traefik, AWS API Gateway, and Azure API Management. Choose based on your scale, complexity, and managed vs. self-hosted preferences.

Comments