Skip to main content
โšก Calmops

Concurrency Performance Tuning

Concurrency Performance Tuning

Concurrency is Go’s strength, but improper use can hurt performance. This guide covers techniques to optimize concurrent applications.

Goroutine Pool Sizing

The number of goroutines affects performance significantly.

Good: Optimal Worker Pool

package main

import (
	"fmt"
	"runtime"
	"sync"
	"time"
)

type Job struct {
	id   int
	data string
}

type Result struct {
	job    Job
	result string
}

func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
	defer wg.Done()
	
	for job := range jobs {
		// Simulate work
		time.Sleep(10 * time.Millisecond)
		results <- Result{
			job:    job,
			result: fmt.Sprintf("processed_%s", job.data),
		}
	}
}

func main() {
	numJobs := 1000
	numWorkers := runtime.NumCPU() // Optimal for CPU-bound work
	
	jobs := make(chan Job, numWorkers*2) // Buffer size = 2x workers
	results := make(chan Result, numWorkers*2)
	
	var wg sync.WaitGroup
	
	// Start workers
	for i := 0; i < numWorkers; i++ {
		wg.Add(1)
		go worker(i, jobs, results, &wg)
	}
	
	// Send jobs
	go func() {
		for i := 0; i < numJobs; i++ {
			jobs <- Job{id: i, data: fmt.Sprintf("job_%d", i)}
		}
		close(jobs)
	}()
	
	// Collect results
	go func() {
		wg.Wait()
		close(results)
	}()
	
	count := 0
	for result := range results {
		count++
		_ = result
	}
	
	fmt.Printf("Processed %d jobs\n", count)
}

Bad: Too Many Goroutines

// โŒ AVOID: Creating goroutine per task
package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var wg sync.WaitGroup
	
	// Creates 10000 goroutines - wasteful!
	for i := 0; i < 10000; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			time.Sleep(10 * time.Millisecond)
			_ = id
		}(i)
	}
	
	wg.Wait()
	fmt.Println("Done")
}

Channel Buffering

Buffer size affects throughput and latency.

Good: Optimal Buffer Size

package main

import (
	"fmt"
	"sync"
	"time"
)

func producer(ch chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	
	for i := 0; i < 1000; i++ {
		ch <- i
	}
}

func consumer(ch chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	
	for range ch {
		time.Sleep(1 * time.Millisecond)
	}
}

func main() {
	// Buffer size = number of workers
	ch := make(chan int, 10)
	
	var wg sync.WaitGroup
	
	// Start producers
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go producer(ch, &wg)
	}
	
	// Start consumers
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go consumer(ch, &wg)
	}
	
	wg.Wait()
	fmt.Println("Done")
}

Bad: Unbuffered Channel Bottleneck

// โŒ AVOID: Unbuffered channel with many goroutines
package main

import (
	"sync"
	"time"
)

func main() {
	ch := make(chan int) // Unbuffered - synchronous
	
	var wg sync.WaitGroup
	
	// Producers block waiting for consumers
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			ch <- id // Blocks!
		}(i)
	}
	
	// Consumers block waiting for producers
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			<-ch // Blocks!
			time.Sleep(10 * time.Millisecond)
		}()
	}
	
	wg.Wait()
}

Lock Contention Optimization

Minimize lock contention for better performance.

Good: Fine-Grained Locking

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

type Counter struct {
	mu    sync.RWMutex
	value int64
}

func (c *Counter) Increment() {
	c.mu.Lock()
	c.value++
	c.mu.Unlock()
}

func (c *Counter) Get() int64 {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.value
}

// Better: Use atomic for simple counters
type AtomicCounter struct {
	value int64
}

func (ac *AtomicCounter) Increment() {
	atomic.AddInt64(&ac.value, 1)
}

func (ac *AtomicCounter) Get() int64 {
	return atomic.LoadInt64(&ac.value)
}

func main() {
	counter := &AtomicCounter{}
	
	var wg sync.WaitGroup
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for j := 0; j < 1000; j++ {
				counter.Increment()
			}
		}()
	}
	
	wg.Wait()
	fmt.Printf("Counter: %d\n", counter.Get())
}

Bad: Coarse-Grained Locking

// โŒ AVOID: Holding lock during I/O
package main

import (
	"sync"
	"time"
)

type Cache struct {
	mu    sync.Mutex
	items map[string]string
}

func (c *Cache) GetAndFetch(key string) string {
	c.mu.Lock()
	defer c.mu.Unlock()
	
	// Holding lock during I/O - blocks other operations!
	time.Sleep(100 * time.Millisecond)
	
	return c.items[key]
}

func main() {
	cache := &Cache{items: make(map[string]string)}
	
	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			_ = cache.GetAndFetch("key")
		}()
	}
	
	wg.Wait()
}

Context-Based Cancellation

Use context for efficient cancellation and timeouts.

