Skip to main content
⚡ Calmops

Async Error Handling in Rust

Master error handling in async/await code with retry strategies, timeouts, and graceful degradation

Error handling in async Rust code is fundamentally different from synchronous code. When you’re juggling multiple concurrent operations, traditional try-catch or simple error returns aren’t enough. This article explores advanced error handling patterns for async code: retries, timeouts, exponential backoff, circuit breakers, and graceful degradation.


Why Async Error Handling Is Different

In synchronous code, an error stops execution immediately. In async code:

  1. Multiple concurrent operations - Errors from different tasks need coordination
  2. Timeouts are critical - Operations can hang indefinitely
  3. Retry logic is common - Network calls fail temporarily
  4. Cascading failures - One error can trigger many others
  5. Resource cleanup - Tasks must be cancelled cleanly

Let’s explore how Rust’s type system and async runtime help us handle these challenges safely.


Core Concepts

The Result Type in Async Context

Async functions return Future<Output = Result<T, E>>. This fundamental type forms the basis of error handling.

// filepath: src/async_basics.rs
use std::future::Future;

// An async function that might fail
async fn fetch_user(id: u32) -> Result<String, String> {
    if id == 0 {
        Err("Invalid user ID".to_string())
    } else {
        Ok(format!("User {}", id))
    }
}

// Async function composition
async fn process_user(id: u32) -> Result<String, String> {
    let user = fetch_user(id).await?;  // ? operator works in async
    Ok(format!("Processing: {}", user))
}

#[tokio::main]
async fn main() {
    match process_user(1).await {
        Ok(result) => println!("{}", result),
        Err(e) => eprintln!("Error: {}", e),
    }
}

The ? operator is just as powerful in async contexts. When an error occurs, it propagates up the call stack immediately.

Error Types in Async Code

// filepath: src/error_types.rs
use std::io;
use std::fmt;

// Custom error type combining multiple error sources
#[derive(Debug)]
pub enum ApiError {
    NetworkError(io::Error),
    Timeout,
    InvalidResponse(String),
    DatabaseError(String),
}

impl fmt::Display for ApiError {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match self {
            ApiError::NetworkError(e) => write!(f, "Network error: {}", e),
            ApiError::Timeout => write!(f, "Request timed out"),
            ApiError::InvalidResponse(msg) => write!(f, "Invalid response: {}", msg),
            ApiError::DatabaseError(msg) => write!(f, "Database error: {}", msg),
        }
    }
}

impl std::error::Error for ApiError {}

// Implement From for easier error conversion
impl From<io::Error> for ApiError {
    fn from(err: io::Error) -> Self {
        ApiError::NetworkError(err)
    }
}

// Function using custom error type
async fn api_call() -> Result<String, ApiError> {
    Err(ApiError::Timeout)
}

Retry Patterns

Simple Retry Loop

The most straightforward retry pattern is a loop with a counter:

// filepath: src/simple_retry.rs
use tokio::time::{sleep, Duration};

async fn fetch_with_retry(url: &str, max_retries: u32) -> Result<String, String> {
    for attempt in 0..=max_retries {
        match fetch_url(url).await {
            Ok(response) => return Ok(response),
            Err(e) => {
                if attempt == max_retries {
                    return Err(format!("Failed after {} attempts: {}", max_retries + 1, e));
                }
                println!("Attempt {} failed: {}. Retrying...", attempt + 1, e);
                sleep(Duration::from_secs(1)).await;
            }
        }
    }
    unreachable!()
}

async fn fetch_url(url: &str) -> Result<String, String> {
    // Simulated network call
    Ok(format!("Response from {}", url))
}

#[tokio::main]
async fn main() {
    let result = fetch_with_retry("https://api.example.com", 3).await;
    println!("{:?}", result);
}

Exponential Backoff

Real-world systems use exponential backoff to avoid overwhelming struggling servers:

// filepath: src/exponential_backoff.rs
use tokio::time::{sleep, Duration};

pub struct ExponentialBackoff {
    initial_delay: Duration,
    max_delay: Duration,
    max_retries: u32,
}

impl ExponentialBackoff {
    pub fn new(initial_delay: Duration, max_delay: Duration, max_retries: u32) -> Self {
        ExponentialBackoff {
            initial_delay,
            max_delay,
            max_retries,
        }
    }

