On this page
Safe concurrency: threads, Arc, and channels
Fearless concurrency
Concurrency is the ability to execute multiple tasks simultaneously. In most languages, concurrency is a major source of bugs: data races, deadlocks, use-after-free in threads. Rust eliminates data races at compile time.
Rust's promise: fearless concurrency — you can write concurrent code without fear of data races, because the compiler detects them before your program ever runs.
Creating threads with std::thread::spawn
use std::thread;
use std::time::Duration;
fn main() {
// Create a new thread
let handle = thread::spawn(|| {
for i in 1..=5 {
println!("Secondary thread: {i}");
thread::sleep(Duration::from_millis(10));
}
});
// The main thread continues
for i in 1..=3 {
println!("Main thread: {i}");
thread::sleep(Duration::from_millis(10));
}
// join() waits for the thread to finish
// If you don't call join(), the thread may be cut off when main exits
handle.join().unwrap();
println!("All threads finished");
}Move closures with threads
Threads need ownership of the data they use. The move keyword forces the closure to take ownership:
use std::thread;
fn main() {
let data = vec![1, 2, 3, 4, 5];
// Without move: error — data may go out of scope before the thread
// let handle = thread::spawn(|| println!("{:?}", data)); // Error
// With move: data is moved into the thread
let handle = thread::spawn(move || {
println!("Data in thread: {:?}", data);
let sum: i32 = data.iter().sum();
println!("Sum: {sum}");
sum
});
// data is no longer available here (it was moved)
// join() returns the closure's return value wrapped in Result
let sum = handle.join().unwrap();
println!("Thread returned: {sum}");
}Arc: atomic reference counting
Rc<T> is not thread-safe (it doesn't implement Send). To share data between threads, use Arc<T> (Atomic Reference Counted):
use std::sync::Arc;
use std::thread;
fn main() {
// Arc allows sharing ownership between multiple threads
let data = Arc::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let mut handles = Vec::new();
for i in 0..4 {
let data_clone = Arc::clone(&data); // Increments the reference count
let handle = thread::spawn(move || {
// Each thread reads its portion
let start = i * (data_clone.len() / 4);
let end = (i + 1) * (data_clone.len() / 4);
let slice = &data_clone[start..end];
let sum: i32 = slice.iter().sum();
println!("Thread {i}: partial sum = {sum}");
sum
});
handles.push(handle);
}
let total: i32 = handles.into_iter()
.map(|h| h.join().unwrap())
.sum();
println!("Total sum: {total}");
}Mutex: exclusive data access
Mutex<T> (Mutual Exclusion) guarantees that only one thread accesses the data at any given moment:
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let results = Arc::new(Mutex::new(Vec::<String>::new()));
let mut handles = Vec::new();
for i in 0..5 {
let results_clone = Arc::clone(&results);
let handle = thread::spawn(move || {
// lock() waits until exclusive access is obtained
// Returns a MutexGuard that releases the lock when dropped
let mut guard = results_clone.lock().unwrap();
guard.push(format!("Thread {i} completed"));
}); // guard is dropped here → lock released
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let data = results.lock().unwrap();
println!("Results ({} entries):", data.len());
for entry in data.iter() {
println!(" - {entry}");
}
}Channels (mpsc): communication between threads
mpsc channels (multiple producer, single consumer) allow sending data between threads without sharing memory:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// Create channel: tx (transmitter/sender), rx (receiver)
let (tx, rx) = mpsc::channel();
// Producer thread
let handle = thread::spawn(move || {
let messages = vec!["hello", "from", "the", "thread"];
for message in messages {
println!("Sending: {message}");
tx.send(message).unwrap();
thread::sleep(Duration::from_millis(50));
}
// tx is dropped here — rx will detect this
});
// Main thread receives
for received in rx {
// rx iterates until tx is dropped
println!("Received: {received}");
}
handle.join().unwrap();
println!("Channel closed");
}Multiple producers
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel::<String>();
let mut handles = Vec::new();
// Multiple threads send to the same channel
for i in 0..5 {
let tx_clone = tx.clone();
let handle = thread::spawn(move || {
for j in 0..3 {
let msg = format!("Thread {i}, message {j}");
tx_clone.send(msg).unwrap();
}
});
handles.push(handle);
}
// Release the original tx so rx can terminate when all clones are dropped
drop(tx);
for handle in handles {
handle.join().unwrap();
}
// Collect all messages
let mut messages: Vec<String> = rx.into_iter().collect();
messages.sort();
println!("Total messages: {}", messages.len());
for msg in &messages[..5] { // Show only first 5
println!(" {msg}");
}
}The Send and Sync traits
Rust uses two marker traits to guarantee concurrency safety:
Send: The type can be safely transferred (moved) to another thread. Almost all types areSend. Exception:Rc<T>(uses non-atomic reference counting).Sync: The type can be safely shared by reference between threads (&TisSend). Exceptions:Cell<T>,RefCell<T>(non-thread-safe interior mutability).
use std::sync::{Arc, Mutex};
// Arc<T> is Send + Sync if T is Send + Sync
// Mutex<T> is Send + Sync if T is Send
// You can verify at compile time:
fn requires_send<T: Send>(_: T) {}
fn requires_sync<T: Sync>(_: &T) {}
fn main() {
let arc = Arc::new(Mutex::new(42));
requires_send(arc.clone()); // OK: Arc<Mutex<i32>> is Send
requires_sync(&arc); // OK: Arc<Mutex<i32>> is Sync
// Rc is NOT Send
let rc = std::rc::Rc::new(42);
// requires_send(rc); // Compile-time error
drop(rc);
}Pattern: thread pool with channels
use std::sync::{Arc, Mutex, mpsc};
use std::thread;
type Task = Box<dyn FnOnce() + Send + 'static>;
struct ThreadPool {
threads: Vec<thread::JoinHandle<()>>,
tx: mpsc::Sender<Task>,
}
impl ThreadPool {
fn new(size: usize) -> Self {
let (tx, rx) = mpsc::channel::<Task>();
let rx = Arc::new(Mutex::new(rx));
let mut threads = Vec::with_capacity(size);
for id in 0..size {
let rx = Arc::clone(&rx);
let handle = thread::spawn(move || {
loop {
let task = rx.lock().unwrap().recv();
match task {
Ok(f) => {
println!("Worker {id}: executing task");
f();
}
Err(_) => {
println!("Worker {id}: channel closed, exiting");
break;
}
}
}
});
threads.push(handle);
}
ThreadPool { threads, tx }
}
fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
self.tx.send(Box::new(f)).unwrap();
}
}
fn main() {
let pool = ThreadPool::new(4);
for i in 0..8 {
pool.execute(move || {
println!("Task {i} executed on thread {:?}", thread::current().id());
});
}
// Give workers time to process
thread::sleep(std::time::Duration::from_millis(100));
}You have reached the end of the theoretical content of the course. The next and final lesson is the final project: a CLI notes application that integrates everything you have learned.
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
// Arc: atomically reference-counted pointer (for shared heap)
// Mutex: mutual exclusion for safe access from multiple threads
let counter = Arc::new(Mutex::new(0_u32));
let mut handles = Vec::new();
for i in 0..8 {
// Clone the Arc — increments the reference count
let counter_clone = Arc::clone(&counter);
let handle = thread::spawn(move || {
// lock() blocks until exclusive access is obtained
let mut guard = counter_clone.lock().unwrap();
*guard += 1;
println!("Thread {i}: counter = {}", *guard);
}); // guard is released here (Drop)
handles.push(handle);
}
// Wait for all threads to finish
for handle in handles {
handle.join().unwrap();
}
let final_val = *counter.lock().unwrap();
println!("Final value: {final_val}"); // Always 8
}
Sign in to track your progress