En esta página

Patrones de concurrencia

14 min lectura TextoCap. 4 — Concurrencia

Patrones de concurrencia en Go

La concurrencia bien diseñada requiere más que goroutines y channels aislados — necesitas patrones que organicen el flujo de trabajo, manejen errores y permitan la cancelación limpia. Go tiene varios patrones estándar que los desarrolladores experimentados reconocen y aplican.

Patrón 1: Worker Pool

El worker pool limita la concurrencia creando un número fijo de goroutines que procesan una cola de trabajo. Es esencial para evitar la creación descontrolada de goroutines cuando procesas grandes volúmenes de tareas:

func workerPool(numWorkers int, tareas []string) []string {
    trabajo := make(chan string, len(tareas))
    resultados := make(chan string, len(tareas))

    // Lanzar workers fijos
    var wg sync.WaitGroup
    for i := range numWorkers {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for tarea := range trabajo {
                // Procesar tarea
                resultado := fmt.Sprintf("Worker-%d procesó: %s", id, tarea)
                resultados <- resultado
            }
        }(i)
    }

    // Enviar todas las tareas
    for _, t := range tareas {
        trabajo <- t
    }
    close(trabajo)

    // Esperar y cerrar resultados
    go func() {
        wg.Wait()
        close(resultados)
    }()

    // Recolectar
    var out []string
    for r := range resultados {
        out = append(out, r)
    }
    return out
}

Cuándo usarlo: procesamiento en batch, throttling de requests HTTP, procesamiento de archivos en paralelo.

Patrón 2: Fan-Out / Fan-In

Fan-out: distribuir trabajo desde un canal a múltiples goroutines. Fan-in: combinar resultados de múltiples canales en uno solo.

// Fan-out: un canal → múltiples goroutines
func fanOut(entrada <-chan int, numWorkers int) []<-chan int {
    salidas := make([]<-chan int, numWorkers)
    for i := range numWorkers {
        salida := make(chan int)
        salidas[i] = salida
        go func(out chan<- int) {
            for n := range entrada {
                out <- n * 2  // procesar
            }
            close(out)
        }(salida)
    }
    return salidas
}

// Fan-in: múltiples canales → un canal
func fanIn(canales ...<-chan int) <-chan int {
    combinado := make(chan int)
    var wg sync.WaitGroup

    copiar := func(ch <-chan int) {
        defer wg.Done()
        for n := range ch {
            combinado <- n
        }
    }

    wg.Add(len(canales))
    for _, ch := range canales {
        go copiar(ch)
    }

    go func() {
        wg.Wait()
        close(combinado)
    }()

    return combinado
}

Patrón 3: Pipeline

Un pipeline encadena etapas de procesamiento, cada una recibe de la anterior y envía a la siguiente:

// Pipeline para procesar URLs
func fetchURL(urls <-chan string) <-chan []byte {
    out := make(chan []byte)
    go func() {
        defer close(out)
        for url := range urls {
            resp, err := http.Get(url)
            if err != nil {
                continue
            }
            body, _ := io.ReadAll(resp.Body)
            resp.Body.Close()
            out <- body
        }
    }()
    return out
}

func parsearHTML(cuerpos <-chan []byte) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        for body := range cuerpos {
            // extraer títulos del HTML
            título := extraerTítulo(body)
            out <- título
        }
    }()
    return out
}

func main() {
    urlsCh := make(chan string, 10)
    go func() {
        urls := []string{"https://a.com", "https://b.com"}
        for _, u := range urls {
            urlsCh <- u
        }
        close(urlsCh)
    }()

    cuerposCh := fetchURL(urlsCh)
    títulosCh := parsearHTML(cuerposCh)

    for título := range títulosCh {
        fmt.Println(título)
    }
}

`context.Context`: cancelación propagada

context.Context es el mecanismo estándar de Go para propagar cancelación, deadlines y valores a través de llamadas de función y goroutines. Es omnipresente en código de producción Go.

Los cuatro tipos de context

import "context"

// 1. Background — raíz de toda jerarquía de contexts
ctx := context.Background()

// 2. TODO — placeholder cuando no sabes qué context usar
ctx2 := context.TODO()

// 3. WithCancel — cancelación manual
ctx3, cancel := context.WithCancel(context.Background())
defer cancel()  // SIEMPRE

// 4. WithTimeout — cancelación automática después de una duración
ctx4, cancel4 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel4()

// 5. WithDeadline — cancelación en un momento específico
deadline := time.Now().Add(5 * time.Second)
ctx5, cancel5 := context.WithDeadline(context.Background(), deadline)
defer cancel5()

// 6. WithValue — propagar valores (solo para datos de request, no para opciones)
ctx6 := context.WithValue(context.Background(), "userID", 42)

Verificar cancelación en goroutines

