Semaphores and Rate Limiting
Semaphores and rate limiting are essential for controlling concurrent access and throughput in Go applications.
Semaphore Basics
A semaphore is a counter that controls access to a shared resource.
Good: Basic Semaphore
package main
import (
"fmt"
"sync"
"time"
)
type Semaphore struct {
sem chan struct{}
}
func NewSemaphore(maxConcurrent int) *Semaphore {
return &Semaphore{
sem: make(chan struct{}, maxConcurrent),
}
}
func (s *Semaphore) Acquire() {
s.sem <- struct{}{}
}
func (s *Semaphore) Release() {
<-s.sem
}
func main() {
sem := NewSemaphore(3) // Max 3 concurrent operations
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
sem.Acquire()
defer sem.Release()
fmt.Printf("Task %d started\n", id)
time.Sleep(500 * time.Millisecond)
fmt.Printf("Task %d finished\n", id)
}(i)
}
wg.Wait()
}
Bad: No Concurrency Control
// โ AVOID: Unlimited concurrent operations
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
// Creates 10000 goroutines - resource exhaustion!
for i := 0; i < 10000; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Task %d started\n", id)
time.Sleep(500 * time.Millisecond)
fmt.Printf("Task %d finished\n", id)
}(i)
}
wg.Wait()
}
Weighted Semaphore
Semaphore with variable resource costs.
Good: Weighted Semaphore
package main
import (
"fmt"
"sync"
"time"
)
type WeightedSemaphore struct {
sem chan int
max int
}
func NewWeightedSemaphore(maxWeight int) *WeightedSemaphore {
return &WeightedSemaphore{
sem: make(chan int, 1),
max: maxWeight,
}
}
func (ws *WeightedSemaphore) Acquire(weight int) {
if weight > ws.max {
panic("weight exceeds max")
}
current := <-ws.sem
for current+weight > ws.max {
ws.sem <- current
time.Sleep(10 * time.Millisecond)
current = <-ws.sem
}
ws.sem <- current + weight
}
func (ws *WeightedSemaphore) Release(weight int) {
current := <-ws.sem
ws.sem <- current - weight
}
func main() {
sem := NewWeightedSemaphore(100)
var wg sync.WaitGroup
// Heavy task (weight 50)
wg.Add(1)
go func() {
defer wg.Done()
sem.Acquire(50)
defer sem.Release(50)
fmt.Println("Heavy task started")
time.Sleep(500 * time.Millisecond)
fmt.Println("Heavy task finished")
}()
// Light tasks (weight 10 each)
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
sem.Acquire(10)
defer sem.Release(10)
fmt.Printf("Light task %d started\n", id)
time.Sleep(300 * time.Millisecond)
fmt.Printf("Light task %d finished\n", id)
}(i)
}
wg.Wait()
}
Token Bucket Rate Limiter
Classic rate limiting algorithm.
Good: Token Bucket
package main
import (
"fmt"
"sync"
"time"
)
type TokenBucket struct {
mu sync.Mutex
tokens float64
maxTokens float64
refillRate float64
lastRefill time.Time
}
func NewTokenBucket(maxTokens, refillRate float64) *TokenBucket {
return &TokenBucket{
tokens: maxTokens,
maxTokens: maxTokens,
refillRate: refillRate,
lastRefill: time.Now(),
}
}
func (tb *TokenBucket) refill() {
now := time.Now()
elapsed := now.Sub(tb.lastRefill).Seconds()
tb.tokens = min(tb.maxTokens, tb.tokens+elapsed*tb.refillRate)
tb.lastRefill = now
}
func (tb *TokenBucket) Allow(tokens float64) bool {
tb.mu.Lock()
defer tb.mu.Unlock()
tb.refill()
if tb.tokens >= tokens {
tb.tokens -= tokens
return true
}
return false
}
func min(a, b float64) float64 {
if a < b {
return a
}
return b
}
func main() {
// 10 tokens per second
limiter := NewTokenBucket(10, 10)
for i := 0; i < 20; i++ {
if limiter.Allow(1) {
fmt.Printf("Request %d allowed\n", i)
} else {
fmt.Printf("Request %d denied\n", i)
}
if i == 9 {
time.Sleep(1 * time.Second)
}
}
}
Leaky Bucket Rate Limiter
Alternative rate limiting algorithm.
Good: Leaky Bucket
package main
import (
"fmt"
"sync"
"time"
)
type LeakyBucket struct {
mu sync.Mutex
queue chan struct{}
capacity int
}
func NewLeakyBucket(capacity int, leakRate time.Duration) *LeakyBucket {
lb := &LeakyBucket{
queue: make(chan struct{}, capacity),
capacity: capacity,
}
// Leak goroutine
go func() {
ticker := time.NewTicker(leakRate)
defer ticker.Stop()
for range ticker.C {
select {
case <-lb.queue:
// Leak one item
default:
}
}
}()
return lb
}
func (lb *LeakyBucket) Allow() bool {
select {
case lb.queue <- struct{}{}:
return true
default:
return false
}
}
func main() {
// Capacity 5, leak 1 item per 100ms
limiter := NewLeakyBucket(5, 100*time.Millisecond)
for i := 0; i < 20; i++ {
if limiter.Allow() {
fmt.Printf("Request %d allowed\n", i)
} else {
fmt.Printf("Request %d denied\n", i)
}
time.Sleep(50 * time.Millisecond)
}
}
Sliding Window Rate Limiter
Time-window based rate limiting.
Good: Sliding Window
package main
import (
"fmt"
"sync"
"time"
)
type SlidingWindow struct {
mu sync.Mutex
requests []time.Time
maxReqs int
window time.Duration
}
func NewSlidingWindow(maxReqs int, window time.Duration) *SlidingWindow {
return &SlidingWindow{
requests: make([]time.Time, 0),
maxReqs: maxReqs,
window: window,
}
}
func (sw *SlidingWindow) Allow() bool {
sw.mu.Lock()
defer sw.mu.Unlock()
now := time.Now()
cutoff := now.Add(-sw.window)
// Remove old requests
i := 0
for i < len(sw.requests) && sw.requests[i].Before(cutoff) {
i++
}
sw.requests = sw.requests[i:]
// Check if we can allow
if len(sw.requests) < sw.maxReqs {
sw.requests = append(sw.requests, now)
return true
}
return false
}
func main() {
// 5 requests per second
limiter := NewSlidingWindow(5, time.Second)
for i := 0; i < 20; i++ {
if limiter.Allow() {
fmt.Printf("Request %d allowed\n", i)
} else {
fmt.Printf("Request %d denied\n", i)
}
time.Sleep(100 * time.Millisecond)
}
}
Distributed Rate Limiting
Rate limiting across multiple instances.
Good: Distributed Limiter Pattern
package main
import (
"fmt"
"sync"
"time"
)
type DistributedLimiter struct {
mu sync.Mutex
quota int
refillAt time.Time
interval time.Duration
}
func NewDistributedLimiter(quota int, interval time.Duration) *DistributedLimiter {
return &DistributedLimiter{
quota: quota,
refillAt: time.Now().Add(interval),
interval: interval,
}
}
func (dl *DistributedLimiter) Allow(tokens int) bool {
dl.mu.Lock()
defer dl.mu.Unlock()
now := time.Now()
// Refill if window expired
if now.After(dl.refillAt) {
dl.quota = tokens
dl.refillAt = now.Add(dl.interval)
return true
}
// Check quota
if dl.quota >= tokens {
dl.quota -= tokens
return true
}
return false
}
func main() {
// 10 tokens per second
limiter := NewDistributedLimiter(10, time.Second)
for i := 0; i < 20; i++ {
if limiter.Allow(1) {
fmt.Printf("Request %d allowed\n", i)
} else {
fmt.Printf("Request %d denied\n", i)
}
if i == 9 {
time.Sleep(1 * time.Second)
}
}
}
Adaptive Rate Limiting
Adjust rate based on system load.
Good: Adaptive Limiter
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
type AdaptiveLimiter struct {
mu sync.Mutex
maxGoroutines int
currentRate int
minRate int
maxRate int
}
func NewAdaptiveLimiter(minRate, maxRate int) *AdaptiveLimiter {
return &AdaptiveLimiter{
currentRate: maxRate,
minRate: minRate,
maxRate: maxRate,
maxGoroutines: runtime.NumCPU() * 100,
}
}
func (al *AdaptiveLimiter) Allow() bool {
al.mu.Lock()
defer al.mu.Unlock()
// Adjust rate based on goroutine count
numGoroutines := runtime.NumGoroutine()
if numGoroutines > al.maxGoroutines {
// Reduce rate
al.currentRate = max(al.minRate, al.currentRate-1)
} else if numGoroutines < al.maxGoroutines/2 {
// Increase rate
al.currentRate = min(al.maxRate, al.currentRate+1)
}
return true
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
func max(a, b int) int {
if a > b {
return a
}
return b
}
func main() {
limiter := NewAdaptiveLimiter(5, 100)
for i := 0; i < 20; i++ {
if limiter.Allow() {
fmt.Printf("Request %d allowed\n", i)
}
time.Sleep(50 * time.Millisecond)
}
}
Best Practices
- Choose Right Algorithm: Token bucket for bursty traffic, leaky bucket for smooth
- Set Appropriate Limits: Based on system capacity
- Monitor Metrics: Track rejection rates and latency
- Graceful Degradation: Handle rate limit gracefully
- Communicate Limits: Inform clients of rate limits
- Test Under Load: Verify behavior under stress
- Adjust Dynamically: Adapt to system conditions
- Document Limits: Make limits clear to users
Common Pitfalls
- Too Strict Limits: Rejecting valid requests
- Too Loose Limits: Not protecting system
- No Backoff: Clients retry immediately
- Unfair Distribution: Some clients get more quota
- No Monitoring: Can’t detect issues
Resources
Summary
Semaphores and rate limiting are essential for controlling concurrent access and throughput. Use token bucket for bursty traffic, leaky bucket for smooth flow. Implement adaptive rate limiting to adjust to system conditions. Monitor metrics and test under load to ensure effectiveness.
Comments