On this page

Concurrency Patterns

14 min read TextCh. 4 — Concurrency

Concurrency Patterns in Go

Well-designed concurrency requires more than isolated goroutines and channels — you need patterns that organize the work flow, handle errors, and enable clean cancellation. Go has several standard patterns that experienced developers recognize and apply consistently.

Pattern 1: Worker Pool

The worker pool limits concurrency by creating a fixed number of goroutines that process a work queue. It is essential for avoiding uncontrolled goroutine creation when processing large volumes of tasks:

func workerPool(numWorkers int, tasks []string) []string {
    work := make(chan string, len(tasks))
    results := make(chan string, len(tasks))

    // Launch fixed workers
    var wg sync.WaitGroup
    for i := range numWorkers {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for task := range work {
                result := fmt.Sprintf("Worker-%d processed: %s", id, task)
                results <- result
            }
        }(i)
    }

    // Send all tasks
    for _, t := range tasks {
        work <- t
    }
    close(work)

    // Wait and close results
    go func() {
        wg.Wait()
        close(results)
    }()

    // Collect
    var out []string
    for r := range results {
        out = append(out, r)
    }
    return out
}

When to use it: batch processing, HTTP request throttling, parallel file processing.

Pattern 2: Fan-Out / Fan-In

Fan-out: distribute work from one channel to multiple goroutines. Fan-in: combine results from multiple channels into one.

// Fan-out: one channel → multiple goroutines
func fanOut(input <-chan int, numWorkers int) []<-chan int {
    outputs := make([]<-chan int, numWorkers)
    for i := range numWorkers {
        output := make(chan int)
        outputs[i] = output
        go func(out chan<- int) {
            for n := range input {
                out <- n * 2  // process
            }
            close(out)
        }(output)
    }
    return outputs
}

// Fan-in: multiple channels → one channel
func fanIn(channels ...<-chan int) <-chan int {
    combined := make(chan int)
    var wg sync.WaitGroup

    copy := func(ch <-chan int) {
        defer wg.Done()
        for n := range ch {
            combined <- n
        }
    }

    wg.Add(len(channels))
    for _, ch := range channels {
        go copy(ch)
    }

    go func() {
        wg.Wait()
        close(combined)
    }()

    return combined
}

Pattern 3: Pipeline

A pipeline chains processing stages, each receiving from the previous and sending to the next:

// Pipeline to process URLs
func fetchURL(urls <-chan string) <-chan []byte {
    out := make(chan []byte)
    go func() {
        defer close(out)
        for url := range urls {
            resp, err := http.Get(url)
            if err != nil {
                continue
            }
            body, _ := io.ReadAll(resp.Body)
            resp.Body.Close()
            out <- body
        }
    }()
    return out
}

func parseHTML(bodies <-chan []byte) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        for body := range bodies {
            title := extractTitle(body)
            out <- title
        }
    }()
    return out
}

func main() {
    urlsCh := make(chan string, 10)
    go func() {
        urls := []string{"https://a.com", "https://b.com"}
        for _, u := range urls {
            urlsCh <- u
        }
        close(urlsCh)
    }()

    bodiesCh := fetchURL(urlsCh)
    titlesCh := parseHTML(bodiesCh)

    for title := range titlesCh {
        fmt.Println(title)
    }
}

`context.Context`: Propagated Cancellation

context.Context is Go's standard mechanism for propagating cancellation, deadlines, and values through function calls and goroutines. It is ubiquitous in production Go code.

The Four Context Types

import "context"

// 1. Background — root of every context hierarchy
ctx := context.Background()

// 2. TODO — placeholder when you are not sure which context to use
ctx2 := context.TODO()

// 3. WithCancel — manual cancellation
ctx3, cancel := context.WithCancel(context.Background())
defer cancel()  // ALWAYS

// 4. WithTimeout — automatic cancellation after a duration
ctx4, cancel4 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel4()

// 5. WithDeadline — cancellation at a specific moment
deadline := time.Now().Add(5 * time.Second)
ctx5, cancel5 := context.WithDeadline(context.Background(), deadline)
defer cancel5()

// 6. WithValue — propagate values (only for request-scoped data, not options)
ctx6 := context.WithValue(context.Background(), "userID", 42)

Checking for Cancellation in Goroutines

