Skip to main content
โšก Calmops

Advanced Channel Patterns

Advanced Channel Patterns

Channels are Go’s primary concurrency primitive. Advanced patterns enable building sophisticated concurrent systems.

Fan-Out/Fan-In Pattern

Distribute work across multiple workers and collect results.

Good: Fan-Out/Fan-In

package main

import (
	"fmt"
	"sync"
)

func fanOut(input <-chan int, numWorkers int) []<-chan int {
	channels := make([]<-chan int, numWorkers)
	
	for i := 0; i < numWorkers; i++ {
		ch := make(chan int)
		channels[i] = ch
		
		go func(out chan<- int) {
			defer close(out)
			for val := range input {
				out <- val * 2
			}
		}(ch)
	}
	
	return channels
}

func fanIn(channels ...<-chan int) <-chan int {
	out := make(chan int)
	var wg sync.WaitGroup
	
	for _, ch := range channels {
		wg.Add(1)
		go func(c <-chan int) {
			defer wg.Done()
			for val := range c {
				out <- val
			}
		}(ch)
	}
	
	go func() {
		wg.Wait()
		close(out)
	}()
	
	return out
}

func main() {
	// Create input
	input := make(chan int)
	go func() {
		for i := 1; i <= 10; i++ {
			input <- i
		}
		close(input)
	}()
	
	// Fan out to 3 workers
	workers := fanOut(input, 3)
	
	// Fan in results
	results := fanIn(workers...)
	
	// Collect results
	for result := range results {
		fmt.Println(result)
	}
}

Pipeline Pattern

Chain operations through channels.

Good: Pipeline Pattern

package main

import (
	"fmt"
)

// Stage 1: Generate numbers
func generate(max int) <-chan int {
	out := make(chan int)
	go func() {
		for i := 1; i <= max; i++ {
			out <- i
		}
		close(out)
	}()
	return out
}

// Stage 2: Square numbers
func square(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for val := range in {
			out <- val * val
		}
		close(out)
	}()
	return out
}

// Stage 3: Filter even numbers
func filterEven(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for val := range in {
			if val%2 == 0 {
				out <- val
			}
		}
		close(out)
	}()
	return out
}

func main() {
	// Build pipeline
	numbers := generate(10)
	squared := square(numbers)
	evens := filterEven(squared)
	
	// Consume results
	for result := range evens {
		fmt.Println(result)
	}
}

Multiplexing Pattern

Combine multiple channels into one.

Good: Multiplexing

package main

import (
	"fmt"
	"time"
)

func multiplex(channels ...<-chan string) <-chan string {
	out := make(chan string)
	
	go func() {
		for _, ch := range channels {
			go func(c <-chan string) {
				for val := range c {
					out <- val
				}
			}(ch)
		}
	}()
	
	return out
}

func main() {
	// Create multiple channels
	ch1 := make(chan string)
	ch2 := make(chan string)
	ch3 := make(chan string)
	
	// Send data
	go func() {
		for i := 0; i < 3; i++ {
			ch1 <- fmt.Sprintf("ch1: %d", i)
			time.Sleep(100 * time.Millisecond)
		}
		close(ch1)
	}()
	
	go func() {
		for i := 0; i < 3; i++ {
			ch2 <- fmt.Sprintf("ch2: %d", i)
			time.Sleep(150 * time.Millisecond)
		}
		close(ch2)
	}()
	
	go func() {
		for i := 0; i < 3; i++ {
			ch3 <- fmt.Sprintf("ch3: %d", i)
			time.Sleep(200 * time.Millisecond)
		}
		close(ch3)
	}()
	
	// Multiplex channels
	mux := multiplex(ch1, ch2, ch3)
	
	// Consume results
	for result := range mux {
		fmt.Println(result)
	}
}

Demultiplexing Pattern

Split one channel into multiple.

Good: Demultiplexing

package main

import (
	"fmt"
)

func demultiplex(in <-chan int, numChannels int) []<-chan int {
	channels := make([]<-chan int, numChannels)
	
	for i := 0; i < numChannels; i++ {
		ch := make(chan int)
		channels[i] = ch
		
		go func(out chan<- int, index int) {
			defer close(out)
			for val := range in {
				if val%numChannels == index {
					out <- val
				}
			}
		}(ch, i)
	}
	
	return channels
}

func main() {
	// Create input
	input := make(chan int)
	go func() {
		for i := 1; i <= 10; i++ {
			input <- i
		}
		close(input)
	}()
	
	// Demultiplex into 3 channels
	channels := demultiplex(input, 3)
	
	// Consume from each channel
	for i, ch := range channels {
		fmt.Printf("Channel %d: ", i)
		for val := range ch {
			fmt.Printf("%d ", val)
		}
		fmt.Println()
	}
}

Timeout Pattern

Add timeout to channel operations.

Good: Timeout Pattern

package main

import (
	"fmt"
	"time"
)

func withTimeout(ch <-chan string, timeout time.Duration) <-chan string {
	out := make(chan string)
	
	go func() {
		defer close(out)
		
		select {
		case val := <-ch:
			out <- val
		case <-time.After(timeout):
			out <- "TIMEOUT"
		}
	}()
	
	return out
}

