Skip to main content
โšก Calmops

Semaphores and Rate Limiting

Semaphores and Rate Limiting

Semaphores and rate limiting are essential for controlling concurrent access and throughput in Go applications.

Semaphore Basics

A semaphore is a counter that controls access to a shared resource.

Good: Basic Semaphore

package main

import (
	"fmt"
	"sync"
	"time"
)

type Semaphore struct {
	sem chan struct{}
}

func NewSemaphore(maxConcurrent int) *Semaphore {
	return &Semaphore{
		sem: make(chan struct{}, maxConcurrent),
	}
}

func (s *Semaphore) Acquire() {
	s.sem <- struct{}{}
}

func (s *Semaphore) Release() {
	<-s.sem
}

func main() {
	sem := NewSemaphore(3) // Max 3 concurrent operations
	
	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			sem.Acquire()
			defer sem.Release()
			
			fmt.Printf("Task %d started\n", id)
			time.Sleep(500 * time.Millisecond)
			fmt.Printf("Task %d finished\n", id)
		}(i)
	}
	
	wg.Wait()
}

Bad: No Concurrency Control

// โŒ AVOID: Unlimited concurrent operations
package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var wg sync.WaitGroup
	
	// Creates 10000 goroutines - resource exhaustion!
	for i := 0; i < 10000; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			fmt.Printf("Task %d started\n", id)
			time.Sleep(500 * time.Millisecond)
			fmt.Printf("Task %d finished\n", id)
		}(i)
	}
	
	wg.Wait()
}

Weighted Semaphore

Semaphore with variable resource costs.

Good: Weighted Semaphore

package main

import (
	"fmt"
	"sync"
	"time"
)

type WeightedSemaphore struct {
	sem chan int
	max int
}

func NewWeightedSemaphore(maxWeight int) *WeightedSemaphore {
	return &WeightedSemaphore{
		sem: make(chan int, 1),
		max: maxWeight,
	}
}

func (ws *WeightedSemaphore) Acquire(weight int) {
	if weight > ws.max {
		panic("weight exceeds max")
	}
	
	current := <-ws.sem
	for current+weight > ws.max {
		ws.sem <- current
		time.Sleep(10 * time.Millisecond)
		current = <-ws.sem
	}
	ws.sem <- current + weight
}

func (ws *WeightedSemaphore) Release(weight int) {
	current := <-ws.sem
	ws.sem <- current - weight
}

func main() {
	sem := NewWeightedSemaphore(100)
	
	var wg sync.WaitGroup
	
	// Heavy task (weight 50)
	wg.Add(1)
	go func() {
		defer wg.Done()
		sem.Acquire(50)
		defer sem.Release(50)
		
		fmt.Println("Heavy task started")
		time.Sleep(500 * time.Millisecond)
		fmt.Println("Heavy task finished")
	}()
	
	// Light tasks (weight 10 each)
	for i := 0; i < 3; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			sem.Acquire(10)
			defer sem.Release(10)
			
			fmt.Printf("Light task %d started\n", id)
			time.Sleep(300 * time.Millisecond)
			fmt.Printf("Light task %d finished\n", id)
		}(i)
	}
	
	wg.Wait()
}

Token Bucket Rate Limiter

Classic rate limiting algorithm.

Good: Token Bucket

package main

import (
	"fmt"
	"sync"
	"time"
)

type TokenBucket struct {
	mu       sync.Mutex
	tokens   float64
	maxTokens float64
	refillRate float64
	lastRefill time.Time
}

func NewTokenBucket(maxTokens, refillRate float64) *TokenBucket {
	return &TokenBucket{
		tokens:     maxTokens,
		maxTokens:  maxTokens,
		refillRate: refillRate,
		lastRefill: time.Now(),
	}
}

func (tb *TokenBucket) refill() {
	now := time.Now()
	elapsed := now.Sub(tb.lastRefill).Seconds()
	tb.tokens = min(tb.maxTokens, tb.tokens+elapsed*tb.refillRate)
	tb.lastRefill = now
}

func (tb *TokenBucket) Allow(tokens float64) bool {
	tb.mu.Lock()
	defer tb.mu.Unlock()
	
	tb.refill()
	
	if tb.tokens >= tokens {
		tb.tokens -= tokens
		return true
	}
	
	return false
}

func min(a, b float64) float64 {
	if a < b {
		return a
	}
	return b
}

func main() {
	// 10 tokens per second
	limiter := NewTokenBucket(10, 10)
	
	for i := 0; i < 20; i++ {
		if limiter.Allow(1) {
			fmt.Printf("Request %d allowed\n", i)
		} else {
			fmt.Printf("Request %d denied\n", i)
		}
		
		if i == 9 {
			time.Sleep(1 * time.Second)
		}
	}
}

Leaky Bucket Rate Limiter

Alternative rate limiting algorithm.

Good: Leaky Bucket

package main

import (
	"fmt"
	"sync"
	"time"
)

type LeakyBucket struct {
	mu       sync.Mutex
	queue    chan struct{}
	capacity int
}

func NewLeakyBucket(capacity int, leakRate time.Duration) *LeakyBucket {
	lb := &LeakyBucket{
		queue:    make(chan struct{}, capacity),
		capacity: capacity,
	}
	
	// Leak goroutine
	go func() {
		ticker := time.NewTicker(leakRate)
		defer ticker.Stop()
		
		for range ticker.C {
			select {
			case <-lb.queue:
				// Leak one item
			default:
			}
		}
	}()
	
	return lb
}

func (lb *LeakyBucket) Allow() bool {
	select {
	case lb.queue <- struct{}{}:
		return true
	default:
		return false
	}
}

