Skip to main content
โšก Calmops

Sync Package: Mutexes and WaitGroups

Sync Package: Mutexes and WaitGroups

The sync package provides essential synchronization primitives for coordinating goroutines. This guide covers mutexes, RWMutex, WaitGroups, and other synchronization tools.

Mutex (Mutual Exclusion)

Basic Mutex Usage

package main

import (
    "fmt"
    "sync"
)

var (
    counter = 0
    mu      sync.Mutex
)

func increment() {
    mu.Lock()
    defer mu.Unlock()
    counter++
}

func main() {
    var wg sync.WaitGroup
    
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            increment()
        }()
    }
    
    wg.Wait()
    fmt.Println("Counter:", counter)  // Always 100
}

Protecting Data Structures

package main

import (
    "fmt"
    "sync"
)

type SafeCounter struct {
    mu    sync.Mutex
    value int
}

func (sc *SafeCounter) Increment() {
    sc.mu.Lock()
    defer sc.mu.Unlock()
    sc.value++
}

func (sc *SafeCounter) Get() int {
    sc.mu.Lock()
    defer sc.mu.Unlock()
    return sc.value
}

func main() {
    counter := &SafeCounter{}
    var wg sync.WaitGroup
    
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Increment()
        }()
    }
    
    wg.Wait()
    fmt.Println("Counter:", counter.Get())  // 100
}

RWMutex (Read-Write Mutex)

When to Use RWMutex

Use RWMutex when you have many readers and few writers:

package main

import (
    "fmt"
    "sync"
    "time"
)

type Cache struct {
    mu    sync.RWMutex
    data  map[string]string
}

func (c *Cache) Get(key string) string {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.data[key]
}

func (c *Cache) Set(key, value string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.data[key] = value
}

func main() {
    cache := &Cache{data: make(map[string]string)}
    
    // Many readers
    for i := 0; i < 10; i++ {
        go func(id int) {
            for j := 0; j < 100; j++ {
                cache.Get("key")
                time.Sleep(1 * time.Millisecond)
            }
        }(i)
    }
    
    // Few writers
    for i := 0; i < 2; i++ {
        go func(id int) {
            for j := 0; j < 10; j++ {
                cache.Set("key", fmt.Sprintf("value-%d", j))
                time.Sleep(10 * time.Millisecond)
            }
        }(i)
    }
    
    time.Sleep(2 * time.Second)
}

RWMutex vs Mutex Performance

package main

import (
    "fmt"
    "sync"
    "time"
)

// Benchmark with Mutex
func benchmarkMutex() time.Duration {
    var mu sync.Mutex
    data := 0
    
    start := time.Now()
    
    var wg sync.WaitGroup
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < 1000; j++ {
                mu.Lock()
                _ = data
                mu.Unlock()
            }
        }()
    }
    
    wg.Wait()
    return time.Since(start)
}

// Benchmark with RWMutex
func benchmarkRWMutex() time.Duration {
    var mu sync.RWMutex
    data := 0
    
    start := time.Now()
    
    var wg sync.WaitGroup
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < 1000; j++ {
                mu.RLock()
                _ = data
                mu.RUnlock()
            }
        }()
    }
    
    wg.Wait()
    return time.Since(start)
}

func main() {
    fmt.Println("Mutex:", benchmarkMutex())
    fmt.Println("RWMutex:", benchmarkRWMutex())
}

WaitGroup

Basic WaitGroup Usage

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    
    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(1 * time.Second)
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup
    
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }
    
    fmt.Println("Waiting for workers...")
    wg.Wait()
    fmt.Println("All workers done")
}

WaitGroup with Error Handling

package main

import (
    "fmt"
    "sync"
)

func task(id int, wg *sync.WaitGroup, results chan<- error) {
    defer wg.Done()
    
    if id%2 == 0 {
        results <- fmt.Errorf("task %d failed", id)
    } else {
        results <- nil
    }
}

func main() {
    var wg sync.WaitGroup
    results := make(chan error, 5)
    
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go task(i, &wg, results)
    }
    
    go func() {
        wg.Wait()
        close(results)
    }()
    
    for err := range results {
        if err != nil {
            fmt.Println("Error:", err)
        } else {
            fmt.Println("Success")
        }
    }
}

Other Sync Primitives

Once (Execute Once)

package main

import (
    "fmt"
    "sync"
)

var (
    instance string
    once     sync.Once
)

func getInstance() string {
    once.Do(func() {
        instance = "Initialized"
        fmt.Println("Initializing...")
    })
    return instance
}

func main() {
    var wg sync.WaitGroup
    
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            fmt.Println(getInstance())
        }()
    }
    
    wg.Wait()
}

