On this page

Safe concurrency: threads, Arc, and channels

14 min read TextCh. 5 — Rust in Practice

Fearless concurrency

Concurrency is the ability to execute multiple tasks simultaneously. In most languages, concurrency is a major source of bugs: data races, deadlocks, use-after-free in threads. Rust eliminates data races at compile time.

Rust's promise: fearless concurrency — you can write concurrent code without fear of data races, because the compiler detects them before your program ever runs.

Creating threads with std::thread::spawn

use std::thread;
use std::time::Duration;

fn main() {
    // Create a new thread
    let handle = thread::spawn(|| {
        for i in 1..=5 {
            println!("Secondary thread: {i}");
            thread::sleep(Duration::from_millis(10));
        }
    });
    
    // The main thread continues
    for i in 1..=3 {
        println!("Main thread: {i}");
        thread::sleep(Duration::from_millis(10));
    }
    
    // join() waits for the thread to finish
    // If you don't call join(), the thread may be cut off when main exits
    handle.join().unwrap();
    
    println!("All threads finished");
}

Move closures with threads

Threads need ownership of the data they use. The move keyword forces the closure to take ownership:

use std::thread;

fn main() {
    let data = vec![1, 2, 3, 4, 5];
    
    // Without move: error — data may go out of scope before the thread
    // let handle = thread::spawn(|| println!("{:?}", data)); // Error
    
    // With move: data is moved into the thread
    let handle = thread::spawn(move || {
        println!("Data in thread: {:?}", data);
        let sum: i32 = data.iter().sum();
        println!("Sum: {sum}");
        sum
    });
    
    // data is no longer available here (it was moved)
    
    // join() returns the closure's return value wrapped in Result
    let sum = handle.join().unwrap();
    println!("Thread returned: {sum}");
}

Arc: atomic reference counting

Rc<T> is not thread-safe (it doesn't implement Send). To share data between threads, use Arc<T> (Atomic Reference Counted):

use std::sync::Arc;
use std::thread;

fn main() {
    // Arc allows sharing ownership between multiple threads
    let data = Arc::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
    
    let mut handles = Vec::new();
    
    for i in 0..4 {
        let data_clone = Arc::clone(&data); // Increments the reference count
        
        let handle = thread::spawn(move || {
            // Each thread reads its portion
            let start = i * (data_clone.len() / 4);
            let end = (i + 1) * (data_clone.len() / 4);
            let slice = &data_clone[start..end];
            let sum: i32 = slice.iter().sum();
            println!("Thread {i}: partial sum = {sum}");
            sum
        });
        
        handles.push(handle);
    }
    
    let total: i32 = handles.into_iter()
        .map(|h| h.join().unwrap())
        .sum();
    
    println!("Total sum: {total}");
}

Mutex: exclusive data access

Mutex<T> (Mutual Exclusion) guarantees that only one thread accesses the data at any given moment:

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let results = Arc::new(Mutex::new(Vec::<String>::new()));
    let mut handles = Vec::new();
    
    for i in 0..5 {
        let results_clone = Arc::clone(&results);
        
        let handle = thread::spawn(move || {
            // lock() waits until exclusive access is obtained
            // Returns a MutexGuard that releases the lock when dropped
            let mut guard = results_clone.lock().unwrap();
            guard.push(format!("Thread {i} completed"));
        }); // guard is dropped here → lock released
        
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    let data = results.lock().unwrap();
    println!("Results ({} entries):", data.len());
    for entry in data.iter() {
        println!("  - {entry}");
    }
}

Channels (mpsc): communication between threads

mpsc channels (multiple producer, single consumer) allow sending data between threads without sharing memory:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // Create channel: tx (transmitter/sender), rx (receiver)
    let (tx, rx) = mpsc::channel();
    
    // Producer thread
    let handle = thread::spawn(move || {
        let messages = vec!["hello", "from", "the", "thread"];
        
        for message in messages {
            println!("Sending: {message}");
            tx.send(message).unwrap();
            thread::sleep(Duration::from_millis(50));
        }
        // tx is dropped here — rx will detect this
    });
    
    // Main thread receives
    for received in rx {
        // rx iterates until tx is dropped
        println!("Received: {received}");
    }
    
    handle.join().unwrap();
    println!("Channel closed");
}