func main() {
	// Slow channel
	slow := make(chan string)
	go func() {
		time.Sleep(2 * time.Second)
		slow <- "data"
	}()
	
	// Add timeout
	result := withTimeout(slow, 1*time.Second)
	
	fmt.Println(<-result)
}

Retry Pattern

Retry operations with exponential backoff.

Good: Retry Pattern

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func retry(operation func() error, maxRetries int) error {
	var err error
	backoff := 100 * time.Millisecond
	
	for i := 0; i < maxRetries; i++ {
		err = operation()
		if err == nil {
			return nil
		}
		
		if i < maxRetries-1 {
			fmt.Printf("Retry %d after %v\n", i+1, backoff)
			time.Sleep(backoff)
			backoff *= 2
		}
	}
	
	return err
}

func main() {
	attempts := 0
	
	err := retry(func() error {
		attempts++
		fmt.Printf("Attempt %d\n", attempts)
		
		if attempts < 3 {
			return fmt.Errorf("temporary error")
		}
		
		return nil
	}, 5)
	
	if err != nil {
		fmt.Println("Failed:", err)
	} else {
		fmt.Println("Success!")
	}
}

Rate Limiting Pattern

Limit throughput using channels.

Good: Rate Limiting

package main

import (
	"fmt"
	"time"
)

func rateLimiter(rate int, duration time.Duration) <-chan struct{} {
	ticker := time.NewTicker(duration / time.Duration(rate))
	ch := make(chan struct{})
	
	go func() {
		defer ticker.Stop()
		for range ticker.C {
			ch <- struct{}{}
		}
	}()
	
	return ch
}

func main() {
	// Allow 5 operations per second
	limiter := rateLimiter(5, time.Second)
	
	for i := 0; i < 10; i++ {
		<-limiter
		fmt.Printf("Operation %d at %v\n", i+1, time.Now().Format("15:04:05.000"))
	}
}

Merge Pattern

Merge multiple channels into one.

Good: Merge Pattern

package main

import (
	"fmt"
	"sync"
)

func merge(channels ...<-chan int) <-chan int {
	out := make(chan int)
	var wg sync.WaitGroup
	
	for _, ch := range channels {
		wg.Add(1)
		go func(c <-chan int) {
			defer wg.Done()
			for val := range c {
				out <- val
			}
		}(ch)
	}
	
	go func() {
		wg.Wait()
		close(out)
	}()
	
	return out
}

func main() {
	// Create channels
	ch1 := make(chan int)
	ch2 := make(chan int)
	ch3 := make(chan int)
	
	// Send data
	go func() {
		for i := 1; i <= 3; i++ {
			ch1 <- i
		}
		close(ch1)
	}()
	
	go func() {
		for i := 10; i <= 12; i++ {
			ch2 <- i
		}
		close(ch2)
	}()
	
	go func() {
		for i := 100; i <= 102; i++ {
			ch3 <- i
		}
		close(ch3)
	}()
	
	// Merge channels
	merged := merge(ch1, ch2, ch3)
	
	// Consume results
	for val := range merged {
		fmt.Println(val)
	}
}

Tee Pattern

Duplicate channel output.

Good: Tee Pattern

package main

import (
	"fmt"
	"sync"
)

func tee(in <-chan int) (<-chan int, <-chan int) {
	out1 := make(chan int)
	out2 := make(chan int)
	
	go func() {
		var wg sync.WaitGroup
		
		for val := range in {
			wg.Add(2)
			
			go func(v int) {
				defer wg.Done()
				out1 <- v
			}(val)
			
			go func(v int) {
				defer wg.Done()
				out2 <- v
			}(val)
		}
		
		wg.Wait()
		close(out1)
		close(out2)
	}()
	
	return out1, out2
}

func main() {
	// Create input
	input := make(chan int)
	go func() {
		for i := 1; i <= 5; i++ {
			input <- i
		}
		close(input)
	}()
	
	// Tee the channel
	ch1, ch2 := tee(input)
	
	// Consume from both
	for i := 0; i < 5; i++ {
		fmt.Printf("ch1: %d, ch2: %d\n", <-ch1, <-ch2)
	}
}

Best Practices

  1. Close Channels Properly: Only sender should close
  2. Use Buffering Wisely: Buffer size affects performance
  3. Avoid Deadlocks: Ensure all goroutines can complete
  4. Handle Cancellation: Use context for cancellation
  5. Document Ownership: Make clear who owns channels
  6. Test Patterns: Thoroughly test concurrent code
  7. Monitor Goroutines: Watch for goroutine leaks
  8. Use Select Carefully: Avoid select with too many cases

Common Pitfalls

  • Closing Receiver Channel: Only sender should close
  • Sending on Closed Channel: Causes panic
  • Deadlocks: Goroutines waiting forever
  • Goroutine Leaks: Goroutines that never exit
  • Race Conditions: Unsynchronized access to shared data

Resources

Summary

Advanced channel patterns enable building sophisticated concurrent systems. Master fan-out/fan-in, pipelines, multiplexing, and other patterns. Always close channels properly, handle cancellation gracefully, and test concurrent code thoroughly. Use these patterns to build robust, scalable Go applications.

Comments