func main() {
	// Capacity 5, leak 1 item per 100ms
	limiter := NewLeakyBucket(5, 100*time.Millisecond)
	
	for i := 0; i < 20; i++ {
		if limiter.Allow() {
			fmt.Printf("Request %d allowed\n", i)
		} else {
			fmt.Printf("Request %d denied\n", i)
		}
		
		time.Sleep(50 * time.Millisecond)
	}
}

Sliding Window Rate Limiter

Time-window based rate limiting.

Good: Sliding Window

package main

import (
	"fmt"
	"sync"
	"time"
)

type SlidingWindow struct {
	mu       sync.Mutex
	requests []time.Time
	maxReqs  int
	window   time.Duration
}

func NewSlidingWindow(maxReqs int, window time.Duration) *SlidingWindow {
	return &SlidingWindow{
		requests: make([]time.Time, 0),
		maxReqs:  maxReqs,
		window:   window,
	}
}

func (sw *SlidingWindow) Allow() bool {
	sw.mu.Lock()
	defer sw.mu.Unlock()
	
	now := time.Now()
	cutoff := now.Add(-sw.window)
	
	// Remove old requests
	i := 0
	for i < len(sw.requests) && sw.requests[i].Before(cutoff) {
		i++
	}
	sw.requests = sw.requests[i:]
	
	// Check if we can allow
	if len(sw.requests) < sw.maxReqs {
		sw.requests = append(sw.requests, now)
		return true
	}
	
	return false
}

func main() {
	// 5 requests per second
	limiter := NewSlidingWindow(5, time.Second)
	
	for i := 0; i < 20; i++ {
		if limiter.Allow() {
			fmt.Printf("Request %d allowed\n", i)
		} else {
			fmt.Printf("Request %d denied\n", i)
		}
		
		time.Sleep(100 * time.Millisecond)
	}
}

Distributed Rate Limiting

Rate limiting across multiple instances.

Good: Distributed Limiter Pattern

package main

import (
	"fmt"
	"sync"
	"time"
)

type DistributedLimiter struct {
	mu       sync.Mutex
	quota    int
	refillAt time.Time
	interval time.Duration
}

func NewDistributedLimiter(quota int, interval time.Duration) *DistributedLimiter {
	return &DistributedLimiter{
		quota:    quota,
		refillAt: time.Now().Add(interval),
		interval: interval,
	}
}

func (dl *DistributedLimiter) Allow(tokens int) bool {
	dl.mu.Lock()
	defer dl.mu.Unlock()
	
	now := time.Now()
	
	// Refill if window expired
	if now.After(dl.refillAt) {
		dl.quota = tokens
		dl.refillAt = now.Add(dl.interval)
		return true
	}
	
	// Check quota
	if dl.quota >= tokens {
		dl.quota -= tokens
		return true
	}
	
	return false
}

func main() {
	// 10 tokens per second
	limiter := NewDistributedLimiter(10, time.Second)
	
	for i := 0; i < 20; i++ {
		if limiter.Allow(1) {
			fmt.Printf("Request %d allowed\n", i)
		} else {
			fmt.Printf("Request %d denied\n", i)
		}
		
		if i == 9 {
			time.Sleep(1 * time.Second)
		}
	}
}

Adaptive Rate Limiting

Adjust rate based on system load.

Good: Adaptive Limiter

package main

import (
	"fmt"
	"runtime"
	"sync"
	"time"
)

type AdaptiveLimiter struct {
	mu          sync.Mutex
	maxGoroutines int
	currentRate int
	minRate     int
	maxRate     int
}

func NewAdaptiveLimiter(minRate, maxRate int) *AdaptiveLimiter {
	return &AdaptiveLimiter{
		currentRate:   maxRate,
		minRate:       minRate,
		maxRate:       maxRate,
		maxGoroutines: runtime.NumCPU() * 100,
	}
}

func (al *AdaptiveLimiter) Allow() bool {
	al.mu.Lock()
	defer al.mu.Unlock()
	
	// Adjust rate based on goroutine count
	numGoroutines := runtime.NumGoroutine()
	
	if numGoroutines > al.maxGoroutines {
		// Reduce rate
		al.currentRate = max(al.minRate, al.currentRate-1)
	} else if numGoroutines < al.maxGoroutines/2 {
		// Increase rate
		al.currentRate = min(al.maxRate, al.currentRate+1)
	}
	
	return true
}

func min(a, b int) int {
	if a < b {
		return a
	}
	return b
}

func max(a, b int) int {
	if a > b {
		return a
	}
	return b
}

func main() {
	limiter := NewAdaptiveLimiter(5, 100)
	
	for i := 0; i < 20; i++ {
		if limiter.Allow() {
			fmt.Printf("Request %d allowed\n", i)
		}
		time.Sleep(50 * time.Millisecond)
	}
}

Best Practices

  1. Choose Right Algorithm: Token bucket for bursty traffic, leaky bucket for smooth
  2. Set Appropriate Limits: Based on system capacity
  3. Monitor Metrics: Track rejection rates and latency
  4. Graceful Degradation: Handle rate limit gracefully
  5. Communicate Limits: Inform clients of rate limits
  6. Test Under Load: Verify behavior under stress
  7. Adjust Dynamically: Adapt to system conditions
  8. Document Limits: Make limits clear to users

Common Pitfalls

  • Too Strict Limits: Rejecting valid requests
  • Too Loose Limits: Not protecting system
  • No Backoff: Clients retry immediately
  • Unfair Distribution: Some clients get more quota
  • No Monitoring: Can’t detect issues

Resources

Summary

Semaphores and rate limiting are essential for controlling concurrent access and throughput. Use token bucket for bursty traffic, leaky bucket for smooth flow. Implement adaptive rate limiting to adjust to system conditions. Monitor metrics and test under load to ensure effectiveness.

Comments