Concurrency in Go

Concurrency is model for software consturction.

Concurrency is the key to designing high performance network services. Go’s concurrency primitives (goroutines and channels) provide a simple and efficient means of expressing concurrent execution. In this talk we see how tricky concurrency problems can be solved gracefully with simple Go code.

The original model Go’s concurrency is from CSP(Communicating Sequential Processes).

The Go approach

Don’t communicate by sharing memory, share memory by communicating.

In other words, you don’t have some memory shared and protected by locks and mutexes from parallel access, instead, you use the channel to pass the data back and forth between the goroutines.

Go’s concurrency toolset

  • go routines
  • channels
  • select
  • sync package(classic mutexes)

Deadlock

A deadlocked program is one in which all concurrent processes are waiting on one another. In this state, the program will never recover without outside intervention.

Race Conditions

A race condition occurs when two or more operations must execute in the correct order, but the program has not been written so that this order is guaranteed to be maintained. Most of the time, this shows up in what’s called a data race, where one concurrent operation attempts to read a variable while at some undetermined time another con‐ current operation is attempting to write to the same variable.

Mutex locks

type Counter struct { 
	mu sync.Mutex 
	value int 
}

func (c *Counter) Increment() 
{ 
	c.mu.Lock() 
	defer c.mu.Unlock() 
	c.value++ 
}

Start goroutines

Common function

func main() { 
	go sayHello() // continue doing other things 
} 
func sayHello() 
{ 
	fmt.Println("hello")
}
// no output

Anonymous function

go func() { 
	fmt.Println("hello") 
}()

Anonymous function with parameter

go func(URL string) { 
	fmt.Println("hello", URL) 
}("http://bcd.com")

Fork and join model

func main() {
	var wg sync.WaitGroup
	sayHello := func() { 
		defer wg.Done() 
		fmt.Println("hello") 
	} 
	wg.Add(1) 
	go sayHello() 
	wg.Wait()
}

Channels

  • Think of a bucket chain
  • 3 components: sender, buffer, receiver
  • The buffer is optional

A channel in Go provides a connection between two goroutines, allowing them to communicate. A channel with no buffer is a blocking(synchronized) channel. A channel operation is a communicate and synchronize operation.

A bufferd channel is not synchronized.

Declaring and initializing

var c chan int
c = make(chan int)

// or
c := make(chan int)

Sending on a channel

c <- 1

Receiving from a channel, the “arrow” indicates the direction of the data flow.

value = <-c

Blocking channels

blocked

unbuffered := make(chan int)

// blocks
unbuffered <- 1
// blocks
a := <- unbuffered 

not blocked

go func() {<- unbuffered }()
unbuffered <- 1
unbuffered := make(chan int, 1)

// still blocks
a := <- buffered

// fine
buffered <- 1

// blocks (buffer full)
buffered <- 2

Blocking breaks concurrency

Closing channels

  • if closed, nothing to do
  • close twice will panic
c	:=	make(chan	int)	
close(c)	
fmt.Println(<-c)	//	receive	and	print	
//	What is	printed?
//	0,	false
//	-	a	receive	always returns two values
//	-	0	as it is the zero value of int	
//	-	false "because no more data" or "returned value is not valid"

makeThumbnails

from:gopl.io

//!+5
// makeThumbnails5 makes thumbnails for the specified files in parallel.
// It returns the generated file names in an arbitrary order,
// or an error if any step failed.
func makeThumbnails5(filenames []string) (thumbfiles []string, err error) {
	type item struct {
		thumbfile string
		err       error
	}

	ch := make(chan item, len(filenames))
	for _, f := range filenames {
		go func(f string) {
			var it item
			it.thumbfile, it.err = thumbnail.ImageFile(f)
			ch <- it
		}(f)
	}

	for range filenames {
		it := <-ch
		if it.err != nil {
			return nil, it.err
		}
		thumbfiles = append(thumbfiles, it.thumbfile)
	}

	return thumbfiles, nil
}

//!-5