func trabajarConContext(ctx context.Context) error {
    for {
        select {
        case <-ctx.Done():
            // ctx.Err() retorna context.Canceled o context.DeadlineExceeded
            return fmt.Errorf("trabajo cancelado: %w", ctx.Err())
        default:
            // hacer trabajo...
            if err := pasoDeTrabajo(); err != nil {
                return err
            }
        }
    }
}

// Pasar context en funciones de larga duración
func buscarEnBD(ctx context.Context, query string) ([]Fila, error) {
    // Las librerías de DB estándar aceptan context
    return db.QueryContext(ctx, query)
}

Jerarquía de contexts

Los contexts forman un árbol: cancelar un padre cancela todos los hijos automáticamente:

padre, cancelPadre := context.WithCancel(context.Background())
hijo1, cancelHijo1 := context.WithTimeout(padre, 3*time.Second)
hijo2, cancelHijo2 := context.WithCancel(padre)

defer cancelPadre()  // cancela padre, hijo1, e hijo2
defer cancelHijo1()
defer cancelHijo2()

// Si llamamos cancelPadre(), ctx.Done() se activa en padre, hijo1 y hijo2
cancelPadre()

Propagar valores con context (con moderación)

type ctxKey string

const (
    ctxKeyUserID ctxKey = "userID"
    ctxKeyRqID   ctxKey = "requestID"
)

func procesarRequest(w http.ResponseWriter, r *http.Request) {
    ctx := context.WithValue(r.Context(), ctxKeyUserID, obtenerUserID(r))
    ctx = context.WithValue(ctx, ctxKeyRqID, uuid.New())

    resultado, err := servicio.Procesar(ctx)
    // ...
}

func (s *Servicio) Procesar(ctx context.Context) (*Resultado, error) {
    userID, ok := ctx.Value(ctxKeyUserID).(int)
    if !ok {
        return nil, errors.New("userID no en context")
    }
    // ...
}

`errgroup`: WaitGroup con manejo de errores

El paquete golang.org/x/sync/errgroup combina WaitGroup con propagación de errores:

import "golang.org/x/sync/errgroup"

func obtenerDatosConcurrentes(ctx context.Context, ids []int) ([]*Dato, error) {
    g, ctx := errgroup.WithContext(ctx)
    datos := make([]*Dato, len(ids))

    for i, id := range ids {
        i, id := i, id  // captura de variables (importante antes de Go 1.22)
        g.Go(func() error {
            dato, err := buscarDato(ctx, id)
            if err != nil {
                return fmt.Errorf("buscarDato id=%d: %w", id, err)
            }
            datos[i] = dato
            return nil
        })
    }

    // Espera a todas las goroutines; retorna el primer error
    if err := g.Wait(); err != nil {
        return nil, err
    }

    return datos, nil
}

Si cualquier goroutine falla, el context se cancela automáticamente (cuando se usa errgroup.WithContext), lo que permite que las demás goroutines se detengan con gracia.

Semáforo: limitar concurrencia

Un canal con buffer funciona como semáforo para limitar concurrencia:

// Semáforo de 5 slots — máximo 5 goroutines concurrentes
sem := make(chan struct{}, 5)

var wg sync.WaitGroup
for _, tarea := range tareas {
    wg.Add(1)
    sem <- struct{}{}  // adquirir slot
    go func(t Tarea) {
        defer wg.Done()
        defer func() { <-sem }()  // liberar slot
        procesarTarea(t)
    }(tarea)
}
wg.Wait()

Patrón: graceful shutdown

func main() {
    servidor := &http.Server{Addr: ":8080"}

    // Canal para señales del OS
    señales := make(chan os.Signal, 1)
    signal.Notify(señales, os.Interrupt, syscall.SIGTERM)

    // Canal para errores del servidor
    errCh := make(chan error, 1)

    go func() {
        errCh <- servidor.ListenAndServe()
    }()

    select {
    case err := <-errCh:
        log.Fatal("error del servidor:", err)
    case <-señales:
        fmt.Println("Señal de parada recibida, cerrando con gracia...")
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()
        if err := servidor.Shutdown(ctx); err != nil {
            log.Fatal("error al cerrar servidor:", err)
        }
        fmt.Println("Servidor cerrado correctamente")
    }
}

Con estos patrones de concurrencia en tu arsenal, en la siguiente lección aprenderemos cómo organizar y distribuir código Go con el sistema de módulos y paquetes.

Siempre llama cancel() con defer al crear un context
Cuando creas un context con WithCancel, WithTimeout o WithDeadline, SIEMPRE debes llamar a la función cancel() retornada, normalmente con defer. Si no la llamas, el context y sus recursos no se liberan hasta que el context padre expire. Esto causa fugas de goroutines y memoria.
errgroup simplifica la gestión de errores en goroutines
El paquete golang.org/x/sync/errgroup ofrece errgroup.Group, que combina sync.WaitGroup con manejo de errores. Si cualquier goroutine retorna un error, las demás pueden ser canceladas automáticamente via context. Es la forma más idiomática de ejecutar múltiples tareas concurrentes con manejo de errores.