Worker pools are a fundamental concurrency pattern in Go. They allow you to process multiple tasks concurrently with a fixed number of workers, preventing resource exhaustion. This guide covers worker pool implementation and related patterns. For more context, see Go Installation Guide, Go Ecosystem Overview, Go Best Practices.
Basic Worker Pool
Simple Worker Pool
package main
import (
"fmt"
"sync"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
results <- job * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// Start 3 workers
for i := 1; i <= 3; i++ {
go worker(i, jobs, results)
}
// Send 10 jobs
for j := 1; j <= 10; j++ {
jobs <- j
}
close(jobs)
// Collect results
for i := 1; i <= 10; i++ {
fmt.Println("Result:", <-results)
}
}
Worker Pool with WaitGroup
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
time.Sleep(100 * time.Millisecond)
results <- job * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
var wg sync.WaitGroup
// Start 3 workers
for i := 1; i <= 3; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
// Send 10 jobs
go func() {
for j := 1; j <= 10; j++ {
jobs <- j
}
close(jobs)
}()
// Wait for workers to finish
go func() {
wg.Wait()
close(results)
}()
// Collect results
for result := range results {
fmt.Println("Result:", result)
}
}
Advanced Patterns
Fan-Out/Fan-In Pattern
package main
import (
"fmt"
"sync"
)
// Fan-out: distribute work to multiple workers
func fanOut(input <-chan int, numWorkers int) []<-chan int {
channels := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
ch := make(chan int)
channels[i] = ch
go func(out chan<- int) {
defer close(out)
for val := range input {
out <- val
}
}(ch)
}
return channels
}
// Fan-in: collect results from multiple channels
func fanIn(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for val := range c {
out <- val
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
// Create input
input := make(chan int)
go func() {
for i := 1; i <= 10; i++ {
input <- i
}
close(input)
}()
// Fan-out to 3 workers
channels := fanOut(input, 3)
// Fan-in results
results := fanIn(channels...)
// Collect results
for result := range results {
fmt.Println("Result:", result)
}
}
Pipeline Pattern
package main
import (
"fmt"
)
// Stage 1: Generate numbers
func generate(max int) <-chan int {
out := make(chan int)
go func() {
for i := 1; i <= max; i++ {
out <- i
}
close(out)
}()
return out
}
// Stage 2: Square numbers
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for num := range in {
out <- num * num
}
close(out)
}()
return out
}
// Stage 3: Filter even numbers
func filterEven(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for num := range in {
if num%2 == 0 {
out <- num
}
}
close(out)
}()
return out
}
func main() {
// Create pipeline
numbers := generate(10)
squared := square(numbers)
evens := filterEven(squared)
// Consume results
for result := range evens {
fmt.Println(result)
}
}
Rate Limiting Pattern
package main
import (
"fmt"
"time"
)
func main() {
// Limit to 3 concurrent operations
limiter := make(chan struct{}, 3)
for i := 1; i <= 10; i++ {
go func(id int) {
limiter <- struct{}{} // Acquire
defer func() { <-limiter }() // Release
fmt.Printf("Processing %d\n", id)
time.Sleep(100 * time.Millisecond)
}(i)
}
time.Sleep(2 * time.Second)
}
Timeout Pattern
package main
import (
"fmt"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
time.Sleep(100 * time.Millisecond)
results <- job * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// Start workers
for i := 1; i <= 3; i++ {
go worker(i, jobs, results)
}
// Send jobs
for j := 1; j <= 10; j++ {
jobs <- j
}
close(jobs)
// Collect results with timeout
timeout := time.After(5 * time.Second)
collected := 0
for collected < 10 {
select {
case result := <-results:
fmt.Println("Result:", result)
collected++
case <-timeout:
fmt.Println("Timeout!")
return
}
}
}
Practical Examples
HTTP Request Worker Pool
package main
import (
"fmt"
"net/http"
"sync"
)
type Request struct {
URL string
ID int
}
type Response struct {
ID int
Status int
Error error
}
func httpWorker(id int, requests <-chan Request, responses chan<- Response, wg *sync.WaitGroup) {
defer wg.Done()
for req := range requests {
resp, err := http.Get(req.URL)
var status int
if err == nil {
status = resp.StatusCode
resp.Body.Close()
}
responses <- Response{
ID: req.ID,
Status: status,
Error: err,
}
}
}
func main() {
requests := make(chan Request, 100)
responses := make(chan Response, 100)
var wg sync.WaitGroup
// Start 5 workers
for i := 1; i <= 5; i++ {
wg.Add(1)
go httpWorker(i, requests, responses, &wg)
}
// Send requests
go func() {
urls := []string{
"https://golang.org",
"https://github.com",
"https://google.com",
}
for i, url := range urls {
requests <- Request{URL: url, ID: i}
}
close(requests)
}()
// Wait and close responses
go func() {
wg.Wait()
close(responses)
}()
// Collect responses
for resp := range responses {
if resp.Error != nil {
fmt.Printf("Request %d: Error - %v\n", resp.ID, resp.Error)
} else {
fmt.Printf("Request %d: Status %d\n", resp.ID, resp.Status)
}
}
}
Task Queue with Priority
package main
import (
"fmt"
"sync"
)
type Task struct {
ID int
Priority int
Work func()
}
func priorityWorker(id int, tasks <-chan Task, wg *sync.WaitGroup) {
defer wg.Done()
for task := range tasks {
fmt.Printf("Worker %d executing task %d (priority %d)\n", id, task.ID, task.Priority)
task.Work()
}
}
func main() {
tasks := make(chan Task, 100)
var wg sync.WaitGroup
// Start workers
for i := 1; i <= 3; i++ {
wg.Add(1)
go priorityWorker(i, tasks, &wg)
}
// Send tasks
go func() {
for i := 1; i <= 10; i++ {
tasks <- Task{
ID: i,
Priority: i % 3,
Work: func() {
fmt.Println("Task executed")
},
}
}
close(tasks)
}()
wg.Wait()
}
Best Practices
✅ Good Practices
- Use buffered channels - For job queues
- Close channels from sender - Signal completion
- Use WaitGroup for synchronization - Track completion
- Implement timeouts - Prevent hangs
- Monitor worker health - Detect failures
- Use directional channels - Clear intent
- Document patterns - Explain design
- Test concurrency - Use race detector
❌ Anti-Patterns
// ❌ Bad: No timeout
for result := range results {
// Could hang forever
}
// ✅ Good: With timeout
select {
case result := <-results:
// Process
case <-time.After(timeout):
// Handle timeout
}
// ❌ Bad: Unbuffered job channel
jobs := make(chan int)
// ✅ Good: Buffered job channel
jobs := make(chan int, 100)
// ❌ Bad: No synchronization
go worker(jobs, results)
// No way to know when done
// ✅ Good: Use WaitGroup
var wg sync.WaitGroup
wg.Add(1)
go worker(jobs, results, &wg)
wg.Wait()
Resources and References
Official Documentation
- Concurrency Patterns - Official blog series
- Effective Go - Concurrency - Best practices
- sync Package - Synchronization primitives
Recommended Reading
- Go Concurrency Patterns - Rob Pike talk (video)
- Advanced Go Concurrency Patterns - Rob Pike talk (video)
- Concurrency is not Parallelism - Rob Pike talk (video)
Tools and Resources
- Race Detector - Detect race conditions
- Delve Debugger - Debug goroutines
- Go Playground - Online Go editor
Summary
Worker pools and concurrency patterns are essential:
- Implement worker pools for controlled concurrency
- Use fan-out/fan-in for distribution
- Implement pipelines for data processing
- Add rate limiting for resource control
- Use timeouts to prevent hangs
- Synchronize with WaitGroup
- Test with race detector
Master these patterns for robust concurrent Go programs.
Comments