Concurrency is model for software consturction.
Concurrency is the key to designing high performance network services. Go’s concurrency primitives (goroutines and channels) provide a simple and efficient means of expressing concurrent execution. In this talk we see how tricky concurrency problems can be solved gracefully with simple Go code.
The original model Go’s concurrency is from CSP(Communicating Sequential Processes).
The Go approach
Don’t communicate by sharing memory, share memory by communicating.
In other words, you don’t have some memory shared and protected by locks and mutexes from parallel access, instead, you use the channel to pass the data back and forth between the goroutines.
Go’s concurrency toolset
- go routines
- channels
- select
- sync package(classic mutexes)
Deadlock
A deadlocked program is one in which all concurrent processes are waiting on one another. In this state, the program will never recover without outside intervention.
Race Conditions
A race condition occurs when two or more operations must execute in the correct order, but the program has not been written so that this order is guaranteed to be maintained. Most of the time, this shows up in what’s called a data race, where one concurrent operation attempts to read a variable while at some undetermined time another con‐ current operation is attempting to write to the same variable.
Mutex locks
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Increment()
{
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
Start goroutines
Common function
func main() {
go sayHello() // continue doing other things
}
func sayHello()
{
fmt.Println("hello")
}
// no output
Anonymous function
go func() {
fmt.Println("hello")
}()
Anonymous function with parameter
go func(URL string) {
fmt.Println("hello", URL)
}("http://bcd.com")
Fork and join model
func main() {
var wg sync.WaitGroup
sayHello := func() {
defer wg.Done()
fmt.Println("hello")
}
wg.Add(1)
go sayHello()
wg.Wait()
}
Channels
- Think of a bucket chain
- 3 components: sender, buffer, receiver
- The buffer is optional
A channel in Go provides a connection between two goroutines, allowing them to communicate. A channel with no buffer is a blocking(synchronized) channel. A channel operation is a communicate and synchronize operation.
A bufferd channel is not synchronized.
Declaring and initializing
var c chan int
c = make(chan int)
// or
c := make(chan int)
Sending on a channel
c <- 1
Receiving from a channel, the “arrow” indicates the direction of the data flow.
value = <-c
Blocking channels
blocked
unbuffered := make(chan int)
// blocks
unbuffered <- 1
// blocks
a := <- unbuffered
not blocked
go func() {<- unbuffered }()
unbuffered <- 1
unbuffered := make(chan int, 1)
// still blocks
a := <- buffered
// fine
buffered <- 1
// blocks (buffer full)
buffered <- 2
Blocking breaks concurrency
Closing channels
- if closed, nothing to do
- close twice will panic
c := make(chan int)
close(c)
fmt.Println(<-c) // receive and print
// What is printed?
// 0, false
// - a receive always returns two values
// - 0 as it is the zero value of int
// - false "because no more data" or "returned value is not valid"
makeThumbnails
from:gopl.io
//!+5
// makeThumbnails5 makes thumbnails for the specified files in parallel.
// It returns the generated file names in an arbitrary order,
// or an error if any step failed.
func makeThumbnails5(filenames []string) (thumbfiles []string, err error) {
type item struct {
thumbfile string
err error
}
ch := make(chan item, len(filenames))
for _, f := range filenames {
go func(f string) {
var it item
it.thumbfile, it.err = thumbnail.ImageFile(f)
ch <- it
}(f)
}
for range filenames {
it := <-ch
if it.err != nil {
return nil, it.err
}
thumbfiles = append(thumbfiles, it.thumbfile)
}
return thumbfiles, nil
}
//!-5
//!+6
// makeThumbnails6 makes thumbnails for each file received from the channel.
// It returns the number of bytes occupied by the files it creates.
func makeThumbnails6(filenames <-chan string) int64 {
sizes := make(chan int64)
var wg sync.WaitGroup // number of working goroutines
for f := range filenames {
wg.Add(1)
// worker
go func(f string) {
defer wg.Done()
thumb, err := thumbnail.ImageFile(f)
if err != nil {
log.Println(err)
return
}
info, _ := os.Stat(thumb) // OK to ignore error
sizes <- info.Size()
}(f)
}
// closer
go func() {
wg.Wait()
close(sizes)
}()
var total int64
for size := range sizes {
total += size
}
return total
}
//!-6
Shape your data flow
-
Channels are streams of data
-
Dealing with multiple streams is the true power of select
-
1 in n out(Fan-out)
-
n in 1 out(Funnel)
-
n in n out(Turnout)
Fan-out
func Fanout(In <-chan int, OutA, OutB chan int) {
for data := range In { // Receive until closed
select { // Send to first non-blocking channel
case OutA <- data:
case OutB <- data:
}
}
}
Turnout
Brilliant!
func Turnout(InA, InB <-chan int, OutA, OutB chan int) {
// variable declaration left out for readability
for {
select {// Receive from first non-blocking
case data, more = <-InA:
case data, more = <-InB:
}
if !more {
// ...?
return
}
select {// Send to first non-blocking
case OutA <- data:
case OutB <- data:
}
}
}
Quit channel
Mutexes are not an optimal solution
- Mutexes are like toilets.
- The longer you occupy them, the longer the queue gets
- Read/write mutexes can only reduce the problem
- Using multiple mutexes will cause deadlocks sooner or later
- All-in-all not the solution we’re looking for
Three shades of code
- Blocking = Your program may get locked up (for undefined time)
- Lock free = At least one part of your program is always making progress
- Wait free = All parts of your program are always making progress
Concurrency in practice
- Avoid blocking, avoid race conditions
- Use channels to avoid shared state. Use select to manage channels.
Where channels don’t work:
- Try to use tools from the sync package first
- In simple cases or when really needed: try lockless code
Using sync.WaitGroup
func main() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
count("sheep")
wg.Done()
}()
wg.Wait()
}
func count(thing string) {
for i := 1; i <= 5; i++ {
fmt.Println(i, thing)
time.Sleep(time.Millisecond * 500)
}
}
// output:
// 1 sheep
// 2 sheep
// 3 sheep
// 4 sheep
// 5 sheep
Channel Example
func main() {
c := make(chan string)
go count("sheep", c)
// for {
// msg, open := <- c
// if !open {
// break
// }
// fmt.Println(msg)
// }
for msg := range c {
fmt.Println(msg)
}
}
func count(thing string, c chan string) {
for i := 1; i <= 5; i++ {
fmt.Println(i, thing)
time.Sleep(time.Millisecond * 500)
}
// sender should close the channel
close(c)
}
// output:
// 1 sheep
// 2 sheep
// 3 sheep
// 4 sheep
// 5 sheep
Unblocked chan(buffered)
func main() {
// won't block
c := make(chan string, 2)
c <- "hello"
msg := <-c
fmt.Println(msg)
}
select statement
Select statements allow you to efficiently wait for events, select a message from competing channels in a uniform random way, continue on if there are no messages waiting, and more.
func main() {
c1 := make(chan string)
c2 := make(chan string)
go func() {
for {
c1 <- "Every 500ms"
time.Sleep(time.Millisecond * 500)
}
}()
go func() {
for {
c2 <- "Every two seconds"
time.Sleep(time.Second * 2)
}
}()
for {
select {
case msg1 := <-c1:
fmt.Println(msg1)
case msg2 := <-c2:
fmt.Println(msg2)
}
}
}
Worker Pools
If you have a queue of work to be done and multiple concurrent workers pulling items off the queue
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
count := 100
go worker(jobs, results)
go worker(jobs, results)
go worker(jobs, results)
for i := 0; i < count; i++ {
jobs <- i
}
close(jobs)
for i := 0; i < count; i++ {
fmt.Println(<-results)
}
}
func worker(jobs <-chan int, results chan<- int) {
for n := range jobs {
results <- fib(n)
}
}
func fib(n int) int {
if n <= 1 {
return n
}
return fib(n-1) + fib(n-2)
}
Generator: function that retruns a channel
func main() {
c := boring("boring!") // function returning a channel.
// this is for testing
fmt.Println(reflect.TypeOf(c))
for i := 0; i < 5; i++ {
fmt.Printf("You say: %q\n", <-c)
}
fmt.Println("You are boring, I'm leaving.")
}
func boring(msg string) <-chan string { // Returns receive-only channel of strings.
c := make(chan string)
go func() { // We launch the goroutine from inside of the function.
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return c // Return channel to the caller.
}
output:
<-chan string
You say: "boring! 0"
You say: "boring! 1"
You say: "boring! 2"
You say: "boring! 3"
You say: "boring! 4"
You are boring, I'm leaving.
Channels as a handle on a service
// Our boring function returns a channel that lets us communicate
// with the boring service it provides.
//
//
package main
import (
"fmt"
"math/rand"
"reflect"
"time"
)
func main() {
ane := boring("ane boring!") // function returning a channel.
joe := boring("joe boring!") // function returning a channel.
fmt.Println("jos is:", reflect.TypeOf(joe))
for i := 0; i < 5; i++ {
fmt.Printf("You say: %q\n", <-ane)
fmt.Printf("You say: %q\n", <-joe)
}
fmt.Println("You are boring, I'm leaving.")
}
func boring(msg string) <-chan string { // Returns receive-only channel of strings.
c := make(chan string)
go func() { // We launch the goroutine from inside of the function.
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return c // Return channel to the caller.
}
Output:
jos is: <-chan string
You say: "ane boring! 0"
You say: "joe boring! 0"
You say: "ane boring! 1"
You say: "joe boring! 1"
You say: "ane boring! 2"
You say: "joe boring! 2"
You say: "ane boring! 3"
You say: "joe boring! 3"
You say: "ane boring! 4"
You say: "joe boring! 4"
You are boring, I'm leaving.
Multiplexing
func main() {
c := fanIn(boring("Joe"), boring("Ann"))
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
fmt.Println("You are boring, I'm leaving.")
}
func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string)
go func() {
for {
c <- <-input1
}
}()
go func() {
for {
c <- <-input2
}
}()
return c
}
func boring(msg string) <-chan string { // Returns receive-only channel of strings.
c := make(chan string)
go func() { // We launch the goroutine from inside of the function.
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return c // Return channel to the caller.
}
Restoring sequencing
type Message struct {
str string
wait chan bool
}
func boring(msg string) <-chan Message {
c := make(chan Message)
waitForIt := make(chan bool)
go func() {
for i := 0; ; i++ {
c <- Message{fmt.Sprintf("%s, %d", msg, i), waitForIt}
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
<-waitForIt // boring goroutines now block here until we set the Message#wait boolean field
}
}()
return c // return the channel to the caller
}
func fanIn(input1, input2 <-chan Message) <-chan Message {
c := make(chan Message)
go func() {
for {
c <- <-input1
}
}()
go func() {
for {
c <- <-input2
}
}()
return c
}
func main() {
c := fanIn(boring("Joe"), boring("Ann"))
for i := 0; i < 5; i++ {
msg1 := <-c
fmt.Println(msg1.str)
msg2 := <-c
fmt.Println(msg2.str)
msg1.wait <- true // Setting this unblocks `boring` to now iterate through the `for` loop again
msg2.wait <- true
}
fmt.Println("You're both boring; I'm leaving")
}
select
- Like a switch statement on channel operations
- The order of cases doesn’t matter at all
- There is a default case, too
- The first non-blocking case is chosen (send and/or receive)
select {
case v1 := <- c1:
fmt.Printf("Received %v from c1\n", v1)
case v2 := <- c2:
fmt.Printf("Received %v from c2\n", v1)
case c3 <- 23:
fmt.Printf("Sent %v from c3\n", 23)
default:
fmt.Printf("No one ready")
}
Fan-in using select
refactor fanIn()
func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string)
go func() { for { c <- <-input1 } }() // todo: why do we double the channel operator?
go func() { for { c <- <-input2 } }()
return c
}
to:
func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string)
go func() {
for {
select {
case s:= <- input1: c <- s
case s:= <- input2: c <- s
}
}
}()
return c
}
Timeout Select
func main() {
c := boring("Joe")
timeout := time.After(5 * time.Second): // We kill the loop after 5sec total
for {
select {
case s := <- c:
fmt.Println(s)
case <- timeout
fmt.Println("You talk too much")
return
}
}
}
Select: Quit channel
func boring(msg string, quit chan bool) <-chan String {
c := make(chan string)
go func() { // we launch the goroutine from inside the function
for i := 0; ; i++ {
select {
case c <- fmt.Sprintf("%s, %d", msg, i):
// Do nothing
case <- quite:
return // Parent routine tells us to finish, so we return from the goroutine
}
}
}()
return c // return the channel to the caller
}
func main() {
quit := make(chan bool)
c := boring("Joe", quit)
for i := rand.Intn(10); i >= 0; i-- { fmt.Println(<-c) }
quite <- true // Tell the routine to finish
}
Quite Channel: two-way ‘wrap it up’ communication
The example above ends the goroutine rather abruptly the second we pass a value to the quit channel. What it we wanted to allow the routine to wrap things up? Well, the channel offers two-way communication, so we can wait for confirmation from the routine like so.
func boring(msg string, quit chan string) <-chan String {
c := make(chan string)
go func() { // we launch the goroutine from inside the function
for i := 0; ; i++ {
select {
case c <- fmt.Sprintf("%s, %d", msg, i):
// Do nothing
case <- quit:
cleanup()
quit <- "See you!"
return
}
}
}()
return c // return the channel to the caller
}
func main() {
quit := make(chan string)
c := boring("Joe", quit)
for i := rand.Intn(10); i >= 0; i-- { fmt.Println(<-c) }
quit <- true // Tell the routine to finish
fmt.Printf("Joe says: %q\n", <-quit) // This blocks main until the goroutine confirms it's done
}
Daisy-chain
Daisy_chain_(electrical_engineering)
package main
// chinese whispers
import "fmt"
func f(left, right chan int) {
left <- 1 + <-right
}
func main() {
const n = 100000
leftmost := make(chan int)
right := leftmost
left := leftmost
// chinese whispers ( w <- 1+ w<- 1+ w<- ... )
// all blocked
for i := 0; i < n; i++ {
right = make(chan int)
go f(left, right)
left = right
}
go func(c chan int) { c <- 1 }(right)
fmt.Println(<-leftmost)
}
Example: Google Search
Q: What does Google search do?
A: Given a query, return a page of search results(and some ads)
Q: How do we get the search results?
A: Send the query to Web search, Image search, YouTube, Maps, News, etc, then mix the results.
How do we implement this?
Google Search 1.0
package main
import (
"fmt"
"math/rand"
"time"
)
var (
Web = fakeSearch("web")
Image = fakeSearch("image")
Video = fakeSearch("video")
)
type Result string
type Search func(query string) Result
func fakeSearch(kind string) Search {
return func(query string) Result {
time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
return Result(fmt.Sprintf("%s result for %q\n", kind, query))
}
}
func Google(query string) (results []Result) {
results = append(results, Web(query)) // Currently, we block in each of these search queries
// waiting
results = append(results, Image(query))
// waiting
results = append(results, Video(query))
// waiting
return
}
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
results := Google("golang")
elapsed := time.Since(start)
fmt.Println(results)
fmt.Println(elapsed)
}
Google Search 2.0
Alternatively, we could refactor the Google function to run in a goroutine and run web, image and video searches concurrently, then wait on all results.
func Google(query string) (results []Result) {
c := make(chan Result)
// Spin each of the searches off into their own goroutine and pipe all results back to the channel created above
go func() { c <- Web(query)) }()
go func() { c <- Image(query)) }()
go func() { c <- Video(query)) }()
// Pull each result out of the channel as it becomes available and append it to the results splice returned
for i := 0; i < 3; i++ {
result := <- c
results = append(results, result)
}
return
}
Google Search 2.1
Don’t wait for slow servers. No locks. No condition variables. No callbacks.
func Google(query string) (results []Result) {
c := make(chan Result)
go func() { c <- Web(query) } ()
go func() { c <- Image(query) } ()
go func() { c <- Video(query) } ()
timeout := time.After(80 * time.Millisecond)
for i := 0; i < 3; i++ {
select {
case result := <-c:
results = append(results, result)
case <-timeout:
fmt.Println("timed out")
return
}
}
return
}
Avoid timeout
Q: How do we avoid discarding results from slow servers?
A: Replicate the servers. Send requests to multiple replicas, and use the first response.
func First(query string, replicas ...Search) Result {
c := make(chan Result)
searchReplica := func(i int) { c <- replicas[i](query) }
for i := range replicas {
go searchReplica(i)
}
return <-c
}
// Using the First function
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
result := First("golang",
fakeSearch("replica 1"),
fakeSearch("replica 2"))
elapsed := time.Since(start)
fmt.Println(result)
fmt.Println(elapsed)
}
Google Search 3.0
Reduce tail latency using replicated search servers.
package main
import (
"fmt"
"math/rand"
"time"
)
var (
Web1 = fakeSearch("web1")
Web2 = fakeSearch("web2")
Image1 = fakeSearch("image1")
Image2 = fakeSearch("image2")
Video1 = fakeSearch("video1")
Video2 = fakeSearch("video2")
)
type Result string
type Search func(query string) Result
func fakeSearch(kind string) Search {
return func(query string) Result {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return Result(fmt.Sprintf("%s result for %q\n", kind, query))
}
}
func Google(query string) (results []Result) {
c := make(chan Result)
go func() { c <- First(query, Web1, Web2) }()
go func() { c <- First(query, Image1, Image2) }()
go func() { c <- First(query, Video1, Video2) }()
timeout := time.After(80 * time.Millisecond)
for i := 0; i < 3; i++ {
select {
case result := <-c:
results = append(results, result)
case <-timeout:
fmt.Println("Timed out")
return
}
}
return
}
func First(query string, replicas ...Search) Result {
c := make(chan Result)
searchReplica := func(i int) { c <- replicas[i](query) }
for i := range replicas {
go searchReplica(i)
}
return <-c
}
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
result := Google("golang")
elapsed := time.Since(start)
fmt.Println(result)
fmt.Println(elapsed)
}
And still …
No locks. No condition variables. No callbacks.
Summary
In just a few simple transformations we used Go’s concurrency primitives to convert a
- slow
- sequential
- failure-sensitive
program into one that is
- fast
- concurrent
- replicated
- robust.
Conclusions
Goroutines and channels make it easy to express complex operations dealing with
- multiple inputs
- multiple outputs
- timeouts
- failure
And they’re fun to use.
Don’t overdo it
As always, you should use right tool for the job.