Microservices must communicate to fulfill user requests. This guide covers synchronous and asynchronous communication patterns, their use cases, and implementation.
Communication Patterns Overview
synchronous:
description: "Request-response, blocking"
protocols:
- "REST/HTTP"
- "gRPC"
- "GraphQL"
use_when:
- "Need immediate response"
- "Simple queries"
- "User-facing operations"
asynchronous:
description: "Non-blocking, message-based"
protocols:
- "Message queues (RabbitMQ, Kafka)"
- "Event streaming"
- "Webhooks"
use_when:
- "Long-running operations"
- "Fire-and-forget"
- "High scalability needed"
Synchronous Communication
REST API Communication
# Service A calling Service B via REST
import requests
from typing import Optional
class UserServiceClient:
"""Client for User Service"""
def __init__(self, base_url: str, timeout: int = 5):
self.base_url = base_url.rstrip('/')
self.timeout = timeout
def get_user(self, user_id: str) -> Optional[dict]:
"""Get user by ID"""
try:
response = requests.get(
f"{self.base_url}/users/{user_id}",
timeout=self.timeout
)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
logging.error(f"Failed to get user: {e}")
return None
def create_user(self, user_data: dict) -> Optional[dict]:
"""Create new user"""
try:
response = requests.post(
f"{self.base_url}/users",
json=user_data,
timeout=self.timeout
)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
logging.error(f"Failed to create user: {e}")
return None
gRPC Communication
// user.proto
syntax = "proto3";
service UserService {
rpc GetUser (GetUserRequest) returns (User);
rpc CreateUser (CreateUserRequest) returns (User);
rpc StreamUsers (GetUsersRequest) returns (stream User);
}
message GetUserRequest {
string user_id = 1;
}
message User {
string id = 1;
string name = 2;
string email = 3;
}
# gRPC client
import grpc
import user_pb2
import user_pb2_grpc
class UserServiceClient:
def __init__(self, host: str, port: int):
channel = grpc.insecure_channel(f"{host}:{port}")
self.stub = user_pb2_grpc.UserServiceStub(channel)
def get_user(self, user_id: str):
request = user_pb2.GetUserRequest(user_id=user_id)
try:
return self.stub.GetUser(request)
except grpc.RpcError as e:
logging.error(f"gRPC error: {e.code()} - {e.details()}")
return None
Asynchronous Communication
Message Queue Pattern
# Async communication via message queue
import pika
import json
class MessagePublisher:
"""Publish events to message queue"""
def __init__(self, rabbitmq_url: str):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(rabbitmq_url)
)
self.channel = self.connection.channel()
# Declare exchange
self.channel.exchange_declare(
exchange='events',
exchange_type='topic',
durable=True
)
def publish(self, event_type: str, payload: dict):
"""Publish event to queue"""
message = json.dumps(payload)
self.channel.basic_publish(
exchange='events',
routing_key=event_type,
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
content_type='application/json'
)
)
class EventConsumer:
"""Consume events from queue"""
def __init__(self, rabbitmq_url: str, queue_name: str):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(rabbitmq_url)
)
self.channel = self.connection.channel()
self.channel.queue_declare(queue=queue_name, durable=True)
def consume(self, callback):
"""Start consuming messages"""
def wrapper(ch, method, properties, body):
try:
payload = json.loads(body)
callback(payload)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
logging.error(f"Error processing message: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(on_message_callback=wrapper)
self.channel.start_consuming()
Event-Driven Communication
# Event-driven architecture
class OrderService:
"""Publishes events when orders are created"""
def __init__(self, event_publisher):
self.publisher = event_publisher
def create_order(self, order_data: dict):
# Create order
order = self._persist_order(order_data)
# Publish event
self.publisher.publish('order.created', {
'order_id': order.id,
'customer_id': order.customer_id,
'total': order.total,
'items': order.items
})
return order
class NotificationService:
"""Consumes order events"""
def __init__(self, event_consumer):
self.consumer = event_consumer
def start(self):
self.consumer.consume(self.handle_order_created)
def handle_order_created(self, payload: dict):
# Send notification
self.send_email(
to=payload['customer_id'],
subject=f"Order {payload['order_id']} confirmed"
)
Choreography vs Orchestration
choreography:
description: "Services communicate via events"
pros:
- "Loose coupling"
- "No central coordinator"
- "Independent services"
cons:
- "Hard to trace"
- "Complex debugging"
- "Implicit flow"
orchestration:
description: "Central orchestrator coordinates"
pros:
- "Clear flow"
- "Easy to debug"
- "Transaction management"
cons:
- "Central point of failure"
- "Orchestrator complexity"
Service Mesh
# Service mesh for microservices communication
service_mesh_features:
- "Service discovery"
- "Load balancing"
- "Mutual TLS"
- "Traffic management"
- "Observability"
tools:
- "Istio"
- "Linkerd"
- "Consul Connect"
# Kubernetes service mesh example (Istio)
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: reviews
spec:
hosts:
- reviews
http:
- match:
- headers:
end-user:
exact: jason
route:
- destination:
host: reviews
subset: v2
- route:
- destination:
host: reviews
subset: v1
Best Practices
# Microservices communication best practices
design:
- "Use asynchronous for non-blocking operations"
- "Implement circuit breakers"
- "Add timeouts to all calls"
- "Log all cross-service communication"
resilience:
- "Retry with exponential backoff"
- "Implement dead letter queues"
- "Monitor service health"
documentation:
- "Document all APIs"
- "Provide OpenAPI specs"
- "Share service contracts"
Conclusion
Choose communication pattern based on requirements:
- Synchronous: REST/gRPC for immediate responses
- Asynchronous: Message queues for scalability and decoupling
- Event-driven: For loose coupling and scalability
Use service mesh for cross-cutting concerns like mTLS and observability.
Comments