En esta página
Patrones de concurrencia
Patrones de concurrencia en Go
La concurrencia bien diseñada requiere más que goroutines y channels aislados — necesitas patrones que organicen el flujo de trabajo, manejen errores y permitan la cancelación limpia. Go tiene varios patrones estándar que los desarrolladores experimentados reconocen y aplican.
Patrón 1: Worker Pool
El worker pool limita la concurrencia creando un número fijo de goroutines que procesan una cola de trabajo. Es esencial para evitar la creación descontrolada de goroutines cuando procesas grandes volúmenes de tareas:
func workerPool(numWorkers int, tareas []string) []string {
trabajo := make(chan string, len(tareas))
resultados := make(chan string, len(tareas))
// Lanzar workers fijos
var wg sync.WaitGroup
for i := range numWorkers {
wg.Add(1)
go func(id int) {
defer wg.Done()
for tarea := range trabajo {
// Procesar tarea
resultado := fmt.Sprintf("Worker-%d procesó: %s", id, tarea)
resultados <- resultado
}
}(i)
}
// Enviar todas las tareas
for _, t := range tareas {
trabajo <- t
}
close(trabajo)
// Esperar y cerrar resultados
go func() {
wg.Wait()
close(resultados)
}()
// Recolectar
var out []string
for r := range resultados {
out = append(out, r)
}
return out
}Cuándo usarlo: procesamiento en batch, throttling de requests HTTP, procesamiento de archivos en paralelo.
Patrón 2: Fan-Out / Fan-In
Fan-out: distribuir trabajo desde un canal a múltiples goroutines. Fan-in: combinar resultados de múltiples canales en uno solo.
// Fan-out: un canal → múltiples goroutines
func fanOut(entrada <-chan int, numWorkers int) []<-chan int {
salidas := make([]<-chan int, numWorkers)
for i := range numWorkers {
salida := make(chan int)
salidas[i] = salida
go func(out chan<- int) {
for n := range entrada {
out <- n * 2 // procesar
}
close(out)
}(salida)
}
return salidas
}
// Fan-in: múltiples canales → un canal
func fanIn(canales ...<-chan int) <-chan int {
combinado := make(chan int)
var wg sync.WaitGroup
copiar := func(ch <-chan int) {
defer wg.Done()
for n := range ch {
combinado <- n
}
}
wg.Add(len(canales))
for _, ch := range canales {
go copiar(ch)
}
go func() {
wg.Wait()
close(combinado)
}()
return combinado
}Patrón 3: Pipeline
Un pipeline encadena etapas de procesamiento, cada una recibe de la anterior y envía a la siguiente:
// Pipeline para procesar URLs
func fetchURL(urls <-chan string) <-chan []byte {
out := make(chan []byte)
go func() {
defer close(out)
for url := range urls {
resp, err := http.Get(url)
if err != nil {
continue
}
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
out <- body
}
}()
return out
}
func parsearHTML(cuerpos <-chan []byte) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for body := range cuerpos {
// extraer títulos del HTML
título := extraerTítulo(body)
out <- título
}
}()
return out
}
func main() {
urlsCh := make(chan string, 10)
go func() {
urls := []string{"https://a.com", "https://b.com"}
for _, u := range urls {
urlsCh <- u
}
close(urlsCh)
}()
cuerposCh := fetchURL(urlsCh)
títulosCh := parsearHTML(cuerposCh)
for título := range títulosCh {
fmt.Println(título)
}
}`context.Context`: cancelación propagada
context.Context es el mecanismo estándar de Go para propagar cancelación, deadlines y valores a través de llamadas de función y goroutines. Es omnipresente en código de producción Go.
Los cuatro tipos de context
import "context"
// 1. Background — raíz de toda jerarquía de contexts
ctx := context.Background()
// 2. TODO — placeholder cuando no sabes qué context usar
ctx2 := context.TODO()
// 3. WithCancel — cancelación manual
ctx3, cancel := context.WithCancel(context.Background())
defer cancel() // SIEMPRE
// 4. WithTimeout — cancelación automática después de una duración
ctx4, cancel4 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel4()
// 5. WithDeadline — cancelación en un momento específico
deadline := time.Now().Add(5 * time.Second)
ctx5, cancel5 := context.WithDeadline(context.Background(), deadline)
defer cancel5()
// 6. WithValue — propagar valores (solo para datos de request, no para opciones)
ctx6 := context.WithValue(context.Background(), "userID", 42)Verificar cancelación en goroutines
func trabajarConContext(ctx context.Context) error {
for {
select {
case <-ctx.Done():
// ctx.Err() retorna context.Canceled o context.DeadlineExceeded
return fmt.Errorf("trabajo cancelado: %w", ctx.Err())
default:
// hacer trabajo...
if err := pasoDeTrabajo(); err != nil {
return err
}
}
}
}
// Pasar context en funciones de larga duración
func buscarEnBD(ctx context.Context, query string) ([]Fila, error) {
// Las librerías de DB estándar aceptan context
return db.QueryContext(ctx, query)
}Jerarquía de contexts
Los contexts forman un árbol: cancelar un padre cancela todos los hijos automáticamente:
padre, cancelPadre := context.WithCancel(context.Background())
hijo1, cancelHijo1 := context.WithTimeout(padre, 3*time.Second)
hijo2, cancelHijo2 := context.WithCancel(padre)
defer cancelPadre() // cancela padre, hijo1, e hijo2
defer cancelHijo1()
defer cancelHijo2()
// Si llamamos cancelPadre(), ctx.Done() se activa en padre, hijo1 y hijo2
cancelPadre()Propagar valores con context (con moderación)
type ctxKey string
const (
ctxKeyUserID ctxKey = "userID"
ctxKeyRqID ctxKey = "requestID"
)
func procesarRequest(w http.ResponseWriter, r *http.Request) {
ctx := context.WithValue(r.Context(), ctxKeyUserID, obtenerUserID(r))
ctx = context.WithValue(ctx, ctxKeyRqID, uuid.New())
resultado, err := servicio.Procesar(ctx)
// ...
}
func (s *Servicio) Procesar(ctx context.Context) (*Resultado, error) {
userID, ok := ctx.Value(ctxKeyUserID).(int)
if !ok {
return nil, errors.New("userID no en context")
}
// ...
}`errgroup`: WaitGroup con manejo de errores
El paquete golang.org/x/sync/errgroup combina WaitGroup con propagación de errores:
import "golang.org/x/sync/errgroup"
func obtenerDatosConcurrentes(ctx context.Context, ids []int) ([]*Dato, error) {
g, ctx := errgroup.WithContext(ctx)
datos := make([]*Dato, len(ids))
for i, id := range ids {
i, id := i, id // captura de variables (importante antes de Go 1.22)
g.Go(func() error {
dato, err := buscarDato(ctx, id)
if err != nil {
return fmt.Errorf("buscarDato id=%d: %w", id, err)
}
datos[i] = dato
return nil
})
}
// Espera a todas las goroutines; retorna el primer error
if err := g.Wait(); err != nil {
return nil, err
}
return datos, nil
}Si cualquier goroutine falla, el context se cancela automáticamente (cuando se usa errgroup.WithContext), lo que permite que las demás goroutines se detengan con gracia.
Semáforo: limitar concurrencia
Un canal con buffer funciona como semáforo para limitar concurrencia:
// Semáforo de 5 slots — máximo 5 goroutines concurrentes
sem := make(chan struct{}, 5)
var wg sync.WaitGroup
for _, tarea := range tareas {
wg.Add(1)
sem <- struct{}{} // adquirir slot
go func(t Tarea) {
defer wg.Done()
defer func() { <-sem }() // liberar slot
procesarTarea(t)
}(tarea)
}
wg.Wait()Patrón: graceful shutdown
func main() {
servidor := &http.Server{Addr: ":8080"}
// Canal para señales del OS
señales := make(chan os.Signal, 1)
signal.Notify(señales, os.Interrupt, syscall.SIGTERM)
// Canal para errores del servidor
errCh := make(chan error, 1)
go func() {
errCh <- servidor.ListenAndServe()
}()
select {
case err := <-errCh:
log.Fatal("error del servidor:", err)
case <-señales:
fmt.Println("Señal de parada recibida, cerrando con gracia...")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := servidor.Shutdown(ctx); err != nil {
log.Fatal("error al cerrar servidor:", err)
}
fmt.Println("Servidor cerrado correctamente")
}
}Con estos patrones de concurrencia en tu arsenal, en la siguiente lección aprenderemos cómo organizar y distribuir código Go con el sistema de módulos y paquetes.
Inicia sesión para guardar tu progreso