//!+6
// makeThumbnails6 makes thumbnails for each file received from the channel.
// It returns the number of bytes occupied by the files it creates.
func makeThumbnails6(filenames <-chan string) int64 {
	sizes := make(chan int64)
	var wg sync.WaitGroup // number of working goroutines
	for f := range filenames {
		wg.Add(1)
		// worker
		go func(f string) {
			defer wg.Done()
			thumb, err := thumbnail.ImageFile(f)
			if err != nil {
				log.Println(err)
				return
			}
			info, _ := os.Stat(thumb) // OK to ignore error
			sizes <- info.Size()
		}(f)
	}

	// closer
	go func() {
		wg.Wait()
		close(sizes)
	}()

	var total int64
	for size := range sizes {
		total += size
	}
	return total
}

//!-6

Shape your data flow

  • Channels are streams of data

  • Dealing with multiple streams is the true power of select

  • 1 in n out(Fan-out)

  • n in 1 out(Funnel)

  • n in n out(Turnout)

Fan-out

func Fanout(In <-chan int, OutA, OutB chan int) { 
    for data := range In { // Receive until closed 
        select {           // Send to first non-blocking channel
        case OutA <- data: 
        case OutB <- data: 
        } 
    } 
}

Turnout

Brilliant!

func  Turnout(InA,  InB <-chan  int,  OutA, OutB  chan  int)  {
    //  variable  declaration left  out for readability
    for { 
        select  {//  Receive from  first non-blocking
        case data,  more  = <-InA:  
        case data,  more  = <-InB:  
        } 
        if  !more { 
            //  ...?
            return
        } 
        select  {//  Send  to  first non-blocking
        case OutA <-  data: 
        case OutB <-  data: 
        } 
    } 
}

Quit channel

Mutexes are not an optimal solution

  • Mutexes are like toilets.
  • The longer you occupy them, the longer the queue gets
  • Read/write mutexes can only reduce the problem
  • Using multiple mutexes will cause deadlocks sooner or later
  • All-in-all not the solution we’re looking for

Three shades of code

  • Blocking = Your program may get locked up (for undefined time)
  • Lock free = At least one part of your program is always making progress
  • Wait free = All parts of your program are always making progress

Concurrency in practice

  • Avoid blocking, avoid race conditions
  • Use channels to avoid shared state. Use select to manage channels.

Where channels don’t work:

  • Try to use tools from the sync package first
  • In simple cases or when really needed: try lockless code

Using sync.WaitGroup


func main() {
	var wg sync.WaitGroup
	wg.Add(1)

	go func() {
		count("sheep")
		wg.Done()
	}()

	wg.Wait()
}

func count(thing string) {
	for i := 1; i <= 5; i++ {
		fmt.Println(i, thing)
		time.Sleep(time.Millisecond * 500)
	}
}

// output:
// 1 sheep
// 2 sheep
// 3 sheep
// 4 sheep
// 5 sheep

Channel Example


func main() {
	c := make(chan string)
	go count("sheep", c)

	// for {
	// 	msg, open := <- c
	// 	if !open {
	// 		break
	// 	}
	// 	fmt.Println(msg)
	// }

	for msg := range c {
		fmt.Println(msg)
	}
}

func count(thing string, c chan string) {
	for i := 1; i <= 5; i++ {
		fmt.Println(i, thing)
		time.Sleep(time.Millisecond * 500)
	}
	// sender should close the channel
	close(c)

}
// output:
// 1 sheep
// 2 sheep
// 3 sheep
// 4 sheep
// 5 sheep

Unblocked chan(buffered)

func main() {
	// won't block
	c := make(chan string, 2)
	c <- "hello"

	msg := <-c
	fmt.Println(msg)
}
  

select statement

Select statements allow you to efficiently wait for events, select a message from competing channels in a uniform random way, continue on if there are no messages waiting, and more.

func main() {
	c1 := make(chan string)
	c2 := make(chan string)

	go func() {
		for {
			c1 <- "Every 500ms"
			time.Sleep(time.Millisecond * 500)
		}
	}()

	go func() {
		for {
			c2 <- "Every two seconds"
			time.Sleep(time.Second * 2)
		}
	}()

	for {
		select {
		case msg1 := <-c1:
			fmt.Println(msg1)
		case msg2 := <-c2:
			fmt.Println(msg2)
		}
	}
}

Worker Pools

If you have a queue of work to be done and multiple concurrent workers pulling items off the queue


