En esta página

Concurrencia segura: threads, Arc y canales

14 min lectura TextoCap. 5 — Rust en práctica

Concurrencia sin miedo

La concurrencia es la capacidad de ejecutar múltiples tareas simultáneamente. En la mayoría de lenguajes, la concurrencia es una fuente importante de bugs: condiciones de carrera, deadlocks, use-after-free en threads. Rust elimina las condiciones de carrera de datos en tiempo de compilación.

La promesa de Rust: fearless concurrency — puedes escribir código concurrente sin temer a condiciones de carrera, porque el compilador las detecta antes de que tu programa se ejecute.

Crear threads con std::thread::spawn

use std::thread;
use std::time::Duration;

fn main() {
    // Crear un nuevo thread
    let handle = thread::spawn(|| {
        for i in 1..=5 {
            println!("Thread secundario: {i}");
            thread::sleep(Duration::from_millis(10));
        }
    });
    
    // El thread principal continúa
    for i in 1..=3 {
        println!("Thread principal: {i}");
        thread::sleep(Duration::from_millis(10));
    }
    
    // join() espera que el thread termine
    // Si no llamas join(), el thread puede cortarse cuando main termina
    handle.join().unwrap();
    
    println!("Todos los threads terminaron");
}

Move closures con threads

Los threads necesitan ownership de los datos que usan. La palabra clave move fuerza que el closure tome ownership:

use std::thread;

fn main() {
    let datos = vec![1, 2, 3, 4, 5];
    
    // Sin move: error — datos puede salir de scope antes que el thread
    // let handle = thread::spawn(|| println!("{:?}", datos)); // Error
    
    // Con move: datos se mueve al thread
    let handle = thread::spawn(move || {
        println!("Datos en el thread: {:?}", datos);
        let suma: i32 = datos.iter().sum();
        println!("Suma: {suma}");
        suma
    });
    
    // datos ya no está disponible aquí (fue movido)
    
    // join() retorna el valor de retorno del closure envuelto en Result
    let suma = handle.join().unwrap();
    println!("El thread retornó: {suma}");
}

Arc: referencia contada atómica

Rc<T> no es thread-safe (no implementa Send). Para compartir datos entre threads, usa Arc<T> (Atomic Reference Counted):

use std::sync::Arc;
use std::thread;

fn main() {
    // Arc permite compartir ownership entre múltiples threads
    let datos = Arc::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
    
    let mut handles = Vec::new();
    
    for i in 0..4 {
        let datos_clone = Arc::clone(&datos); // Incrementa el contador
        
        let handle = thread::spawn(move || {
            // Cada thread lee su porción
            let inicio = i * (datos_clone.len() / 4);
            let fin = (i + 1) * (datos_clone.len() / 4);
            let porcion = &datos_clone[inicio..fin];
            let suma: i32 = porcion.iter().sum();
            println!("Thread {i}: suma parcial = {suma}");
            suma
        });
        
        handles.push(handle);
    }
    
    let suma_total: i32 = handles.into_iter()
        .map(|h| h.join().unwrap())
        .sum();
    
    println!("Suma total: {suma_total}");
}

Mutex: acceso exclusivo a datos compartidos

Mutex<T> (Mutual Exclusion) garantiza que solo un thread acceda al dato en cualquier momento:

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let resultado = Arc::new(Mutex::new(Vec::<String>::new()));
    let mut handles = Vec::new();
    
    for i in 0..5 {
        let resultado_clone = Arc::clone(&resultado);
        
        let handle = thread::spawn(move || {
            // lock() espera hasta obtener acceso exclusivo
            // Retorna un MutexGuard que libera el lock al salir de scope
            let mut guard = resultado_clone.lock().unwrap();
            guard.push(format!("Thread {i} completado"));
        }); // guard se dropea aquí → lock liberado
        
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    let datos = resultado.lock().unwrap();
    println!("Resultados ({} entradas):", datos.len());
    for entrada in datos.iter() {
        println!("  - {entrada}");
    }
}

Canales (mpsc): comunicación entre threads

Los canales mpsc (multiple producer, single consumer) permiten enviar datos entre threads sin compartir memoria:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // Crear canal: tx (transmitter/sender), rx (receiver)
    let (tx, rx) = mpsc::channel();
    
    // Thread productor
    let handle = thread::spawn(move || {
        let mensajes = vec!["hola", "desde", "el", "thread"];
        
        for mensaje in mensajes {
            println!("Enviando: {mensaje}");
            tx.send(mensaje).unwrap();
            thread::sleep(Duration::from_millis(50));
        }
        // tx se dropea aquí — rx lo detectará
    });
    
    // Thread principal recibe
    for recibido in rx {
        // rx se itera hasta que tx se dropea
        println!("Recibido: {recibido}");
    }
    
    handle.join().unwrap();
    println!("Canal cerrado");
}

