Sharing State By Communicating

Now that I've worked with Golang for several months, I've seen several patterns arise with respect to concurrency.

Here is a quick introduction for those not familiar with Go's mechanisms for concurrency. Go doesn't support threads for concurrency, such as those you can create using pthread_create. Go does support concurrent execution in the form of Goroutines. Goroutines do not map directly to operating system threads, but may run concurrently depending on your environment. To communicate between goroutines, you can use channels. Channels are a communication primitive that act as a typed, synchronized queue.

Goroutines are a first-class concept in the language and are relatively lightweight compared to operating system threads. For server applications, it is typical to use a single goroutine for each client connection. Just like typical multithreaded programming, concurrent access to data structures from multiple goroutines is not necessarily safe. Once you've opened a separate goroutine, it can service the needs of a specific client. Eventually, the each client connection may want to communicate some data back to a single thread. In some cases, this just may be monitoring information, such as the number of bytes sent and received by the conncetion. In other cases, you may have a global state that is shared amongst clients.

The zero'th level of sharing state

The following contrived example presents a server that allows clients to increment a global counter.

package main

import "net"
import "strconv"
import "bufio"
import "sync"
import "fmt"

func main() {
    listener, err := net.Listen("tcp", "0.0.0.0:3333")
    if err != nil {
        return
    }
    defer listener.Close()
    var value int
    valueLock := &sync.Mutex{}

    connectionRoutine := func(conn net.Conn) {
        defer conn.Close()
        reader := bufio.NewReader(conn)
        line, err := reader.ReadString('\n')
        if err != nil {
            return
        }
        line = line[:len(line)-1]
        incr, err := strconv.Atoi(line)
        if err != nil {
            fmt.Fprintf(conn, "The value %q is not a valid number\n", line)
            return
        }

        valueLock.Lock()
        defer valueLock.Unlock()
        value += incr
        fmt.Fprintf(conn, "After incrementing, new value is %d\n", value)

    }

    for {
        conn, err := listener.Accept()
        if err != nil {
            break
        }
        go connectionRoutine(conn)

    }
}

This example listens for TCP connections on port 3333. Each new connection is handled by an independent goroutine executing the function connectionRoutine. You can use the netcat command line utility to connect to it and send it a number. It will increment the counter and display the result to the user. Here's an example session.

ericu@eric-phenom-linux:~/sharing_state_by_communicating/src/github.com/hydrogen18/ssbc$ nc localhost 3333
1234
After incrementing, new value is 1234
ericu@eric-phenom-linux:~/sharing_state_by_communicating/src/github.com/hydrogen18/ssbc$ 

In order to make access to the value variable goroutine safe, a mutex is used. In this case is an instance of sync.Mutex from the go standard libraries. This works, but requires that each connection hold the mutex in order to add to the value. This sort of programming is common in programming languages where mutexes are the most commonly available synchronization primitive.

The first level of sharing

Go has channels as a complete goroutine safe method of message passing. Channels can be used to reimplement the previous example without requiring the mutex.

package main

import "net"
import "strconv"
import "bufio"
import "fmt"

type increment struct {
    amount   int
    response chan<- int
}

func main() {
    listener, err := net.Listen("tcp", "0.0.0.0:3333")
    if err != nil {
        return
    }
    defer listener.Close()
    var value int

    connectionRoutine := func(conn net.Conn, outgoing chan<- increment) {
        defer conn.Close()
        reader := bufio.NewReader(conn)
        line, err := reader.ReadString('\n')
        if err != nil {
            return
        }
        line = line[:len(line)-1]
        incr, err := strconv.Atoi(line)
        if err != nil {
            fmt.Fprintf(conn, "The value %q is not a valid number\n", line)
            return
        }
        incoming := make(chan int, 1)
        outgoing <- increment{incr, incoming}

        fmt.Fprintf(conn, "After incrementing, new value is %d\n", <-incoming)

    }

    increments := make(chan increment)

    go func() {
        for {
            conn, err := listener.Accept()
            if err != nil {
                break
            }

            go connectionRoutine(conn, increments)
        }
    }()

    for {
        incr := <-increments
        value += incr.amount
        incr.response <- value
    }
}