Good: Context-Based Cancellation

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

func worker(ctx context.Context, id int, results chan<- int) {
	for {
		select {
		case <-ctx.Done():
			fmt.Printf("Worker %d stopped\n", id)
			return
		default:
			// Do work
			results <- id
			time.Sleep(100 * time.Millisecond)
		}
	}
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
	defer cancel()
	
	results := make(chan int, 10)
	
	var wg sync.WaitGroup
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			worker(ctx, id, results)
		}(i)
	}
	
	// Collect results
	go func() {
		wg.Wait()
		close(results)
	}()
	
	for result := range results {
		fmt.Println("Result:", result)
	}
}

Bad: Manual Cancellation

// โŒ AVOID: Manual cancellation with channels
package main

import (
	"fmt"
	"sync"
	"time"
)

func worker(done chan bool, id int) {
	for {
		select {
		case <-done:
			fmt.Printf("Worker %d stopped\n", id)
			return
		default:
			// Do work
			time.Sleep(100 * time.Millisecond)
		}
	}
}

func main() {
	done := make(chan bool)
	
	var wg sync.WaitGroup
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			worker(done, id)
		}(i)
	}
	
	time.Sleep(500 * time.Millisecond)
	close(done)
	
	wg.Wait()
}

Batch Processing

Process items in batches for better throughput.

Good: Batch Processing

package main

import (
	"fmt"
	"sync"
	"time"
)

type Batch struct {
	items []int
}

func processBatch(batch Batch) {
	// Process entire batch at once
	sum := 0
	for _, item := range batch.items {
		sum += item
	}
	fmt.Printf("Batch sum: %d\n", sum)
}

func main() {
	batchSize := 100
	batches := make(chan Batch, 10)
	
	var wg sync.WaitGroup
	
	// Producer
	wg.Add(1)
	go func() {
		defer wg.Done()
		
		batch := Batch{items: make([]int, 0, batchSize)}
		for i := 0; i < 1000; i++ {
			batch.items = append(batch.items, i)
			
			if len(batch.items) == batchSize {
				batches <- batch
				batch = Batch{items: make([]int, 0, batchSize)}
			}
		}
		
		if len(batch.items) > 0 {
			batches <- batch
		}
		close(batches)
	}()
	
	// Consumers
	for i := 0; i < 4; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for batch := range batches {
				processBatch(batch)
			}
		}()
	}
	
	wg.Wait()
}

Semaphore Pattern

Limit concurrent operations with semaphores.

Good: Semaphore for Rate Limiting

package main

import (
	"fmt"
	"sync"
	"time"
)

type Semaphore struct {
	sem chan struct{}
}

func NewSemaphore(maxConcurrent int) *Semaphore {
	return &Semaphore{
		sem: make(chan struct{}, maxConcurrent),
	}
}

func (s *Semaphore) Acquire() {
	s.sem <- struct{}{}
}

func (s *Semaphore) Release() {
	<-s.sem
}

func main() {
	sem := NewSemaphore(3) // Max 3 concurrent operations
	
	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			sem.Acquire()
			defer sem.Release()
			
			fmt.Printf("Task %d started\n", id)
			time.Sleep(100 * time.Millisecond)
			fmt.Printf("Task %d finished\n", id)
		}(i)
	}
	
	wg.Wait()
}

Profiling Concurrency

Goroutine Profiling

# Get goroutine profile
go tool pprof http://localhost:6060/debug/pprof/goroutine

# Check for goroutine leaks
go tool pprof http://localhost:6060/debug/pprof/goroutine?debug=1

Contention Profiling

# Enable mutex profiling
go build -gcflags="-mutexprofile" main.go

# Analyze contention
go tool pprof mutex.prof

Best Practices

  1. Right-Size Worker Pools: Use runtime.NumCPU() for CPU-bound work
  2. Buffer Channels Appropriately: Buffer size affects throughput
  3. Use Atomic Operations: For simple counters, use atomic instead of locks
  4. Minimize Lock Duration: Hold locks for minimal time
  5. Use Context for Cancellation: Prefer context over manual channels
  6. Batch When Possible: Process items in batches for better throughput
  7. Profile Concurrency: Use profiling to identify bottlenecks
  8. Monitor Goroutines: Watch for goroutine leaks

Common Pitfalls

  • Too Many Goroutines: Creates overhead and reduces performance
  • Unbuffered Channels: Can cause unnecessary blocking
  • Lock Contention: Holding locks too long hurts performance
  • Goroutine Leaks: Goroutines that never exit waste resources
  • Inefficient Synchronization: Using channels when atomics would be better

Resources

Summary

Concurrency performance tuning requires careful consideration of goroutine count, channel buffering, and lock contention. Use worker pools with appropriate sizing, buffer channels for throughput, and minimize lock duration. Profile your concurrent code to identify bottlenecks and validate optimizations.

Comments