Skip to main content
โšก Calmops

Microservices Communication Patterns: Synchronous vs Asynchronous

Microservices must communicate to fulfill user requests. This guide covers synchronous and asynchronous communication patterns, their use cases, and implementation.

Communication Patterns Overview

synchronous:
  description: "Request-response, blocking"
  protocols:
    - "REST/HTTP"
    - "gRPC"
    - "GraphQL"
  use_when:
    - "Need immediate response"
    - "Simple queries"
    - "User-facing operations"

asynchronous:
  description: "Non-blocking, message-based"
  protocols:
    - "Message queues (RabbitMQ, Kafka)"
    - "Event streaming"
    - "Webhooks"
  use_when:
    - "Long-running operations"
    - "Fire-and-forget"
    - "High scalability needed"

Synchronous Communication

REST API Communication

# Service A calling Service B via REST

import requests
from typing import Optional

class UserServiceClient:
    """Client for User Service"""
    
    def __init__(self, base_url: str, timeout: int = 5):
        self.base_url = base_url.rstrip('/')
        self.timeout = timeout
    
    def get_user(self, user_id: str) -> Optional[dict]:
        """Get user by ID"""
        try:
            response = requests.get(
                f"{self.base_url}/users/{user_id}",
                timeout=self.timeout
            )
            response.raise_for_status()
            return response.json()
        except requests.RequestException as e:
            logging.error(f"Failed to get user: {e}")
            return None
    
    def create_user(self, user_data: dict) -> Optional[dict]:
        """Create new user"""
        try:
            response = requests.post(
                f"{self.base_url}/users",
                json=user_data,
                timeout=self.timeout
            )
            response.raise_for_status()
            return response.json()
        except requests.RequestException as e:
            logging.error(f"Failed to create user: {e}")
            return None

gRPC Communication

// user.proto
syntax = "proto3";

service UserService {
    rpc GetUser (GetUserRequest) returns (User);
    rpc CreateUser (CreateUserRequest) returns (User);
    rpc StreamUsers (GetUsersRequest) returns (stream User);
}

message GetUserRequest {
    string user_id = 1;
}

message User {
    string id = 1;
    string name = 2;
    string email = 3;
}
# gRPC client

import grpc
import user_pb2
import user_pb2_grpc

class UserServiceClient:
    def __init__(self, host: str, port: int):
        channel = grpc.insecure_channel(f"{host}:{port}")
        self.stub = user_pb2_grpc.UserServiceStub(channel)
    
    def get_user(self, user_id: str):
        request = user_pb2.GetUserRequest(user_id=user_id)
        try:
            return self.stub.GetUser(request)
        except grpc.RpcError as e:
            logging.error(f"gRPC error: {e.code()} - {e.details()}")
            return None

Asynchronous Communication

Message Queue Pattern

# Async communication via message queue

import pika
import json

class MessagePublisher:
    """Publish events to message queue"""
    
    def __init__(self, rabbitmq_url: str):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(rabbitmq_url)
        )
        self.channel = self.connection.channel()
        
        # Declare exchange
        self.channel.exchange_declare(
            exchange='events',
            exchange_type='topic',
            durable=True
        )
    
    def publish(self, event_type: str, payload: dict):
        """Publish event to queue"""
        message = json.dumps(payload)
        
        self.channel.basic_publish(
            exchange='events',
            routing_key=event_type,
            body=message,
            properties=pika.BasicProperties(
                delivery_mode=2,  # Persistent
                content_type='application/json'
            )
        )


class EventConsumer:
    """Consume events from queue"""
    
    def __init__(self, rabbitmq_url: str, queue_name: str):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(rabbitmq_url)
        )
        self.channel = self.connection.channel()
        
        self.channel.queue_declare(queue=queue_name, durable=True)
    
    def consume(self, callback):
        """Start consuming messages"""
        def wrapper(ch, method, properties, body):
            try:
                payload = json.loads(body)
                callback(payload)
                ch.basic_ack(delivery_tag=method.delivery_tag)
            except Exception as e:
                logging.error(f"Error processing message: {e}")
                ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
        
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(on_message_callback=wrapper)
        self.channel.start_consuming()

Event-Driven Communication

# Event-driven architecture

class OrderService:
    """Publishes events when orders are created"""
    
    def __init__(self, event_publisher):
        self.publisher = event_publisher
    
    def create_order(self, order_data: dict):
        # Create order
        order = self._persist_order(order_data)
        
        # Publish event
        self.publisher.publish('order.created', {
            'order_id': order.id,
            'customer_id': order.customer_id,
            'total': order.total,
            'items': order.items
        })
        
        return order


class NotificationService:
    """Consumes order events"""
    
    def __init__(self, event_consumer):
        self.consumer = event_consumer
    
    def start(self):
        self.consumer.consume(self.handle_order_created)
    
    def handle_order_created(self, payload: dict):
        # Send notification
        self.send_email(
            to=payload['customer_id'],
            subject=f"Order {payload['order_id']} confirmed"
        )

Choreography vs Orchestration

choreography:
  description: "Services communicate via events"
  pros:
    - "Loose coupling"
    - "No central coordinator"
    - "Independent services"
  cons:
    - "Hard to trace"
    - "Complex debugging"
    - "Implicit flow"

orchestration:
  description: "Central orchestrator coordinates"
  pros:
    - "Clear flow"
    - "Easy to debug"
    - "Transaction management"
  cons:
    - "Central point of failure"
    - "Orchestrator complexity"

Service Mesh

# Service mesh for microservices communication

service_mesh_features:
  - "Service discovery"
  - "Load balancing"
  - "Mutual TLS"
  - "Traffic management"
  - "Observability"

tools:
  - "Istio"
  - "Linkerd"
  - "Consul Connect"
# Kubernetes service mesh example (Istio)

apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: reviews
spec:
  hosts:
  - reviews
  http:
  - match:
    - headers:
        end-user:
          exact: jason
    route:
    - destination:
        host: reviews
        subset: v2
  - route:
    - destination:
        host: reviews
        subset: v1

Best Practices

# Microservices communication best practices

design:
  - "Use asynchronous for non-blocking operations"
  - "Implement circuit breakers"
  - "Add timeouts to all calls"
  - "Log all cross-service communication"

resilience:
  - "Retry with exponential backoff"
  - "Implement dead letter queues"
  - "Monitor service health"

documentation:
  - "Document all APIs"
  - "Provide OpenAPI specs"
  - "Share service contracts"

Conclusion

Choose communication pattern based on requirements:

  • Synchronous: REST/gRPC for immediate responses
  • Asynchronous: Message queues for scalability and decoupling
  • Event-driven: For loose coupling and scalability

Use service mesh for cross-cutting concerns like mTLS and observability.


Comments