As in the first example, TCP connections are listened for on port 3333. Each connection is also handled by an independent goroutine. But instead of using a mutex to access the variable value each goroutine sends an incoming struct into a single common channel. The struct has the amount to increment value by and a channel that takes type int. The main thread is waiting on any messages from the channel that the amount to increment by is sent across. Each time a message is received, value is incremented and a copy of the new value is sent back on the other channel. In this way, the main goroutine is responsible for incrementing value on behalf of all the other goroutines. As an added bonus, when the response channel is created using make it has a buffer size of one. This means that the main thread does not block when sending the result since the channel is guaranteed to have space for it.

This example demonstrates idiomatic shared state in Go.

Adding complexity

No matter well planned a project is, complexity always getting piled when a long enough time frame is considered. Let's expand the previous example by adding a common requirement: logging each connection. This is straightforward in Go. By adding a channel that accepts type net.Conn each new connection can be placed on the channel. The main goroutine waits for new connections and logs the remote address of each one to standard output.

Here is the previous example refactored to log each connection.

package main

import "net"
import "strconv"
import "bufio"
import "fmt"

type increment struct {
    amount   int
    response chan<- int
}

func main() {
    listener, err := net.Listen("tcp", "0.0.0.0:3333")
    if err != nil {
        return
    }
    defer listener.Close()
    var value int

    connectionRoutine := func(conn net.Conn, outgoing chan<- increment) {
        defer conn.Close()
        reader := bufio.NewReader(conn)
        line, err := reader.ReadString('\n')
        if err != nil {
            return
        }
        line = line[:len(line)-1]
        incr, err := strconv.Atoi(line)
        if err != nil {
            fmt.Fprintf(conn, "The value %q is not a valid number\n", line)
            return
        }
        incoming := make(chan int, 1)
        outgoing <- increment{incr, incoming}

        fmt.Fprintf(conn, "After incrementing, new value is %d\n", <-incoming)

    }

    increments := make(chan increment)
    conns := make(chan net.Conn, 16)

    go func() {
        for {
            conn, err := listener.Accept()
            if err != nil {
                break
            }
            conns <- conn
            go connectionRoutine(conn, increments)
        }
    }()

    for {
        select {
        case incr := <-increments:
            value += incr.amount
            incr.response <- value

        case conn := <-conns:
            fmt.Printf("New connection from %v\n", conn.RemoteAddr())
        }
    }
}

The additional channel is called conns. Before the goroutine is launched, the connection is placed on this channel. To mitigate a possible bottleneck here, a buffer size of sixteen is used on the channel. The main goroutine waits for new connections on this channel and uses fmt.Printf to log the remote connection address of each connection. Here is what the server output looks like in action.

ericu@eric-phenom-linux:~/sharing_state_by_communicating$ $GOPATH/bin/firstBNew connection from 127.0.0.1:46709
New connection from 127.0.0.1:46715
New connection from 127.0.0.1:46721

The main goroutine is able to both log the new connections and handling incrementing by use of the select keyword in Go. The select keyword allows a goroutine to wait for a message on one or more channels.

This pattern is particularly elegant. The only information that needs to be shared is the net.Conn instance. Whenever an existing type does not contain sufficient information, a new type can be defined and passed across the channel. This is exactly the reason for the existence of the increment type in the example. This type would not need to exist if the values did not need to be passed across a channel. As more complexity is added, the number of types dedicated just to communication increases. The select statement in a goroutine grows in size as well. This actually turns out to be a boon to understanding code: you can start at a select statement and work out what the code is responsible for. The size of a select statement can be mitigated by refactoring into multiple goroutines where possible.

The inescapable fact is that each producer into a channel of structs must have the correct logic to initialize the struct correctly. For the case of a single producer and a single consumer, creating the struct with exported fields is sufficient. The initialization of the struct is done in one place and it is used in another. One of the great advantages to channels in Go is that multiple producers and consumers can all access one channel simultaneously. This means that each producer winds up with their own copy of the logic to initialize the struct. If the logic is identical, it can be refactored into a single function. The main problem with this approach is that with N producers, you likely have N source files that are responsible for initializing each struct.

