Skip to main content

Worker Pools and Concurrency Patterns

Created: May 8, 2026 7 min read

Worker pools are a fundamental concurrency pattern in Go. They allow you to process multiple tasks concurrently with a fixed number of workers, preventing resource exhaustion. This guide covers worker pool implementation and related patterns. For more context, see Go Installation Guide, Go Ecosystem Overview, Go Best Practices.

Basic Worker Pool

Simple Worker Pool

package main

import (
    "fmt"
    "sync"
)

func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, job)
        results <- job * 2
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)
    
    // Start 3 workers
    for i := 1; i <= 3; i++ {
        go worker(i, jobs, results)
    }
    
    // Send 10 jobs
    for j := 1; j <= 10; j++ {
        jobs <- j
    }
    close(jobs)
    
    // Collect results
    for i := 1; i <= 10; i++ {
        fmt.Println("Result:", <-results)
    }
}

Worker Pool with WaitGroup

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for job := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, job)
        time.Sleep(100 * time.Millisecond)
        results <- job * 2
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)
    
    var wg sync.WaitGroup
    
    // Start 3 workers
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go worker(i, jobs, results, &wg)
    }
    
    // Send 10 jobs
    go func() {
        for j := 1; j <= 10; j++ {
            jobs <- j
        }
        close(jobs)
    }()
    
    // Wait for workers to finish
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // Collect results
    for result := range results {
        fmt.Println("Result:", result)
    }
}

Advanced Patterns

Fan-Out/Fan-In Pattern

package main

import (
    "fmt"
    "sync"
)

// Fan-out: distribute work to multiple workers
func fanOut(input <-chan int, numWorkers int) []<-chan int {
    channels := make([]<-chan int, numWorkers)
    
    for i := 0; i < numWorkers; i++ {
        ch := make(chan int)
        channels[i] = ch
        
        go func(out chan<- int) {
            defer close(out)
            for val := range input {
                out <- val
            }
        }(ch)
    }
    
    return channels
}

// Fan-in: collect results from multiple channels
func fanIn(channels ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup
    
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for val := range c {
                out <- val
            }
        }(ch)
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

func main() {
    // Create input
    input := make(chan int)
    go func() {
        for i := 1; i <= 10; i++ {
            input <- i
        }
        close(input)
    }()
    
    // Fan-out to 3 workers
    channels := fanOut(input, 3)
    
    // Fan-in results
    results := fanIn(channels...)
    
    // Collect results
    for result := range results {
        fmt.Println("Result:", result)
    }
}

Pipeline Pattern

package main

import (
    "fmt"
)

// Stage 1: Generate numbers
func generate(max int) <-chan int {
    out := make(chan int)
    go func() {
        for i := 1; i <= max; i++ {
            out <- i
        }
        close(out)
    }()
    return out
}

// Stage 2: Square numbers
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for num := range in {
            out <- num * num
        }
        close(out)
    }()
    return out
}

// Stage 3: Filter even numbers
func filterEven(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for num := range in {
            if num%2 == 0 {
                out <- num
            }
        }
        close(out)
    }()
    return out
}

func main() {
    // Create pipeline
    numbers := generate(10)
    squared := square(numbers)
    evens := filterEven(squared)
    
    // Consume results
    for result := range evens {
        fmt.Println(result)
    }
}

Rate Limiting Pattern

package main

import (
    "fmt"
    "time"
)

func main() {
    // Limit to 3 concurrent operations
    limiter := make(chan struct{}, 3)
    
    for i := 1; i <= 10; i++ {
        go func(id int) {
            limiter <- struct{}{}  // Acquire
            defer func() { <-limiter }()  // Release
            
            fmt.Printf("Processing %d\n", id)
            time.Sleep(100 * time.Millisecond)
        }(i)
    }
    
    time.Sleep(2 * time.Second)
}

Timeout Pattern

package main

