Skip to main content
โšก Calmops

Advanced Go Concurrency Patterns: select, cancellation, and Worker Pools

Introduction

Go’s concurrency model โ€” goroutines and channels โ€” is simple to start but has subtle patterns that separate correct, production-grade code from code that leaks goroutines or deadlocks. This guide covers the patterns you’ll use in real systems.

Prerequisites: Basic goroutines and channels. See Go by Example: Goroutines.

The select Statement

select is like a switch for channels โ€” it blocks until one of its cases can proceed:

select {
case msg := <-ch1:
    fmt.Println("received from ch1:", msg)
case ch2 <- value:
    fmt.Println("sent to ch2")
case <-time.After(1 * time.Second):
    fmt.Println("timeout")
default:
    fmt.Println("no channel ready (non-blocking)")
}

Key behaviors:

  • If multiple cases are ready, Go picks one at random
  • default makes select non-blocking
  • <-time.After(d) implements timeouts

The for-select Loop

The most common concurrency pattern in Go: a goroutine that processes events until told to stop.

func worker(jobs <-chan Job, results chan<- Result, done <-chan struct{}) {
    for {
        select {
        case job, ok := <-jobs:
            if !ok {
                return  // channel closed
            }
            results <- process(job)

        case <-done:
            return  // cancellation signal
        }
    }
}

Cancellation with context.Context

context.Context is the standard way to cancel goroutines in Go. Always pass context as the first argument:

func fetchUser(ctx context.Context, id int) (*User, error) {
    // Create a request with the context
    req, err := http.NewRequestWithContext(ctx, "GET",
        fmt.Sprintf("https://api.example.com/users/%d", id), nil)
    if err != nil {
        return nil, err
    }

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return nil, err  // includes context.Canceled or context.DeadlineExceeded
    }
    defer resp.Body.Close()

    var user User
    return &user, json.NewDecoder(resp.Body).Decode(&user)
}

// Caller controls the lifetime
func main() {
    // Cancel after 5 seconds
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()  // always call cancel to release resources

    user, err := fetchUser(ctx, 42)
    if err != nil {
        if errors.Is(err, context.DeadlineExceeded) {
            log.Println("request timed out")
        }
        log.Fatal(err)
    }
    fmt.Println(user.Name)
}

Propagating Cancellation

func processOrder(ctx context.Context, orderID string) error {
    // All sub-operations inherit the same cancellation
    user, err := getUser(ctx, orderID)
    if err != nil {
        return fmt.Errorf("getUser: %w", err)
    }

    inventory, err := checkInventory(ctx, orderID)
    if err != nil {
        return fmt.Errorf("checkInventory: %w", err)
    }

    return chargePayment(ctx, user, inventory)
}

// If ctx is cancelled, all three calls stop immediately

Manual Cancellation

func longRunningTask(ctx context.Context) error {
    for i := 0; i < 1000; i++ {
        // Check for cancellation at each iteration
        select {
        case <-ctx.Done():
            return ctx.Err()  // context.Canceled or context.DeadlineExceeded
        default:
        }

        if err := doWork(i); err != nil {
            return err
        }
    }
    return nil
}

Worker Pool Pattern

Process N jobs with M workers โ€” the most common concurrency pattern for I/O-bound work:

func workerPool(ctx context.Context, jobs []Job, numWorkers int) []Result {
    jobCh    := make(chan Job, len(jobs))
    resultCh := make(chan Result, len(jobs))

    // Start workers
    var wg sync.WaitGroup
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for job := range jobCh {
                select {
                case <-ctx.Done():
                    return
                default:
                    resultCh <- processJob(job)
                }
            }
        }()
    }

    // Send jobs
    for _, job := range jobs {
        jobCh <- job
    }
    close(jobCh)  // signal workers: no more jobs

    // Wait for all workers, then close results
    go func() {
        wg.Wait()
        close(resultCh)
    }()

    // Collect results
    var results []Result
    for result := range resultCh {
        results = append(results, result)
    }
    return results
}

Worker Pool with Error Handling

type Result struct {
    Value interface{}
    Err   error
}

func workerPoolWithErrors(ctx context.Context, urls []string, workers int) []Result {
    jobs    := make(chan string, len(urls))
    results := make(chan Result, len(urls))

    var wg sync.WaitGroup
    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for url := range jobs {
                data, err := fetch(ctx, url)
                results <- Result{Value: data, Err: err}
            }
        }()
    }

    for _, url := range urls {
        jobs <- url
    }
    close(jobs)

    go func() {
        wg.Wait()
        close(results)
    }()

    var all []Result
    for r := range results {
        all = append(all, r)
    }
    return all
}