Multiple producers

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel::<String>();
    
    let mut handles = Vec::new();
    
    // Multiple threads send to the same channel
    for i in 0..5 {
        let tx_clone = tx.clone();
        
        let handle = thread::spawn(move || {
            for j in 0..3 {
                let msg = format!("Thread {i}, message {j}");
                tx_clone.send(msg).unwrap();
            }
        });
        
        handles.push(handle);
    }
    
    // Release the original tx so rx can terminate when all clones are dropped
    drop(tx);
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    // Collect all messages
    let mut messages: Vec<String> = rx.into_iter().collect();
    messages.sort();
    
    println!("Total messages: {}", messages.len());
    for msg in &messages[..5] { // Show only first 5
        println!("  {msg}");
    }
}

The Send and Sync traits

Rust uses two marker traits to guarantee concurrency safety:

  • Send: The type can be safely transferred (moved) to another thread. Almost all types are Send. Exception: Rc<T> (uses non-atomic reference counting).

  • Sync: The type can be safely shared by reference between threads (&T is Send). Exceptions: Cell<T>, RefCell<T> (non-thread-safe interior mutability).

use std::sync::{Arc, Mutex};

// Arc<T> is Send + Sync if T is Send + Sync
// Mutex<T> is Send + Sync if T is Send

// You can verify at compile time:
fn requires_send<T: Send>(_: T) {}
fn requires_sync<T: Sync>(_: &T) {}

fn main() {
    let arc = Arc::new(Mutex::new(42));
    requires_send(arc.clone()); // OK: Arc<Mutex<i32>> is Send
    requires_sync(&arc);        // OK: Arc<Mutex<i32>> is Sync
    
    // Rc is NOT Send
    let rc = std::rc::Rc::new(42);
    // requires_send(rc); // Compile-time error
    drop(rc);
}

Pattern: thread pool with channels

use std::sync::{Arc, Mutex, mpsc};
use std::thread;

type Task = Box<dyn FnOnce() + Send + 'static>;

struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
    tx: mpsc::Sender<Task>,
}

impl ThreadPool {
    fn new(size: usize) -> Self {
        let (tx, rx) = mpsc::channel::<Task>();
        let rx = Arc::new(Mutex::new(rx));
        let mut threads = Vec::with_capacity(size);
        
        for id in 0..size {
            let rx = Arc::clone(&rx);
            let handle = thread::spawn(move || {
                loop {
                    let task = rx.lock().unwrap().recv();
                    match task {
                        Ok(f) => {
                            println!("Worker {id}: executing task");
                            f();
                        }
                        Err(_) => {
                            println!("Worker {id}: channel closed, exiting");
                            break;
                        }
                    }
                }
            });
            threads.push(handle);
        }
        
        ThreadPool { threads, tx }
    }
    
    fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        self.tx.send(Box::new(f)).unwrap();
    }
}

fn main() {
    let pool = ThreadPool::new(4);
    
    for i in 0..8 {
        pool.execute(move || {
            println!("Task {i} executed on thread {:?}", thread::current().id());
        });
    }
    
    // Give workers time to process
    thread::sleep(std::time::Duration::from_millis(100));
}

You have reached the end of the theoretical content of the course. The next and final lesson is the final project: a CLI notes application that integrates everything you have learned.

Send and Sync: concurrency safety in the type
Send means the type can be transferred to another thread. Sync means it can be shared by reference between threads. The compiler verifies these traits automatically — you cannot pass an Rc<T> to another thread (not Send), but you can pass an Arc<T> (which is Send + Sync).
Watch out for deadlocks
A deadlock occurs when two threads each wait for the other to release a mutex. Rust prevents data race conditions but does NOT prevent deadlocks (they are a logic problem, not a memory problem). Always acquire mutexes in the same order in all threads.
rust
use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    // Arc: atomically reference-counted pointer (for shared heap)
    // Mutex: mutual exclusion for safe access from multiple threads
    let counter = Arc::new(Mutex::new(0_u32));
    let mut handles = Vec::new();

    for i in 0..8 {
        // Clone the Arc — increments the reference count
        let counter_clone = Arc::clone(&counter);

        let handle = thread::spawn(move || {
            // lock() blocks until exclusive access is obtained
            let mut guard = counter_clone.lock().unwrap();
            *guard += 1;
            println!("Thread {i}: counter = {}", *guard);
        }); // guard is released here (Drop)

        handles.push(handle);
    }

    // Wait for all threads to finish
    for handle in handles {
        handle.join().unwrap();
    }

    let final_val = *counter.lock().unwrap();
    println!("Final value: {final_val}"); // Always 8
}