Introduction
Communication between microservices is the backbone of distributed systems. The choice of communication pattern affects latency, reliability, scalability, and maintainability. This guide covers synchronous and asynchronous communication patterns, implementation strategies, and best practices for building robust microservice architectures.
Communication Patterns Overview
| Pattern | Use Case | Latency | Complexity | Coupling |
|---|---|---|---|---|
| REST | CRUD operations | Medium | Low | Tight |
| gRPC | High-performance APIs | Low | Medium | Tight |
| GraphQL | Flexible queries | Medium | Medium | Tight |
| Message Queue | Async processing | High | Medium | Loose |
| Event-Driven | Real-time updates | Variable | High | Loose |
Synchronous Communication
REST API Client
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import logging
class APIClient:
def __init__(self, base_url, timeout=30, max_retries=3):
self.base_url = base_url
self.session = requests.Session()
# Configure retry strategy
retry_strategy = Retry(
total=max_retries,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE", "POST"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
def get(self, path, params=None, headers=None):
url = f"{self.base_url}{path}"
response = self.session.get(url, params=params, headers=headers, timeout=30)
response.raise_for_status()
return response.json()
def post(self, path, data=None, json=None, headers=None):
url = f"{self.base_url}{path}"
response = self.session.post(url, data=data, json=json, headers=headers, timeout=30)
response.raise_for_status()
return response.json()
def put(self, path, data=None, json=None, headers=None):
url = f"{self.base_url}{path}"
response = self.session.put(url, data=data, json=json, headers=headers, timeout=30)
response.raise_for_status()
return response.json()
def delete(self, path, headers=None):
url = f"{self.base_url}{path}"
response = self.session.delete(url, headers=headers, timeout=30)
response.raise_for_status()
return response.json()
Service Discovery with Consul
import consul
import requests
class ConsulServiceRegistry:
def __init__(self, consul_host='localhost', consul_port=8500):
self.consul = consul.Consul(host=consul_host, port=consul_port)
def register_service(self, name, service_id, port, health_check=None):
service_config = {
'ID': service_id,
'Name': name,
'Port': port,
'Meta': {
'version': '1.0.0'
}
}
if health_check:
service_config['Check'] = health_check
self.consul.agent.service.register(**service_config)
def deregister_service(self, service_id):
self.consul.agent.service.deregister(service_id)
def get_service(self, name):
_, services = self.consul.health.service(name, passing_only=True)
if not services:
raise ServiceUnavailable(f"No healthy instances for {name}")
# Return first healthy instance
service = services[0]['Service']
return {
'host': service['Address'],
'port': service['Port'],
'id': service['ID']
}
def get_all_services(self, name):
_, services = self.consul.health.service(name, passing_only=True)
return [
{
'host': s['Service']['Address'],
'port': s['Service']['Port'],
'id': s['Service']['ID']
}
for s in services
]
Client-Side Load Balancing
import random
class LoadBalancer:
def __init__(self, service_registry):
self.registry = service_registry
self.instances = {}
self.health_checks = {}
def get_instance(self, service_name):
"""Get healthy instance using round-robin."""
instances = self.registry.get_all_services(service_name)
if not instances:
raise ServiceUnavailable(f"No instances available for {service_name}")
# Filter healthy instances
healthy = [i for i in instances if self.is_healthy(i)]
if not healthy:
raise ServiceUnavailable(f"No healthy instances for {service_name}")
# Round-robin selection
return healthy[len(healthy) % len(healthy)]
def is_healthy(self, instance):
"""Check if instance passed recent health check."""
key = f"{instance['host']}:{instance['port']}"
last_check = self.health_checks.get(key, 0)
# Consider unhealthy if no check in last 30 seconds
return (time.time() - last_check) < 30
def record_health(self, instance, is_healthy):
"""Record health check result."""
key = f"{instance['host']}:{instance['port']}"
if is_healthy:
self.health_checks[key] = time.time()
else:
# Mark as unhealthy
self.health_checks[key] = 0
class ServiceClient:
def __init__(self, service_name, registry, load_balancer):
self.service_name = service_name
self.registry = registry
self.load_balancer = load_balancer
def request(self, method, path, **kwargs):
instance = self.load_balancer.get_instance(self.service_name)
url = f"http://{instance['host']}:{instance['port']}{path}"
try:
response = requests.request(method, url, **kwargs)
return response
except Exception as e:
# Mark instance as unhealthy
self.load_balancer.record_health(instance, False)
raise
gRPC Communication
Protocol Buffers Definition
// user_service.proto
syntax = "proto3";
package user;
service UserService {
rpc GetUser(GetUserRequest) returns (User);
rpc CreateUser(CreateUserRequest) returns (User);
rpc ListUsers(ListUsersRequest) returns (ListUsersResponse);
rpc StreamUserEvents(StreamUserEventsRequest) returns (stream UserEvent);
}
message User {
string id = 1;
string email = 2;
string name = 3;
int64 created_at = 4;
UserStatus status = 5;
}
enum UserStatus {
UNKNOWN = 0;
ACTIVE = 1;
INACTIVE = 2;
SUSPENDED = 3;
}
message GetUserRequest {
string id = 1;
}
message CreateUserRequest {
string email = 1;
string name = 2;
}
message ListUsersRequest {
int32 page_size = 1;
string page_token = 2;
}
message ListUsersResponse {
repeated User users = 1;
string next_page_token = 2;
}
message UserEvent {
string user_id = 1;
EventType type = 2;
int64 timestamp = 3;
}
enum EventType {
CREATED = 0;
UPDATED = 1;
DELETED = 2;
}
message StreamUserEventsRequest {
string user_id = 1;
}
gRPC Server Implementation
from concurrent import futures
import grpc
import user_service_pb2
import user_service_pb2_grpc
class UserServiceServicer(user_service_pb2_grpc.UserServiceServicer):
def __init__(self, database):
self.db = database
def GetUser(self, request, context):
user = self.db.get_user(request.id)
if not user:
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details("User not found")
return user_service_pb2.User()
return user_service_pb2.User(
id=user.id,
email=user.email,
name=user.name,
created_at=user.created_at,
status=user_service_pb2.ACTIVE if user.active else user_service_pb2.INACTIVE
)
def CreateUser(self, request, context):
user = self.db.create_user(email=request.email, name=request.name)
return user_service_pb2.User(
id=user.id,
email=user.email,
name=user.name,
created_at=user.created_at,
status=user_service_pb2.ACTIVE
)
def ListUsers(self, request, context):
users, next_token = self.db.list_users(
limit=request.page_size,
token=request.page_token
)
return user_service_pb2.ListUsersResponse(
users=[
user_service_pb2.User(
id=u.id,
email=u.email,
name=u.name,
created_at=u.created_at
)
for u in users
],
next_page_token=next_token
)
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
user_service_pb2_grpc.add_UserServiceServicer_to_server(
UserServiceServicer(database),
server
)
server.add_insecure_port('[::]:50051')
server.start()
server.wait_for_termination()
gRPC Client
import grpc
import user_service_pb2
import user_service_pb2_grpc
class GrpcUserClient:
def __init__(self, host='localhost', port=50051):
channel = grpc.insecure_channel(f'{host}:{port}')
self.stub = user_service_pb2_grpc.UserServiceStub(channel)
def get_user(self, user_id):
request = user_service_pb2.GetUserRequest(id=user_id)
try:
return self.stub.GetUser(request)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.NOT_FOUND:
return None
raise
def create_user(self, email, name):
request = user_service_pb2.CreateUserRequest(email=email, name=name)
return self.stub.CreateUser(request)
def list_users(self, page_size=10, page_token=''):
request = user_service_pb2.ListUsersRequest(
page_size=page_size,
page_token=page_token
)
return self.stub.ListUsers(request)
gRPC with TLS
# Server with TLS
def create_secure_server(cert_file, key_file):
with open(cert_file, 'rb') as f:
cert = f.read()
with open(key_file, 'rb') as f:
key = f.read()
server_credentials = grpc.ssl_server_credentials([(key, cert)])
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
server.add_secure_port('[::]:50051', server_credentials)
return server
# Client with TLS
def create_secure_channel(cert_file):
with open(cert_file, 'rb') as f:
cert = f.read()
credentials = grpc.ssl_channel_credentials(root_certificates=cert)
return grpc.secure_channel('server:50051', credentials)
Asynchronous Communication
Message Queue Producer
import pika
import json
class MessageProducer:
def __init__(self, host='localhost', exchange='events'):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=host)
)
self.channel = self.connection.channel()
self.exchange = exchange
# Declare exchange
self.channel.exchange_declare(
exchange=exchange,
exchange_type='topic',
durable=True
)
def publish(self, routing_key, message, persistent=True):
"""Publish message to exchange."""
properties = pika.BasicProperties(
delivery_mode=2 if persistent else 1, # Persistent
content_type='application/json',
message_id=message.get('id'),
timestamp=message.get('timestamp', int(time.time() * 1000))
)
self.channel.basic_publish(
exchange=self.exchange,
routing_key=routing_key,
body=json.dumps(message),
properties=properties
)
def close(self):
self.connection.close()
# Usage
producer = MessageProducer()
producer.publish(
'user.created',
{
'id': 'user-123',
'email': '[email protected]',
'timestamp': int(time.time() * 1000)
}
)
Message Queue Consumer
import pika
import json
class MessageConsumer:
def __init__(self, host='localhost', queue='user_events'):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=host)
)
self.channel = self.connection.channel()
self.queue = queue
# Declare queue
self.channel.queue_declare(queue=queue, durable=True)
def consume(self, callback, routing_keys=['user.*']):
"""Start consuming messages."""
for routing_key in routing_keys:
self.channel.queue_bind(
exchange='events',
queue=self.queue,
routing_key=routing_key
)
def on_message(channel, method, properties, body):
try:
message = json.loads(body)
callback(message)
channel.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
# Reject and requeue on failure
print(f"Error processing message: {e}")
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(queue=self.queue, on_message_callback=on_message)
print('Waiting for messages...')
self.channel.start_consuming()
def close(self):
self.connection.close()
# Usage
consumer = MessageConsumer(queue='user_events')
def handle_user_created(message):
print(f"Processing user: {message['id']}")
# Process the message
consumer.consume(handle_user_created, routing_keys=['user.created', 'user.updated'])
Kafka Producer
from kafka import KafkaProducer
import json
class KafkaMessageProducer:
def __init__(self, bootstrap_servers=['localhost:9092']):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks='all', # Wait for all replicas
retries=3,
compression_type='lz4'
)
def send(self, topic, key, value):
"""Send message to Kafka."""
future = self.producer.send(
topic,
key=key,
value=value
)
# Wait for send to complete
record_metadata = future.get(timeout=10)
return {
'topic': record_metadata.topic,
'partition': record_metadata.partition,
'offset': record_metadata.offset
}
def send_async(self, topic, key, value, callback=None):
"""Send message asynchronously."""
future = self.producer.send(topic, key=key, value=value)
if callback:
future.add_callback(callback)
return future
def close(self):
self.producer.flush()
self.producer.close()
# Usage
producer = KafkaMessageProducer()
producer.send(
'user-events',
key='user-123',
value={
'type': 'USER_CREATED',
'user_id': 'user-123',
'email': '[email protected]',
'timestamp': 1699999999
}
)
Kafka Consumer
from kafka import KafkaConsumer
class KafkaMessageConsumer:
def __init__(self, topics, bootstrap_servers=['localhost:9092'], group_id='my-group'):
self.consumer = KafkaConsumer(
*topics,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=True,
auto_commit_interval_ms=5000
)
def consume(self, callback):
"""Consume messages from Kafka."""
for message in self.consumer:
try:
callback(message.value, message)
except Exception as e:
print(f"Error processing: {e}")
def consume_batch(self, batch_size=100, timeout_ms=5000):
"""Consume messages in batches."""
batch = []
while True:
messages = self.consumer.poll(timeout_ms=timeout_ms, max_records=batch_size)
for topic_partition, records in messages.items():
for record in records:
batch.append(record.value)
if batch:
yield batch
batch = []
def close(self):
self.consumer.close()
# Usage
consumer = KafkaMessageConsumer(['user-events'], group_id='user-processor')
for message in consumer.consumer:
print(f"Received: {message.value}")
Circuit Breaker Pattern
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = 0
OPEN = 1
HALF_OPEN = 2
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=30, expected_exception=Exception):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
else:
raise CircuitOpenError("Circuit is open")
try:
result = func(*args, **kwargs)
if self.state == CircuitState.HALF_OPEN:
self.reset()
return result
except self.expected_exception as e:
self.record_failure()
raise
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
def reset(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
Service Mesh Integration
Istio Virtual Service
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: user-service
spec:
hosts:
- user-service
http:
- match:
- headers:
x-canary:
exact: "true"
route:
- destination:
host: user-service
subset: v2
weight: 100
- route:
- destination:
host: user-service
subset: v1
weight: 90
- destination:
host: user-service
subset: v2
weight: 10
Retry and Timeout Configuration
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: user-service
spec:
hosts:
- user-service
http:
- route:
- destination:
host: user-service
retries:
attempts: 3
perTryTimeout: 2s
retryOn: connect-failure,refused-stream,unavailable,cancelled,retriable-status-500
timeout: 10s
Best Practices
- Choose Communication Style Based on Use Case: Sync for user-facing APIs, async for background processing
- Implement Circuit Breakers: Prevent cascade failures
- Use Service Discovery: Enable dynamic service scaling
- Add Timeouts: Never wait forever
- Implement Retries with Backoff: Handle transient failures
- Monitor Communication: Track latency, error rates, and throughput
- Use Message Queues for Reliability: Decouple services and handle load spikes
Conclusion
Microservices communication requires careful consideration of latency, reliability, and coupling. Start with REST for simple synchronous needs, graduate to gRPC for high-performance scenarios, and leverage message queues for asynchronous processing. Always implement circuit breakers, timeouts, and proper monitoring to build resilient distributed systems.
Comments