Skip to main content
โšก Calmops

Concurrency Patterns in Rust Web Services

Concurrency Patterns in Rust Web Services

TL;DR: This guide covers essential concurrency patterns for building high-performance Rust web services. You’ll learn connection pools, worker pools, message queues, rate limiting, and graceful shutdown patterns.


Introduction

Rust’s async ecosystem makes building concurrent web services natural. Key patterns include:

  • Connection pooling for databases
  • Worker pools for background tasks
  • Message queues for async communication
  • Rate limiting for protection
  • Graceful shutdown handling

Connection Pools

SQLx Connection Pool

use sqlx::{postgres::PgPoolOptions, PgPool};
use std::time::Duration;

pub async fn create_pool(database_url: &str) -> Result<PgPool, sqlx::Error> {
    PgPoolOptions::new()
        .max_connections(20)
        .min_connections(5)
        .acquire_timeout(Duration::from_secs(3))
        .idle_timeout(Duration::from_secs(600))
        .max_lifetime(Duration::from_secs(1800))
        .connect(database_url)
        .await
}

Custom Connection Pool

use std::sync::Arc;
use tokio::sync::{Semaphore, RwLock};
use std::collections::VecDeque;
use std::time::{Duration, Instant};

pub struct ConnectionPool<T> {
    pool: Arc<RwLock<VecDeque<T>>>,
    semaphore: Arc<Semaphore>,
    max_size: usize,
    timeout: Duration,
}

impl<T: Clone> ConnectionPool<T> 
where 
    T: Connection + Send + Sync,
{
    pub fn new(max_size: usize, timeout: Duration) -> Self {
        Self {
            pool: Arc::new(RwLock::new(VecDeque::with_capacity(max_size))),
            semaphore: Arc::new(Semaphore::new(max_size)),
            max_size,
            timeout,
        }
    }
    
    pub async fn acquire(&self) -> Result<PooledConnection<T>, PoolError> {
        let _permit = self.semaphore.acquire().await
            .map_err(|_| PoolError::PoolExhausted)?;
        
        let mut pool = self.pool.write().await;
        
        let connection = if let Some(conn) = pool.pop_front() {
            conn
        } else {
            T::connect().await?
        };
        
        Ok(PooledConnection {
            connection,
            pool: self.pool.clone(),
            permit: _permit,
        })
    }
}

pub trait Connection {
    async fn connect() -> Result<Self, PoolError>
    where
        Self: Sized;
}

pub struct PooledConnection<T: Connection> {
    connection: T,
    pool: Arc<RwLock<VecDeque<T>>>,
    permit: tokio::sync::SemaphorePermit<'static>,
}

impl<T: Connection> Drop for PooledConnection<T> {
    fn drop(&mut self) {
        let pool = self.pool.clone();
        let conn = self.connection.clone();
        
        tokio::spawn(async move {
            let mut pool = pool.write().await;
            if pool.len() < 10 { // max idle
                pool.push_back(conn);
            }
        });
    }
}

Worker Pools

Background Task Worker

use tokio::sync::{mpsc, broadcast, oneshot};
use std::sync::Arc;

pub struct WorkerPool {
    sender: mpsc::Sender<Task>,
    workers: Vec<WorkerHandle>,
}

struct WorkerHandle {
    shutdown: oneshot::Sender<()>,
}

enum Task {
    ProcessJob(Job, oneshot::Sender<Result<(), Error>>),
    Shutdown,
}

impl WorkerPool {
    pub fn new(num_workers: usize) -> Self {
        let (tx, rx) = mpsc::channel(100);
        let mut workers = Vec::new();
        
        for _ in 0..num_workers {
            let rx = rx.clone();
            let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
            
            tokio::spawn(async move {
                let mut rx = rx;
                
                loop {
                    tokio::select! {
                        _ = &mut shutdown_rx => {
                            break;
                        }
                        task = rx.recv() => {
                            match task {
                                Some(Task::ProcessJob(job, response)) => {
                                    let result = process_job(job).await;
                                    let _ = response.send(result);
                                }
                                None => break,
                                Some(Task::Shutdown) => break,
                            }
                        }
                    }
                }
            });
            
            workers.push(WorkerHandle { shutdown: shutdown_tx });
        }
        
        Self { sender: tx, workers }
    }
    
    pub async fn submit(&self, job: Job) -> Result<(), Error> {
        let (tx, rx) = oneshot::channel();
        self.sender.send(Task::ProcessJob(job, tx))
            .await
            .map_err(|_| Error::PoolClosed)?;
        rx.await.map_err(|_| Error::TaskCancelled)?
    }
}

async fn process_job(job: Job) -> Result<(), Error> {
    // Process job
    Ok(())
}

pub struct Job {
    pub id: String,
    pub payload: Vec<u8>,
}

Message Queues

Event Bus Implementation

use tokio::sync::{broadcast, mpsc};
use std::sync::Arc;
use std::collections::HashMap;

pub struct EventBus {
    subscribers: Arc<RwLock<HashMap<String, broadcast::Sender<Event>>>>,
}

impl EventBus {
    pub fn new() -> Self {
        Self {
            subscribers: Arc::new(RwLock::new(HashMap::new())),
        }
    }
    