// Output:
// Initializing...
// Initialized
// Initialized
// Initialized
// Initialized

Cond (Condition Variable)

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    
    var ready bool
    
    // Waiter
    go func() {
        mu.Lock()
        for !ready {
            cond.Wait()
        }
        fmt.Println("Waiter: Ready!")
        mu.Unlock()
    }()
    
    // Signaler
    time.Sleep(1 * time.Second)
    mu.Lock()
    ready = true
    cond.Signal()
    mu.Unlock()
    
    time.Sleep(100 * time.Millisecond)
}

Pool (Object Pool)

package main

import (
    "fmt"
    "sync"
)

type Buffer struct {
    data []byte
}

func main() {
    pool := &sync.Pool{
        New: func() interface{} {
            return &Buffer{data: make([]byte, 1024)}
        },
    }
    
    // Get from pool
    buf := pool.Get().(*Buffer)
    fmt.Println("Got buffer:", len(buf.data))
    
    // Use buffer
    buf.data[0] = 'A'
    
    // Return to pool
    pool.Put(buf)
    
    // Get again (might be same instance)
    buf2 := pool.Get().(*Buffer)
    fmt.Println("Got buffer:", len(buf2.data))
}

Practical Examples

Thread-Safe Map

package main

import (
    "fmt"
    "sync"
)

type SafeMap struct {
    mu    sync.RWMutex
    items map[string]interface{}
}

func (sm *SafeMap) Set(key string, value interface{}) {
    sm.mu.Lock()
    defer sm.mu.Unlock()
    sm.items[key] = value
}

func (sm *SafeMap) Get(key string) (interface{}, bool) {
    sm.mu.RLock()
    defer sm.mu.RUnlock()
    val, ok := sm.items[key]
    return val, ok
}

func (sm *SafeMap) Delete(key string) {
    sm.mu.Lock()
    defer sm.mu.Unlock()
    delete(sm.items, key)
}

func main() {
    m := &SafeMap{items: make(map[string]interface{})}
    
    var wg sync.WaitGroup
    
    // Writers
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            m.Set(fmt.Sprintf("key-%d", id), id)
        }(i)
    }
    
    // Readers
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            val, ok := m.Get(fmt.Sprintf("key-%d", id))
            if ok {
                fmt.Printf("Got: %v\n", val)
            }
        }(i)
    }
    
    wg.Wait()
}

Semaphore Pattern

package main

import (
    "fmt"
    "sync"
    "time"
)

type Semaphore struct {
    sem chan struct{}
}

func NewSemaphore(max int) *Semaphore {
    return &Semaphore{
        sem: make(chan struct{}, max),
    }
}

func (s *Semaphore) Acquire() {
    s.sem <- struct{}{}
}

func (s *Semaphore) Release() {
    <-s.sem
}

func main() {
    sem := NewSemaphore(3)
    var wg sync.WaitGroup
    
    for i := 1; i <= 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            
            sem.Acquire()
            defer sem.Release()
            
            fmt.Printf("Worker %d working\n", id)
            time.Sleep(100 * time.Millisecond)
            fmt.Printf("Worker %d done\n", id)
        }(i)
    }
    
    wg.Wait()
}

Best Practices

โœ… Good Practices

  1. Use defer to unlock - Ensures unlock even on panic
  2. Keep critical sections small - Minimize lock time
  3. Use RWMutex for read-heavy workloads - Better performance
  4. Use channels for communication - Preferred in Go
  5. Use WaitGroup for synchronization - Track completion
  6. Use Once for initialization - Thread-safe singleton
  7. Document synchronization - Explain why locks are needed
  8. Test with race detector - Catch bugs early

โŒ Anti-Patterns

// โŒ Bad: Forgetting to unlock
mu.Lock()
data = value
// Forgot to unlock!

// โœ… Good: Use defer
mu.Lock()
defer mu.Unlock()
data = value

// โŒ Bad: Holding lock too long
mu.Lock()
doExpensiveOperation()
mu.Unlock()

// โœ… Good: Minimize critical section
mu.Lock()
data = value
mu.Unlock()
doExpensiveOperation()

// โŒ Bad: Not using WaitGroup
go worker()
go worker()
// No way to know when done

// โœ… Good: Use WaitGroup
var wg sync.WaitGroup
wg.Add(2)
go worker(&wg)
go worker(&wg)
wg.Wait()

Resources and References

Official Documentation

Tools and Resources

Summary

The sync package provides essential synchronization tools:

  • Mutex for mutual exclusion
  • RWMutex for read-heavy workloads
  • WaitGroup for synchronization
  • Once for one-time initialization
  • Cond for condition variables
  • Pool for object pooling

Master these primitives for correct concurrent Go programs.

Comments