Introduction
Go’s concurrency model โ goroutines and channels โ is simple to start but has subtle patterns that separate correct, production-grade code from code that leaks goroutines or deadlocks. This guide covers the patterns you’ll use in real systems.
Prerequisites: Basic goroutines and channels. See Go by Example: Goroutines.
The select Statement
select is like a switch for channels โ it blocks until one of its cases can proceed:
select {
case msg := <-ch1:
fmt.Println("received from ch1:", msg)
case ch2 <- value:
fmt.Println("sent to ch2")
case <-time.After(1 * time.Second):
fmt.Println("timeout")
default:
fmt.Println("no channel ready (non-blocking)")
}
Key behaviors:
- If multiple cases are ready, Go picks one at random
defaultmakes select non-blocking<-time.After(d)implements timeouts
The for-select Loop
The most common concurrency pattern in Go: a goroutine that processes events until told to stop.
func worker(jobs <-chan Job, results chan<- Result, done <-chan struct{}) {
for {
select {
case job, ok := <-jobs:
if !ok {
return // channel closed
}
results <- process(job)
case <-done:
return // cancellation signal
}
}
}
Cancellation with context.Context
context.Context is the standard way to cancel goroutines in Go. Always pass context as the first argument:
func fetchUser(ctx context.Context, id int) (*User, error) {
// Create a request with the context
req, err := http.NewRequestWithContext(ctx, "GET",
fmt.Sprintf("https://api.example.com/users/%d", id), nil)
if err != nil {
return nil, err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err // includes context.Canceled or context.DeadlineExceeded
}
defer resp.Body.Close()
var user User
return &user, json.NewDecoder(resp.Body).Decode(&user)
}
// Caller controls the lifetime
func main() {
// Cancel after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() // always call cancel to release resources
user, err := fetchUser(ctx, 42)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
log.Println("request timed out")
}
log.Fatal(err)
}
fmt.Println(user.Name)
}
Propagating Cancellation
func processOrder(ctx context.Context, orderID string) error {
// All sub-operations inherit the same cancellation
user, err := getUser(ctx, orderID)
if err != nil {
return fmt.Errorf("getUser: %w", err)
}
inventory, err := checkInventory(ctx, orderID)
if err != nil {
return fmt.Errorf("checkInventory: %w", err)
}
return chargePayment(ctx, user, inventory)
}
// If ctx is cancelled, all three calls stop immediately
Manual Cancellation
func longRunningTask(ctx context.Context) error {
for i := 0; i < 1000; i++ {
// Check for cancellation at each iteration
select {
case <-ctx.Done():
return ctx.Err() // context.Canceled or context.DeadlineExceeded
default:
}
if err := doWork(i); err != nil {
return err
}
}
return nil
}
Worker Pool Pattern
Process N jobs with M workers โ the most common concurrency pattern for I/O-bound work:
func workerPool(ctx context.Context, jobs []Job, numWorkers int) []Result {
jobCh := make(chan Job, len(jobs))
resultCh := make(chan Result, len(jobs))
// Start workers
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobCh {
select {
case <-ctx.Done():
return
default:
resultCh <- processJob(job)
}
}
}()
}
// Send jobs
for _, job := range jobs {
jobCh <- job
}
close(jobCh) // signal workers: no more jobs
// Wait for all workers, then close results
go func() {
wg.Wait()
close(resultCh)
}()
// Collect results
var results []Result
for result := range resultCh {
results = append(results, result)
}
return results
}
Worker Pool with Error Handling
type Result struct {
Value interface{}
Err error
}
func workerPoolWithErrors(ctx context.Context, urls []string, workers int) []Result {
jobs := make(chan string, len(urls))
results := make(chan Result, len(urls))
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for url := range jobs {
data, err := fetch(ctx, url)
results <- Result{Value: data, Err: err}
}
}()
}
for _, url := range urls {
jobs <- url
}
close(jobs)
go func() {
wg.Wait()
close(results)
}()
var all []Result
for r := range results {
all = append(all, r)
}
return all
}
Fan-Out / Fan-In
Fan-out: distribute work across multiple goroutines. Fan-in: merge results from multiple goroutines into one channel.
// Fan-out: send each item to a separate goroutine
func fanOut(ctx context.Context, input <-chan int, workers int) []<-chan int {
outputs := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
ch := make(chan int)
outputs[i] = ch
go func(out chan<- int) {
defer close(out)
for v := range input {
select {
case out <- v * v: // square each number
case <-ctx.Done():
return
}
}
}(ch)
}
return outputs
}
// Fan-in: merge multiple channels into one
func fanIn(ctx context.Context, channels ...<-chan int) <-chan int {
merged := make(chan int)
var wg sync.WaitGroup
output := func(ch <-chan int) {
defer wg.Done()
for v := range ch {
select {
case merged <- v:
case <-ctx.Done():
return
}
}
}
wg.Add(len(channels))
for _, ch := range channels {
go output(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
The Ping-Pong Pattern
Demonstrates channel ownership and passing state between goroutines:
type Ball struct{ hits int }
func player(name string, table chan *Ball) {
for {
ball := <-table
ball.hits++
fmt.Println(name, ball.hits)
time.Sleep(100 * time.Millisecond)
table <- ball
}
}
func main() {
table := make(chan *Ball)
go player("ping", table)
go player("pong", table)
table <- new(Ball) // start the game
time.Sleep(1 * time.Second)
<-table // stop the game
}
Semaphore: Limit Concurrent Operations
Use a buffered channel as a semaphore to limit concurrent goroutines:
const maxConcurrent = 5
sem := make(chan struct{}, maxConcurrent)
for _, url := range urls {
sem <- struct{}{} // acquire slot (blocks if full)
go func(u string) {
defer func() { <-sem }() // release slot when done
fetch(u)
}(url)
}
// Wait for all goroutines to finish
for i := 0; i < maxConcurrent; i++ {
sem <- struct{}{}
}
Or use golang.org/x/sync/semaphore for a cleaner API:
import "golang.org/x/sync/semaphore"
sem := semaphore.NewWeighted(5)
for _, url := range urls {
if err := sem.Acquire(ctx, 1); err != nil {
break
}
go func(u string) {
defer sem.Release(1)
fetch(u)
}(url)
}
sem.Acquire(ctx, 5) // wait for all to finish
errgroup: Goroutines with Error Propagation
golang.org/x/sync/errgroup runs goroutines and collects the first error:
import "golang.org/x/sync/errgroup"
func fetchAll(ctx context.Context, urls []string) ([][]byte, error) {
g, ctx := errgroup.WithContext(ctx)
results := make([][]byte, len(urls))
for i, url := range urls {
i, url := i, url // capture loop variables
g.Go(func() error {
data, err := fetch(ctx, url)
if err != nil {
return fmt.Errorf("fetch %s: %w", url, err)
}
results[i] = data
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err // first error from any goroutine
}
return results, nil
}
Common Mistakes
Goroutine Leak
// BAD: goroutine runs forever if nobody reads from ch
func leak() {
ch := make(chan int)
go func() {
ch <- expensiveComputation() // blocks forever if caller returns
}()
// caller returns without reading ch
}
// GOOD: use context for cancellation
func noLeak(ctx context.Context) (int, error) {
ch := make(chan int, 1) // buffered: goroutine won't block
go func() {
ch <- expensiveComputation()
}()
select {
case result := <-ch:
return result, nil
case <-ctx.Done():
return 0, ctx.Err()
}
}
Closing a Channel Twice
// BAD: panics if close is called twice
close(ch)
close(ch) // panic: close of closed channel
// GOOD: use sync.Once
var once sync.Once
closeOnce := func() { once.Do(func() { close(ch) }) }
Resources
- Go Blog: Concurrency Patterns
- Go Blog: Context
- Advanced Go Concurrency Patterns (talk)
- golang.org/x/sync
Comments