Skip to main content

Advanced Go Concurrency Patterns: select, cancellation, and Worker Pools

Created: April 25, 2022 6 min read

Introduction

Go’s concurrency model — goroutines and channels — is simple to start but has subtle patterns that separate correct, production-grade code from code that leaks goroutines or deadlocks. This guide covers the patterns you’ll use in real systems. See Go Installation Guide, Go Ecosystem Overview, Go Best Practices for more context.

Prerequisites: Basic goroutines and channels. See Go by Example: Goroutines.

The select Statement

select is like a switch for channels — it blocks until one of its cases can proceed:

select {
case msg := <-ch1:
    fmt.Println("received from ch1:", msg)
case ch2 <- value:
    fmt.Println("sent to ch2")
case <-time.After(1 * time.Second):
    fmt.Println("timeout")
default:
    fmt.Println("no channel ready (non-blocking)")
}

Key behaviors:

  • If multiple cases are ready, Go picks one at random
  • default makes select non-blocking
  • <-time.After(d) implements timeouts

The for-select Loop

The most common concurrency pattern in Go: a goroutine that processes events until told to stop.

func worker(jobs <-chan Job, results chan<- Result, done <-chan struct{}) {
    for {
        select {
        case job, ok := <-jobs:
            if !ok {
                return  // channel closed
            }
            results <- process(job)

        case <-done:
            return  // cancellation signal
        }
    }
}

Cancellation with context.Context

context.Context is the standard way to cancel goroutines in Go. Always pass context as the first argument:

func fetchUser(ctx context.Context, id int) (*User, error) {
    // Create a request with the context
    req, err := http.NewRequestWithContext(ctx, "GET",
        fmt.Sprintf("https://api.example.com/users/%d", id), nil)
    if err != nil {
        return nil, err
    }

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return nil, err  // includes context.Canceled or context.DeadlineExceeded
    }
    defer resp.Body.Close()

    var user User
    return &user, json.NewDecoder(resp.Body).Decode(&user)
}

// Caller controls the lifetime
func main() {
    // Cancel after 5 seconds
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()  // always call cancel to release resources

    user, err := fetchUser(ctx, 42)
    if err != nil {
        if errors.Is(err, context.DeadlineExceeded) {
            log.Println("request timed out")
        }
        log.Fatal(err)
    }
    fmt.Println(user.Name)
}

Propagating Cancellation

func processOrder(ctx context.Context, orderID string) error {
    // All sub-operations inherit the same cancellation
    user, err := getUser(ctx, orderID)
    if err != nil {
        return fmt.Errorf("getUser: %w", err)
    }

    inventory, err := checkInventory(ctx, orderID)
    if err != nil {
        return fmt.Errorf("checkInventory: %w", err)
    }

    return chargePayment(ctx, user, inventory)
}

// If ctx is cancelled, all three calls stop immediately

Manual Cancellation

func longRunningTask(ctx context.Context) error {
    for i := 0; i < 1000; i++ {
        // Check for cancellation at each iteration
        select {
        case <-ctx.Done():
            return ctx.Err()  // context.Canceled or context.DeadlineExceeded
        default:
        }

        if err := doWork(i); err != nil {
            return err
        }
    }
    return nil
}

Worker Pool Pattern

Process N jobs with M workers — the most common concurrency pattern for I/O-bound work:

func workerPool(ctx context.Context, jobs []Job, numWorkers int) []Result {
    jobCh    := make(chan Job, len(jobs))
    resultCh := make(chan Result, len(jobs))

    // Start workers
    var wg sync.WaitGroup
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for job := range jobCh {
                select {
                case <-ctx.Done():
                    return
                default:
                    resultCh <- processJob(job)
                }
            }
        }()
    }

    // Send jobs
    for _, job := range jobs {
        jobCh <- job
    }
    close(jobCh)  // signal workers: no more jobs

    // Wait for all workers, then close results
    go func() {
        wg.Wait()
        close(resultCh)
    }()

    // Collect results
    var results []Result
    for result := range resultCh {
        results = append(results, result)
    }
    return results
}

Worker Pool with Error Handling

type Result struct {
    Value interface{}
    Err   error
}

