Concurrency Patterns in Rust Web Services
TL;DR: This guide covers essential concurrency patterns for building high-performance Rust web services. You’ll learn connection pools, worker pools, message queues, rate limiting, and graceful shutdown patterns.
Introduction
Rust’s async ecosystem makes building concurrent web services natural. Key patterns include:
- Connection pooling for databases
- Worker pools for background tasks
- Message queues for async communication
- Rate limiting for protection
- Graceful shutdown handling
Connection Pools
SQLx Connection Pool
use sqlx::{postgres::PgPoolOptions, PgPool};
use std::time::Duration;
pub async fn create_pool(database_url: &str) -> Result<PgPool, sqlx::Error> {
PgPoolOptions::new()
.max_connections(20)
.min_connections(5)
.acquire_timeout(Duration::from_secs(3))
.idle_timeout(Duration::from_secs(600))
.max_lifetime(Duration::from_secs(1800))
.connect(database_url)
.await
}
Custom Connection Pool
use std::sync::Arc;
use tokio::sync::{Semaphore, RwLock};
use std::collections::VecDeque;
use std::time::{Duration, Instant};
pub struct ConnectionPool<T> {
pool: Arc<RwLock<VecDeque<T>>>,
semaphore: Arc<Semaphore>,
max_size: usize,
timeout: Duration,
}
impl<T: Clone> ConnectionPool<T>
where
T: Connection + Send + Sync,
{
pub fn new(max_size: usize, timeout: Duration) -> Self {
Self {
pool: Arc::new(RwLock::new(VecDeque::with_capacity(max_size))),
semaphore: Arc::new(Semaphore::new(max_size)),
max_size,
timeout,
}
}
pub async fn acquire(&self) -> Result<PooledConnection<T>, PoolError> {
let _permit = self.semaphore.acquire().await
.map_err(|_| PoolError::PoolExhausted)?;
let mut pool = self.pool.write().await;
let connection = if let Some(conn) = pool.pop_front() {
conn
} else {
T::connect().await?
};
Ok(PooledConnection {
connection,
pool: self.pool.clone(),
permit: _permit,
})
}
}
pub trait Connection {
async fn connect() -> Result<Self, PoolError>
where
Self: Sized;
}
pub struct PooledConnection<T: Connection> {
connection: T,
pool: Arc<RwLock<VecDeque<T>>>,
permit: tokio::sync::SemaphorePermit<'static>,
}
impl<T: Connection> Drop for PooledConnection<T> {
fn drop(&mut self) {
let pool = self.pool.clone();
let conn = self.connection.clone();
tokio::spawn(async move {
let mut pool = pool.write().await;
if pool.len() < 10 { // max idle
pool.push_back(conn);
}
});
}
}
Worker Pools
Background Task Worker
use tokio::sync::{mpsc, broadcast, oneshot};
use std::sync::Arc;
pub struct WorkerPool {
sender: mpsc::Sender<Task>,
workers: Vec<WorkerHandle>,
}
struct WorkerHandle {
shutdown: oneshot::Sender<()>,
}
enum Task {
ProcessJob(Job, oneshot::Sender<Result<(), Error>>),
Shutdown,
}
impl WorkerPool {
pub fn new(num_workers: usize) -> Self {
let (tx, rx) = mpsc::channel(100);
let mut workers = Vec::new();
for _ in 0..num_workers {
let rx = rx.clone();
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
tokio::spawn(async move {
let mut rx = rx;
loop {
tokio::select! {
_ = &mut shutdown_rx => {
break;
}
task = rx.recv() => {
match task {
Some(Task::ProcessJob(job, response)) => {
let result = process_job(job).await;
let _ = response.send(result);
}
None => break,
Some(Task::Shutdown) => break,
}
}
}
}
});
workers.push(WorkerHandle { shutdown: shutdown_tx });
}
Self { sender: tx, workers }
}
pub async fn submit(&self, job: Job) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
self.sender.send(Task::ProcessJob(job, tx))
.await
.map_err(|_| Error::PoolClosed)?;
rx.await.map_err(|_| Error::TaskCancelled)?
}
}
async fn process_job(job: Job) -> Result<(), Error> {
// Process job
Ok(())
}
pub struct Job {
pub id: String,
pub payload: Vec<u8>,
}
Message Queues
Event Bus Implementation
use tokio::sync::{broadcast, mpsc};
use std::sync::Arc;
use std::collections::HashMap;
pub struct EventBus {
subscribers: Arc<RwLock<HashMap<String, broadcast::Sender<Event>>>>,
}
impl EventBus {
pub fn new() -> Self {
Self {
subscribers: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn subscribe(&self, topic: &str) -> broadcast::Receiver<Event> {
let mut subs = self.subscribers.write().await;
if let Some(sender) = subs.get(topic) {
sender.subscribe()
} else {
let (sender, receiver) = broadcast::channel(100);
subs.insert(topic.to_string(), sender);
receiver
}
}
pub async fn publish(&self, topic: &str, event: Event) {
let subs = self.subscribers.read().await;
if let Some(sender) = subs.get(topic) {
let _ = sender.send(event);
}
}
}
#[derive(Debug, Clone)]
pub struct Event {
pub topic: String,
pub payload: Vec<u8>,
pub timestamp: std::time::Instant,
}
Using the Event Bus
pub struct NotificationService {
bus: Arc<EventBus>,
}
impl NotificationService {
pub fn new(bus: Arc<EventBus>) -> Self {
Self { bus }
}
pub async fn notify_user(&self, user_id: &str, message: &str) {
let event = Event {
topic: "notifications".to_string(),
payload: serde_json::json!({
"user_id": user_id,
"message": message,
}).to_string().into_bytes(),
timestamp: std::time::Instant::now(),
};
self.bus.publish("notifications", event).await;
}
pub async fn listen(&self) -> broadcast::Receiver<Event> {
self.bus.subscribe("notifications").await
}
}
Rate Limiting
Token Bucket for Requests
use std::sync::Arc;
use tokio::sync::RwLock;
use std::collections::HashMap;
use std::time::Instant;
pub struct RequestRateLimiter {
buckets: Arc<RwLock<HashMap<String, TokenBucket>>>,
capacity: u64,
refill_rate: f64,
}
struct TokenBucket {
tokens: f64,
last_refill: Instant,
}
impl RequestRateLimiter {
pub fn new(capacity: u64, refill_rate: f64) -> Self {
Self {
buckets: Arc::new(RwLock::new(HashMap::new())),
capacity,
refill_rate,
}
}
pub async fn check(&self, key: &str, cost: u64) -> Result<u64, RateLimitError> {
let mut buckets = self.buckets.write().await;
let bucket = buckets.entry(key.to_string()).or_insert_with(|| TokenBucket {
tokens: self.capacity as f64,
last_refill: Instant::now(),
});
let elapsed = bucket.last_refill.elapsed().as_secs_f64();
bucket.tokens = (bucket.tokens + elapsed * self.refill_rate)
.min(self.capacity as f64);
bucket.last_refill = Instant::now();
if bucket.tokens >= cost as f64 {
bucket.tokens -= cost as f64;
Ok(bucket.tokens as u64)
} else {
Err(RateLimitError::Exceeded {
retry_after: ((cost as f64 - bucket.tokens) / self.refill_rate) as u64,
})
}
}
}
Graceful Shutdown
Handling Shutdown Signals
use tokio::signal;
use tokio::sync::broadcast;
pub struct Shutdown {
shutdown_tx: broadcast::Sender<()>,
}
impl Shutdown {
pub fn new() -> Self {
let (tx, _) = broadcast::channel(1);
Self { shutdown_tx: tx }
}
pub fn subscribe(&self) -> broadcast::Receiver<()> {
self.shutdown_tx.subscribe()
}
pub async fn wait(&self) {
let _ = self.shutdown_tx.send(());
}
}
pub async fn run_with_shutdown(app: App) {
let shutdown = Shutdown::new();
let mut rx = shutdown.subscribe();
tokio::select! {
result = app.run() => {
if let Err(e) = result {
eprintln!("Server error: {}", e);
}
}
_ = signal::ctrl_c() => {
println!("Received Ctrl+C, shutting down...");
shutdown.wait().await;
}
}
println!("Starting graceful shutdown...");
app.shutdown().await;
println!("Shutdown complete");
}
Load Balancing
Round-Robin Load Balancer
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
pub struct LoadBalancer<T> {
backends: Arc<RwLock<Vec<T>>>,
current: AtomicUsize,
}
impl<T> LoadBalancer<T> {
pub fn new(backends: Vec<T>) -> Self {
Self {
backends: Arc::new(RwLock::new(backends)),
current: AtomicUsize::new(0),
}
}
pub async fn next(&self) -> Option<T>
where
T: Clone,
{
let backends = self.backends.read().await;
if backends.is_empty() {
return None;
}
let idx = self.current.fetch_add(1, Ordering::SeqCst) % backends.len();
Some(backends[idx].clone())
}
}
Conclusion
Key concurrency patterns for web services:
- Connection Pools - Reuse database connections efficiently
- Worker Pools - Background task processing
- Message Queues - Async event communication
- Rate Limiting - Protect against abuse
- Graceful Shutdown - Clean resource cleanup
Related Articles
- Advanced Async/Await Patterns in Rust
- Building REST APIs with Axum and Actix-web
- Concurrency in Rust
Comments