Sync Package: Mutexes and WaitGroups
The sync package provides essential synchronization primitives for coordinating goroutines. This guide covers mutexes, RWMutex, WaitGroups, and other synchronization tools.
Mutex (Mutual Exclusion)
Basic Mutex Usage
package main
import (
"fmt"
"sync"
)
var (
counter = 0
mu sync.Mutex
)
func increment() {
mu.Lock()
defer mu.Unlock()
counter++
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
increment()
}()
}
wg.Wait()
fmt.Println("Counter:", counter) // Always 100
}
Protecting Data Structures
package main
import (
"fmt"
"sync"
)
type SafeCounter struct {
mu sync.Mutex
value int
}
func (sc *SafeCounter) Increment() {
sc.mu.Lock()
defer sc.mu.Unlock()
sc.value++
}
func (sc *SafeCounter) Get() int {
sc.mu.Lock()
defer sc.mu.Unlock()
return sc.value
}
func main() {
counter := &SafeCounter{}
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Println("Counter:", counter.Get()) // 100
}
RWMutex (Read-Write Mutex)
When to Use RWMutex
Use RWMutex when you have many readers and few writers:
package main
import (
"fmt"
"sync"
"time"
)
type Cache struct {
mu sync.RWMutex
data map[string]string
}
func (c *Cache) Get(key string) string {
c.mu.RLock()
defer c.mu.RUnlock()
return c.data[key]
}
func (c *Cache) Set(key, value string) {
c.mu.Lock()
defer c.mu.Unlock()
c.data[key] = value
}
func main() {
cache := &Cache{data: make(map[string]string)}
// Many readers
for i := 0; i < 10; i++ {
go func(id int) {
for j := 0; j < 100; j++ {
cache.Get("key")
time.Sleep(1 * time.Millisecond)
}
}(i)
}
// Few writers
for i := 0; i < 2; i++ {
go func(id int) {
for j := 0; j < 10; j++ {
cache.Set("key", fmt.Sprintf("value-%d", j))
time.Sleep(10 * time.Millisecond)
}
}(i)
}
time.Sleep(2 * time.Second)
}
RWMutex vs Mutex Performance
package main
import (
"fmt"
"sync"
"time"
)
// Benchmark with Mutex
func benchmarkMutex() time.Duration {
var mu sync.Mutex
data := 0
start := time.Now()
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 1000; j++ {
mu.Lock()
_ = data
mu.Unlock()
}
}()
}
wg.Wait()
return time.Since(start)
}
// Benchmark with RWMutex
func benchmarkRWMutex() time.Duration {
var mu sync.RWMutex
data := 0
start := time.Now()
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 1000; j++ {
mu.RLock()
_ = data
mu.RUnlock()
}
}()
}
wg.Wait()
return time.Since(start)
}
func main() {
fmt.Println("Mutex:", benchmarkMutex())
fmt.Println("RWMutex:", benchmarkRWMutex())
}
WaitGroup
Basic WaitGroup Usage
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d starting\n", id)
time.Sleep(1 * time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go worker(i, &wg)
}
fmt.Println("Waiting for workers...")
wg.Wait()
fmt.Println("All workers done")
}
WaitGroup with Error Handling
package main
import (
"fmt"
"sync"
)
func task(id int, wg *sync.WaitGroup, results chan<- error) {
defer wg.Done()
if id%2 == 0 {
results <- fmt.Errorf("task %d failed", id)
} else {
results <- nil
}
}
func main() {
var wg sync.WaitGroup
results := make(chan error, 5)
for i := 1; i <= 5; i++ {
wg.Add(1)
go task(i, &wg, results)
}
go func() {
wg.Wait()
close(results)
}()
for err := range results {
if err != nil {
fmt.Println("Error:", err)
} else {
fmt.Println("Success")
}
}
}
Other Sync Primitives
Once (Execute Once)
package main
import (
"fmt"
"sync"
)
var (
instance string
once sync.Once
)
func getInstance() string {
once.Do(func() {
instance = "Initialized"
fmt.Println("Initializing...")
})
return instance
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println(getInstance())
}()
}
wg.Wait()
}
// Output:
// Initializing...
// Initialized
// Initialized
// Initialized
// Initialized
Cond (Condition Variable)
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var mu sync.Mutex
cond := sync.NewCond(&mu)
var ready bool
// Waiter
go func() {
mu.Lock()
for !ready {
cond.Wait()
}
fmt.Println("Waiter: Ready!")
mu.Unlock()
}()
// Signaler
time.Sleep(1 * time.Second)
mu.Lock()
ready = true
cond.Signal()
mu.Unlock()
time.Sleep(100 * time.Millisecond)
}
Pool (Object Pool)
package main
import (
"fmt"
"sync"
)
type Buffer struct {
data []byte
}
func main() {
pool := &sync.Pool{
New: func() interface{} {
return &Buffer{data: make([]byte, 1024)}
},
}
// Get from pool
buf := pool.Get().(*Buffer)
fmt.Println("Got buffer:", len(buf.data))
// Use buffer
buf.data[0] = 'A'
// Return to pool
pool.Put(buf)
// Get again (might be same instance)
buf2 := pool.Get().(*Buffer)
fmt.Println("Got buffer:", len(buf2.data))
}
Practical Examples
Thread-Safe Map
package main
import (
"fmt"
"sync"
)
type SafeMap struct {
mu sync.RWMutex
items map[string]interface{}
}
func (sm *SafeMap) Set(key string, value interface{}) {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.items[key] = value
}
func (sm *SafeMap) Get(key string) (interface{}, bool) {
sm.mu.RLock()
defer sm.mu.RUnlock()
val, ok := sm.items[key]
return val, ok
}
func (sm *SafeMap) Delete(key string) {
sm.mu.Lock()
defer sm.mu.Unlock()
delete(sm.items, key)
}
func main() {
m := &SafeMap{items: make(map[string]interface{})}
var wg sync.WaitGroup
// Writers
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
m.Set(fmt.Sprintf("key-%d", id), id)
}(i)
}
// Readers
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
val, ok := m.Get(fmt.Sprintf("key-%d", id))
if ok {
fmt.Printf("Got: %v\n", val)
}
}(i)
}
wg.Wait()
}
Semaphore Pattern
package main
import (
"fmt"
"sync"
"time"
)
type Semaphore struct {
sem chan struct{}
}
func NewSemaphore(max int) *Semaphore {
return &Semaphore{
sem: make(chan struct{}, max),
}
}
func (s *Semaphore) Acquire() {
s.sem <- struct{}{}
}
func (s *Semaphore) Release() {
<-s.sem
}
func main() {
sem := NewSemaphore(3)
var wg sync.WaitGroup
for i := 1; i <= 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
sem.Acquire()
defer sem.Release()
fmt.Printf("Worker %d working\n", id)
time.Sleep(100 * time.Millisecond)
fmt.Printf("Worker %d done\n", id)
}(i)
}
wg.Wait()
}
Best Practices
โ Good Practices
- Use defer to unlock - Ensures unlock even on panic
- Keep critical sections small - Minimize lock time
- Use RWMutex for read-heavy workloads - Better performance
- Use channels for communication - Preferred in Go
- Use WaitGroup for synchronization - Track completion
- Use Once for initialization - Thread-safe singleton
- Document synchronization - Explain why locks are needed
- Test with race detector - Catch bugs early
โ Anti-Patterns
// โ Bad: Forgetting to unlock
mu.Lock()
data = value
// Forgot to unlock!
// โ
Good: Use defer
mu.Lock()
defer mu.Unlock()
data = value
// โ Bad: Holding lock too long
mu.Lock()
doExpensiveOperation()
mu.Unlock()
// โ
Good: Minimize critical section
mu.Lock()
data = value
mu.Unlock()
doExpensiveOperation()
// โ Bad: Not using WaitGroup
go worker()
go worker()
// No way to know when done
// โ
Good: Use WaitGroup
var wg sync.WaitGroup
wg.Add(2)
go worker(&wg)
go worker(&wg)
wg.Wait()
Resources and References
Official Documentation
- sync Package - Complete reference
- Effective Go - Concurrency - Best practices
- Race Detector - Detect race conditions
Recommended Reading
- Go Concurrency Patterns - Rob Pike talk (video)
- Advanced Go Concurrency Patterns - Rob Pike talk (video)
- Concurrency is not Parallelism - Rob Pike talk (video)
Tools and Resources
- Race Detector - Detect race conditions
- Delve Debugger - Debug goroutines
- Go Playground - Online Go editor
Summary
The sync package provides essential synchronization tools:
- Mutex for mutual exclusion
- RWMutex for read-heavy workloads
- WaitGroup for synchronization
- Once for one-time initialization
- Cond for condition variables
- Pool for object pooling
Master these primitives for correct concurrent Go programs.
Comments