    pub async fn subscribe(&self, topic: &str) -> broadcast::Receiver<Event> {
        let mut subs = self.subscribers.write().await;
        
        if let Some(sender) = subs.get(topic) {
            sender.subscribe()
        } else {
            let (sender, receiver) = broadcast::channel(100);
            subs.insert(topic.to_string(), sender);
            receiver
        }
    }
    
    pub async fn publish(&self, topic: &str, event: Event) {
        let subs = self.subscribers.read().await;
        
        if let Some(sender) = subs.get(topic) {
            let _ = sender.send(event);
        }
    }
}

#[derive(Debug, Clone)]
pub struct Event {
    pub topic: String,
    pub payload: Vec<u8>,
    pub timestamp: std::time::Instant,
}

Using the Event Bus

pub struct NotificationService {
    bus: Arc<EventBus>,
}

impl NotificationService {
    pub fn new(bus: Arc<EventBus>) -> Self {
        Self { bus }
    }
    
    pub async fn notify_user(&self, user_id: &str, message: &str) {
        let event = Event {
            topic: "notifications".to_string(),
            payload: serde_json::json!({
                "user_id": user_id,
                "message": message,
            }).to_string().into_bytes(),
            timestamp: std::time::Instant::now(),
        };
        
        self.bus.publish("notifications", event).await;
    }
    
    pub async fn listen(&self) -> broadcast::Receiver<Event> {
        self.bus.subscribe("notifications").await
    }
}

Rate Limiting

Token Bucket for Requests

use std::sync::Arc;
use tokio::sync::RwLock;
use std::collections::HashMap;
use std::time::Instant;

pub struct RequestRateLimiter {
    buckets: Arc<RwLock<HashMap<String, TokenBucket>>>,
    capacity: u64,
    refill_rate: f64,
}

struct TokenBucket {
    tokens: f64,
    last_refill: Instant,
}

impl RequestRateLimiter {
    pub fn new(capacity: u64, refill_rate: f64) -> Self {
        Self {
            buckets: Arc::new(RwLock::new(HashMap::new())),
            capacity,
            refill_rate,
        }
    }
    
    pub async fn check(&self, key: &str, cost: u64) -> Result<u64, RateLimitError> {
        let mut buckets = self.buckets.write().await;
        
        let bucket = buckets.entry(key.to_string()).or_insert_with(|| TokenBucket {
            tokens: self.capacity as f64,
            last_refill: Instant::now(),
        });
        
        let elapsed = bucket.last_refill.elapsed().as_secs_f64();
        bucket.tokens = (bucket.tokens + elapsed * self.refill_rate)
            .min(self.capacity as f64);
        bucket.last_refill = Instant::now();
        
        if bucket.tokens >= cost as f64 {
            bucket.tokens -= cost as f64;
            Ok(bucket.tokens as u64)
        } else {
            Err(RateLimitError::Exceeded {
                retry_after: ((cost as f64 - bucket.tokens) / self.refill_rate) as u64,
            })
        }
    }
}

Graceful Shutdown

Handling Shutdown Signals

use tokio::signal;
use tokio::sync::broadcast;

pub struct Shutdown {
    shutdown_tx: broadcast::Sender<()>,
}

impl Shutdown {
    pub fn new() -> Self {
        let (tx, _) = broadcast::channel(1);
        Self { shutdown_tx: tx }
    }
    
    pub fn subscribe(&self) -> broadcast::Receiver<()> {
        self.shutdown_tx.subscribe()
    }
    
    pub async fn wait(&self) {
        let _ = self.shutdown_tx.send(());
    }
}

pub async fn run_with_shutdown(app: App) {
    let shutdown = Shutdown::new();
    let mut rx = shutdown.subscribe();
    
    tokio::select! {
        result = app.run() => {
            if let Err(e) = result {
                eprintln!("Server error: {}", e);
            }
        }
        _ = signal::ctrl_c() => {
            println!("Received Ctrl+C, shutting down...");
            shutdown.wait().await;
        }
    }
    
    println!("Starting graceful shutdown...");
    app.shutdown().await;
    println!("Shutdown complete");
}

Load Balancing

Round-Robin Load Balancer

use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};

pub struct LoadBalancer<T> {
    backends: Arc<RwLock<Vec<T>>>,
    current: AtomicUsize,
}

impl<T> LoadBalancer<T> {
    pub fn new(backends: Vec<T>) -> Self {
        Self {
            backends: Arc::new(RwLock::new(backends)),
            current: AtomicUsize::new(0),
        }
    }
    
    pub async fn next(&self) -> Option<T> 
    where 
        T: Clone,
    {
        let backends = self.backends.read().await;
        
        if backends.is_empty() {
            return None;
        }
        
        let idx = self.current.fetch_add(1, Ordering::SeqCst) % backends.len();
        Some(backends[idx].clone())
    }
}

Conclusion

Key concurrency patterns for web services:

  1. Connection Pools - Reuse database connections efficiently
  2. Worker Pools - Background task processing
  3. Message Queues - Async event communication
  4. Rate Limiting - Protect against abuse
  5. Graceful Shutdown - Clean resource cleanup

Comments