Stopping it all in Go
- Tuesday July 15 2014
- golang goroutines channels
Goroutines in Go are great. They are so great, there is a language feature dedicated to creating them. The following snippet starts awesomeWorkerFunction
as a separate goroutine.
go awesomeWorkerFunction()
Goroutines aren't threads, but they are separate paths of execution. They have a stack just like a real thread.
Starting a goroutine is easy, but there is no way in the language to determine when a goroutine stops running. Also, you cannot get the return value of a goroutine even if it returns one.
This is intentional. This is not a problem for everyone, nor is the solution the same for everyone.
I commonly run into the same pain points around goroutines.
- Signaling for a goroutine to shutdown.
- Waiting for a goroutine to shutdown.
- Flushing a communications channel before shutting a goroutine down.
Signaling to shutdown
Launching a function to receive and process from a channel is a common idiom. It winds up looking something like this.
func worker(messages <-chan int) { for { select { case message := <- messages: //Do something useful with message here } } }
The goroutine receives from the channel forever. This is great unless you want to perform an orderly shutdown.
There are many ways to signal to a goroutine, but adding a second channel that has no meaningful values passed over it is my solution.
func worker(messages <-chan int, shutdown <- chan int) { for { select { case message := <- messages: //Do something useful with message here case _ = <- shutdown: //We're done! return } } }
When the channel is closed, the goroutine enters the case that tries to receive from it and exits. The reason why this is advantageous is that if you have a number of goroutines all receiving from that channel just closing it causes them all to stop. The actual type of the channel does not matter since no values are passed over it.
Waiting for shutdown
Being able to signal a goroutine to shutdown is fairly useless if you can't wait for that to actually happen. If the main function exits before the remainder of goroutines complete they simply stop abruptly.
Solving this problem is relatively easy. The builtin package sync
includes a WaitGroup
type that acts a counter. A goroutine can wait on this goroutine to return to zero before continuing.
func worker(messages <-chan int, shutdown <-chan int, wg *sync.WaitGroup) { defer wg.Done() for { select { case message := <- messages: //Do something useful with message here case _ = <- shutdown: //We're done! return } } }
Calling the Done()
receiver on the WaitGroup
type decrements the counter. By combining this with the defer
keyword we can be sure that the counter is decremented after this goroutine exits. Immediately before launching the goroutine the caller calls Add(1)
to increment the counter. It looks something like this.
wg := &sync.WaitGroup{} wg.Add(1) go worker(messages,shutdown,wg) close(shutdown) //Signal to shutdown wg.Wait() //Wait for the goroutine to shutdown
The WaitGroup
should always be passed by-pointer because copying it is not safe.
This approach is excellent because if a goroutine needs to launch other goroutines it can simply call wg.Add(1)
and pass the *WaitGroup
and shutdown
channel to it as well. In this way, whenever the shutdown
channel is closed, it shuts down all the goroutines. The main
goroutine doesn't even have to be aware of where each goroutine is started from for this to work.
Flushing channels
If you used buffered channels to communicate amongst goroutines, you likely want to flush those channels before the goroutine exits. Otherwise, potentially important data could be lost. This is easily accomplished by having the goroutine check if the channel has been closed and then exiting. The problem with this is that writing to a closed channel causes the program to panic.
The simplest program might have two goroutines communicating across a single channel.
func producer(messages chan<- int, shutdown <-chan int, wg *sync.WaitGroup){ defer wg.Done() for { select { case _ = <-shutdown: return case time.After(time.Second): messages <- 0 } } } func consumer(messages <-chan int, shutdown <-chan int, wg *sync.WaitGroup){ defer wg.Done() for { select { case _ = <-shutdown: return case msg, ok := <- messages: if !ok { return //Channel closed, we're done } //Do something with message } }
Closing the shutdown
channel would cause both goroutines to exit. But if messages
is buffered values could be left in the channel that would be lost.
This problem can be solved by creating two separate shutdown
channels and instances of *WaitGroup
. When launching a producer goroutine it is passed the producer_shutdown
and producer_wg
instances. When a consumer is launched it is passed the consumer_shutdown
and consumer_wg
instances. To shutdown without message loss the producer_shutdown
channel is closed, then Wait()
is called on producer_wg
. The messages
channel is then closed. Since all producers are guaranteed to have stopped no panic occurs. Just call Wait()
on consumer_wg
. The consumers process all messages and then exit since messages
has been closed. Of course, you can still simply close consumer_shutdown
if you want to perform an immediate exit.
Here is the same idea laid out in code.
messages := make(chan int,16) producer_wg := &sync.WaitGroup{} producer_shutdown := make(chan int) producer_wg.Add(1) go producer(messages,producer_shutdown,producer_wg) consumer_wg := &sync.WaitGroup{} consumer_shutdown := make(chan int) consumer_wg.Add(1) go consumer(messages,consumer_shutdown,consumer_wg) close(producer_shutdown) //Shutdown producers producer_wg.Wait() //Wait for producers close(messages) //Close messages without causing a panic consumer_wg.Wait() //Wait for consumers to finish processing messages and exit
Conclusion
I hope this helps out others. This definitely doesn't cover all cases, but it lays down a good set of ideas that can be extended where needed.