If a program has N goroutines that all passing information over the same channel, it is likely that all the goroutines have exactly the same need. That is to say that the goroutine interpreting the data in the struct does not particularly care about the origin of the data. In that goroutine, the operations to be performed can be wrapped up into a function. The function could be protected with a mutex and shared amongst goroutines, but this is particularly ineffecient. Accepting these facts, this leads me to the conclusion that the main reason for the struct passing pattern is that a single actor needs to maintain a coherent view of the rest of the actors in the program. In other words, the objective is to have a series of operations invoked in a predictable, linear fashion.

Go has no semantics for expressing the invocation of partial invocation of a function. The Python module functools implement something like this with functools.partial. But what Go does have is functions as a first class feature of the language. In this way, any invocation of a function can be expressed as a function with no arguments and no return value. Consider the following example.

invocation := func(){
    fmt.Printf("foo %d",5) 
}
invocation()

Here, the invocation of fmt.Printf is wrapped in a function definition and is not actually called until invocation(). I call this pattern deferred invocation.

While Go does not explicitly support partial invocation, you can create bound functions against a type

type struct Example {
    SomeValue int
}

func (ex *Example) someFunction() int {
    return ex.SomeValue
}

If you have an instance of the Example structure named foo you can use the expression foo.someFunction to get a function that is bound. It still has to be called with any applicable arguments other than the receiver. This means you don't need to pass around the pointer to foo yourself.

The second level of sharing

Using the pattern of deferred invocation, we can refactor the example server program. First we'll make a Server type that incorporates all the logic around the initialization and use of the increment type. Then, we can modify the connection routine to no longer need to know about channels.

package main

import "net"
import "strconv"
import "bufio"
import "fmt"

func connectionRoutine(conn net.Conn, increment func(amount int) int) {
    defer conn.Close()
    reader := bufio.NewReader(conn)
    line, err := reader.ReadString('\n')
    if err != nil {
        return
    }
    line = line[:len(line)-1]
    incr, err := strconv.Atoi(line)
    if err != nil {
        fmt.Fprintf(conn, "The value %q is not a valid number\n", line)
        return
    }

    fmt.Fprintf(conn, "After incrementing, new value is %d\n", increment(incr))
}

type increment struct {
    amount   int
    response chan<- int
}

type Server struct {
    listener   net.Listener
    conns      chan net.Conn
    increments chan increment
}

func (s *Server) Accept() {
    for {
        conn, err := s.listener.Accept()
        if err != nil {
            break
        }
        s.conns <- conn
        go connectionRoutine(conn, s.Increment)
    }
}

func (s *Server) Increment(amount int) int {
    response := make(chan int, 1)
    increment := increment{}
    increment.amount = amount
    increment.response = response
    s.increments <- increment
    return <-response
}

func main() {
    var err error
    s := Server{}
    s.listener, err = net.Listen("tcp", "0.0.0.0:3333")
    if err != nil {
        return
    }
    defer s.listener.Close()
    var value int

    s.increments = make(chan increment)
    s.conns = make(chan net.Conn, 16)

    go s.Accept()

    for {
        select {
        case incr := <-s.increments:
            value += incr.amount
            incr.response <- value

        case conn := <-s.conns:
            fmt.Printf("New connection from %v\n", conn.RemoteAddr())
        }
    }
}

You should notice that this hasn't removed the need for the use of a request channel, individual response channels, and the increment type. However, this does allow all that logic to be placed in one location. All this is achieved with no loss of functionality.

From the point of connectionRoutine, it only needs to know about a function that takes an integer and returns an integer. It appears as a regular blocking call in the flow of that goroutine.

Conclusion

I feel that passing around functions is a decent approach to sharing state between goroutines. This obviously won't apply for all use cases. I view Go's channels as more of a means to an end than the final abstraction for all communication. For cases requiring a response from another goroutine, it may not make any sense to do anything but block until the result is available. This fits into my example. Even for cases where no response is expected, abstracting the actual placement onto the channel can make code easier to read and easier to test. Passing in channels whenever a goroutine is launched can of course achieve the same thing. But doing so quickly becomes cumbersome as it may not be immediately obvious what each channel is used for. It also leads to a large amount of boilerplate to wire together goroutines.

Source code

All the source code is available on github.


© Eric Urban 2014