func main() {
	jobs := make(chan int, 100)
	results := make(chan int, 100)
	count := 100

	go worker(jobs, results)
	go worker(jobs, results)
	go worker(jobs, results)

	for i := 0; i < count; i++ {
		jobs <- i
	}
	close(jobs)

	for i := 0; i < count; i++ {
		fmt.Println(<-results)
	}
}

func worker(jobs <-chan int, results chan<- int) {
	for n := range jobs {
		results <- fib(n)
	}
}

func fib(n int) int {
	if n <= 1 {
		return n
	}

	return fib(n-1) + fib(n-2)
}

Generator: function that retruns a channel

func main() {
	c := boring("boring!") // function returning a channel.

	// this is for testing
	fmt.Println(reflect.TypeOf(c))

	for i := 0; i < 5; i++ {
		fmt.Printf("You say: %q\n", <-c)
	}
	fmt.Println("You are boring, I'm leaving.")

}

func boring(msg string) <-chan string { // Returns receive-only channel of strings.
	c := make(chan string)
	go func() { // We launch the goroutine from inside of the function.
		for i := 0; ; i++ {
			c <- fmt.Sprintf("%s %d", msg, i)
			time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
		}
	}()
	return c // Return channel to the caller.
}

output:

<-chan string
You say: "boring! 0"
You say: "boring! 1"
You say: "boring! 2"
You say: "boring! 3"
You say: "boring! 4"
You are boring, I'm leaving.

Channels as a handle on a service

// Our boring function returns a channel that lets us communicate
//  with the boring service it provides.
//
//
package main

import (
	"fmt"
	"math/rand"
	"reflect"
	"time"
)

func main() {
	ane := boring("ane boring!") // function returning a channel.
	joe := boring("joe boring!") // function returning a channel.

	fmt.Println("jos is:", reflect.TypeOf(joe))

	for i := 0; i < 5; i++ {
		fmt.Printf("You say: %q\n", <-ane)
		fmt.Printf("You say: %q\n", <-joe)
	}
	fmt.Println("You are boring, I'm leaving.")

}

func boring(msg string) <-chan string { // Returns receive-only channel of strings.
	c := make(chan string)
	go func() { // We launch the goroutine from inside of the function.
		for i := 0; ; i++ {
			c <- fmt.Sprintf("%s %d", msg, i)
			time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
		}
	}()
	return c // Return channel to the caller.
}

Output:

jos is: <-chan string
You say: "ane boring! 0"
You say: "joe boring! 0"
You say: "ane boring! 1"
You say: "joe boring! 1"
You say: "ane boring! 2"
You say: "joe boring! 2"
You say: "ane boring! 3"
You say: "joe boring! 3"
You say: "ane boring! 4"
You say: "joe boring! 4"
You are boring, I'm leaving.

Multiplexing


func main() {
	c := fanIn(boring("Joe"), boring("Ann"))

	for i := 0; i < 10; i++ {
		fmt.Println(<-c)
	}
	fmt.Println("You are boring, I'm leaving.")

}

func fanIn(input1, input2 <-chan string) <-chan string {
	c := make(chan string)
	go func() {
		for {
			c <- <-input1
		}
	}()
	go func() {
		for {
			c <- <-input2
		}
	}()
	return c
}

func boring(msg string) <-chan string { // Returns receive-only channel of strings.
	c := make(chan string)
	go func() { // We launch the goroutine from inside of the function.
		for i := 0; ; i++ {
			c <- fmt.Sprintf("%s %d", msg, i)
			time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
		}
	}()
	return c // Return channel to the caller.
}

Restoring sequencing


type Message struct {
	str  string
	wait chan bool
}

func boring(msg string) <-chan Message {
	c := make(chan Message)
	waitForIt := make(chan bool)

	go func() {
		for i := 0; ; i++ {
			c <- Message{fmt.Sprintf("%s, %d", msg, i), waitForIt}
			time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
			<-waitForIt // boring goroutines now block here until we set the Message#wait boolean field
		}
	}()

	return c // return the channel to the caller
}

func fanIn(input1, input2 <-chan Message) <-chan Message {
	c := make(chan Message)

	go func() {
		for {
			c <- <-input1
		}
	}()
	go func() {
		for {
			c <- <-input2
		}
	}()

	return c
}

