BUG Caused by Unbuffered Channels
Recently I was working on a toy-level MapReduce implementation that contains a lot of concurrent programming code. A slight mistake could cause unknown blocking states during Map or Reduce operations, preventing the entire system from processing all tasks as planned and writing the output results to files. After extensive debugging, I discovered that the root cause was unfamiliarity with basic usage of unbuffered channels. When dealing with multiple channel collaborations, it’s inevitable to run into pitfalls.
Let’s start with the key point:
Unbuffered channels require both sender and receiver to be ready simultaneously to perform send and receive operations
Show me the code
During the fault tolerance phase of Tasks, we need to re-execute those failed tasks. My approach is to save the execution results of each task in a boolean array, where the index corresponds to each task and the element indicates whether the current Task is completed.
for t, failed := range taskFailed {
if failed == true {
wg.Add(1)
addr := <-registerChan
go func(i int, workerAddr string, p jobPhase) {
res := call(workerAddr, "Worker.DoTask", DoTaskArgs{
JobName: jobName,
File: mapFiles[i],
Phase: p,
TaskNumber: i,
NumOtherPhase: n_other,
}, nil)
if res == true {
registerChan <- workerAddr
wg.Done()
}
}(t, addr, phase)
}
}
wg.Wait()We traversed the task status array, identified those failed tasks, let idle workers execute them, and then simply waited using WaitGroup.
However, upon careful observation and connecting with the previously mentioned requirement that unbuffered channels need both sides ready to perform operations, this implies that if only one side is performing an operation while the other side is not online, the operation will block the goroutine.
Looking back at these two lines of code:
registerChan <- workerAddr
wg.Done()Note that there may not necessarily be a corresponding registerChan receiver at this moment, because we only reallocate Workers for failed Tasks. Therefore, if only one Task fails, wg.Wait() will remain blocked forever. So we should swap the order:
wg.Done()
registerChan <- workerAddrThis way, the WaitGroup won’t remain blocked indefinitely.
Views