import (
    "fmt"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, job)
        time.Sleep(100 * time.Millisecond)
        results <- job * 2
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)
    
    // Start workers
    for i := 1; i <= 3; i++ {
        go worker(i, jobs, results)
    }
    
    // Send jobs
    for j := 1; j <= 10; j++ {
        jobs <- j
    }
    close(jobs)
    
    // Collect results with timeout
    timeout := time.After(5 * time.Second)
    collected := 0
    
    for collected < 10 {
        select {
        case result := <-results:
            fmt.Println("Result:", result)
            collected++
        case <-timeout:
            fmt.Println("Timeout!")
            return
        }
    }
}

Practical Examples

HTTP Request Worker Pool

package main

import (
    "fmt"
    "net/http"
    "sync"
)

type Request struct {
    URL string
    ID  int
}

type Response struct {
    ID     int
    Status int
    Error  error
}

func httpWorker(id int, requests <-chan Request, responses chan<- Response, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for req := range requests {
        resp, err := http.Get(req.URL)
        
        var status int
        if err == nil {
            status = resp.StatusCode
            resp.Body.Close()
        }
        
        responses <- Response{
            ID:     req.ID,
            Status: status,
            Error:  err,
        }
    }
}

func main() {
    requests := make(chan Request, 100)
    responses := make(chan Response, 100)
    
    var wg sync.WaitGroup
    
    // Start 5 workers
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go httpWorker(i, requests, responses, &wg)
    }
    
    // Send requests
    go func() {
        urls := []string{
            "https://golang.org",
            "https://github.com",
            "https://google.com",
        }
        
        for i, url := range urls {
            requests <- Request{URL: url, ID: i}
        }
        close(requests)
    }()
    
    // Wait and close responses
    go func() {
        wg.Wait()
        close(responses)
    }()
    
    // Collect responses
    for resp := range responses {
        if resp.Error != nil {
            fmt.Printf("Request %d: Error - %v\n", resp.ID, resp.Error)
        } else {
            fmt.Printf("Request %d: Status %d\n", resp.ID, resp.Status)
        }
    }
}

Task Queue with Priority

package main

import (
    "fmt"
    "sync"
)

type Task struct {
    ID       int
    Priority int
    Work     func()
}

func priorityWorker(id int, tasks <-chan Task, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for task := range tasks {
        fmt.Printf("Worker %d executing task %d (priority %d)\n", id, task.ID, task.Priority)
        task.Work()
    }
}

func main() {
    tasks := make(chan Task, 100)
    
    var wg sync.WaitGroup
    
    // Start workers
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go priorityWorker(i, tasks, &wg)
    }
    
    // Send tasks
    go func() {
        for i := 1; i <= 10; i++ {
            tasks <- Task{
                ID:       i,
                Priority: i % 3,
                Work: func() {
                    fmt.Println("Task executed")
                },
            }
        }
        close(tasks)
    }()
    
    wg.Wait()
}

Best Practices

✅ Good Practices

  1. Use buffered channels - For job queues
  2. Close channels from sender - Signal completion
  3. Use WaitGroup for synchronization - Track completion
  4. Implement timeouts - Prevent hangs
  5. Monitor worker health - Detect failures
  6. Use directional channels - Clear intent
  7. Document patterns - Explain design
  8. Test concurrency - Use race detector

❌ Anti-Patterns

// ❌ Bad: No timeout
for result := range results {
    // Could hang forever
}

// ✅ Good: With timeout
select {
case result := <-results:
    // Process
case <-time.After(timeout):
    // Handle timeout
}

// ❌ Bad: Unbuffered job channel
jobs := make(chan int)

// ✅ Good: Buffered job channel
jobs := make(chan int, 100)

// ❌ Bad: No synchronization
go worker(jobs, results)
// No way to know when done

// ✅ Good: Use WaitGroup
var wg sync.WaitGroup
wg.Add(1)
go worker(jobs, results, &wg)
wg.Wait()

Resources and References

Official Documentation

Tools and Resources

Summary

Worker pools and concurrency patterns are essential:

  • Implement worker pools for controlled concurrency
  • Use fan-out/fan-in for distribution
  • Implement pipelines for data processing
  • Add rate limiting for resource control
  • Use timeouts to prevent hangs
  • Synchronize with WaitGroup
  • Test with race detector

Master these patterns for robust concurrent Go programs.

Resources

Comments

Share this article

Scan to read on mobile