Concurrency in Rust - Message Passing

While creating threads allows you to run code concurrently, a common challenge is coordinating and communicating between those threads. One popular and safe approach is message passing, where threads communicate by sending each other data without sharing memory directly. This model aligns with the Go programming language’s slogan: “Do not communicate by sharing memory; instead, share memory by communicating.”

Rust implements this concept in its standard library through channels. A channel is a one-way conduit that allows a “transmitter” to send data to a “receiver.”

Creating a Channel with mpsc

Rust’s standard channel implementation is in the std::sync::mpsc module. The name mpsc stands for multiple producer, single consumer. This means you can have many threads sending data, but only one thread receiving it.

You create a channel using the mpsc::channel() function, which returns a tuple containing a transmitter and a receiver.

use std::sync::mpsc;
use std::thread;

fn main() {
    // Create a new channel. `tx` is the transmitter, `rx` is the receiver.
    let (tx, rx) = mpsc::channel();

    // Spawn a new thread and move the transmitter into it.
    thread::spawn(move || {
        let val = String::from("hi");
        // Send a message. This takes ownership of `val`.
        tx.send(val).unwrap();
        // println!("val is {}", val); // This would not compile because `val` was moved.
    });

    // Block the main thread and wait for a value.
    // `recv()` returns a Result, which will be an Err if the transmitter has hung up.
    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

In this example:

  1. We create a channel. The transmitter tx is moved into the spawned thread.
  2. The spawned thread sends a String through the channel using tx.send(). The send method takes ownership of the value, preventing us from accidentally using it again in the spawned thread after sending it.
  3. The main thread calls rx.recv(), which blocks execution and waits for a message to arrive.
  4. Once the message is received, it’s printed.

Multiple Producers, Single Consumer

The “multiple producer” capability of mpsc channels is enabled by cloning the transmitter.

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    // Clone the transmitter for a second thread.
    let tx1 = tx.clone();

    // First producer thread
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    // Second producer thread
    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    // The receiver can be treated as an iterator.
    // The loop will end when all transmitters have been dropped.
    for received in rx {
        println!("Got: {}", received);
    }
}

In this example:

  1. We clone the transmitter tx to create tx1.
  2. We spawn two threads, giving each one a transmitter.
  3. Both threads send messages concurrently.
  4. The main thread iterates over the receiver rx. This is a convenient way to handle incoming messages. The loop automatically ends when all transmitters are dropped (i.e., when both spawned threads finish).

Conclusion

Message-passing channels are a fundamental tool for safe concurrency in Rust. By transferring ownership of data when sending messages, they help prevent common concurrency bugs like data races at compile time. They provide a clean and effective way for threads to communicate and synchronize their work.