Skip to main content
โšก Calmops

Microservices Communication Patterns: Synchronous and Asynchronous

Introduction

Communication between microservices is one of the most challenging aspects of distributed systems. How services exchange data, handle failures, and maintain consistency determines system reliability, performance, and scalability. Understanding communication patterns is essential for building robust microservices architectures.

This guide examines microservices communication patterns. We explore synchronous communication (REST, gRPC), asynchronous patterns (messaging, events), and supporting patterns like service discovery and circuit breakers. Whether designing new services or improving existing architectures, this guide provides the knowledge necessary for success.

Communication Patterns Overview

Synchronous vs. Asynchronous

Pattern Characteristics Use Cases
Synchronous Request-response, blocking Simple queries, user-facing APIs
Asynchronous Message-based, non-blocking Background processing, eventual consistency

Synchronous Communication

REST APIs

// Express.js REST API
const express = require('express');
const app = express();

app.get('/api/orders/:id', async (req, res) => {
    try {
        const order = await orderService.getOrder(req.params.id);
        
        if (!order) {
            return res.status(404).json({ error: 'Order not found' });
        }
        
        // Fetch related customer data
        const customer = await customerService.getCustomer(order.customerId);
        
        res.json({
            ...order,
            customer
        });
    } catch (error) {
        logger.error('Failed to get order', { orderId: req.params.id, error });
        res.status(500).json({ error: 'Internal server error' });
    }
});

app.post('/api/orders', async (req, res) => {
    try {
        const order = await orderService.createOrder(req.body);
        
        // Emit event asynchronously
        eventBus.emit('order.created', order);
        
        res.status(201).json(order);
    } catch (error) {
        res.status(400).json({ error: error.message });
    }
});

gRPC Services

// order.proto
syntax = "proto3";

package order;

service OrderService {
    rpc GetOrder (GetOrderRequest) returns (Order);
    rpc CreateOrder (CreateOrderRequest) returns (Order);
    rpc StreamOrders (StreamRequest) returns (stream Order);
}

message GetOrderRequest {
    string order_id = 1;
}

message CreateOrderRequest {
    string customer_id = 1;
    repeated OrderItem items = 2;
}

message Order {
    string id = 1;
    string customer_id = 2;
    repeated OrderItem items = 3;
    OrderStatus status = 4;
    int64 created_at = 5;
}

message OrderItem {
    string product_id = 1;
    int32 quantity = 2;
    double price = 3;
}
// Go gRPC server
package main

import (
    "context"
    "net"
    
    "google.golang.org/grpc"
    "order/orderpb"
)

type orderServer struct {
    orderpb.UnimplementedOrderServiceServer
}

func (s *orderServer) GetOrder(ctx context.Context, req *orderpb.GetOrderRequest) (*orderpb.Order, error) {
    order, err := s.service.GetOrder(ctx, req.OrderId)
    if err != nil {
        return nil, err
    }
    
    return &orderpb.Order{
        Id:         order.ID,
        CustomerId: order.CustomerID,
        Status:     orderpb.OrderStatus(order.Status),
    }, nil
}

func main() {
    lis, _ := net.Listen("tcp", ":50051")
    server := grpc.NewServer()
    orderpb.RegisterOrderServiceServer(server, &orderServer{})
    server.Serve(lis)
}

Service Discovery

# Kubernetes service discovery
apiVersion: v1
kind: Service
metadata:
  name: order-service
spec:
  selector:
    app: order-service
  ports:
    - port: 80
      targetPort: 8080
  type: ClusterIP
---
# DNS resolution: order-service.default.svc.cluster.local
# Client-side service discovery
import socket

class ServiceDiscovery:
    def __init__(self, service_name):
        self.service_name = service_name
    
    def get_service_url(self):
        # Query DNS SRV record
        try:
            srv_records = socket.getaddrinfo(
                f"{self.service_name}.default.svc.cluster.local",
                80,
                socket.AF_INET,
                socket.SOCK_STREAM
            )
            host, port = srv_records[0][4]
            return f"http://{host}:{port}"
        except socket.gaierror:
            # Fallback or raise exception
            raise ServiceDiscoveryError(f"Service {self.service_name} not found")
    
    async def call_service(self, path, method="GET"):
        url = self.get_service_url()
        async with aiohttp.ClientSession() as session:
            async with session.request(method, f"{url}{path}") as response:
                return await response.json()

Asynchronous Communication

Message Queues

# RabbitMQ producer
import pika
import json

def publish_order_created(order):
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('rabbitmq-cluster.cluster.local')
    )
    channel = connection.channel()
    
    channel.exchange_declare(
        exchange='orders',
        exchange_type='topic',
        durable=True
    )
    
    channel.basic_publish(
        exchange='orders',
        routing_key='order.created',
        body=json.dumps(order),
        properties=pika.BasicProperties(
            delivery_mode=2,  # Persistent
            content_type='application/json'
        )
    )
    
    connection.close()

# Consumer
def consume_orders():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('rabbitmq-cluster.cluster.local')
    )
    channel = connection.channel()
    
    channel.exchange_declare(
        exchange='orders',
        exchange_type='topic',
        durable=True
    )
    
    channel.queue_declare(queue='order_processing', durable=True)
    channel.queue_bind(
        exchange='orders',
        queue='order_processing',
        routing_key='order.*'
    )
    
    def callback(ch, method, properties, body):
        order = json.loads(body)
        process_order(order)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(queue='order_processing', on_message_callback=callback)
    channel.start_consuming()

