Common Concurrency Patterns in Go

Common Concurrency Patterns in Go

Minimal Request-Response Pattern

// The function returns a receiver based on parameters
func Glob(pattern string) <-chan Item {
	c := make(chan Item)	// 1. Initialize receiver

	// 2. Anonymous goroutine closure executes the program
	go func() {
		defer close(c)
		for name, item := range items {
			if ok, _ := filepath.Match(pattern, name); !ok {
				continue
			}
			c <- item
		}
	}()

	// 3. Return receiver quickly so later code can read from the channel
	return c
}

func main() {
	for item := range Glob("[ab]*") {
		fmt.Println(item)
	}
}

Single State Holder

One goroutine holds global state. Other goroutines access this state through channels.

reads := make(chan *readOp)
writes := make(chan *writeOp)

// State holder
go func() {
    var state = make(map[int]int)
    for {
        select {
        case read := <-reads:
			// Return result
            read.resp <- state[read.key]
        case write := <-writes:
            state[write.key] = write.val
			// Return result
            write.resp <- true
        }
    }
}()

// Requester
go func() {
    for {
        read := &readOp{
            key:  rand.Intn(5),
            resp: make(chan int)}
        reads <- read
        <-read.resp
     }
}()

go func() {
    for {
        write := &writeOp{
            key:  rand.Intn(5),
            val:  rand.Intn(100),
            resp: make(chan bool)}
        writes <- write
        <-write.resp
     }
}()

Produce First, Consume Later

// Produce first, known number of items to produce
chanOwner := func() <-chan int {
    results := make(chan int, 5)
    go func() {
        defer close(results)
        for i := 0; i <= 5; i++ {
            results <- i
        }
    }()
    return results
}

consumer := func(results <-chan int) {
    for result := range results {
        fmt.Printf("Received: %d\n", result)
    }
    fmt.Println("Done receiving!")
}

results := chanOwner()
// Consume
consumer(results)

Consumer Tells Producer to Stop

newRandStream := func(done <-chan interface{}) <-chan int {
    randStream := make(chan int)
    go func() {
        defer fmt.Println("newRandStream closure exited.")
        defer close(randStream)
        for {
            select {
            case randStream <- rand.Int():
            case <-done:
				// Stop producing, exit
                return
            }
        }
    }()

    return randStream
}

done := make(chan interface{})
randStream := newRandStream(done)
fmt.Println("3 random ints:")
for i := 1; i <= 3; i++ {
    fmt.Printf("%d: %d\n", i, <-randStream)
}
// Consumer actively stops production
close(done)

Multiple Channel Merge for Safe Exit (OR Channel)

var or func(channels ...<-chan interface{}) <-chan interface{}
or = func(channels ...<-chan interface{}) <-chan interface{} {
    switch len(channels) {
    case 0:
        return nil
    case 1:
        return channels[0]
    }

    orDone := make(chan interface{})
    go func() {
        defer close(orDone)

        switch len(channels) {
        case 2:
            select {
            case <-channels[0]:
            case <-channels[1]:
            }
        default:
            select {
            case <-channels[0]:
            case <-channels[1]:
            case <-channels[2]:
            case <-or(append(channels[3:], orDone)...):
            }
        }
    }()
    return orDone
}
sig := func(after time.Duration) <-chan interface{}{
    c := make(chan interface{})
    go func() {
        defer close(c)
        time.Sleep(after)
    }()
    return c
}

start := time.Now()
<-or(
    sig(2*time.Hour),
    sig(5*time.Minute),
    sig(1*time.Second),
    sig(1*time.Hour),
    sig(1*time.Minute),
)
fmt.Printf("done after %v", time.Since(start))

When any channel exits, all other channels exit too.

Pack Return Values into Struct When Running Functions Concurrently

Example: Concurrent http.Get

// Unify return values into a result struct
type Result struct {
    Error error
    Response *http.Response
}

checkStatus := func(done <-chan interface{}, urls ...string) <-chan Result {
    results := make(chan Result)
    go func() {
        defer close(results)

        for _, url := range urls {
            var result Result
            resp, err := http.Get(url)
            result = Result{Error: err, Response: resp}
            select {
            case <-done:
                return
            case results <- result:
            }
        }
    }()
    return results
} 
done := make(chan interface{})
defer close(done)

urls := []string{"https://www.google.com", "https://badhost"}
for result := range checkStatus(done, urls...) {
    if result.Error != nil {
        fmt.Printf("error: %v", result.Error)
        continue
    }
    fmt.Printf("Response: %v\n", result.Response.Status)
}

This pattern also shows we can introduce an observer that controls program flow. In the example above, the observer defaults to main.

Error control:

done := make(chan interface{})
defer close(done)

errCount := 0
urls := []string{"a", "https://www.google.com", "b", "c", "d"}
for result := range checkStatus(done, urls...) {
    if result.Error != nil {
        fmt.Printf("error: %v\n", result.Error)
        errCount++
        if errCount >= 3 {
			// Break and terminate requesting goroutine when too many errors
            fmt.Println("Too many errors, breaking!")
            break
        }
        continue
    }
    fmt.Printf("Response: %v\n", result.Response.Status)
}