Fan-Out / Fan-In

Fan-out: distribute work across multiple goroutines. Fan-in: merge results from multiple goroutines into one channel.

// Fan-out: send each item to a separate goroutine
func fanOut(ctx context.Context, input <-chan int, workers int) []<-chan int {
    outputs := make([]<-chan int, workers)
    for i := 0; i < workers; i++ {
        ch := make(chan int)
        outputs[i] = ch
        go func(out chan<- int) {
            defer close(out)
            for v := range input {
                select {
                case out <- v * v:  // square each number
                case <-ctx.Done():
                    return
                }
            }
        }(ch)
    }
    return outputs
}

// Fan-in: merge multiple channels into one
func fanIn(ctx context.Context, channels ...<-chan int) <-chan int {
    merged := make(chan int)
    var wg sync.WaitGroup

    output := func(ch <-chan int) {
        defer wg.Done()
        for v := range ch {
            select {
            case merged <- v:
            case <-ctx.Done():
                return
            }
        }
    }

    wg.Add(len(channels))
    for _, ch := range channels {
        go output(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

The Ping-Pong Pattern

Demonstrates channel ownership and passing state between goroutines:

type Ball struct{ hits int }

func player(name string, table chan *Ball) {
    for {
        ball := <-table
        ball.hits++
        fmt.Println(name, ball.hits)
        time.Sleep(100 * time.Millisecond)
        table <- ball
    }
}

func main() {
    table := make(chan *Ball)
    go player("ping", table)
    go player("pong", table)

    table <- new(Ball)  // start the game
    time.Sleep(1 * time.Second)
    <-table  // stop the game
}

Semaphore: Limit Concurrent Operations

Use a buffered channel as a semaphore to limit concurrent goroutines:

const maxConcurrent = 5
sem := make(chan struct{}, maxConcurrent)

for _, url := range urls {
    sem <- struct{}{}  // acquire slot (blocks if full)
    go func(u string) {
        defer func() { <-sem }()  // release slot when done
        fetch(u)
    }(url)
}

// Wait for all goroutines to finish
for i := 0; i < maxConcurrent; i++ {
    sem <- struct{}{}
}

Or use golang.org/x/sync/semaphore for a cleaner API:

import "golang.org/x/sync/semaphore"

sem := semaphore.NewWeighted(5)

for _, url := range urls {
    if err := sem.Acquire(ctx, 1); err != nil {
        break
    }
    go func(u string) {
        defer sem.Release(1)
        fetch(u)
    }(url)
}

sem.Acquire(ctx, 5)  // wait for all to finish

errgroup: Goroutines with Error Propagation

golang.org/x/sync/errgroup runs goroutines and collects the first error:

import "golang.org/x/sync/errgroup"

func fetchAll(ctx context.Context, urls []string) ([][]byte, error) {
    g, ctx := errgroup.WithContext(ctx)
    results := make([][]byte, len(urls))

    for i, url := range urls {
        i, url := i, url  // capture loop variables
        g.Go(func() error {
            data, err := fetch(ctx, url)
            if err != nil {
                return fmt.Errorf("fetch %s: %w", url, err)
            }
            results[i] = data
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return nil, err  // first error from any goroutine
    }
    return results, nil
}

Common Mistakes

Goroutine Leak

// BAD: goroutine runs forever if nobody reads from ch
func leak() {
    ch := make(chan int)
    go func() {
        ch <- expensiveComputation()  // blocks forever if caller returns
    }()
    // caller returns without reading ch
}

// GOOD: use context for cancellation
func noLeak(ctx context.Context) (int, error) {
    ch := make(chan int, 1)  // buffered: goroutine won't block
    go func() {
        ch <- expensiveComputation()
    }()

    select {
    case result := <-ch:
        return result, nil
    case <-ctx.Done():
        return 0, ctx.Err()
    }
}

Closing a Channel Twice

// BAD: panics if close is called twice
close(ch)
close(ch)  // panic: close of closed channel

// GOOD: use sync.Once
var once sync.Once
closeOnce := func() { once.Do(func() { close(ch) }) }

Resources

Comments