Event-Driven Architecture

graph LR
    Order[Order Service] -->|order.created| EventBus[Event Bus]
    EventBus -->|Events| Inventory[Inventory Service]
    EventBus -->|Events| Notification[Notification Service]
    EventBus -->|Events| Analytics[Analytics Service]
    
    Inventory -->|Inventory Updated| EventBus
    EventBus -->|Event| Order
# Event-driven with Apache Kafka
from kafka import KafkaProducer, KafkaConsumer

# Producer
producer = KafkaProducer(
    bootstrap_servers=['kafka:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def emit_order_created(order):
    producer.send(
        'orders',
        key=order['id'].encode('utf-8'),
        value={
            'event_type': 'ORDER_CREATED',
            'order_id': order['id'],
            'customer_id': order['customer_id'],
            'total': order['total'],
            'timestamp': datetime.utcnow().isoformat()
        }
    )

# Consumer
consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['kafka:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    group_id='order-processor',
    auto_offset_reset='earliest'
)

for message in consumer:
    event = message.value
    if event['event_type'] == 'ORDER_CREATED':
        process_new_order(event)

Resilience Patterns

Circuit Breaker

# Circuit breaker implementation
import time
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
    
    def call(self, func, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.timeout:
                self.state = CircuitState.HALF_OPEN
            else:
                raise CircuitOpenError("Circuit is open")
        
        try:
            result = func(*args, **kwargs)
            self.on_success()
            return result
        except Exception as e:
            self.on_failure()
            raise
    
    def on_success(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED
    
    def on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

# Usage with requests
cb = CircuitBreaker(failure_threshold=5)

def call_order_service(order_id):
    return cb.call(requests.get, f"http://orders/api/{order_id}")

Retry with Exponential Backoff

# Retry decorator with exponential backoff
import time
import functools

def retry(max_attempts=3, backoff_factor=2, exceptions=(Exception,)):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            attempt = 0
            while attempt < max_attempts:
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    attempt += 1
                    if attempt >= max_attempts:
                        raise
                    
                    delay = backoff_factor ** attempt
                    logger.warning(
                        f"Attempt {attempt} failed: {e}. "
                        f"Retrying in {delay}s..."
                    )
                    time.sleep(delay)
            
        return wrapper
    return decorator

@retry(max_attempts=3, exceptions=(ConnectionError, TimeoutError))
def call_service(url):
    response = requests.get(url, timeout=5)
    response.raise_for_status()
    return response.json()

Timeout and Bulkhead

# Timeout and bulkhead patterns
import asyncio
from concurrent.futures import ThreadPoolExecutor

class Bulkhead:
    def __init__(self, max_concurrent=10):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.executor = ThreadPoolExecutor(max_workers=max_concurrent)
    
    async def execute(self, func, *args, **kwargs):
        async with self.semaphore:
            loop = asyncio.get_event_loop()
            return await loop.run_in_executor(
                self.executor,
                lambda: func(*args, **kwargs)
            )

# Usage
bulkhead = Bulkhead(max_concurrent=10)

async def call_service(service_func):
    try:
        result = await asyncio.wait_for(
            bulkhead.execute(service_func),
            timeout=5.0
        )
        return result
    except asyncio.TimeoutError:
        logger.error("Service call timed out")
        raise

API Gateway Integration

# API Gateway routing to microservices
resources:
  /users:
    get:
      x-amazon-apigateway-integration:
        type: http_proxy
        uri: http://user-service/users
        httpMethod: GET
  
  /orders:
    get:
      x-amazon-apigateway-integration:
        type: http_proxy
        uri: http://order-service/orders
        httpMethod: GET
      responses:
        400:
          statusCode: 400
        500:
          statusCode: 500
  
  /orders:
    post:
      x-amazon-apigateway-integration:
        type: aws
        uri: arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/${CreateOrderFunction.Arn}/invocations
        httpMethod: POST
        credentials: ${CreateOrderFunction.Arn}
        requestTemplates:
          application/json: '{"order_id": "$input.params("order_id")"}'

Contract Testing

# Consumer-driven contract testing with Pact
import pact

# Define consumer expectations
consumer = pact.Consumer('OrderClient')
provider = provider('OrderService')

consumer.given('an order exists')
    .upon_receiving('a request for the order')
    .with_request(method='GET', path='/orders/123')
    .will_respond_with(status=200, body={
        'id': '123',
        'customer_id': 'customer-456',
        'status': 'pending'
    })

# Verify
with consumer:
    result = requests.get('http://localhost:8080/orders/123')
    assert result.status_code == 200

# Publish contract
consumer.publish(
    provider='OrderService',
    version='1.0.0',
    publish_verification_results=True
)

Conclusion

Microservices communication patterns provide the foundation for building reliable distributed systems. Understanding synchronous and asynchronous patterns, implementing resilience patterns like circuit breakers and retries, and using appropriate tooling enables building robust architectures.

Key practices include choosing the right communication pattern for each use case, implementing resilience patterns to handle failures gracefully, using service discovery for loose coupling, and testing integrations with contract testing. The investment in communication patterns pays dividends through improved reliability and maintainability.

As distributed systems grow in complexity, communication patterns become increasingly critical. Choose patterns that match your requirements and invest in building robust, observable communication infrastructure.


Resources

Comments