Go常见并发模式

Go常见并发模式

2019年9月22日

Go语言的并发模式是其最具特色的编程范式之一。本文总结了在Go开发中最常见的几种并发模式,包括请求-接收模式、单一状态者模式、生产-消费模式、Pipeline流水线模式等。这些模式能够帮助我们更好地组织和管理goroutine,使并发程序更加健壮和优雅。通过这些模式的学习,你将能够更好地掌握Go语言的并发特性,写出更高质量的并发代码。

极简的请求-接收模式

// 请求函数根据参数返回接收器
func Glob(pattern string) <-chan Item {
	c := make(chan Item)	// 1.初始化接收器

	// 2.匿名 G 闭包执行程序
	go func() {
		defer close(c)
		for name, item := range items {
			if ok, _ := filepath.Match(pattern, name); !ok {
				continue
			}
			c <- item
		}
	}()

	// 3. 快速返回接收器以便后面的程序从chan中读
	return c
}

func main() {
	for item := range Glob("[ab]*") {
		fmt.Println(item)
	}
}

单一状态者

一个 G 持有一个全局的状态,其他的 G 可以通过 chan 访问到 G 持有的状态

reads := make(chan *readOp)
writes := make(chan *writeOp)

// 状态持有者
go func() {
    var state = make(map[int]int)
    for {
        select {
        case read := <-reads:
			// 返回结果
            read.resp <- state[read.key]
        case write := <-writes:
            state[write.key] = write.val
			// 返回结果
            write.resp <- true
        }
    }
}()

// 请求者
go func() {
    for {
        read := &readOp{
            key:  rand.Intn(5),
            resp: make(chan int)}
        reads <- read
        <-read.resp
     }
}()

go func() {
    for {
        write := &writeOp{
            key:  rand.Intn(5),
            val:  rand.Intn(100),
            resp: make(chan bool)}
        writes <- write
        <-write.resp
     }
}()

先生产,后消费

// 先生产,已知生产个数
chanOwner := func() <-chan int {
    results := make(chan int, 5)
    go func() {
        defer close(results)
        for i := 0; i <= 5; i++ {
            results <- i
        }
    }()
    return results
}

consumer := func(results <-chan int) {
    for result := range results {
        fmt.Printf("Received: %d\n", result)
    }
    fmt.Println("Done receiving!")
}

results := chanOwner()
// 消费
consumer(results)

消费者通知生产者

newRandStream := func(done <-chan interface{}) <-chan int {
    randStream := make(chan int)
    go func() {
        defer fmt.Println("newRandStream closure exited.")
        defer close(randStream)
        for {
            select {
            case randStream <- rand.Int():
            case <-done:
				// 停止生产,退出
                return
            }
        }
    }()

    return randStream
}

done := make(chan interface{})
randStream := newRandStream(done)
fmt.Println("3 random ints:")
for i := 1; i <= 3; i++ {
    fmt.Printf("%d: %d\n", i, <-randStream)
}
// 消费者主动要求生产停止
close(done)

多 channel 合并使得安全退出(OR channel)

var or func(channels ...<-chan interface{}) <-chan interface{}
or = func(channels ...<-chan interface{}) <-chan interface{} { 1
    switch len(channels) {
    case 0: 2
        return nil
    case 1: 3
        return channels[0]
    }

    orDone := make(chan interface{})
    go func() {
        defer close(orDone)

        switch len(channels) {
        case 2:
            select {
            case <-channels[0]:
            case <-channels[1]:
            }
        default:
            select {
            case <-channels[0]:
            case <-channels[1]:
            case <-channels[2]:
            case <-or(append(channels[3:], orDone)...):
            }
        }
    }()
    return orDone
}
sig := func(after time.Duration) <-chan interface{}{
    c := make(chan interface{})
    go func() {
        defer close(c)
        time.Sleep(after)
    }()
    return c
}

start := time.Now()
<-or(
    sig(2*time.Hour),
    sig(5*time.Minute),
    sig(1*time.Second),
    sig(1*time.Hour),
    sig(1*time.Minute),
)
fmt.Printf("done after %v", time.Since(start))

只要有其中一个 chan 退出,其他的 chan 都退出

当需要将某一函数并发执行时,可以一律将返回值打包成结构体使用 chan 传递

例如:并发 http.Get

// 把返回值统一成 result 结构体
type Result struct {
    Error error
    Response *http.Response
}

checkStatus := func(done <-chan interface{}, urls ...string) <-chan Result {
    results := make(chan Result)
    go func() {
        defer close(results)

        for _, url := range urls {
            var result Result
            resp, err := http.Get(url)
            result = Result{Error: err, Response: resp}
            select {
            case <-done:
                return
            case results <- result:
            }
        }
    }()
    return results
} 
done := make(chan interface{})
defer close(done)

urls := []string{"https://www.google.com", "https://badhost"}
for result := range checkStatus(done, urls...) {
    if result.Error != nil {
        fmt.Printf("error: %v", result.Error)
        continue
    }
    fmt.Printf("Response: %v\n", result.Response.Status)
}

同时,这个模式也启发我们在容错方面可以引入一个程序状态的观察者,它能够同时控制程序的走向,在上述例子中,我们默认那个观察者是 main

错误控制

done := make(chan interface{})
defer close(done)

errCount := 0
urls := []string{"a", "https://www.google.com", "b", "c", "d"}
for result := range checkStatus(done, urls...) {
    if result.Error != nil {
        fmt.Printf("error: %v\n", result.Error)
        errCount++
        if errCount >= 3 {
			// 当错误过多的时候,跳出并终止请求 G
            fmt.Println("Too many errors, breaking!")
            break
        }
        continue
    }
    fmt.Printf("Response: %v\n", result.Response.Status)
}