Múltiples productores

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel::<String>();
    
    let mut handles = Vec::new();
    
    // Múltiples threads envían al mismo canal
    for i in 0..5 {
        let tx_clone = tx.clone();
        
        let handle = thread::spawn(move || {
            for j in 0..3 {
                let msg = format!("Thread {i}, mensaje {j}");
                tx_clone.send(msg).unwrap();
            }
        });
        
        handles.push(handle);
    }
    
    // Liberar el tx original para que rx pueda terminar cuando todos los clones se cierren
    drop(tx);
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    // Recoger todos los mensajes
    let mut mensajes: Vec<String> = rx.into_iter().collect();
    mensajes.sort();
    
    println!("Total mensajes: {}", mensajes.len());
    for msg in &mensajes[..5] { // Mostrar solo los primeros 5
        println!("  {msg}");
    }
}

Los traits Send y Sync

Rust usa dos markers traits para garantizar la seguridad en concurrencia:

  • Send: El tipo puede transferirse (moved) de forma segura a otro thread. Casi todos los tipos son Send. Excepción: Rc<T> (usa conteo de referencias no atómico).

  • Sync: El tipo puede compartirse por referencia de forma segura entre threads (&T es Send). Excepción: Cell<T>, RefCell<T> (mutabilidad interior no thread-safe).

use std::sync::{Arc, Mutex};

// Arc<T> es Send + Sync si T es Send + Sync
// Mutex<T> es Send + Sync si T es Send

// Puedes verificar en tiempo de compilación:
fn requiere_send<T: Send>(_: T) {}
fn requiere_sync<T: Sync>(_: &T) {}

fn main() {
    let arc = Arc::new(Mutex::new(42));
    requiere_send(arc.clone()); // OK: Arc<Mutex<i32>> es Send
    requiere_sync(&arc);        // OK: Arc<Mutex<i32>> es Sync
    
    // Rc NO es Send
    let rc = std::rc::Rc::new(42);
    // requiere_send(rc); // Error en tiempo de compilación
    drop(rc);
}

Patrón: Pool de threads con canales

use std::sync::{Arc, Mutex, mpsc};
use std::thread;

type Tarea = Box<dyn FnOnce() + Send + 'static>;

struct PoolDeThreads {
    threads: Vec<thread::JoinHandle<()>>,
    tx: mpsc::Sender<Tarea>,
}

impl PoolDeThreads {
    fn nuevo(tamanio: usize) -> Self {
        let (tx, rx) = mpsc::channel::<Tarea>();
        let rx = Arc::new(Mutex::new(rx));
        let mut threads = Vec::with_capacity(tamanio);
        
        for id in 0..tamanio {
            let rx = Arc::clone(&rx);
            let handle = thread::spawn(move || {
                loop {
                    let tarea = rx.lock().unwrap().recv();
                    match tarea {
                        Ok(f) => {
                            println!("Worker {id}: ejecutando tarea");
                            f();
                        }
                        Err(_) => {
                            println!("Worker {id}: canal cerrado, saliendo");
                            break;
                        }
                    }
                }
            });
            threads.push(handle);
        }
        
        PoolDeThreads { threads, tx }
    }
    
    fn ejecutar<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        self.tx.send(Box::new(f)).unwrap();
    }
}

fn main() {
    let pool = PoolDeThreads::nuevo(4);
    
    for i in 0..8 {
        pool.ejecutar(move || {
            println!("Tarea {i} ejecutada en thread {:?}", thread::current().id());
        });
    }
    
    // Dar tiempo a los workers
    thread::sleep(std::time::Duration::from_millis(100));
}

Has llegado al final del contenido teórico del curso. La siguiente y última lección es el proyecto final: una aplicación CLI de notas que integra todo lo aprendido.

Send y Sync: seguridad de concurrencia en el tipo
Send significa que el tipo puede transferirse a otro thread. Sync significa que puede compartirse por referencia entre threads. El compilador verifica automáticamente estos traits — no puedes pasar un Rc<T> a otro thread (no es Send), pero sí un Arc<T> (que sí es Send + Sync).
Cuidado con deadlocks
Un deadlock ocurre cuando dos threads esperan mutuamente que el otro libere un mutex. Rust previene condiciones de carrera de datos pero NO previene deadlocks (son un problema de lógica, no de memoria). Siempre adquiere los mutexes en el mismo orden en todos los threads.
rust
use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    // Arc: puntero de conteo de referencias atómico (para heap compartido)
    // Mutex: exclusión mutua para acceso seguro desde múltiples threads
    let contador = Arc::new(Mutex::new(0_u32));
    let mut handles = Vec::new();

    for i in 0..8 {
        // Clonar el Arc — incrementa el contador de referencias
        let contador_clone = Arc::clone(&contador);

        let handle = thread::spawn(move || {
            // lock() bloquea hasta obtener acceso exclusivo
            let mut guard = contador_clone.lock().unwrap();
            *guard += 1;
            println!("Thread {i}: contador = {}", *guard);
        }); // guard se libera aquí (Drop)

        handles.push(handle);
    }

    // Esperar que todos los threads terminen
    for handle in handles {
        handle.join().unwrap();
    }

    let final_val = *contador.lock().unwrap();
    println!("Valor final: {final_val}"); // Siempre 8
}