Skip to main content
โšก Calmops

Microservices Communication Patterns: REST, gRPC, and Event-Driven

Introduction

Communication between microservices is the backbone of distributed systems. The choice of communication pattern affects latency, reliability, scalability, and maintainability. This guide covers synchronous and asynchronous communication patterns, implementation strategies, and best practices for building robust microservice architectures.

Communication Patterns Overview

Pattern Use Case Latency Complexity Coupling
REST CRUD operations Medium Low Tight
gRPC High-performance APIs Low Medium Tight
GraphQL Flexible queries Medium Medium Tight
Message Queue Async processing High Medium Loose
Event-Driven Real-time updates Variable High Loose

Synchronous Communication

REST API Client

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import logging

class APIClient:
    def __init__(self, base_url, timeout=30, max_retries=3):
        self.base_url = base_url
        self.session = requests.Session()
        
        # Configure retry strategy
        retry_strategy = Retry(
            total=max_retries,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE", "POST"]
        )
        
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)
    
    def get(self, path, params=None, headers=None):
        url = f"{self.base_url}{path}"
        response = self.session.get(url, params=params, headers=headers, timeout=30)
        response.raise_for_status()
        return response.json()
    
    def post(self, path, data=None, json=None, headers=None):
        url = f"{self.base_url}{path}"
        response = self.session.post(url, data=data, json=json, headers=headers, timeout=30)
        response.raise_for_status()
        return response.json()
    
    def put(self, path, data=None, json=None, headers=None):
        url = f"{self.base_url}{path}"
        response = self.session.put(url, data=data, json=json, headers=headers, timeout=30)
        response.raise_for_status()
        return response.json()
    
    def delete(self, path, headers=None):
        url = f"{self.base_url}{path}"
        response = self.session.delete(url, headers=headers, timeout=30)
        response.raise_for_status()
        return response.json()

Service Discovery with Consul

import consul
import requests

class ConsulServiceRegistry:
    def __init__(self, consul_host='localhost', consul_port=8500):
        self.consul = consul.Consul(host=consul_host, port=consul_port)
    
    def register_service(self, name, service_id, port, health_check=None):
        service_config = {
            'ID': service_id,
            'Name': name,
            'Port': port,
            'Meta': {
                'version': '1.0.0'
            }
        }
        
        if health_check:
            service_config['Check'] = health_check
        
        self.consul.agent.service.register(**service_config)
    
    def deregister_service(self, service_id):
        self.consul.agent.service.deregister(service_id)
    
    def get_service(self, name):
        _, services = self.consul.health.service(name, passing_only=True)
        
        if not services:
            raise ServiceUnavailable(f"No healthy instances for {name}")
        
        # Return first healthy instance
        service = services[0]['Service']
        return {
            'host': service['Address'],
            'port': service['Port'],
            'id': service['ID']
        }
    
    def get_all_services(self, name):
        _, services = self.consul.health.service(name, passing_only=True)
        
        return [
            {
                'host': s['Service']['Address'],
                'port': s['Service']['Port'],
                'id': s['Service']['ID']
            }
            for s in services
        ]

Client-Side Load Balancing

import random

class LoadBalancer:
    def __init__(self, service_registry):
        self.registry = service_registry
        self.instances = {}
        self.health_checks = {}
    
    def get_instance(self, service_name):
        """Get healthy instance using round-robin."""
        instances = self.registry.get_all_services(service_name)
        
        if not instances:
            raise ServiceUnavailable(f"No instances available for {service_name}")
        
        # Filter healthy instances
        healthy = [i for i in instances if self.is_healthy(i)]
        
        if not healthy:
            raise ServiceUnavailable(f"No healthy instances for {service_name}")
        
        # Round-robin selection
        return healthy[len(healthy) % len(healthy)]
    
    def is_healthy(self, instance):
        """Check if instance passed recent health check."""
        key = f"{instance['host']}:{instance['port']}"
        last_check = self.health_checks.get(key, 0)
        
        # Consider unhealthy if no check in last 30 seconds
        return (time.time() - last_check) < 30
    
    def record_health(self, instance, is_healthy):
        """Record health check result."""
        key = f"{instance['host']}:{instance['port']}"
        
        if is_healthy:
            self.health_checks[key] = time.time()
        else:
            # Mark as unhealthy
            self.health_checks[key] = 0

