channel 与 Select
make 操作返回 chan 引用,实际的结构体被分配在堆中
缓冲 chan
数据结构:
- 基于数组的循环队列 ring buffer 用来暂存数据
- 基于链表的单向队列sudog,用于保存阻塞在此 channel 上的 goroutine
理想情况下的收发过程:
发送方拿到互斥锁
将数据拷贝并入队
释放锁
接收方拿到锁
出队将数据取出
释放锁
如果发送方的内容占满了缓冲区 发送方会被阻塞直到接收方接收,发送方会在满的时候通过 gopark() 告知运行时将它休眠,新的元素和发送方的指针会保存在 channel 的结构体内部,当接收方出队取走元素的时候就可以自行将下一个元素入队(避免恢复运行后再多抢一次锁),并在此时通过 goready() 通知运行时发送方可以从阻塞队列放入就绪队列了,然后运行时再进行调度
如果接收方尝试从空的 channel 中接收 同样,会转变成阻塞状态,但不同的是,接收方即将接收元素的内存指针也被存入了channel,当发送方发送的时候,它可以无锁地往接收方的栈内存中直接写入元素
试图从未被初始化(值为nil)的通道接收元素值会造成永久阻塞!
无缓冲 channel
接收者优先,发送者总是可以无锁将元素直接写入接收者的栈空间中 发送者优先,发送内容和休眠goroutine指针总是会存放在channel结构体中
值得注意的是:发送方向通道发送的值会被复制,接收方接收的总是该值得副本,而不是该值本身。传递的值至少会被复制一次,至多会被复制两次
Select 实现
根据代码编译信息创建 select 对象,预分配好存放case的内存,运行时存放所有的 channel 到 scase 集合中,然后获得所有case 中 channel 的锁
// 打乱遍历的顺序
for i := 1; i < ncases; i++ {
j := fastrandn(uint32(i + 1))
pollorder[i] = pollorder[j]
pollorder[j] = uint16(i)
}// 对 ncases 中的 hchan 进行堆排序
for i := 0; i < ncases; i++ {
j := i
// Start with the pollorder to permute cases on the same channel.
c := scases[pollorder[i]].c
for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
k := (j - 1) / 2
lockorder[j] = lockorder[k]
j = k
}
lockorder[j] = pollorder[i]
}
...随机遍历 channel ,查看是否有 case 已经准备好接收或者发送,如果都没有,则尝试运行 default 分支,如果没有 default 分支,则将自身G封装好并分别挂入每个 channel 的读写者队列中,G进入阻塞队列,等待被 channel 中的任意一个唤醒,被唤醒后找到对应的 case (CAS 操作确保只选择其中一个case),在执行之前放弃在其他 channel 中的等待,最后进入某个分支运行,然后 select 对象就可以等待释放了
实际使用中,常常将 select 放在单独的 G 中运行防止阻塞程序
定时器与 chan
对接收操作进行超时设定
timeout := time.Millisecond * 500
var timer *time.Timer
for {
// 首次使用进行初始化,否则使用 reset 对定时器对象进行复用
if timer == nil {
timer = time.NewTimer(timeout)
} else {
timer.Reset(timeout)
}
select {
case e, ok := <-intChan:
if !ok {
return
}
case <-timer.C:
fmt.Println("Timeout!")
}
}如果不想使用通道来查看到期,可以使用 time.AfterFunc 传入相对到期时间以及到期时需要执行的函数
断续器与 chan
通知方式与定时器类似,但是会在到期之后立即进入下一个周期等待再次到期直到被停止,常被作为定时任务的触发器来使用
如果不想停止,可以使用 time.Tick(..) 直接通过时间间隔获取只读到期信号通道