func workerPoolWithErrors(ctx context.Context, urls []string, workers int) []Result {
    jobs    := make(chan string, len(urls))
    results := make(chan Result, len(urls))

    var wg sync.WaitGroup
    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for url := range jobs {
                data, err := fetch(ctx, url)
                results <- Result{Value: data, Err: err}
            }
        }()
    }

    for _, url := range urls {
        jobs <- url
    }
    close(jobs)

    go func() {
        wg.Wait()
        close(results)
    }()

    var all []Result
    for r := range results {
        all = append(all, r)
    }
    return all
}

Fan-Out / Fan-In

Fan-out: distribute work across multiple goroutines. Fan-in: merge results from multiple goroutines into one channel.

// Fan-out: send each item to a separate goroutine
func fanOut(ctx context.Context, input <-chan int, workers int) []<-chan int {
    outputs := make([]<-chan int, workers)
    for i := 0; i < workers; i++ {
        ch := make(chan int)
        outputs[i] = ch
        go func(out chan<- int) {
            defer close(out)
            for v := range input {
                select {
                case out <- v * v:  // square each number
                case <-ctx.Done():
                    return
                }
            }
        }(ch)
    }
    return outputs
}

// Fan-in: merge multiple channels into one
func fanIn(ctx context.Context, channels ...<-chan int) <-chan int {
    merged := make(chan int)
    var wg sync.WaitGroup

    output := func(ch <-chan int) {
        defer wg.Done()
        for v := range ch {
            select {
            case merged <- v:
            case <-ctx.Done():
                return
            }
        }
    }

    wg.Add(len(channels))
    for _, ch := range channels {
        go output(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

The Ping-Pong Pattern

Demonstrates channel ownership and passing state between goroutines:

type Ball struct{ hits int }

func player(name string, table chan *Ball) {
    for {
        ball := <-table
        ball.hits++
        fmt.Println(name, ball.hits)
        time.Sleep(100 * time.Millisecond)
        table <- ball
    }
}

func main() {
    table := make(chan *Ball)
    go player("ping", table)
    go player("pong", table)

    table <- new(Ball)  // start the game
    time.Sleep(1 * time.Second)
    <-table  // stop the game
}

Semaphore: Limit Concurrent Operations

Use a buffered channel as a semaphore to limit concurrent goroutines:

const maxConcurrent = 5
sem := make(chan struct{}, maxConcurrent)

for _, url := range urls {
    sem <- struct{}{}  // acquire slot (blocks if full)
    go func(u string) {
        defer func() { <-sem }()  // release slot when done
        fetch(u)
    }(url)
}

// Wait for all goroutines to finish
for i := 0; i < maxConcurrent; i++ {
    sem <- struct{}{}
}

Or use golang.org/x/sync/semaphore for a cleaner API:

import "golang.org/x/sync/semaphore"

sem := semaphore.NewWeighted(5)

for _, url := range urls {
    if err := sem.Acquire(ctx, 1); err != nil {
        break
    }
    go func(u string) {
        defer sem.Release(1)
        fetch(u)
    }(url)
}

sem.Acquire(ctx, 5)  // wait for all to finish

errgroup: Goroutines with Error Propagation

golang.org/x/sync/errgroup runs goroutines and collects the first error:

import "golang.org/x/sync/errgroup"

func fetchAll(ctx context.Context, urls []string) ([][]byte, error) {
    g, ctx := errgroup.WithContext(ctx)
    results := make([][]byte, len(urls))

    for i, url := range urls {
        i, url := i, url  // capture loop variables
        g.Go(func() error {
            data, err := fetch(ctx, url)
            if err != nil {
                return fmt.Errorf("fetch %s: %w", url, err)
            }
            results[i] = data
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return nil, err  // first error from any goroutine
    }
    return results, nil
}

Common Mistakes

Goroutine Leak

// BAD: goroutine runs forever if nobody reads from ch
func leak() {
    ch := make(chan int)
    go func() {
        ch <- expensiveComputation()  // blocks forever if caller returns
    }()
    // caller returns without reading ch
}

// GOOD: use context for cancellation
func noLeak(ctx context.Context) (int, error) {
    ch := make(chan int, 1)  // buffered: goroutine won't block
    go func() {
        ch <- expensiveComputation()
    }()

    select {
    case result := <-ch:
        return result, nil
    case <-ctx.Done():
        return 0, ctx.Err()
    }
}

Closing a Channel Twice

// BAD: panics if close is called twice
close(ch)
close(ch)  // panic: close of closed channel

// GOOD: use sync.Once
var once sync.Once
closeOnce := func() { once.Do(func() { close(ch) }) }

Resources

Comments

Share this article

Scan to read on mobile