Skip to main content
โšก Calmops

Worker Pools and Concurrency Patterns

Worker Pools and Concurrency Patterns

Worker pools are a fundamental concurrency pattern in Go. They allow you to process multiple tasks concurrently with a fixed number of workers, preventing resource exhaustion. This guide covers worker pool implementation and related patterns.

Basic Worker Pool

Simple Worker Pool

package main

import (
    "fmt"
    "sync"
)

func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, job)
        results <- job * 2
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)
    
    // Start 3 workers
    for i := 1; i <= 3; i++ {
        go worker(i, jobs, results)
    }
    
    // Send 10 jobs
    for j := 1; j <= 10; j++ {
        jobs <- j
    }
    close(jobs)
    
    // Collect results
    for i := 1; i <= 10; i++ {
        fmt.Println("Result:", <-results)
    }
}

Worker Pool with WaitGroup

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for job := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, job)
        time.Sleep(100 * time.Millisecond)
        results <- job * 2
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)
    
    var wg sync.WaitGroup
    
    // Start 3 workers
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go worker(i, jobs, results, &wg)
    }
    
    // Send 10 jobs
    go func() {
        for j := 1; j <= 10; j++ {
            jobs <- j
        }
        close(jobs)
    }()
    
    // Wait for workers to finish
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // Collect results
    for result := range results {
        fmt.Println("Result:", result)
    }
}

Advanced Patterns

Fan-Out/Fan-In Pattern

package main

import (
    "fmt"
    "sync"
)

// Fan-out: distribute work to multiple workers
func fanOut(input <-chan int, numWorkers int) []<-chan int {
    channels := make([]<-chan int, numWorkers)
    
    for i := 0; i < numWorkers; i++ {
        ch := make(chan int)
        channels[i] = ch
        
        go func(out chan<- int) {
            defer close(out)
            for val := range input {
                out <- val
            }
        }(ch)
    }
    
    return channels
}

// Fan-in: collect results from multiple channels
func fanIn(channels ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup
    
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for val := range c {
                out <- val
            }
        }(ch)
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

func main() {
    // Create input
    input := make(chan int)
    go func() {
        for i := 1; i <= 10; i++ {
            input <- i
        }
        close(input)
    }()
    
    // Fan-out to 3 workers
    channels := fanOut(input, 3)
    
    // Fan-in results
    results := fanIn(channels...)
    
    // Collect results
    for result := range results {
        fmt.Println("Result:", result)
    }
}

Pipeline Pattern

package main

import (
    "fmt"
)

// Stage 1: Generate numbers
func generate(max int) <-chan int {
    out := make(chan int)
    go func() {
        for i := 1; i <= max; i++ {
            out <- i
        }
        close(out)
    }()
    return out
}

// Stage 2: Square numbers
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for num := range in {
            out <- num * num
        }
        close(out)
    }()
    return out
}

// Stage 3: Filter even numbers
func filterEven(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for num := range in {
            if num%2 == 0 {
                out <- num
            }
        }
        close(out)
    }()
    return out
}

func main() {
    // Create pipeline
    numbers := generate(10)
    squared := square(numbers)
    evens := filterEven(squared)
    
    // Consume results
    for result := range evens {
        fmt.Println(result)
    }
}

Rate Limiting Pattern

package main

import (
    "fmt"
    "time"
)

func main() {
    // Limit to 3 concurrent operations
    limiter := make(chan struct{}, 3)
    
    for i := 1; i <= 10; i++ {
        go func(id int) {
            limiter <- struct{}{}  // Acquire
            defer func() { <-limiter }()  // Release
            
            fmt.Printf("Processing %d\n", id)
            time.Sleep(100 * time.Millisecond)
        }(i)
    }
    
    time.Sleep(2 * time.Second)
}

Timeout Pattern

package main