func workWithContext(ctx context.Context) error {
    for {
        select {
        case <-ctx.Done():
            // ctx.Err() returns context.Canceled or context.DeadlineExceeded
            return fmt.Errorf("work cancelled: %w", ctx.Err())
        default:
            // do work...
            if err := workStep(); err != nil {
                return err
            }
        }
    }
}

// Pass context in long-running functions
func queryDB(ctx context.Context, query string) ([]Row, error) {
    // Standard DB libraries accept context
    return db.QueryContext(ctx, query)
}

Context Hierarchy

Contexts form a tree: cancelling a parent automatically cancels all children:

parent, cancelParent := context.WithCancel(context.Background())
child1, cancelChild1 := context.WithTimeout(parent, 3*time.Second)
child2, cancelChild2 := context.WithCancel(parent)

defer cancelParent()   // cancels parent, child1, and child2
defer cancelChild1()
defer cancelChild2()

// If we call cancelParent(), ctx.Done() fires for parent, child1 and child2
cancelParent()

Propagating Values with Context (Use Sparingly)

type ctxKey string

const (
    ctxKeyUserID ctxKey = "userID"
    ctxKeyReqID  ctxKey = "requestID"
)

func handleRequest(w http.ResponseWriter, r *http.Request) {
    ctx := context.WithValue(r.Context(), ctxKeyUserID, getUserID(r))
    ctx = context.WithValue(ctx, ctxKeyReqID, uuid.New())

    result, err := service.Process(ctx)
    // ...
}

func (s *Service) Process(ctx context.Context) (*Result, error) {
    userID, ok := ctx.Value(ctxKeyUserID).(int)
    if !ok {
        return nil, errors.New("userID not in context")
    }
    // ...
}

`errgroup`: WaitGroup with Error Handling

The golang.org/x/sync/errgroup package combines WaitGroup with error propagation:

import "golang.org/x/sync/errgroup"

func fetchConcurrently(ctx context.Context, ids []int) ([]*Data, error) {
    g, ctx := errgroup.WithContext(ctx)
    data := make([]*Data, len(ids))

    for i, id := range ids {
        i, id := i, id  // capture variables (important before Go 1.22)
        g.Go(func() error {
            d, err := fetchData(ctx, id)
            if err != nil {
                return fmt.Errorf("fetchData id=%d: %w", id, err)
            }
            data[i] = d
            return nil
        })
    }

    // Waits for all goroutines; returns the first error
    if err := g.Wait(); err != nil {
        return nil, err
    }

    return data, nil
}

If any goroutine fails, the context is automatically cancelled (when using errgroup.WithContext), allowing other goroutines to stop gracefully.

Semaphore: Limiting Concurrency

A buffered channel works as a semaphore to limit concurrency:

// Semaphore with 5 slots — maximum 5 concurrent goroutines
sem := make(chan struct{}, 5)

var wg sync.WaitGroup
for _, task := range tasks {
    wg.Add(1)
    sem <- struct{}{}  // acquire slot
    go func(t Task) {
        defer wg.Done()
        defer func() { <-sem }()  // release slot
        processTask(t)
    }(task)
}
wg.Wait()

Pattern: Graceful Shutdown

func main() {
    server := &http.Server{Addr: ":8080"}

    // Channel for OS signals
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt, syscall.SIGTERM)

    // Channel for server errors
    errCh := make(chan error, 1)

    go func() {
        errCh <- server.ListenAndServe()
    }()

    select {
    case err := <-errCh:
        log.Fatal("server error:", err)
    case <-signals:
        fmt.Println("Shutdown signal received, closing gracefully...")
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()
        if err := server.Shutdown(ctx); err != nil {
            log.Fatal("error shutting down server:", err)
        }
        fmt.Println("Server closed cleanly")
    }
}

With these concurrency patterns in your toolkit, in the next lesson we will learn how to organize and distribute Go code with the module and package system.

Always call cancel() with defer when creating a context
When you create a context with WithCancel, WithTimeout, or WithDeadline, you MUST call the returned cancel() function, typically with defer. If you do not call it, the context and its resources are not released until the parent context expires. This causes goroutine and memory leaks.
errgroup simplifies error management in goroutines
The golang.org/x/sync/errgroup package offers errgroup.Group, which combines sync.WaitGroup with error handling. If any goroutine returns an error, the others can be cancelled automatically via context. It is the most idiomatic way to run multiple concurrent tasks with error handling.