Suppose we have a pipeline that contains two tasks, which are two big steps. Both of them can be parallized. Then we can use one worker pool process task1 and another worker pool to process task2. A worker pool is a fixed number of goroutines.
The follwing code computes hash concurrently. First, it sends all the jobs to the jobs channel. Then the hash workers fetch the jobs from the jobs channel. Then the data workers process the results.
This code is used just to illustrate the idea.
func ConcurrentHash(nHashWorkers int, nDataWorkers int, trees []Tree) (HashValueToIdxMap map[HashValue][]TreeIndex) {
HashValueToIdxMap = make(map[HashValue][]TreeIndex)
jobs := make(chan int, len(trees))
results := make(chan HashIdPair, len(trees))
// Compute the hash concurrently
var wgHashWorkers sync.WaitGroup
for i := 0; i < nHashWorkers; i++ {
wgHashWorkers.Add(1)
go func() {
defer wgHashWorkers.Done()
for treeIdx := range jobs {
hash := trees[treeIdx].Hash()
results <- HashIdPair{Id: treeIdx, Hash: hash}
}
}()
}
// Send jobs to the workers
go func() {
for i := 0; i < len(trees); i++ {
jobs <- i
}
// Close jobs channel to signal workers there's no more work
close(jobs)
}()
wgHashWorkers.Wait()
close(results)
// Process the results concurrently
var wgDataWorkers sync.WaitGroup
for j := 0; j < nDataWorkers; j++ {
wgDataWorkers.Add(1)
go func() {
defer wgDataWorkers.Done()
for pair := range results {
hash := HashValue(pair.Hash)
treeIdx := TreeIndex(pair.Id)
// do some work concurrently here
// remeber to use some synchronization mechanism to protect the data
}
}()
}
wgDataWorkers.Wait()
return HashValueToIdxMap
}
Here is a another example.
package main
import (
"fmt"
"sync"
"time"
)
// Job represents a single unit of work.
type Job struct {
ID int
Value int
}
// Result holds the outcome of a processed job.
type Result struct {
JobID int
Output string
}
// worker is a generic function that represents a worker in a pool.
// It reads from a jobs channel, processes the work, and sends the result to a results channel.
func worker(id int, wg *sync.WaitGroup, jobs <-chan Job, results chan<- int) {
defer wg.Done()
for j := range jobs {
fmt.Printf("Worker %d started job %d\n", id, j.ID)
// Simulate work
time.Sleep(time.Second)
// Send the result of the work to the results channel
results <- j.Value * j.Value // Squaring the value
fmt.Printf("Worker %d finished job %d\n", id, j.ID)
}
}
// formatterWorker represents a worker in the second stage of the pipeline.
func formatterWorker(id int, wg *sync.WaitGroup, squaredValues <-chan int, finalResults chan<- Result) {
defer wg.Done()
jobIDCounter := 1
for val := range squaredValues {
fmt.Printf("Formatter %d started formatting value %d\n", id, val)
// Simulate formatting work
time.Sleep(500 * time.Millisecond)
output := fmt.Sprintf("Result of job %d is %d", jobIDCounter, val)
finalResults <- Result{JobID: jobIDCounter, Output: output}
jobIDCounter++
}
}
func main() {
const numJobs = 10
const numWorkersStage1 = 3
const numWorkersStage2 = 2
jobs := make(chan Job, numJobs)
squaredValues := make(chan int, numJobs)
finalResults := make(chan Result, numJobs)
var wgStage1 sync.WaitGroup
var wgStage2 sync.WaitGroup
// --- Start Stage 1 Workers (Squaring) ---
for w := 1; w <= numWorkersStage1; w++ {
wgStage1.Add(1)
go worker(w, &wgStage1, jobs, squaredValues)
}
// --- Start Stage 2 Workers (Formatting) ---
for w := 1; w <= numWorkersStage2; w++ {
wgStage2.Add(1)
go formatterWorker(w, &wgStage2, squaredValues, finalResults)
}
// --- Send Jobs to Stage 1 ---
for j := 1; j <= numJobs; j++ {
jobs <- Job{ID: j, Value: j}
}
close(jobs) // Close jobs channel to signal Stage 1 workers there's no more work.
// --- Wait for Stage 1 to finish, then close the intermediate channel ---
wgStage1.Wait()
close(squaredValues) // Close squaredValues channel to signal Stage 2 workers.
// --- Wait for Stage 2 to finish ---
wgStage2.Wait()
close(finalResults) // Close the final channel after all formatters are done.
// --- Collect and print final results ---
fmt.Println("\n--- Final Results ---")
for result := range finalResults {
fmt.Println(result.Output)
}
}