func main() {
	c := fanIn(boring("Joe"), boring("Ann"))

	for i := 0; i < 5; i++ {
		msg1 := <-c
		fmt.Println(msg1.str)
		msg2 := <-c
		fmt.Println(msg2.str)

		msg1.wait <- true // Setting this unblocks `boring` to now iterate through the `for` loop again
		msg2.wait <- true
	}

	fmt.Println("You're both boring; I'm leaving")
}

select

  • Like a switch statement on channel operations
  • The order of cases doesn’t matter at all
  • There is a default case, too
  • The first non-blocking case is chosen (send and/or receive)
select {
    case v1 := <- c1:
        fmt.Printf("Received %v from c1\n", v1)
    case v2 := <- c2:
        fmt.Printf("Received %v from c2\n", v1)
    case c3 <- 23:
        fmt.Printf("Sent %v from c3\n", 23)
    default:
        fmt.Printf("No one ready")
}

Fan-in using select

refactor fanIn()

func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)

    go func() { for { c <- <-input1 } }()  // todo: why do we double the channel operator?
    go func() { for { c <- <-input2 } }()

    return c
}

to:

func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)

    go func() { 
        for {
            select {
                case s:= <- input1: c <- s
                case s:= <- input2: c <- s                
            }
        }        
    }()

    return c
}

Timeout Select

func main() {
    c := boring("Joe")
    timeout := time.After(5 * time.Second):  // We kill the loop after 5sec total

    for {
        select {
            case s := <- c:
                fmt.Println(s)

            case <- timeout
                fmt.Println("You talk too much")
                return                
        }
    }
}

Select: Quit channel

func boring(msg string, quit chan bool) <-chan String {
    c := make(chan string)

    go func() {  // we launch the goroutine from inside the function
        for i := 0; ; i++ {
            select {
                case c <- fmt.Sprintf("%s, %d", msg, i):
                    // Do nothing
                case <- quite:
                    return  // Parent routine tells us to finish, so we return from the goroutine
            }
        }
    }()

    return c  // return the channel to the caller
}

func main() {
    quit := make(chan bool)
    c := boring("Joe", quit)
    for i := rand.Intn(10); i >= 0; i-- { fmt.Println(<-c) }
    quite <- true  // Tell the routine to finish
}

Quite Channel: two-way ‘wrap it up’ communication

The example above ends the goroutine rather abruptly the second we pass a value to the quit channel. What it we wanted to allow the routine to wrap things up? Well, the channel offers two-way communication, so we can wait for confirmation from the routine like so.

func boring(msg string, quit chan string) <-chan String {
    c := make(chan string)

    go func() {  // we launch the goroutine from inside the function
        for i := 0; ; i++ {
            select {
                case c <- fmt.Sprintf("%s, %d", msg, i):
                    // Do nothing
                case <- quit:
                    cleanup()
                    quit <- "See you!"
                    return
            }
        }
    }()

    return c  // return the channel to the caller
}

func main() {
    quit := make(chan string)
    c := boring("Joe", quit)
    for i := rand.Intn(10); i >= 0; i-- { fmt.Println(<-c) }
    quit <- true  // Tell the routine to finish
    fmt.Printf("Joe says: %q\n", <-quit) // This blocks main until the goroutine confirms it's done
}

Daisy-chain

Daisy_chain_(electrical_engineering)

package main

// chinese whispers
import "fmt"

func f(left, right chan int) {
	left <- 1 + <-right
}

func main() {
	const n = 100000
	leftmost := make(chan int)
	right := leftmost
	left := leftmost

	// chinese whispers ( w <- 1+ w<- 1+ w<- ... )
	// all blocked
	for i := 0; i < n; i++ {
		right = make(chan int)
		go f(left, right)
		left = right
	}

	go func(c chan int) { c <- 1 }(right)
	fmt.Println(<-leftmost)
}

Q: What does Google search do?
A: Given a query, return a page of search results(and some ads)

Q: How do we get the search results?
A: Send the query to Web search, Image search, YouTube, Maps, News, etc, then mix the results.

How do we implement this?

Google Search 1.0

package main

import (
	"fmt"
	"math/rand"
	"time"
)

var (
	Web   = fakeSearch("web")
	Image = fakeSearch("image")
	Video = fakeSearch("video")
)

type Result string

type Search func(query string) Result

func fakeSearch(kind string) Search {
	return func(query string) Result {
		time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
		return Result(fmt.Sprintf("%s result for %q\n", kind, query))
	}
}