import (
    "fmt"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, job)
        time.Sleep(100 * time.Millisecond)
        results <- job * 2
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)
    
    // Start workers
    for i := 1; i <= 3; i++ {
        go worker(i, jobs, results)
    }
    
    // Send jobs
    for j := 1; j <= 10; j++ {
        jobs <- j
    }
    close(jobs)
    
    // Collect results with timeout
    timeout := time.After(5 * time.Second)
    collected := 0
    
    for collected < 10 {
        select {
        case result := <-results:
            fmt.Println("Result:", result)
            collected++
        case <-timeout:
            fmt.Println("Timeout!")
            return
        }
    }
}

Practical Examples

HTTP Request Worker Pool

package main

import (
    "fmt"
    "net/http"
    "sync"
)

type Request struct {
    URL string
    ID  int
}

type Response struct {
    ID     int
    Status int
    Error  error
}

func httpWorker(id int, requests <-chan Request, responses chan<- Response, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for req := range requests {
        resp, err := http.Get(req.URL)
        
        var status int
        if err == nil {
            status = resp.StatusCode
            resp.Body.Close()
        }
        
        responses <- Response{
            ID:     req.ID,
            Status: status,
            Error:  err,
        }
    }
}

func main() {
    requests := make(chan Request, 100)
    responses := make(chan Response, 100)
    
    var wg sync.WaitGroup
    
    // Start 5 workers
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go httpWorker(i, requests, responses, &wg)
    }
    
    // Send requests
    go func() {
        urls := []string{
            "https://golang.org",
            "https://github.com",
            "https://google.com",
        }
        
        for i, url := range urls {
            requests <- Request{URL: url, ID: i}
        }
        close(requests)
    }()
    
    // Wait and close responses
    go func() {
        wg.Wait()
        close(responses)
    }()
    
    // Collect responses
    for resp := range responses {
        if resp.Error != nil {
            fmt.Printf("Request %d: Error - %v\n", resp.ID, resp.Error)
        } else {
            fmt.Printf("Request %d: Status %d\n", resp.ID, resp.Status)
        }
    }
}

Task Queue with Priority

package main

import (
    "fmt"
    "sync"
)

type Task struct {
    ID       int
    Priority int
    Work     func()
}

func priorityWorker(id int, tasks <-chan Task, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for task := range tasks {
        fmt.Printf("Worker %d executing task %d (priority %d)\n", id, task.ID, task.Priority)
        task.Work()
    }
}

func main() {
    tasks := make(chan Task, 100)
    
    var wg sync.WaitGroup
    
    // Start workers
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go priorityWorker(i, tasks, &wg)
    }
    
    // Send tasks
    go func() {
        for i := 1; i <= 10; i++ {
            tasks <- Task{
                ID:       i,
                Priority: i % 3,
                Work: func() {
                    fmt.Println("Task executed")
                },
            }
        }
        close(tasks)
    }()
    
    wg.Wait()
}

Best Practices

โœ… Good Practices

  1. Use buffered channels - For job queues
  2. Close channels from sender - Signal completion
  3. Use WaitGroup for synchronization - Track completion
  4. Implement timeouts - Prevent hangs
  5. Monitor worker health - Detect failures
  6. Use directional channels - Clear intent
  7. Document patterns - Explain design
  8. Test concurrency - Use race detector

โŒ Anti-Patterns

// โŒ Bad: No timeout
for result := range results {
    // Could hang forever
}

// โœ… Good: With timeout
select {
case result := <-results:
    // Process
case <-time.After(timeout):
    // Handle timeout
}

// โŒ Bad: Unbuffered job channel
jobs := make(chan int)

// โœ… Good: Buffered job channel
jobs := make(chan int, 100)

// โŒ Bad: No synchronization
go worker(jobs, results)
// No way to know when done

// โœ… Good: Use WaitGroup
var wg sync.WaitGroup
wg.Add(1)
go worker(jobs, results, &wg)
wg.Wait()

Resources and References

Official Documentation

Tools and Resources

Summary

Worker pools and concurrency patterns are essential:

  • Implement worker pools for controlled concurrency
  • Use fan-out/fan-in for distribution
  • Implement pipelines for data processing
  • Add rate limiting for resource control
  • Use timeouts to prevent hangs
  • Synchronize with WaitGroup
  • Test with race detector

Master these patterns for robust concurrent Go programs.

Comments