class ServiceClient:
    def __init__(self, service_name, registry, load_balancer):
        self.service_name = service_name
        self.registry = registry
        self.load_balancer = load_balancer
    
    def request(self, method, path, **kwargs):
        instance = self.load_balancer.get_instance(self.service_name)
        url = f"http://{instance['host']}:{instance['port']}{path}"
        
        try:
            response = requests.request(method, url, **kwargs)
            return response
        except Exception as e:
            # Mark instance as unhealthy
            self.load_balancer.record_health(instance, False)
            raise

gRPC Communication

Protocol Buffers Definition

// user_service.proto
syntax = "proto3";

package user;

service UserService {
    rpc GetUser(GetUserRequest) returns (User);
    rpc CreateUser(CreateUserRequest) returns (User);
    rpc ListUsers(ListUsersRequest) returns (ListUsersResponse);
    rpc StreamUserEvents(StreamUserEventsRequest) returns (stream UserEvent);
}

message User {
    string id = 1;
    string email = 2;
    string name = 3;
    int64 created_at = 4;
    UserStatus status = 5;
}

enum UserStatus {
    UNKNOWN = 0;
    ACTIVE = 1;
    INACTIVE = 2;
    SUSPENDED = 3;
}

message GetUserRequest {
    string id = 1;
}

message CreateUserRequest {
    string email = 1;
    string name = 2;
}

message ListUsersRequest {
    int32 page_size = 1;
    string page_token = 2;
}

message ListUsersResponse {
    repeated User users = 1;
    string next_page_token = 2;
}

message UserEvent {
    string user_id = 1;
    EventType type = 2;
    int64 timestamp = 3;
}

enum EventType {
    CREATED = 0;
    UPDATED = 1;
    DELETED = 2;
}

message StreamUserEventsRequest {
    string user_id = 1;
}

gRPC Server Implementation

from concurrent import futures
import grpc
import user_service_pb2
import user_service_pb2_grpc

class UserServiceServicer(user_service_pb2_grpc.UserServiceServicer):
    def __init__(self, database):
        self.db = database
    
    def GetUser(self, request, context):
        user = self.db.get_user(request.id)
        
        if not user:
            context.set_code(grpc.StatusCode.NOT_FOUND)
            context.set_details("User not found")
            return user_service_pb2.User()
        
        return user_service_pb2.User(
            id=user.id,
            email=user.email,
            name=user.name,
            created_at=user.created_at,
            status=user_service_pb2.ACTIVE if user.active else user_service_pb2.INACTIVE
        )
    
    def CreateUser(self, request, context):
        user = self.db.create_user(email=request.email, name=request.name)
        
        return user_service_pb2.User(
            id=user.id,
            email=user.email,
            name=user.name,
            created_at=user.created_at,
            status=user_service_pb2.ACTIVE
        )
    
    def ListUsers(self, request, context):
        users, next_token = self.db.list_users(
            limit=request.page_size,
            token=request.page_token
        )
        
        return user_service_pb2.ListUsersResponse(
            users=[
                user_service_pb2.User(
                    id=u.id,
                    email=u.email,
                    name=u.name,
                    created_at=u.created_at
                )
                for u in users
            ],
            next_page_token=next_token
        )

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    user_service_pb2_grpc.add_UserServiceServicer_to_server(
        UserServiceServicer(database),
        server
    )
    
    server.add_insecure_port('[::]:50051')
    server.start()
    server.wait_for_termination()

gRPC Client

import grpc
import user_service_pb2
import user_service_pb2_grpc