流水线 Pipeline 模式

将生产过程进行拆分,解耦,多个 G 之间互相协同,但每个 G 之间又相互独立 能进行拆分的前提:不同的阶段处理的都是同一类型的数据

generator := func(done <-chan interface{}, integers ...int) <-chan int {
    intStream := make(chan int)
    go func() {
        defer close(intStream)
        for _, i := range integers {
            select {
            case <-done:
                return
            case intStream <- i:
            }
        }
    }()
    return intStream
}

multiply := func(
  done <-chan interface{},
  intStream <-chan int,
  multiplier int,
) <-chan int {
    multipliedStream := make(chan int)
    go func() {
        defer close(multipliedStream)
        for i := range intStream {
            select {
            case <-done:
                return
            case multipliedStream <- i*multiplier:
            }
        }
    }()
    return multipliedStream
}

add := func(
  done <-chan interface{},
  intStream <-chan int,
  additive int,
) <-chan int {
    addedStream := make(chan int)
    go func() {
        defer close(addedStream)
        for i := range intStream {
            select {
            case <-done:
                return
            case addedStream <- i+additive:
            }
        }
    }()
    return addedStream
}

// 每个阶段都可以使用 done channel 安全退出 goroutine
done := make(chan interface{})
defer close(done)

// 每个阶段都返回要处理的数据类型的 channel
intStream := generator(done, 1, 2, 3, 4)
pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)

for v := range pipeline {
    fmt.Println(v)
}

多个 goroutine 通过多个同类型的 channel 和共享一个 done channel 联系在一起

生成器模式

无限重复生成

repeat := func(
    done <-chan interface{},
    values ...interface{},
) <-chan interface{} {
    valueStream := make(chan interface{})
    go func() {
        defer close(valueStream)
		// 死循环不断重复发送 slice 的内容直到发送信号停止
        for {
            for _, v := range values {
                select {
                case <-done:
                    return
                case valueStream <- v:
                }
            }
        }
    }()
    return valueStream
}

上述模式可以产生一个生成流,下面我们来从这个流中读取

从流中按需读取

take := func(
    done <-chan interface{},
    valueStream <-chan interface{},
    num int,
) <-chan interface{} {
    takeStream := make(chan interface{})
    go func() {
        defer close(takeStream)
        for i := 0; i < num; i++ {
            select {
            case <-done:
                return
            case takeStream <- <- valueStream:
            }
        }
    }()
    return takeStream
}

done := make(chan interface{})
defer close(done)

// 从流中截取前 10 个元素然后退出
for num := range take(done, repeat(done, 1), 10) {
    fmt.Printf("%v ", num)
}

worker pool 协程池模式

worker := 8
c := make(chan int, l)

var wg sync.WaitGroup
wg.Add(worker)

for i:=0;i<worker;i++ {
	go func(){
		for row := range c {
			for col := range m.m[row] {
				fillPixel(m, row, col)
			}
		}
	}()
}

for row := range m.m {
	c <- row
}
close(c)

pub-sub 发布订阅模式

每一个订阅者都会持有一个 chan 每个订阅者会有一个过滤器来进行内容的过滤 过滤器通常是一个 bool 函数值类型 发布者使用 map 存储订阅者 chan 到过滤函数的映射 每次发布的时候都遍历 map 然后使用对应的过滤器进行过滤后投递即可

事件驱动模式

常用于在一些操作执行的过程中触发其他操作例如通知 这些事件的处理操作需要事先注册,然后在合适的时间触发

package events

import (
    "time"
)

var UserCreated userCreated

// 定义上下文荷载
type UserCreatedPayload struct {
    Email string
    Time  time.Time
}

// 带荷载上下文的函数链
type userCreated struct {
    handlers []interface{ Handle(UserCreatedPayload) }
}

// Register 将时间的处理函数添加进切片中
func (u *userCreated) Register(handler interface{ Handle(UserCreatedPayload) }) {
    u.handlers = append(u.handlers, handler)
}

// Trigger 依次触发 goroutine 来执行带上下文的处理函数
func (u userCreated) Trigger(payload UserCreatedPayload) {
    for _, handler := range u.handlers {
        go handler.Handle(payload)
    }
}
package main

import (
    "time"

    "github.com/stephenafamo/demo/events"
)

func init() {
    createNotifier := userCreatedNotifier{
        adminEmail: "[email protected]",
        slackHook: "https://hooks.slack.com/services/...",
    }

	// 注册处理回调
    events.UserCreated.Register(createNotifier)
}

type userCreatedNotifier struct{
    adminEmail string
    slackHook string
}

func (u userCreatedNotifier) notifyAdmin(email string, time time.Time) {
    // ...
}

func (u userCreatedNotifier) sendToSlack(email string, time time.Time) {
    // ...
}

// 只要符合处理函数的签名就可以自定义处理函数的内部实现
func (u userCreatedNotifier) Handle(payload events.UserCreatedPayload) {
    // Do something with our payload
    u.notifyAdmin(payload.Email, payload.Time)
    u.sendToSlack(payload.Email, payload.Time)
}

触发事件处理函数

package auth

import (
    "time"

    "github.com/stephenafamo/demo/events"
    // Other imported packages
)

func CreateUser() {
    // ...
	// 解耦之后只需要传入需要的载荷即可,不用关心具体的处理逻辑
    events.UserCreated.Trigger(events.UserCreatedPayload{
        Email: "[email protected]",
        Time: time.Now(),
    })
    // ...
}