func Google(query string) (results []Result) {
	results = append(results, Web(query)) // Currently, we block in each of these search queries
	// waiting
	results = append(results, Image(query))
	// waiting
	results = append(results, Video(query))
	// waiting
	return
}

func main() {
	rand.Seed(time.Now().UnixNano())
	start := time.Now()
	results := Google("golang")
	elapsed := time.Since(start)
	fmt.Println(results)
	fmt.Println(elapsed)
}

Google Search 2.0

Alternatively, we could refactor the Google function to run in a goroutine and run web, image and video searches concurrently, then wait on all results.

func Google(query string) (results []Result) {
    c := make(chan Result)

    // Spin each of the searches off into their own goroutine and pipe all results back to the channel created above
    go func() { c <- Web(query)) }()
    go func() { c <- Image(query)) }()
    go func() { c <- Video(query)) }()

    // Pull each result out of the channel as it becomes available and append it to the results splice returned
    for i := 0; i < 3; i++ {
        result := <- c
        results = append(results, result)
    }

    return
}

Google Search 2.1

Don’t wait for slow servers. No locks. No condition variables. No callbacks.

func Google(query string) (results []Result) {
c := make(chan Result)
    go func() { c <- Web(query) } ()
    go func() { c <- Image(query) } ()
    go func() { c <- Video(query) } ()

    timeout := time.After(80 * time.Millisecond)
    for i := 0; i < 3; i++ {
        select {
        case result := <-c:
            results = append(results, result)
        case <-timeout:
            fmt.Println("timed out")
            return
        }
    }
    return
}

Avoid timeout

Q: How do we avoid discarding results from slow servers?

A: Replicate the servers. Send requests to multiple replicas, and use the first response.

func First(query string, replicas ...Search) Result {
    c := make(chan Result)
    searchReplica := func(i int) { c <- replicas[i](query) }
    for i := range replicas {
        go searchReplica(i)
    }
    return <-c
}

// Using the First function
func main() {
    rand.Seed(time.Now().UnixNano())
    start := time.Now()
    result := First("golang",
        fakeSearch("replica 1"),
        fakeSearch("replica 2"))
    elapsed := time.Since(start)
    fmt.Println(result)
    fmt.Println(elapsed)
}

Google Search 3.0

Reduce tail latency using replicated search servers.

package main

import (
	"fmt"
	"math/rand"
	"time"
)

var (
	Web1 = fakeSearch("web1")
	Web2 = fakeSearch("web2")

	Image1 = fakeSearch("image1")
	Image2 = fakeSearch("image2")

	Video1 = fakeSearch("video1")
	Video2 = fakeSearch("video2")
)  

type Result string

type Search func(query string) Result

func fakeSearch(kind string) Search {
	return func(query string) Result {
		time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
		return Result(fmt.Sprintf("%s result for %q\n", kind, query))
	}
}
func Google(query string) (results []Result) {
	c := make(chan Result)

	go func() { c <- First(query, Web1, Web2) }()
	go func() { c <- First(query, Image1, Image2) }()
	go func() { c <- First(query, Video1, Video2) }()

	timeout := time.After(80 * time.Millisecond)
	for i := 0; i < 3; i++ {
		select {
		case result := <-c:
			results = append(results, result)
		case <-timeout:
			fmt.Println("Timed out")
			return
		}
	}

	return
}
func First(query string, replicas ...Search) Result {
	c := make(chan Result)
	searchReplica := func(i int) { c <- replicas[i](query) }
	for i := range replicas {
		go searchReplica(i)
	}
	return <-c
}

func main() {
	rand.Seed(time.Now().UnixNano())
	start := time.Now()
	result := Google("golang")
	elapsed := time.Since(start)
	fmt.Println(result)
	fmt.Println(elapsed)
}

And still …

No locks. No condition variables. No callbacks.

Summary

In just a few simple transformations we used Go’s concurrency primitives to convert a

  • slow
  • sequential
  • failure-sensitive

program into one that is

  • fast
  • concurrent
  • replicated
  • robust.

Conclusions

Goroutines and channels make it easy to express complex operations dealing with

  • multiple inputs
  • multiple outputs
  • timeouts
  • failure

And they’re fun to use.

Don’t overdo it

As always, you should use right tool for the job.

References