Worker Pools and Concurrency Patterns
Worker pools are a fundamental concurrency pattern in Go. They allow you to process multiple tasks concurrently with a fixed number of workers, preventing resource exhaustion. This guide covers worker pool implementation and related patterns.
Basic Worker Pool
Simple Worker Pool
package main
import (
"fmt"
"sync"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
results <- job * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// Start 3 workers
for i := 1; i <= 3; i++ {
go worker(i, jobs, results)
}
// Send 10 jobs
for j := 1; j <= 10; j++ {
jobs <- j
}
close(jobs)
// Collect results
for i := 1; i <= 10; i++ {
fmt.Println("Result:", <-results)
}
}
Worker Pool with WaitGroup
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
time.Sleep(100 * time.Millisecond)
results <- job * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
var wg sync.WaitGroup
// Start 3 workers
for i := 1; i <= 3; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
// Send 10 jobs
go func() {
for j := 1; j <= 10; j++ {
jobs <- j
}
close(jobs)
}()
// Wait for workers to finish
go func() {
wg.Wait()
close(results)
}()
// Collect results
for result := range results {
fmt.Println("Result:", result)
}
}
Advanced Patterns
Fan-Out/Fan-In Pattern
package main
import (
"fmt"
"sync"
)
// Fan-out: distribute work to multiple workers
func fanOut(input <-chan int, numWorkers int) []<-chan int {
channels := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
ch := make(chan int)
channels[i] = ch
go func(out chan<- int) {
defer close(out)
for val := range input {
out <- val
}
}(ch)
}
return channels
}
// Fan-in: collect results from multiple channels
func fanIn(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for val := range c {
out <- val
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
// Create input
input := make(chan int)
go func() {
for i := 1; i <= 10; i++ {
input <- i
}
close(input)
}()
// Fan-out to 3 workers
channels := fanOut(input, 3)
// Fan-in results
results := fanIn(channels...)
// Collect results
for result := range results {
fmt.Println("Result:", result)
}
}
Pipeline Pattern
package main
import (
"fmt"
)
// Stage 1: Generate numbers
func generate(max int) <-chan int {
out := make(chan int)
go func() {
for i := 1; i <= max; i++ {
out <- i
}
close(out)
}()
return out
}
// Stage 2: Square numbers
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for num := range in {
out <- num * num
}
close(out)
}()
return out
}
// Stage 3: Filter even numbers
func filterEven(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for num := range in {
if num%2 == 0 {
out <- num
}
}
close(out)
}()
return out
}
func main() {
// Create pipeline
numbers := generate(10)
squared := square(numbers)
evens := filterEven(squared)
// Consume results
for result := range evens {
fmt.Println(result)
}
}
Rate Limiting Pattern
package main
import (
"fmt"
"time"
)
func main() {
// Limit to 3 concurrent operations
limiter := make(chan struct{}, 3)
for i := 1; i <= 10; i++ {
go func(id int) {
limiter <- struct{}{} // Acquire
defer func() { <-limiter }() // Release
fmt.Printf("Processing %d\n", id)
time.Sleep(100 * time.Millisecond)
}(i)
}
time.Sleep(2 * time.Second)
}
Timeout Pattern
package main
import (
"fmt"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
time.Sleep(100 * time.Millisecond)
results <- job * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// Start workers
for i := 1; i <= 3; i++ {
go worker(i, jobs, results)
}
// Send jobs
for j := 1; j <= 10; j++ {
jobs <- j
}
close(jobs)
// Collect results with timeout
timeout := time.After(5 * time.Second)
collected := 0
for collected < 10 {
select {
case result := <-results:
fmt.Println("Result:", result)
collected++
case <-timeout:
fmt.Println("Timeout!")
return
}
}
}
Practical Examples
HTTP Request Worker Pool
package main
import (
"fmt"
"net/http"
"sync"
)
type Request struct {
URL string
ID int
}
type Response struct {
ID int
Status int
Error error
}
func httpWorker(id int, requests <-chan Request, responses chan<- Response, wg *sync.WaitGroup) {
defer wg.Done()
for req := range requests {
resp, err := http.Get(req.URL)
var status int
if err == nil {
status = resp.StatusCode
resp.Body.Close()
}
responses <- Response{
ID: req.ID,
Status: status,
Error: err,
}
}
}
func main() {
requests := make(chan Request, 100)
responses := make(chan Response, 100)
var wg sync.WaitGroup
// Start 5 workers
for i := 1; i <= 5; i++ {
wg.Add(1)
go httpWorker(i, requests, responses, &wg)
}
// Send requests
go func() {
urls := []string{
"https://golang.org",
"https://github.com",
"https://google.com",
}
for i, url := range urls {
requests <- Request{URL: url, ID: i}
}
close(requests)
}()
// Wait and close responses
go func() {
wg.Wait()
close(responses)
}()
// Collect responses
for resp := range responses {
if resp.Error != nil {
fmt.Printf("Request %d: Error - %v\n", resp.ID, resp.Error)
} else {
fmt.Printf("Request %d: Status %d\n", resp.ID, resp.Status)
}
}
}
Task Queue with Priority
package main
import (
"fmt"
"sync"
)
type Task struct {
ID int
Priority int
Work func()
}
func priorityWorker(id int, tasks <-chan Task, wg *sync.WaitGroup) {
defer wg.Done()
for task := range tasks {
fmt.Printf("Worker %d executing task %d (priority %d)\n", id, task.ID, task.Priority)
task.Work()
}
}
func main() {
tasks := make(chan Task, 100)
var wg sync.WaitGroup
// Start workers
for i := 1; i <= 3; i++ {
wg.Add(1)
go priorityWorker(i, tasks, &wg)
}
// Send tasks
go func() {
for i := 1; i <= 10; i++ {
tasks <- Task{
ID: i,
Priority: i % 3,
Work: func() {
fmt.Println("Task executed")
},
}
}
close(tasks)
}()
wg.Wait()
}
Best Practices
โ Good Practices
- Use buffered channels - For job queues
- Close channels from sender - Signal completion
- Use WaitGroup for synchronization - Track completion
- Implement timeouts - Prevent hangs
- Monitor worker health - Detect failures
- Use directional channels - Clear intent
- Document patterns - Explain design
- Test concurrency - Use race detector
โ Anti-Patterns
// โ Bad: No timeout
for result := range results {
// Could hang forever
}
// โ
Good: With timeout
select {
case result := <-results:
// Process
case <-time.After(timeout):
// Handle timeout
}
// โ Bad: Unbuffered job channel
jobs := make(chan int)
// โ
Good: Buffered job channel
jobs := make(chan int, 100)
// โ Bad: No synchronization
go worker(jobs, results)
// No way to know when done
// โ
Good: Use WaitGroup
var wg sync.WaitGroup
wg.Add(1)
go worker(jobs, results, &wg)
wg.Wait()
Resources and References
Official Documentation
- Concurrency Patterns - Official blog series
- Effective Go - Concurrency - Best practices
- sync Package - Synchronization primitives
Recommended Reading
- Go Concurrency Patterns - Rob Pike talk (video)
- Advanced Go Concurrency Patterns - Rob Pike talk (video)
- Concurrency is not Parallelism - Rob Pike talk (video)
Tools and Resources
- Race Detector - Detect race conditions
- Delve Debugger - Debug goroutines
- Go Playground - Online Go editor
Summary
Worker pools and concurrency patterns are essential:
- Implement worker pools for controlled concurrency
- Use fan-out/fan-in for distribution
- Implement pipelines for data processing
- Add rate limiting for resource control
- Use timeouts to prevent hangs
- Synchronize with WaitGroup
- Test with race detector
Master these patterns for robust concurrent Go programs.
Comments