Introduction
Go’s concurrency model — goroutines and channels — is simple to start but has subtle patterns that separate correct, production-grade code from code that leaks goroutines or deadlocks. This guide covers the patterns you’ll use in real systems. See Go Installation Guide, Go Ecosystem Overview, Go Best Practices for more context.
Prerequisites: Basic goroutines and channels. See Go by Example: Goroutines.
The select Statement
select is like a switch for channels — it blocks until one of its cases can proceed:
select {
case msg := <-ch1:
fmt.Println("received from ch1:", msg)
case ch2 <- value:
fmt.Println("sent to ch2")
case <-time.After(1 * time.Second):
fmt.Println("timeout")
default:
fmt.Println("no channel ready (non-blocking)")
}
Key behaviors:
- If multiple cases are ready, Go picks one at random
defaultmakes select non-blocking<-time.After(d)implements timeouts
The for-select Loop
The most common concurrency pattern in Go: a goroutine that processes events until told to stop.
func worker(jobs <-chan Job, results chan<- Result, done <-chan struct{}) {
for {
select {
case job, ok := <-jobs:
if !ok {
return // channel closed
}
results <- process(job)
case <-done:
return // cancellation signal
}
}
}
Cancellation with context.Context
context.Context is the standard way to cancel goroutines in Go. Always pass context as the first argument:
func fetchUser(ctx context.Context, id int) (*User, error) {
// Create a request with the context
req, err := http.NewRequestWithContext(ctx, "GET",
fmt.Sprintf("https://api.example.com/users/%d", id), nil)
if err != nil {
return nil, err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err // includes context.Canceled or context.DeadlineExceeded
}
defer resp.Body.Close()
var user User
return &user, json.NewDecoder(resp.Body).Decode(&user)
}
// Caller controls the lifetime
func main() {
// Cancel after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() // always call cancel to release resources
user, err := fetchUser(ctx, 42)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
log.Println("request timed out")
}
log.Fatal(err)
}
fmt.Println(user.Name)
}
Propagating Cancellation
func processOrder(ctx context.Context, orderID string) error {
// All sub-operations inherit the same cancellation
user, err := getUser(ctx, orderID)
if err != nil {
return fmt.Errorf("getUser: %w", err)
}
inventory, err := checkInventory(ctx, orderID)
if err != nil {
return fmt.Errorf("checkInventory: %w", err)
}
return chargePayment(ctx, user, inventory)
}
// If ctx is cancelled, all three calls stop immediately
Manual Cancellation
func longRunningTask(ctx context.Context) error {
for i := 0; i < 1000; i++ {
// Check for cancellation at each iteration
select {
case <-ctx.Done():
return ctx.Err() // context.Canceled or context.DeadlineExceeded
default:
}
if err := doWork(i); err != nil {
return err
}
}
return nil
}
Worker Pool Pattern
Process N jobs with M workers — the most common concurrency pattern for I/O-bound work:
func workerPool(ctx context.Context, jobs []Job, numWorkers int) []Result {
jobCh := make(chan Job, len(jobs))
resultCh := make(chan Result, len(jobs))
// Start workers
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobCh {
select {
case <-ctx.Done():
return
default:
resultCh <- processJob(job)
}
}
}()
}
// Send jobs
for _, job := range jobs {
jobCh <- job
}
close(jobCh) // signal workers: no more jobs
// Wait for all workers, then close results
go func() {
wg.Wait()
close(resultCh)
}()
// Collect results
var results []Result
for result := range resultCh {
results = append(results, result)
}
return results
}
Worker Pool with Error Handling
type Result struct {
Value interface{}
Err error
}
func workerPoolWithErrors(ctx context.Context, urls []string, workers int) []Result {
jobs := make(chan string, len(urls))
results := make(chan Result, len(urls))
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for url := range jobs {
data, err := fetch(ctx, url)
results <- Result{Value: data, Err: err}
}
}()
}
for _, url := range urls {
jobs <- url
}
close(jobs)
go func() {
wg.Wait()
close(results)
}()
var all []Result
for r := range results {
all = append(all, r)
}
return all
}
Fan-Out / Fan-In
Fan-out: distribute work across multiple goroutines. Fan-in: merge results from multiple goroutines into one channel.
// Fan-out: send each item to a separate goroutine
func fanOut(ctx context.Context, input <-chan int, workers int) []<-chan int {
outputs := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
ch := make(chan int)
outputs[i] = ch
go func(out chan<- int) {
defer close(out)
for v := range input {
select {
case out <- v * v: // square each number
case <-ctx.Done():
return
}
}
}(ch)
}
return outputs
}
// Fan-in: merge multiple channels into one
func fanIn(ctx context.Context, channels ...<-chan int) <-chan int {
merged := make(chan int)
var wg sync.WaitGroup
output := func(ch <-chan int) {
defer wg.Done()
for v := range ch {
select {
case merged <- v:
case <-ctx.Done():
return
}
}
}
wg.Add(len(channels))
for _, ch := range channels {
go output(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
The Ping-Pong Pattern
Demonstrates channel ownership and passing state between goroutines:
type Ball struct{ hits int }
func player(name string, table chan *Ball) {
for {
ball := <-table
ball.hits++
fmt.Println(name, ball.hits)
time.Sleep(100 * time.Millisecond)
table <- ball
}
}
func main() {
table := make(chan *Ball)
go player("ping", table)
go player("pong", table)
table <- new(Ball) // start the game
time.Sleep(1 * time.Second)
<-table // stop the game
}
Semaphore: Limit Concurrent Operations
Use a buffered channel as a semaphore to limit concurrent goroutines:
const maxConcurrent = 5
sem := make(chan struct{}, maxConcurrent)
for _, url := range urls {
sem <- struct{}{} // acquire slot (blocks if full)
go func(u string) {
defer func() { <-sem }() // release slot when done
fetch(u)
}(url)
}
// Wait for all goroutines to finish
for i := 0; i < maxConcurrent; i++ {
sem <- struct{}{}
}
Or use golang.org/x/sync/semaphore for a cleaner API:
import "golang.org/x/sync/semaphore"
sem := semaphore.NewWeighted(5)
for _, url := range urls {
if err := sem.Acquire(ctx, 1); err != nil {
break
}
go func(u string) {
defer sem.Release(1)
fetch(u)
}(url)
}
sem.Acquire(ctx, 5) // wait for all to finish
errgroup: Goroutines with Error Propagation
golang.org/x/sync/errgroup runs goroutines and collects the first error:
import "golang.org/x/sync/errgroup"
func fetchAll(ctx context.Context, urls []string) ([][]byte, error) {
g, ctx := errgroup.WithContext(ctx)
results := make([][]byte, len(urls))
for i, url := range urls {
i, url := i, url // capture loop variables
g.Go(func() error {
data, err := fetch(ctx, url)
if err != nil {
return fmt.Errorf("fetch %s: %w", url, err)
}
results[i] = data
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err // first error from any goroutine
}
return results, nil
}
Common Mistakes
Goroutine Leak
// BAD: goroutine runs forever if nobody reads from ch
func leak() {
ch := make(chan int)
go func() {
ch <- expensiveComputation() // blocks forever if caller returns
}()
// caller returns without reading ch
}
// GOOD: use context for cancellation
func noLeak(ctx context.Context) (int, error) {
ch := make(chan int, 1) // buffered: goroutine won't block
go func() {
ch <- expensiveComputation()
}()
select {
case result := <-ch:
return result, nil
case <-ctx.Done():
return 0, ctx.Err()
}
}
Closing a Channel Twice
// BAD: panics if close is called twice
close(ch)
close(ch) // panic: close of closed channel
// GOOD: use sync.Once
var once sync.Once
closeOnce := func() { once.Do(func() { close(ch) }) }
Resources
- Go Blog: Concurrency Patterns
- Go Blog: Context
- Advanced Go Concurrency Patterns (talk)
- golang.org/x/sync
Comments