Golang: Spawn a Fixed Number of Goroutines

Suppose we have a pipeline that contains two tasks, which are two big steps. Both of them can be parallized. Then we can use one worker pool process task1 and another worker pool to process task2. A worker pool is a fixed number of goroutines.

The follwing code computes hash concurrently. First, it sends all the jobs to the jobs channel. Then the hash workers fetch the jobs from the jobs channel. Then the data workers process the results.

This code is used just to illustrate the idea.


func ConcurrentHash(nHashWorkers int, nDataWorkers int, trees []Tree) (HashValueToIdxMap map[HashValue][]TreeIndex) {

    HashValueToIdxMap = make(map[HashValue][]TreeIndex)
    jobs := make(chan int, len(trees))
    results := make(chan HashIdPair, len(trees))
    
    // Compute the hash concurrently
    var wgHashWorkers sync.WaitGroup
    for i := 0; i < nHashWorkers; i++ {
        wgHashWorkers.Add(1)
        go func() {
            defer wgHashWorkers.Done()
            for treeIdx := range jobs {
                hash := trees[treeIdx].Hash()
                results <- HashIdPair{Id: treeIdx, Hash: hash}
            }
        }()
    }
    
    // Send jobs to the workers
    go func() {
        for i := 0; i < len(trees); i++ {
            jobs <- i
        }
        // Close jobs channel to signal workers there's no more work
        close(jobs)
    }()

    wgHashWorkers.Wait()
    close(results)    

    // Process the results concurrently
    var wgDataWorkers sync.WaitGroup
    
    for j := 0; j < nDataWorkers; j++ {
        wgDataWorkers.Add(1)
        go func() {
            defer wgDataWorkers.Done()
            for pair := range results {
                hash := HashValue(pair.Hash)
                treeIdx := TreeIndex(pair.Id)
    
                // do some work concurrently here
                // remeber to use some synchronization mechanism to protect the data
            }
        }()
    }

    wgDataWorkers.Wait()

    return HashValueToIdxMap
}

Here is a another example.

package main

import (
    "fmt"
    "sync"
    "time"
)

// Job represents a single unit of work.
type Job struct {
    ID    int
    Value int
}

// Result holds the outcome of a processed job.
type Result struct {
    JobID  int
    Output string
}

// worker is a generic function that represents a worker in a pool.
// It reads from a jobs channel, processes the work, and sends the result to a results channel.
func worker(id int, wg *sync.WaitGroup, jobs <-chan Job, results chan<- int) {
    defer wg.Done()
    for j := range jobs {
        fmt.Printf("Worker %d started job %d\n", id, j.ID)
        // Simulate work
        time.Sleep(time.Second)
        // Send the result of the work to the results channel
        results <- j.Value * j.Value // Squaring the value
        fmt.Printf("Worker %d finished job %d\n", id, j.ID)
    }
}

// formatterWorker represents a worker in the second stage of the pipeline.
func formatterWorker(id int, wg *sync.WaitGroup, squaredValues <-chan int, finalResults chan<- Result) {
    defer wg.Done()
    jobIDCounter := 1
    for val := range squaredValues {
        fmt.Printf("Formatter %d started formatting value %d\n", id, val)
        // Simulate formatting work
        time.Sleep(500 * time.Millisecond)
        output := fmt.Sprintf("Result of job %d is %d", jobIDCounter, val)
        finalResults <- Result{JobID: jobIDCounter, Output: output}
        jobIDCounter++
    }
}

func main() {
    const numJobs = 10
    const numWorkersStage1 = 3
    const numWorkersStage2 = 2

    jobs := make(chan Job, numJobs)
    squaredValues := make(chan int, numJobs)
    finalResults := make(chan Result, numJobs)

    var wgStage1 sync.WaitGroup
    var wgStage2 sync.WaitGroup

    // --- Start Stage 1 Workers (Squaring) ---
    for w := 1; w <= numWorkersStage1; w++ {
        wgStage1.Add(1)
        go worker(w, &wgStage1, jobs, squaredValues)
    }

    // --- Start Stage 2 Workers (Formatting) ---
    for w := 1; w <= numWorkersStage2; w++ {
        wgStage2.Add(1)
        go formatterWorker(w, &wgStage2, squaredValues, finalResults)
    }

    // --- Send Jobs to Stage 1 ---
    for j := 1; j <= numJobs; j++ {
        jobs <- Job{ID: j, Value: j}
    }
    close(jobs) // Close jobs channel to signal Stage 1 workers there's no more work.

    // --- Wait for Stage 1 to finish, then close the intermediate channel ---
    wgStage1.Wait()
    close(squaredValues) // Close squaredValues channel to signal Stage 2 workers.

    // --- Wait for Stage 2 to finish ---
    wgStage2.Wait()
    close(finalResults) // Close the final channel after all formatters are done.

    // --- Collect and print final results ---
    fmt.Println("\n--- Final Results ---")
    for result := range finalResults {
        fmt.Println(result.Output)
    }
}