En esta página
Concurrencia segura: threads, Arc y canales
Concurrencia sin miedo
La concurrencia es la capacidad de ejecutar múltiples tareas simultáneamente. En la mayoría de lenguajes, la concurrencia es una fuente importante de bugs: condiciones de carrera, deadlocks, use-after-free en threads. Rust elimina las condiciones de carrera de datos en tiempo de compilación.
La promesa de Rust: fearless concurrency — puedes escribir código concurrente sin temer a condiciones de carrera, porque el compilador las detecta antes de que tu programa se ejecute.
Crear threads con std::thread::spawn
use std::thread;
use std::time::Duration;
fn main() {
// Crear un nuevo thread
let handle = thread::spawn(|| {
for i in 1..=5 {
println!("Thread secundario: {i}");
thread::sleep(Duration::from_millis(10));
}
});
// El thread principal continúa
for i in 1..=3 {
println!("Thread principal: {i}");
thread::sleep(Duration::from_millis(10));
}
// join() espera que el thread termine
// Si no llamas join(), el thread puede cortarse cuando main termina
handle.join().unwrap();
println!("Todos los threads terminaron");
}Move closures con threads
Los threads necesitan ownership de los datos que usan. La palabra clave move fuerza que el closure tome ownership:
use std::thread;
fn main() {
let datos = vec![1, 2, 3, 4, 5];
// Sin move: error — datos puede salir de scope antes que el thread
// let handle = thread::spawn(|| println!("{:?}", datos)); // Error
// Con move: datos se mueve al thread
let handle = thread::spawn(move || {
println!("Datos en el thread: {:?}", datos);
let suma: i32 = datos.iter().sum();
println!("Suma: {suma}");
suma
});
// datos ya no está disponible aquí (fue movido)
// join() retorna el valor de retorno del closure envuelto en Result
let suma = handle.join().unwrap();
println!("El thread retornó: {suma}");
}Arc: referencia contada atómica
Rc<T> no es thread-safe (no implementa Send). Para compartir datos entre threads, usa Arc<T> (Atomic Reference Counted):
use std::sync::Arc;
use std::thread;
fn main() {
// Arc permite compartir ownership entre múltiples threads
let datos = Arc::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let mut handles = Vec::new();
for i in 0..4 {
let datos_clone = Arc::clone(&datos); // Incrementa el contador
let handle = thread::spawn(move || {
// Cada thread lee su porción
let inicio = i * (datos_clone.len() / 4);
let fin = (i + 1) * (datos_clone.len() / 4);
let porcion = &datos_clone[inicio..fin];
let suma: i32 = porcion.iter().sum();
println!("Thread {i}: suma parcial = {suma}");
suma
});
handles.push(handle);
}
let suma_total: i32 = handles.into_iter()
.map(|h| h.join().unwrap())
.sum();
println!("Suma total: {suma_total}");
}Mutex: acceso exclusivo a datos compartidos
Mutex<T> (Mutual Exclusion) garantiza que solo un thread acceda al dato en cualquier momento:
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let resultado = Arc::new(Mutex::new(Vec::<String>::new()));
let mut handles = Vec::new();
for i in 0..5 {
let resultado_clone = Arc::clone(&resultado);
let handle = thread::spawn(move || {
// lock() espera hasta obtener acceso exclusivo
// Retorna un MutexGuard que libera el lock al salir de scope
let mut guard = resultado_clone.lock().unwrap();
guard.push(format!("Thread {i} completado"));
}); // guard se dropea aquí → lock liberado
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let datos = resultado.lock().unwrap();
println!("Resultados ({} entradas):", datos.len());
for entrada in datos.iter() {
println!(" - {entrada}");
}
}Canales (mpsc): comunicación entre threads
Los canales mpsc (multiple producer, single consumer) permiten enviar datos entre threads sin compartir memoria:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// Crear canal: tx (transmitter/sender), rx (receiver)
let (tx, rx) = mpsc::channel();
// Thread productor
let handle = thread::spawn(move || {
let mensajes = vec!["hola", "desde", "el", "thread"];
for mensaje in mensajes {
println!("Enviando: {mensaje}");
tx.send(mensaje).unwrap();
thread::sleep(Duration::from_millis(50));
}
// tx se dropea aquí — rx lo detectará
});
// Thread principal recibe
for recibido in rx {
// rx se itera hasta que tx se dropea
println!("Recibido: {recibido}");
}
handle.join().unwrap();
println!("Canal cerrado");
}Múltiples productores
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel::<String>();
let mut handles = Vec::new();
// Múltiples threads envían al mismo canal
for i in 0..5 {
let tx_clone = tx.clone();
let handle = thread::spawn(move || {
for j in 0..3 {
let msg = format!("Thread {i}, mensaje {j}");
tx_clone.send(msg).unwrap();
}
});
handles.push(handle);
}
// Liberar el tx original para que rx pueda terminar cuando todos los clones se cierren
drop(tx);
for handle in handles {
handle.join().unwrap();
}
// Recoger todos los mensajes
let mut mensajes: Vec<String> = rx.into_iter().collect();
mensajes.sort();
println!("Total mensajes: {}", mensajes.len());
for msg in &mensajes[..5] { // Mostrar solo los primeros 5
println!(" {msg}");
}
}Los traits Send y Sync
Rust usa dos markers traits para garantizar la seguridad en concurrencia:
Send: El tipo puede transferirse (moved) de forma segura a otro thread. Casi todos los tipos sonSend. Excepción:Rc<T>(usa conteo de referencias no atómico).Sync: El tipo puede compartirse por referencia de forma segura entre threads (&TesSend). Excepción:Cell<T>,RefCell<T>(mutabilidad interior no thread-safe).
use std::sync::{Arc, Mutex};
// Arc<T> es Send + Sync si T es Send + Sync
// Mutex<T> es Send + Sync si T es Send
// Puedes verificar en tiempo de compilación:
fn requiere_send<T: Send>(_: T) {}
fn requiere_sync<T: Sync>(_: &T) {}
fn main() {
let arc = Arc::new(Mutex::new(42));
requiere_send(arc.clone()); // OK: Arc<Mutex<i32>> es Send
requiere_sync(&arc); // OK: Arc<Mutex<i32>> es Sync
// Rc NO es Send
let rc = std::rc::Rc::new(42);
// requiere_send(rc); // Error en tiempo de compilación
drop(rc);
}Patrón: Pool de threads con canales
use std::sync::{Arc, Mutex, mpsc};
use std::thread;
type Tarea = Box<dyn FnOnce() + Send + 'static>;
struct PoolDeThreads {
threads: Vec<thread::JoinHandle<()>>,
tx: mpsc::Sender<Tarea>,
}
impl PoolDeThreads {
fn nuevo(tamanio: usize) -> Self {
let (tx, rx) = mpsc::channel::<Tarea>();
let rx = Arc::new(Mutex::new(rx));
let mut threads = Vec::with_capacity(tamanio);
for id in 0..tamanio {
let rx = Arc::clone(&rx);
let handle = thread::spawn(move || {
loop {
let tarea = rx.lock().unwrap().recv();
match tarea {
Ok(f) => {
println!("Worker {id}: ejecutando tarea");
f();
}
Err(_) => {
println!("Worker {id}: canal cerrado, saliendo");
break;
}
}
}
});
threads.push(handle);
}
PoolDeThreads { threads, tx }
}
fn ejecutar<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
self.tx.send(Box::new(f)).unwrap();
}
}
fn main() {
let pool = PoolDeThreads::nuevo(4);
for i in 0..8 {
pool.ejecutar(move || {
println!("Tarea {i} ejecutada en thread {:?}", thread::current().id());
});
}
// Dar tiempo a los workers
thread::sleep(std::time::Duration::from_millis(100));
}Has llegado al final del contenido teórico del curso. La siguiente y última lección es el proyecto final: una aplicación CLI de notas que integra todo lo aprendido.
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
// Arc: puntero de conteo de referencias atómico (para heap compartido)
// Mutex: exclusión mutua para acceso seguro desde múltiples threads
let contador = Arc::new(Mutex::new(0_u32));
let mut handles = Vec::new();
for i in 0..8 {
// Clonar el Arc — incrementa el contador de referencias
let contador_clone = Arc::clone(&contador);
let handle = thread::spawn(move || {
// lock() bloquea hasta obtener acceso exclusivo
let mut guard = contador_clone.lock().unwrap();
*guard += 1;
println!("Thread {i}: contador = {}", *guard);
}); // guard se libera aquí (Drop)
handles.push(handle);
}
// Esperar que todos los threads terminen
for handle in handles {
handle.join().unwrap();
}
let final_val = *contador.lock().unwrap();
println!("Valor final: {final_val}"); // Siempre 8
}
Inicia sesión para guardar tu progreso