    pub async fn execute<F, T, E>(&self, mut operation: F) -> Result<T, E>
    where
        F: FnMut() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, E>>>>,
    {
        let mut attempt = 0;
        let mut delay = self.initial_delay;

        loop {
            match operation().await {
                Ok(result) => return Ok(result),
                Err(e) => {
                    attempt += 1;
                    if attempt > self.max_retries {
                        return Err(e);
                    }

                    println!("Attempt {} failed. Waiting {:?} before retry.", attempt, delay);
                    sleep(delay).await;

                    // Calculate next delay: min(delay * 2, max_delay)
                    delay = std::cmp::min(
                        Duration::from_millis(delay.as_millis() as u64 * 2),
                        self.max_delay,
                    );
                }
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let backoff = ExponentialBackoff::new(
        Duration::from_millis(100),
        Duration::from_secs(10),
        5,
    );

    let result = backoff.execute(|| {
        Box::pin(async {
            // Simulated operation
            Ok::<String, String>("Success".to_string())
        })
    }).await;

    println!("{:?}", result);
}

Jittered Backoff for Thundering Herd Prevention

// filepath: src/jittered_backoff.rs
use tokio::time::{sleep, Duration};
use rand::Rng;

pub async fn fetch_with_jitter(
    url: &str,
    max_retries: u32,
) -> Result<String, String> {
    let mut rng = rand::thread_rng();
    let mut delay = Duration::from_millis(100);

    for attempt in 0..=max_retries {
        match fetch_url(url).await {
            Ok(response) => return Ok(response),
            Err(e) => {
                if attempt == max_retries {
                    return Err(format!("Failed after {} attempts", max_retries + 1));
                }

                // Add random jitter (±10%)
                let jitter_factor = rng.gen_range(0.9..=1.1);
                let jittered_delay = Duration::from_millis(
                    (delay.as_millis() as f64 * jitter_factor) as u64
                );

                sleep(jittered_delay).await;

                // Exponential backoff: delay * 2
                delay = Duration::from_millis(
                    std::cmp::min(delay.as_millis() as u64 * 2, 10000)
                );
            }
        }
    }
    unreachable!()
}

async fn fetch_url(url: &str) -> Result<String, String> {
    Ok(format!("Response from {}", url))
}

Timeout Handling

Timeouts are essential in async code to prevent indefinite hangs:

// filepath: src/timeout_handling.rs
use tokio::time::{timeout, Duration};

#[derive(Debug)]
pub enum RequestError {
    Timeout,
    NetworkError(String),
}

pub async fn fetch_with_timeout(url: &str, timeout_secs: u64) -> Result<String, RequestError> {
    let timeout_duration = Duration::from_secs(timeout_secs);

    match timeout(timeout_duration, fetch_url(url)).await {
        Ok(Ok(response)) => Ok(response),
        Ok(Err(e)) => Err(RequestError::NetworkError(e)),
        Err(_) => Err(RequestError::Timeout),
    }
}

async fn fetch_url(url: &str) -> Result<String, String> {
    // Simulated network call
    Ok(format!("Response from {}", url))
}

#[tokio::main]
async fn main() {
    let result = fetch_with_timeout("https://api.example.com", 5).await;
    match result {
        Ok(data) => println!("Success: {}", data),
        Err(RequestError::Timeout) => println!("Request timed out"),
        Err(RequestError::NetworkError(e)) => println!("Network error: {}", e),
    }
}

Combining Retry and Timeout

Real applications need both timeout and retry logic:

// filepath: src/retry_with_timeout.rs
use tokio::time::{timeout, sleep, Duration};

pub async fn fetch_reliable(
    url: &str,
    max_retries: u32,
    timeout_secs: u64,
) -> Result<String, String> {
    let mut attempt = 0;
    let mut delay = Duration::from_millis(100);

    loop {
        attempt += 1;
        println!("Attempt {} of {}", attempt, max_retries + 1);

        // Apply timeout to each attempt
        let timeout_duration = Duration::from_secs(timeout_secs);
        match timeout(timeout_duration, fetch_url(url)).await {
            // Success
            Ok(Ok(response)) => return Ok(response),

            // Operation timed out
            Err(_) => {
                println!("Attempt {} timed out", attempt);
                if attempt > max_retries {
                    return Err("Timeout after all retries".to_string());
                }
            }

            // Other error
            Ok(Err(e)) => {
                println!("Attempt {} failed: {}", attempt, e);
                if attempt > max_retries {
                    return Err(format!("Failed after {} attempts: {}", max_retries + 1, e));
                }
            }
        }

        sleep(delay).await;
        delay = Duration::from_millis(std::cmp::min(delay.as_millis() as u64 * 2, 5000));
    }
}

async fn fetch_url(url: &str) -> Result<String, String> {
    Ok("Response".to_string())
}

Circuit Breaker Pattern

For handling cascading failures, implement a circuit breaker:

// filepath: src/circuit_breaker.rs
use tokio::time::{Instant, Duration};
use std::sync::{Arc, Mutex};

#[derive(Debug, Clone, Copy)]
pub enum CircuitState {
    Closed,      // Normal operation
    Open,        // Failing, reject requests
    HalfOpen,    // Testing if service recovered
}

pub struct CircuitBreaker {
    state: Arc<Mutex<CircuitState>>,
    failure_threshold: u32,
    success_threshold: u32,
    failure_count: Arc<Mutex<u32>>,
    success_count: Arc<Mutex<u32>>,
    last_failure_time: Arc<Mutex<Option<Instant>>>,
    timeout_duration: Duration,
}

impl CircuitBreaker {
    pub fn new(
        failure_threshold: u32,
        success_threshold: u32,
        timeout_secs: u64,
    ) -> Self {
        CircuitBreaker {
            state: Arc::new(Mutex::new(CircuitState::Closed)),
            failure_threshold,
            success_threshold,
            failure_count: Arc::new(Mutex::new(0)),
            success_count: Arc::new(Mutex::new(0)),
            last_failure_time: Arc::new(Mutex::new(None)),
            timeout_duration: Duration::from_secs(timeout_secs),
        }
    }

    pub async fn call<F, T>(&self, operation: F) -> Result<T, String>
    where
        F: std::future::Future<Output = Result<T, String>>,
    {
        let mut state = self.state.lock().unwrap();

        match *state {
            CircuitState::Open => {
                let last_failure = self.last_failure_time.lock().unwrap();
                if let Some(instant) = *last_failure {
                    if instant.elapsed() > self.timeout_duration {
                        println!("Circuit breaker: Transitioning to HalfOpen");
                        *state = CircuitState::HalfOpen;
                        *self.success_count.lock().unwrap() = 0;
                    } else {
                        return Err("Circuit breaker is open".to_string());
                    }
                }
            }
            _ => {}
        }

        drop(state);  // Release lock before operation

        match operation.await {
            Ok(result) => {
                self.on_success();
                Ok(result)
            }
            Err(e) => {
                self.on_failure();
                Err(e)
            }
        }
    }

    fn on_success(&self) {
        let mut state = self.state.lock().unwrap();
        let mut success_count = self.success_count.lock().unwrap();

        match *state {
            CircuitState::HalfOpen => {
                *success_count += 1;
                if *success_count >= self.success_threshold {
                    println!("Circuit breaker: Closed (recovered)");
                    *state = CircuitState::Closed;
                    *self.failure_count.lock().unwrap() = 0;
                }
            }
            CircuitState::Closed => {
                *self.failure_count.lock().unwrap() = 0;
            }
            _ => {}
        }
    }

    fn on_failure(&self) {
        let mut state = self.state.lock().unwrap();
        let mut failure_count = self.failure_count.lock().unwrap();

        *failure_count += 1;

        if *failure_count >= self.failure_threshold {
            println!("Circuit breaker: Open (too many failures)");
            *state = CircuitState::Open;
            *self.last_failure_time.lock().unwrap() = Some(Instant::now());
        }
    }
}

#[tokio::main]
async fn main() {
    let breaker = CircuitBreaker::new(3, 2, 5);

    for i in 0..10 {
        let result = breaker.call(async {
            if i < 4 {
                Err("Service unavailable".to_string())
            } else {
                Ok("Success".to_string())
            }
        }).await;

        println!("Call {}: {:?}", i, result);
    }
}

Graceful Degradation

// filepath: src/graceful_degradation.rs
use tokio::time::Duration;

#[derive(Debug)]
pub struct Response {
    pub data: String,
    pub is_cached: bool,
}

pub struct Service {
    cache: std::sync::Mutex<Option<String>>,
}

impl Service {
    pub fn new() -> Self {
        Service {
            cache: std::sync::Mutex::new(None),
        }
    }

    pub async fn fetch_with_fallback(&self, url: &str) -> Response {
        // Try primary source with timeout
        match tokio::time::timeout(
            Duration::from_secs(2),
            fetch_primary(url),
        ).await {
            Ok(Ok(data)) => {
                // Cache successful response
                *self.cache.lock().unwrap() = Some(data.clone());
                Response {
                    data,
                    is_cached: false,
                }
            }
            _ => {
                // Primary failed, try cache
                if let Some(cached) = self.cache.lock().unwrap().clone() {
                    println!("Using cached data");
                    Response {
                        data: cached,
                        is_cached: true,
                    }
                } else {
                    // No cache, return default
                    println!("Returning default data");
                    Response {
                        data: "Default data".to_string(),
                        is_cached: false,
                    }
                }
            }
        }
    }
}

async fn fetch_primary(url: &str) -> Result<String, String> {
    Ok(format!("Data from {}", url))
}

Common Pitfalls and Best Practices

❌ Pitfall 1: Ignoring Timeout Hangs

// ❌ Bad: No timeout, can hang indefinitely
async fn bad_fetch(url: &str) -> Result<String, String> {
    make_request(url).await  // Could hang forever
}

// ✅ Good: Always include timeout
async fn good_fetch(url: &str) -> Result<String, String> {
    match timeout(Duration::from_secs(30), make_request(url)).await {
        Ok(result) => result,
        Err(_) => Err("Request timed out".to_string()),
    }
}

❌ Pitfall 2: Retry Without Backoff

// ❌ Bad: Tight retry loop burns CPU and hammers server
for _ in 0..10 {
    if operation().await.is_ok() {
        break;
    }
}

// ✅ Good: Use backoff
let mut delay = Duration::from_millis(100);
for _ in 0..10 {
    if operation().await.is_ok() {
        break;
    }
    sleep(delay).await;
    delay = Duration::from_millis((delay.as_millis() as u64).saturating_mul(2));
}

✅ Best Practice 1: Use Custom Error Types

#[derive(Debug)]
pub enum ApiError {
    Timeout,
    NetworkError(String),
    Retryable(String),
    NonRetryable(String),
}

impl ApiError {
    pub fn is_retryable(&self) -> bool {
        matches!(self, ApiError::Timeout | ApiError::Retryable(_))
    }
}

✅ Best Practice 2: Structured Logging

use tracing::{error, warn, debug};

pub async fn fetch_with_logging(url: &str) -> Result<String, String> {
    debug!(url, "Starting fetch");

    match make_request(url).await {
        Ok(data) => {
            debug!(url, "Fetch succeeded");
            Ok(data)
        }
        Err(e) => {
            error!(url, error = %e, "Fetch failed");
            Err(e)
        }
    }
}

Comparison with Other Languages

Language Approach Pros Cons
Rust Result + async/await Type-safe, zero-cost, no runtime Syntax complex initially
Go error returns + goroutines Simple, readable Manual error handling
Python try/except + asyncio Easy to learn Dynamic, no compile-time safety
Java try/catch + CompletableFuture Type-safe Verbose, checked exceptions

Further Resources

Books and Articles

Helpful Crates

Alternative Technologies

  • Async Channels (tokio::sync::mpsc) - For task coordination
  • Select! Macro - For racing multiple futures
  • Task Spawning - For independent async tasks
  • Cancellation Tokens - For graceful shutdown

Conclusion

Robust async error handling in Rust combines:

  1. Type-safe error types - Custom enums capturing all failure modes
  2. Timeouts - Always prevent indefinite hangs
  3. Retries with backoff - Recover from transient failures
  4. Circuit breakers - Prevent cascading failures
  5. Graceful degradation - Provide fallbacks when possible
  6. Structured logging - Track failures for debugging

By applying these patterns, your async Rust applications become resilient and maintainable.



Happy async coding! 🚀

Comments