Pipeline Pattern

Split the production process into stages. Multiple goroutines coordinate while staying independent. Each stage handles the same data type.

generator := func(done <-chan interface{}, integers ...int) <-chan int {
    intStream := make(chan int)
    go func() {
        defer close(intStream)
        for _, i := range integers {
            select {
            case <-done:
                return
            case intStream <- i:
            }
        }
    }()
    return intStream
}

multiply := func(
  done <-chan interface{},
  intStream <-chan int,
  multiplier int,
) <-chan int {
    multipliedStream := make(chan int)
    go func() {
        defer close(multipliedStream)
        for i := range intStream {
            select {
            case <-done:
                return
            case multipliedStream <- i*multiplier:
            }
        }
    }()
    return multipliedStream
}

add := func(
  done <-chan interface{},
  intStream <-chan int,
  additive int,
) <-chan int {
    addedStream := make(chan int)
    go func() {
        defer close(addedStream)
        for i := range intStream {
            select {
            case <-done:
                return
            case addedStream <- i+additive:
            }
        }
    }()
    return addedStream
}

// Each stage can safely exit goroutine with done channel
done := make(chan interface{})
defer close(done)

// Each stage returns a channel of the data type it processes
intStream := generator(done, 1, 2, 3, 4)
pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)

for v := range pipeline {
    fmt.Println(v)
}

Multiple goroutines connect through multiple channels of the same type and share one done channel.

Generator Pattern

Infinite repetition:

repeat := func(
    done <-chan interface{},
    values ...interface{},
) <-chan interface{} {
    valueStream := make(chan interface{})
    go func() {
        defer close(valueStream)
		// Loop forever repeating slice contents until stop signal
        for {
            for _, v := range values {
                select {
                case <-done:
                    return
                case valueStream <- v:
                }
            }
        }
    }()
    return valueStream
}

The pattern creates a stream. Now read from it:

Read on demand:

take := func(
    done <-chan interface{},
    valueStream <-chan interface{},
    num int,
) <-chan interface{} {
    takeStream := make(chan interface{})
    go func() {
        defer close(takeStream)
        for i := 0; i < num; i++ {
            select {
            case <-done:
                return
            case takeStream <- <- valueStream:
            }
        }
    }()
    return takeStream
}

done := make(chan interface{})
defer close(done)

// Take first 10 elements from stream then exit
for num := range take(done, repeat(done, 1), 10) {
    fmt.Printf("%v ", num)
}

Worker Pool Pattern

worker := 8
c := make(chan int, l)

var wg sync.WaitGroup
wg.Add(worker)

for i:=0;i<worker;i++ {
	go func(){
		for row := range c {
			for col := range m.m[row] {
				fillPixel(m, row, col)
			}
		}
	}()
}

for row := range m.m {
	c <- row
}
close(c)

Pub-Sub Pattern

Each subscriber holds a channel. Each subscriber has a filter for content filtering. Filters are typically boolean functions. Publishers use maps to store subscriber channels mapped to filter functions. On each publish, iterate the map and deliver filtered content.

Event-Driven Pattern

Often used to trigger other operations during execution, like notifications. Event handlers register beforehand, then trigger at the right time.

package events

import (
    "time"
)

var UserCreated userCreated

// Define context payload
type UserCreatedPayload struct {
    Email string
    Time  time.Time
}

// Function chain with payload context
type userCreated struct {
    handlers []interface{ Handle(UserCreatedPayload) }
}

// Register adds event handlers to the slice
func (u *userCreated) Register(handler interface{ Handle(UserCreatedPayload) }) {
    u.handlers = append(u.handlers, handler)
}

// Trigger starts goroutines to execute context-aware handlers
func (u userCreated) Trigger(payload UserCreatedPayload) {
    for _, handler := range u.handlers {
        go handler.Handle(payload)
    }
}
package main

import (
    "time"

    "github.com/stephenafamo/demo/events"
)

func init() {
    createNotifier := userCreatedNotifier{
        adminEmail: "[email protected]",
        slackHook: "https://hooks.slack.com/services/...",
    }

	// Register callback handlers
    events.UserCreated.Register(createNotifier)
}

type userCreatedNotifier struct{
    adminEmail string
    slackHook string
}

func (u userCreatedNotifier) notifyAdmin(email string, time time.Time) {
    // ...
}

func (u userCreatedNotifier) sendToSlack(email string, time time.Time) {
    // ...
}

// Custom handler implementation as long as signature matches
func (u userCreatedNotifier) Handle(payload events.UserCreatedPayload) {
    // Do something with our payload
    u.notifyAdmin(payload.Email, payload.Time)
    u.sendToSlack(payload.Email, payload.Time)
}

Trigger event handlers:

package auth

import (
    "time"

    "github.com/stephenafamo/demo/events"
    // Other imported packages
)

func CreateUser() {
    // ...
	// Decoupled - just pass needed payload, no need to know specific handling logic
    events.UserCreated.Trigger(events.UserCreatedPayload{
        Email: "[email protected]",
        Time: time.Now(),
    })
    // ...
}