Go常见并发模式
Go常见并发模式
2019年9月22日
Go语言的并发模式是其最具特色的编程范式之一。本文总结了在Go开发中最常见的几种并发模式,包括请求-接收模式、单一状态者模式、生产-消费模式、Pipeline流水线模式等。这些模式能够帮助我们更好地组织和管理goroutine,使并发程序更加健壮和优雅。通过这些模式的学习,你将能够更好地掌握Go语言的并发特性,写出更高质量的并发代码。
极简的请求-接收模式
// 请求函数根据参数返回接收器
func Glob(pattern string) <-chan Item {
c := make(chan Item) // 1.初始化接收器
// 2.匿名 G 闭包执行程序
go func() {
defer close(c)
for name, item := range items {
if ok, _ := filepath.Match(pattern, name); !ok {
continue
}
c <- item
}
}()
// 3. 快速返回接收器以便后面的程序从chan中读
return c
}
func main() {
for item := range Glob("[ab]*") {
fmt.Println(item)
}
}单一状态者
一个 G 持有一个全局的状态,其他的 G 可以通过 chan 访问到 G 持有的状态
reads := make(chan *readOp)
writes := make(chan *writeOp)
// 状态持有者
go func() {
var state = make(map[int]int)
for {
select {
case read := <-reads:
// 返回结果
read.resp <- state[read.key]
case write := <-writes:
state[write.key] = write.val
// 返回结果
write.resp <- true
}
}
}()
// 请求者
go func() {
for {
read := &readOp{
key: rand.Intn(5),
resp: make(chan int)}
reads <- read
<-read.resp
}
}()
go func() {
for {
write := &writeOp{
key: rand.Intn(5),
val: rand.Intn(100),
resp: make(chan bool)}
writes <- write
<-write.resp
}
}()先生产,后消费
// 先生产,已知生产个数
chanOwner := func() <-chan int {
results := make(chan int, 5)
go func() {
defer close(results)
for i := 0; i <= 5; i++ {
results <- i
}
}()
return results
}
consumer := func(results <-chan int) {
for result := range results {
fmt.Printf("Received: %d\n", result)
}
fmt.Println("Done receiving!")
}
results := chanOwner()
// 消费
consumer(results)消费者通知生产者
newRandStream := func(done <-chan interface{}) <-chan int {
randStream := make(chan int)
go func() {
defer fmt.Println("newRandStream closure exited.")
defer close(randStream)
for {
select {
case randStream <- rand.Int():
case <-done:
// 停止生产,退出
return
}
}
}()
return randStream
}
done := make(chan interface{})
randStream := newRandStream(done)
fmt.Println("3 random ints:")
for i := 1; i <= 3; i++ {
fmt.Printf("%d: %d\n", i, <-randStream)
}
// 消费者主动要求生产停止
close(done)多 channel 合并使得安全退出(OR channel)
var or func(channels ...<-chan interface{}) <-chan interface{}
or = func(channels ...<-chan interface{}) <-chan interface{} { 1
switch len(channels) {
case 0: 2
return nil
case 1: 3
return channels[0]
}
orDone := make(chan interface{})
go func() {
defer close(orDone)
switch len(channels) {
case 2:
select {
case <-channels[0]:
case <-channels[1]:
}
default:
select {
case <-channels[0]:
case <-channels[1]:
case <-channels[2]:
case <-or(append(channels[3:], orDone)...):
}
}
}()
return orDone
}sig := func(after time.Duration) <-chan interface{}{
c := make(chan interface{})
go func() {
defer close(c)
time.Sleep(after)
}()
return c
}
start := time.Now()
<-or(
sig(2*time.Hour),
sig(5*time.Minute),
sig(1*time.Second),
sig(1*time.Hour),
sig(1*time.Minute),
)
fmt.Printf("done after %v", time.Since(start))只要有其中一个 chan 退出,其他的 chan 都退出
当需要将某一函数并发执行时,可以一律将返回值打包成结构体使用 chan 传递
例如:并发 http.Get
// 把返回值统一成 result 结构体
type Result struct {
Error error
Response *http.Response
}
checkStatus := func(done <-chan interface{}, urls ...string) <-chan Result {
results := make(chan Result)
go func() {
defer close(results)
for _, url := range urls {
var result Result
resp, err := http.Get(url)
result = Result{Error: err, Response: resp}
select {
case <-done:
return
case results <- result:
}
}
}()
return results
} done := make(chan interface{})
defer close(done)
urls := []string{"https://www.google.com", "https://badhost"}
for result := range checkStatus(done, urls...) {
if result.Error != nil {
fmt.Printf("error: %v", result.Error)
continue
}
fmt.Printf("Response: %v\n", result.Response.Status)
}同时,这个模式也启发我们在容错方面可以引入一个程序状态的观察者,它能够同时控制程序的走向,在上述例子中,我们默认那个观察者是 main
错误控制
done := make(chan interface{})
defer close(done)
errCount := 0
urls := []string{"a", "https://www.google.com", "b", "c", "d"}
for result := range checkStatus(done, urls...) {
if result.Error != nil {
fmt.Printf("error: %v\n", result.Error)
errCount++
if errCount >= 3 {
// 当错误过多的时候,跳出并终止请求 G
fmt.Println("Too many errors, breaking!")
break
}
continue
}
fmt.Printf("Response: %v\n", result.Response.Status)
}流水线 Pipeline 模式
将生产过程进行拆分,解耦,多个 G 之间互相协同,但每个 G 之间又相互独立 能进行拆分的前提:不同的阶段处理的都是同一类型的数据
generator := func(done <-chan interface{}, integers ...int) <-chan int {
intStream := make(chan int)
go func() {
defer close(intStream)
for _, i := range integers {
select {
case <-done:
return
case intStream <- i:
}
}
}()
return intStream
}
multiply := func(
done <-chan interface{},
intStream <-chan int,
multiplier int,
) <-chan int {
multipliedStream := make(chan int)
go func() {
defer close(multipliedStream)
for i := range intStream {
select {
case <-done:
return
case multipliedStream <- i*multiplier:
}
}
}()
return multipliedStream
}
add := func(
done <-chan interface{},
intStream <-chan int,
additive int,
) <-chan int {
addedStream := make(chan int)
go func() {
defer close(addedStream)
for i := range intStream {
select {
case <-done:
return
case addedStream <- i+additive:
}
}
}()
return addedStream
}
// 每个阶段都可以使用 done channel 安全退出 goroutine
done := make(chan interface{})
defer close(done)
// 每个阶段都返回要处理的数据类型的 channel
intStream := generator(done, 1, 2, 3, 4)
pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)
for v := range pipeline {
fmt.Println(v)
}多个 goroutine 通过多个同类型的 channel 和共享一个 done channel 联系在一起
生成器模式
无限重复生成
repeat := func(
done <-chan interface{},
values ...interface{},
) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
// 死循环不断重复发送 slice 的内容直到发送信号停止
for {
for _, v := range values {
select {
case <-done:
return
case valueStream <- v:
}
}
}
}()
return valueStream
}上述模式可以产生一个生成流,下面我们来从这个流中读取
从流中按需读取
take := func(
done <-chan interface{},
valueStream <-chan interface{},
num int,
) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- <- valueStream:
}
}
}()
return takeStream
}
done := make(chan interface{})
defer close(done)
// 从流中截取前 10 个元素然后退出
for num := range take(done, repeat(done, 1), 10) {
fmt.Printf("%v ", num)
}worker pool 协程池模式
worker := 8
c := make(chan int, l)
var wg sync.WaitGroup
wg.Add(worker)
for i:=0;i<worker;i++ {
go func(){
for row := range c {
for col := range m.m[row] {
fillPixel(m, row, col)
}
}
}()
}
for row := range m.m {
c <- row
}
close(c)pub-sub 发布订阅模式
每一个订阅者都会持有一个 chan 每个订阅者会有一个过滤器来进行内容的过滤 过滤器通常是一个 bool 函数值类型 发布者使用 map 存储订阅者 chan 到过滤函数的映射 每次发布的时候都遍历 map 然后使用对应的过滤器进行过滤后投递即可
事件驱动模式
常用于在一些操作执行的过程中触发其他操作例如通知 这些事件的处理操作需要事先注册,然后在合适的时间触发
package events
import (
"time"
)
var UserCreated userCreated
// 定义上下文荷载
type UserCreatedPayload struct {
Email string
Time time.Time
}
// 带荷载上下文的函数链
type userCreated struct {
handlers []interface{ Handle(UserCreatedPayload) }
}
// Register 将时间的处理函数添加进切片中
func (u *userCreated) Register(handler interface{ Handle(UserCreatedPayload) }) {
u.handlers = append(u.handlers, handler)
}
// Trigger 依次触发 goroutine 来执行带上下文的处理函数
func (u userCreated) Trigger(payload UserCreatedPayload) {
for _, handler := range u.handlers {
go handler.Handle(payload)
}
}package main
import (
"time"
"github.com/stephenafamo/demo/events"
)
func init() {
createNotifier := userCreatedNotifier{
adminEmail: "[email protected]",
slackHook: "https://hooks.slack.com/services/...",
}
// 注册处理回调
events.UserCreated.Register(createNotifier)
}
type userCreatedNotifier struct{
adminEmail string
slackHook string
}
func (u userCreatedNotifier) notifyAdmin(email string, time time.Time) {
// ...
}
func (u userCreatedNotifier) sendToSlack(email string, time time.Time) {
// ...
}
// 只要符合处理函数的签名就可以自定义处理函数的内部实现
func (u userCreatedNotifier) Handle(payload events.UserCreatedPayload) {
// Do something with our payload
u.notifyAdmin(payload.Email, payload.Time)
u.sendToSlack(payload.Email, payload.Time)
}触发事件处理函数
package auth
import (
"time"
"github.com/stephenafamo/demo/events"
// Other imported packages
)
func CreateUser() {
// ...
// 解耦之后只需要传入需要的载荷即可,不用关心具体的处理逻辑
events.UserCreated.Trigger(events.UserCreatedPayload{
Email: "[email protected]",
Time: time.Now(),
})
// ...
}