Stopping it all in Go

Goroutines in Go are great. They are so great, there is a language feature dedicated to creating them. The following snippet starts awesomeWorkerFunction as a separate goroutine.

go awesomeWorkerFunction()

Goroutines aren't threads, but they are separate paths of execution. They have a stack just like a real thread.

Starting a goroutine is easy, but there is no way in the language to determine when a goroutine stops running. Also, you cannot get the return value of a goroutine even if it returns one.

This is intentional. This is not a problem for everyone, nor is the solution the same for everyone.

I commonly run into the same pain points around goroutines.

  1. Signaling for a goroutine to shutdown.
  2. Waiting for a goroutine to shutdown.
  3. Flushing a communications channel before shutting a goroutine down.

Signaling to shutdown

Launching a function to receive and process from a channel is a common idiom. It winds up looking something like this.

func worker(messages <-chan int) {
    for {
        select {
            case message := <- messages:
            //Do something useful with message here
        }
    }
}

The goroutine receives from the channel forever. This is great unless you want to perform an orderly shutdown.

There are many ways to signal to a goroutine, but adding a second channel that has no meaningful values passed over it is my solution.

func worker(messages <-chan int, shutdown <- chan int) {
    for {
        select {
            case message := <- messages:
            //Do something useful with message here
            case _ = <- shutdown:
            //We're done!
            return
        }
    }
}

When the channel is closed, the goroutine enters the case that tries to receive from it and exits. The reason why this is advantageous is that if you have a number of goroutines all receiving from that channel just closing it causes them all to stop. The actual type of the channel does not matter since no values are passed over it.

Waiting for shutdown

Being able to signal a goroutine to shutdown is fairly useless if you can't wait for that to actually happen. If the main function exits before the remainder of goroutines complete they simply stop abruptly.

Solving this problem is relatively easy. The builtin package sync includes a WaitGroup type that acts a counter. A goroutine can wait on this goroutine to return to zero before continuing.

func worker(messages <-chan int, shutdown <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        select {
            case message := <- messages:
            //Do something useful with message here
            case _ = <- shutdown:
            //We're done!
            return
        }
    }
}

Calling the Done() receiver on the WaitGroup type decrements the counter. By combining this with the defer keyword we can be sure that the counter is decremented after this goroutine exits. Immediately before launching the goroutine the caller calls Add(1) to increment the counter. It looks something like this.

wg := &sync.WaitGroup{}
wg.Add(1)
go worker(messages,shutdown,wg)
close(shutdown) //Signal to shutdown
wg.Wait() //Wait for the goroutine to shutdown

The WaitGroup should always be passed by-pointer because copying it is not safe.

This approach is excellent because if a goroutine needs to launch other goroutines it can simply call wg.Add(1) and pass the *WaitGroup and shutdown channel to it as well. In this way, whenever the shutdown channel is closed, it shuts down all the goroutines. The main goroutine doesn't even have to be aware of where each goroutine is started from for this to work.

Flushing channels

If you used buffered channels to communicate amongst goroutines, you likely want to flush those channels before the goroutine exits. Otherwise, potentially important data could be lost. This is easily accomplished by having the goroutine check if the channel has been closed and then exiting. The problem with this is that writing to a closed channel causes the program to panic.

The simplest program might have two goroutines communicating across a single channel.

func producer(messages chan<- int, 
shutdown <-chan int,
wg *sync.WaitGroup){
    defer wg.Done()
    for {
        select {
        case _ = <-shutdown:
        return
        case time.After(time.Second):
        messages <- 0
        }
    }
}

func consumer(messages <-chan int,
shutdown <-chan int,
wg *sync.WaitGroup){
    defer wg.Done()
    for {
        select {
        case _ = <-shutdown:
        return
        case msg, ok := <- messages:
        if !ok {
            return //Channel closed, we're done
        }
        //Do something with message

    }
}

Closing the shutdown channel would cause both goroutines to exit. But if messages is buffered values could be left in the channel that would be lost.

This problem can be solved by creating two separate shutdown channels and instances of *WaitGroup. When launching a producer goroutine it is passed the producer_shutdown and producer_wg instances. When a consumer is launched it is passed the consumer_shutdown and consumer_wg instances. To shutdown without message loss the producer_shutdown channel is closed, then Wait() is called on producer_wg. The messages channel is then closed. Since all producers are guaranteed to have stopped no panic occurs. Just call Wait() on consumer_wg. The consumers process all messages and then exit since messages has been closed. Of course, you can still simply close consumer_shutdown if you want to perform an immediate exit.

Here is the same idea laid out in code.

messages := make(chan int,16)

producer_wg := &sync.WaitGroup{}
producer_shutdown := make(chan int)

producer_wg.Add(1)
go producer(messages,producer_shutdown,producer_wg)

consumer_wg := &sync.WaitGroup{}
consumer_shutdown := make(chan int)

consumer_wg.Add(1)
go consumer(messages,consumer_shutdown,consumer_wg)

close(producer_shutdown) //Shutdown producers
producer_wg.Wait() //Wait for producers

close(messages) //Close messages without causing a panic

consumer_wg.Wait() //Wait for consumers to finish processing messages and exit

Conclusion

I hope this helps out others. This definitely doesn't cover all cases, but it lays down a good set of ideas that can be extended where needed.


Copyright Eric Urban 2014, or the respective entity where indicated