Channels and Select

Channels and Select

The make operation returns a chan reference. The actual struct lives on the heap.

Buffered Chans

Data structures:

  • Ring buffer based on array to temporarily store data
  • Singly linked list of sudog queues to hold goroutines blocked on this channel

The ideal send/receive process:

  1. Sender grabs mutex

  2. Copy data and enqueue

  3. Release lock

  4. Receiver grabs lock

  5. Dequeue and take data

  6. Release lock

  7. If sender fills the buffer The sender blocks until receiver takes data. The sender calls gopark() to tell the runtime to sleep it. The new element and sender pointer get stored inside the channel struct. When the receiver dequeues an element, it can enqueue the next element itself (avoiding another lock grab after waking up). It then calls goready() to tell the runtime that the sender can move from the blocked queue to the ready queue for scheduling.

  8. If receiver tries to read from empty channel It becomes blocked too, but the receiver’s memory pointer also gets stored in the channel. When the sender sends, it can write directly to the receiver’s stack memory without locks.

Trying to receive from uninitialized (nil) channels causes permanent blocking!

Unbuffered Channels

Receivers go first. Senders always write directly to receiver stack space without locks.

Senders go first. Sent content and sleeping goroutine pointers always live in the channel struct.

Note: Values sent to channels get copied. Receivers always get copies, not the original values. Values get copied at least once, at most twice.

Select Implementation

The compiler creates select objects with pre-allocated memory for cases. At runtime, all channels go into scase collections. Then we get locks for all channels in the cases.

// Shuffle traversal order
for i := 1; i < ncases; i++ {
	j := fastrandn(uint32(i + 1))
	pollorder[i] = pollorder[j]
	pollorder[j] = uint16(i)
}
// Heap sort on hchan in ncases
for i := 0; i < ncases; i++ {
	j := i
	// Start with pollorder to permute cases on same channel
	c := scases[pollorder[i]].c
	for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
		k := (j - 1) / 2
		lockorder[j] = lockorder[k]
		j = k
	}
	lockorder[j] = pollorder[i]
}
...

We randomly traverse channels to see if any case is ready for send or receive. If none are ready, we try running the default branch. If no default exists, we wrap our G and hang it in each channel’s reader/writer queues. The G enters the blocked queue waiting for any channel to wake it up. After waking, we find the corresponding case (CAS operations ensure only one case gets chosen), abandon waits on other channels before execution, then run in some branch. Finally the select object waits for release.

In practice, people often run select in separate Gs to prevent blocking the program.

Timers and Chans

Set timeout for receive operations.

timeout := time.Millisecond * 500
var timer *time.Timer
for {
	// Initialize first time, otherwise reuse timer with Reset
	if timer == nil {
		timer = time.NewTimer(timeout)
	} else {
		timer.Reset(timeout)
	}

	select {
	case e, ok := <-intChan:
		if !ok {
			return
		}
	case <-timer.C:
		fmt.Println("Timeout!")
	}
}

If you don’t want to use channels to check expiration, use time.AfterFunc with relative expiration time and function to execute when expired.

Tickers and Chans

Notifications work like timers, but tickers immediately enter the next cycle after expiration until stopped. They’re commonly used as triggers for scheduled tasks.

If you don’t want to stop, use time.Tick(..) to get a read-only expiration signal channel directly from the interval.