class GrpcUserClient:
    def __init__(self, host='localhost', port=50051):
        channel = grpc.insecure_channel(f'{host}:{port}')
        self.stub = user_service_pb2_grpc.UserServiceStub(channel)
    
    def get_user(self, user_id):
        request = user_service_pb2.GetUserRequest(id=user_id)
        
        try:
            return self.stub.GetUser(request)
        except grpc.RpcError as e:
            if e.code() == grpc.StatusCode.NOT_FOUND:
                return None
            raise
    
    def create_user(self, email, name):
        request = user_service_pb2.CreateUserRequest(email=email, name=name)
        return self.stub.CreateUser(request)
    
    def list_users(self, page_size=10, page_token=''):
        request = user_service_pb2.ListUsersRequest(
            page_size=page_size,
            page_token=page_token
        )
        return self.stub.ListUsers(request)

gRPC with TLS

# Server with TLS
def create_secure_server(cert_file, key_file):
    with open(cert_file, 'rb') as f:
        cert = f.read()
    with open(key_file, 'rb') as f:
        key = f.read()
    
    server_credentials = grpc.ssl_server_credentials([(key, cert)])
    
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    server.add_secure_port('[::]:50051', server_credentials)
    
    return server

# Client with TLS
def create_secure_channel(cert_file):
    with open(cert_file, 'rb') as f:
        cert = f.read()
    
    credentials = grpc.ssl_channel_credentials(root_certificates=cert)
    
    return grpc.secure_channel('server:50051', credentials)

Asynchronous Communication

Message Queue Producer

import pika
import json

class MessageProducer:
    def __init__(self, host='localhost', exchange='events'):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=host)
        )
        self.channel = self.connection.channel()
        self.exchange = exchange
        
        # Declare exchange
        self.channel.exchange_declare(
            exchange=exchange,
            exchange_type='topic',
            durable=True
        )
    
    def publish(self, routing_key, message, persistent=True):
        """Publish message to exchange."""
        properties = pika.BasicProperties(
            delivery_mode=2 if persistent else 1,  # Persistent
            content_type='application/json',
            message_id=message.get('id'),
            timestamp=message.get('timestamp', int(time.time() * 1000))
        )
        
        self.channel.basic_publish(
            exchange=self.exchange,
            routing_key=routing_key,
            body=json.dumps(message),
            properties=properties
        )
    
    def close(self):
        self.connection.close()

# Usage
producer = MessageProducer()

producer.publish(
    'user.created',
    {
        'id': 'user-123',
        'email': '[email protected]',
        'timestamp': int(time.time() * 1000)
    }
)

Message Queue Consumer

import pika
import json

class MessageConsumer:
    def __init__(self, host='localhost', queue='user_events'):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=host)
        )
        self.channel = self.connection.channel()
        self.queue = queue
        
        # Declare queue
        self.channel.queue_declare(queue=queue, durable=True)
    
    def consume(self, callback, routing_keys=['user.*']):
        """Start consuming messages."""
        for routing_key in routing_keys:
            self.channel.queue_bind(
                exchange='events',
                queue=self.queue,
                routing_key=routing_key
            )
        
        def on_message(channel, method, properties, body):
            try:
                message = json.loads(body)
                callback(message)
                channel.basic_ack(delivery_tag=method.delivery_tag)
            except Exception as e:
                # Reject and requeue on failure
                print(f"Error processing message: {e}")
                channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
        
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(queue=self.queue, on_message_callback=on_message)
        
        print('Waiting for messages...')
        self.channel.start_consuming()
    
    def close(self):
        self.connection.close()

# Usage
consumer = MessageConsumer(queue='user_events')

def handle_user_created(message):
    print(f"Processing user: {message['id']}")
    # Process the message

consumer.consume(handle_user_created, routing_keys=['user.created', 'user.updated'])

Kafka Producer

from kafka import KafkaProducer
import json

