Advanced Channel Patterns
Channels are Go’s primary concurrency primitive. Advanced patterns enable building sophisticated concurrent systems.
Fan-Out/Fan-In Pattern
Distribute work across multiple workers and collect results.
Good: Fan-Out/Fan-In
package main
import (
"fmt"
"sync"
)
func fanOut(input <-chan int, numWorkers int) []<-chan int {
channels := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
ch := make(chan int)
channels[i] = ch
go func(out chan<- int) {
defer close(out)
for val := range input {
out <- val * 2
}
}(ch)
}
return channels
}
func fanIn(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for val := range c {
out <- val
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
// Create input
input := make(chan int)
go func() {
for i := 1; i <= 10; i++ {
input <- i
}
close(input)
}()
// Fan out to 3 workers
workers := fanOut(input, 3)
// Fan in results
results := fanIn(workers...)
// Collect results
for result := range results {
fmt.Println(result)
}
}
Pipeline Pattern
Chain operations through channels.
Good: Pipeline Pattern
package main
import (
"fmt"
)
// Stage 1: Generate numbers
func generate(max int) <-chan int {
out := make(chan int)
go func() {
for i := 1; i <= max; i++ {
out <- i
}
close(out)
}()
return out
}
// Stage 2: Square numbers
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for val := range in {
out <- val * val
}
close(out)
}()
return out
}
// Stage 3: Filter even numbers
func filterEven(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for val := range in {
if val%2 == 0 {
out <- val
}
}
close(out)
}()
return out
}
func main() {
// Build pipeline
numbers := generate(10)
squared := square(numbers)
evens := filterEven(squared)
// Consume results
for result := range evens {
fmt.Println(result)
}
}
Multiplexing Pattern
Combine multiple channels into one.
Good: Multiplexing
package main
import (
"fmt"
"time"
)
func multiplex(channels ...<-chan string) <-chan string {
out := make(chan string)
go func() {
for _, ch := range channels {
go func(c <-chan string) {
for val := range c {
out <- val
}
}(ch)
}
}()
return out
}
func main() {
// Create multiple channels
ch1 := make(chan string)
ch2 := make(chan string)
ch3 := make(chan string)
// Send data
go func() {
for i := 0; i < 3; i++ {
ch1 <- fmt.Sprintf("ch1: %d", i)
time.Sleep(100 * time.Millisecond)
}
close(ch1)
}()
go func() {
for i := 0; i < 3; i++ {
ch2 <- fmt.Sprintf("ch2: %d", i)
time.Sleep(150 * time.Millisecond)
}
close(ch2)
}()
go func() {
for i := 0; i < 3; i++ {
ch3 <- fmt.Sprintf("ch3: %d", i)
time.Sleep(200 * time.Millisecond)
}
close(ch3)
}()
// Multiplex channels
mux := multiplex(ch1, ch2, ch3)
// Consume results
for result := range mux {
fmt.Println(result)
}
}
Demultiplexing Pattern
Split one channel into multiple.
Good: Demultiplexing
package main
import (
"fmt"
)
func demultiplex(in <-chan int, numChannels int) []<-chan int {
channels := make([]<-chan int, numChannels)
for i := 0; i < numChannels; i++ {
ch := make(chan int)
channels[i] = ch
go func(out chan<- int, index int) {
defer close(out)
for val := range in {
if val%numChannels == index {
out <- val
}
}
}(ch, i)
}
return channels
}
func main() {
// Create input
input := make(chan int)
go func() {
for i := 1; i <= 10; i++ {
input <- i
}
close(input)
}()
// Demultiplex into 3 channels
channels := demultiplex(input, 3)
// Consume from each channel
for i, ch := range channels {
fmt.Printf("Channel %d: ", i)
for val := range ch {
fmt.Printf("%d ", val)
}
fmt.Println()
}
}
Timeout Pattern
Add timeout to channel operations.
Good: Timeout Pattern
package main
import (
"fmt"
"time"
)
func withTimeout(ch <-chan string, timeout time.Duration) <-chan string {
out := make(chan string)
go func() {
defer close(out)
select {
case val := <-ch:
out <- val
case <-time.After(timeout):
out <- "TIMEOUT"
}
}()
return out
}
func main() {
// Slow channel
slow := make(chan string)
go func() {
time.Sleep(2 * time.Second)
slow <- "data"
}()
// Add timeout
result := withTimeout(slow, 1*time.Second)
fmt.Println(<-result)
}
Retry Pattern
Retry operations with exponential backoff.
Good: Retry Pattern
package main
import (
"fmt"
"math/rand"
"time"
)
func retry(operation func() error, maxRetries int) error {
var err error
backoff := 100 * time.Millisecond
for i := 0; i < maxRetries; i++ {
err = operation()
if err == nil {
return nil
}
if i < maxRetries-1 {
fmt.Printf("Retry %d after %v\n", i+1, backoff)
time.Sleep(backoff)
backoff *= 2
}
}
return err
}
func main() {
attempts := 0
err := retry(func() error {
attempts++
fmt.Printf("Attempt %d\n", attempts)
if attempts < 3 {
return fmt.Errorf("temporary error")
}
return nil
}, 5)
if err != nil {
fmt.Println("Failed:", err)
} else {
fmt.Println("Success!")
}
}
Rate Limiting Pattern
Limit throughput using channels.
Good: Rate Limiting
package main
import (
"fmt"
"time"
)
func rateLimiter(rate int, duration time.Duration) <-chan struct{} {
ticker := time.NewTicker(duration / time.Duration(rate))
ch := make(chan struct{})
go func() {
defer ticker.Stop()
for range ticker.C {
ch <- struct{}{}
}
}()
return ch
}
func main() {
// Allow 5 operations per second
limiter := rateLimiter(5, time.Second)
for i := 0; i < 10; i++ {
<-limiter
fmt.Printf("Operation %d at %v\n", i+1, time.Now().Format("15:04:05.000"))
}
}
Merge Pattern
Merge multiple channels into one.
Good: Merge Pattern
package main
import (
"fmt"
"sync"
)
func merge(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for val := range c {
out <- val
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
// Create channels
ch1 := make(chan int)
ch2 := make(chan int)
ch3 := make(chan int)
// Send data
go func() {
for i := 1; i <= 3; i++ {
ch1 <- i
}
close(ch1)
}()
go func() {
for i := 10; i <= 12; i++ {
ch2 <- i
}
close(ch2)
}()
go func() {
for i := 100; i <= 102; i++ {
ch3 <- i
}
close(ch3)
}()
// Merge channels
merged := merge(ch1, ch2, ch3)
// Consume results
for val := range merged {
fmt.Println(val)
}
}
Tee Pattern
Duplicate channel output.
Good: Tee Pattern
package main
import (
"fmt"
"sync"
)
func tee(in <-chan int) (<-chan int, <-chan int) {
out1 := make(chan int)
out2 := make(chan int)
go func() {
var wg sync.WaitGroup
for val := range in {
wg.Add(2)
go func(v int) {
defer wg.Done()
out1 <- v
}(val)
go func(v int) {
defer wg.Done()
out2 <- v
}(val)
}
wg.Wait()
close(out1)
close(out2)
}()
return out1, out2
}
func main() {
// Create input
input := make(chan int)
go func() {
for i := 1; i <= 5; i++ {
input <- i
}
close(input)
}()
// Tee the channel
ch1, ch2 := tee(input)
// Consume from both
for i := 0; i < 5; i++ {
fmt.Printf("ch1: %d, ch2: %d\n", <-ch1, <-ch2)
}
}
Best Practices
- Close Channels Properly: Only sender should close
- Use Buffering Wisely: Buffer size affects performance
- Avoid Deadlocks: Ensure all goroutines can complete
- Handle Cancellation: Use context for cancellation
- Document Ownership: Make clear who owns channels
- Test Patterns: Thoroughly test concurrent code
- Monitor Goroutines: Watch for goroutine leaks
- Use Select Carefully: Avoid select with too many cases
Common Pitfalls
- Closing Receiver Channel: Only sender should close
- Sending on Closed Channel: Causes panic
- Deadlocks: Goroutines waiting forever
- Goroutine Leaks: Goroutines that never exit
- Race Conditions: Unsynchronized access to shared data
Resources
Summary
Advanced channel patterns enable building sophisticated concurrent systems. Master fan-out/fan-in, pipelines, multiplexing, and other patterns. Always close channels properly, handle cancellation gracefully, and test concurrent code thoroughly. Use these patterns to build robust, scalable Go applications.
Comments