class KafkaMessageProducer:
    def __init__(self, bootstrap_servers=['localhost:9092']):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None,
            acks='all',  # Wait for all replicas
            retries=3,
            compression_type='lz4'
        )
    
    def send(self, topic, key, value):
        """Send message to Kafka."""
        future = self.producer.send(
            topic,
            key=key,
            value=value
        )
        
        # Wait for send to complete
        record_metadata = future.get(timeout=10)
        
        return {
            'topic': record_metadata.topic,
            'partition': record_metadata.partition,
            'offset': record_metadata.offset
        }
    
    def send_async(self, topic, key, value, callback=None):
        """Send message asynchronously."""
        future = self.producer.send(topic, key=key, value=value)
        
        if callback:
            future.add_callback(callback)
        
        return future
    
    def close(self):
        self.producer.flush()
        self.producer.close()

# Usage
producer = KafkaMessageProducer()

producer.send(
    'user-events',
    key='user-123',
    value={
        'type': 'USER_CREATED',
        'user_id': 'user-123',
        'email': '[email protected]',
        'timestamp': 1699999999
    }
)

Kafka Consumer

from kafka import KafkaConsumer

class KafkaMessageConsumer:
    def __init__(self, topics, bootstrap_servers=['localhost:9092'], group_id='my-group'):
        self.consumer = KafkaConsumer(
            *topics,
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            auto_offset_reset='earliest',
            enable_auto_commit=True,
            auto_commit_interval_ms=5000
        )
    
    def consume(self, callback):
        """Consume messages from Kafka."""
        for message in self.consumer:
            try:
                callback(message.value, message)
            except Exception as e:
                print(f"Error processing: {e}")
    
    def consume_batch(self, batch_size=100, timeout_ms=5000):
        """Consume messages in batches."""
        batch = []
        
        while True:
            messages = self.consumer.poll(timeout_ms=timeout_ms, max_records=batch_size)
            
            for topic_partition, records in messages.items():
                for record in records:
                    batch.append(record.value)
            
            if batch:
                yield batch
                batch = []
    
    def close(self):
        self.consumer.close()

# Usage
consumer = KafkaMessageConsumer(['user-events'], group_id='user-processor')

for message in consumer.consumer:
    print(f"Received: {message.value}")

Circuit Breaker Pattern

import time
from enum import Enum

class CircuitState(Enum):
    CLOSED = 0
    OPEN = 1
    HALF_OPEN = 2

class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=30, expected_exception=Exception):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
    
    def call(self, func, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time >= self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
            else:
                raise CircuitOpenError("Circuit is open")
        
        try:
            result = func(*args, **kwargs)
            
            if self.state == CircuitState.HALF_OPEN:
                self.reset()
            
            return result
            
        except self.expected_exception as e:
            self.record_failure()
            raise
    
    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
    
    def reset(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED

Service Mesh Integration

Istio Virtual Service

apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: user-service
spec:
  hosts:
  - user-service
  http:
  - match:
    - headers:
        x-canary:
          exact: "true"
    route:
    - destination:
        host: user-service
        subset: v2
      weight: 100
  - route:
    - destination:
        host: user-service
        subset: v1
      weight: 90
    - destination:
        host: user-service
        subset: v2
      weight: 10

Retry and Timeout Configuration

apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: user-service
spec:
  hosts:
  - user-service
  http:
  - route:
    - destination:
        host: user-service
    retries:
      attempts: 3
      perTryTimeout: 2s
      retryOn: connect-failure,refused-stream,unavailable,cancelled,retriable-status-500
    timeout: 10s

Best Practices

  1. Choose Communication Style Based on Use Case: Sync for user-facing APIs, async for background processing
  2. Implement Circuit Breakers: Prevent cascade failures
  3. Use Service Discovery: Enable dynamic service scaling
  4. Add Timeouts: Never wait forever
  5. Implement Retries with Backoff: Handle transient failures
  6. Monitor Communication: Track latency, error rates, and throughput
  7. Use Message Queues for Reliability: Decouple services and handle load spikes

Conclusion

Microservices communication requires careful consideration of latency, reliability, and coupling. Start with REST for simple synchronous needs, graduate to gRPC for high-performance scenarios, and leverage message queues for asynchronous processing. Always implement circuit breakers, timeouts, and proper monitoring to build resilient distributed systems.

Comments