Sharing State By Communicating
- Wednesday June 04 2014
- golang concurrency
Now that I've worked with Golang for several months, I've seen several patterns arise with respect to concurrency.
Here is a quick introduction for those not familiar with Go's mechanisms for concurrency. Go doesn't support threads for concurrency, such as those you can create using pthread_create
. Go does support concurrent execution in the form of Goroutines. Goroutines do not map directly to operating system threads, but may run concurrently depending on your environment. To communicate between goroutines, you can use channels. Channels are a communication primitive that act as a typed, synchronized queue.
Goroutines are a first-class concept in the language and are relatively lightweight compared to operating system threads. For server applications, it is typical to use a single goroutine for each client connection. Just like typical multithreaded programming, concurrent access to data structures from multiple goroutines is not necessarily safe. Once you've opened a separate goroutine, it can service the needs of a specific client. Eventually, the each client connection may want to communicate some data back to a single thread. In some cases, this just may be monitoring information, such as the number of bytes sent and received by the conncetion. In other cases, you may have a global state that is shared amongst clients.
The zero'th level of sharing state
The following contrived example presents a server that allows clients to increment a global counter.
package main import "net" import "strconv" import "bufio" import "sync" import "fmt" func main() { listener, err := net.Listen("tcp", "0.0.0.0:3333") if err != nil { return } defer listener.Close() var value int valueLock := &sync.Mutex{} connectionRoutine := func(conn net.Conn) { defer conn.Close() reader := bufio.NewReader(conn) line, err := reader.ReadString('\n') if err != nil { return } line = line[:len(line)-1] incr, err := strconv.Atoi(line) if err != nil { fmt.Fprintf(conn, "The value %q is not a valid number\n", line) return } valueLock.Lock() defer valueLock.Unlock() value += incr fmt.Fprintf(conn, "After incrementing, new value is %d\n", value) } for { conn, err := listener.Accept() if err != nil { break } go connectionRoutine(conn) } }
This example listens for TCP connections on port 3333. Each new connection is handled by an independent goroutine executing the function connectionRoutine
. You can use the netcat command line utility to connect to it and send it a number. It will increment the counter and display the result to the user. Here's an example session.
ericu@eric-phenom-linux:~/sharing_state_by_communicating/src/github.com/hydrogen18/ssbc$ nc localhost 3333 1234 After incrementing, new value is 1234 ericu@eric-phenom-linux:~/sharing_state_by_communicating/src/github.com/hydrogen18/ssbc$
In order to make access to the value
variable goroutine safe, a mutex is used. In this case is an instance of sync.Mutex
from the go standard libraries. This works, but requires that each connection hold the mutex in order to add to the value. This sort of programming is common in programming languages where mutexes are the most commonly available synchronization primitive.
The first level of sharing
Go has channels as a complete goroutine safe method of message passing. Channels can be used to reimplement the previous example without requiring the mutex.
package main import "net" import "strconv" import "bufio" import "fmt" type increment struct { amount int response chan<- int } func main() { listener, err := net.Listen("tcp", "0.0.0.0:3333") if err != nil { return } defer listener.Close() var value int connectionRoutine := func(conn net.Conn, outgoing chan<- increment) { defer conn.Close() reader := bufio.NewReader(conn) line, err := reader.ReadString('\n') if err != nil { return } line = line[:len(line)-1] incr, err := strconv.Atoi(line) if err != nil { fmt.Fprintf(conn, "The value %q is not a valid number\n", line) return } incoming := make(chan int, 1) outgoing <- increment{incr, incoming} fmt.Fprintf(conn, "After incrementing, new value is %d\n", <-incoming) } increments := make(chan increment) go func() { for { conn, err := listener.Accept() if err != nil { break } go connectionRoutine(conn, increments) } }() for { incr := <-increments value += incr.amount incr.response <- value } }
As in the first example, TCP connections are listened for on port 3333. Each connection is also handled by an independent goroutine. But instead of using a mutex to access the variable value
each goroutine sends an incoming
struct into a single common channel. The struct has the amount to increment value
by and a channel that takes type int
. The main thread is waiting on any messages from the channel that the amount to increment by is sent across. Each time a message is received, value
is incremented and a copy of the new value is sent back on the other channel. In this way, the main goroutine is responsible for incrementing value
on behalf of all the other goroutines. As an added bonus, when the response channel is created using make
it has a buffer size of one. This means that the main thread does not block when sending the result since the channel is guaranteed to have space for it.
This example demonstrates idiomatic shared state in Go.
Adding complexity
No matter well planned a project is, complexity always getting piled when a long enough time frame is considered. Let's expand the previous example by adding a common requirement: logging each connection. This is straightforward in Go. By adding a channel that accepts type net.Conn
each new connection can be placed on the channel. The main goroutine waits for new connections and logs the remote address of each one to standard output.
Here is the previous example refactored to log each connection.
package main import "net" import "strconv" import "bufio" import "fmt" type increment struct { amount int response chan<- int } func main() { listener, err := net.Listen("tcp", "0.0.0.0:3333") if err != nil { return } defer listener.Close() var value int connectionRoutine := func(conn net.Conn, outgoing chan<- increment) { defer conn.Close() reader := bufio.NewReader(conn) line, err := reader.ReadString('\n') if err != nil { return } line = line[:len(line)-1] incr, err := strconv.Atoi(line) if err != nil { fmt.Fprintf(conn, "The value %q is not a valid number\n", line) return } incoming := make(chan int, 1) outgoing <- increment{incr, incoming} fmt.Fprintf(conn, "After incrementing, new value is %d\n", <-incoming) } increments := make(chan increment) conns := make(chan net.Conn, 16) go func() { for { conn, err := listener.Accept() if err != nil { break } conns <- conn go connectionRoutine(conn, increments) } }() for { select { case incr := <-increments: value += incr.amount incr.response <- value case conn := <-conns: fmt.Printf("New connection from %v\n", conn.RemoteAddr()) } } }
The additional channel is called conns
. Before the goroutine is launched, the connection is placed on this channel. To mitigate a possible bottleneck here, a buffer size of sixteen is used on the channel. The main goroutine waits for new connections on this channel and uses fmt.Printf
to log the remote connection address of each connection. Here is what the server output looks like in action.
ericu@eric-phenom-linux:~/sharing_state_by_communicating$ $GOPATH/bin/firstBNew connection from 127.0.0.1:46709 New connection from 127.0.0.1:46715 New connection from 127.0.0.1:46721
The main goroutine is able to both log the new connections and handling incrementing by use of the select
keyword in Go. The select
keyword allows a goroutine to wait for a message on one or more channels.
This pattern is particularly elegant. The only information that needs to be shared is the net.Conn
instance. Whenever an existing type does not contain sufficient information, a new type can be defined and passed across the channel. This is exactly the reason for the existence of the increment
type in the example. This type would not need to exist if the values did not need to be passed across a channel. As more complexity is added, the number of types dedicated just to communication increases. The select
statement in a goroutine grows in size as well. This actually turns out to be a boon to understanding code: you can start at a select
statement and work out what the code is responsible for. The size of a select
statement can be mitigated by refactoring into multiple goroutines where possible.
The inescapable fact is that each producer into a channel of structs
must have the correct logic to initialize the struct correctly. For the case of a single producer and a single consumer, creating the struct with exported fields is sufficient. The initialization of the struct
is done in one place and it is used in another. One of the great advantages to channels in Go is that multiple producers and consumers can all access one channel simultaneously. This means that each producer winds up with their own copy of the logic to initialize the struct
. If the logic is identical, it can be refactored into a single function. The main problem with this approach is that with N producers, you likely have N source files that are responsible for initializing each struct
.
If a program has N goroutines that all passing information over the same channel, it is likely that all the goroutines have exactly the same need. That is to say that the goroutine interpreting the data in the struct
does not particularly care about the origin of the data. In that goroutine, the operations to be performed can be wrapped up into a function. The function could be protected with a mutex and shared amongst goroutines, but this is particularly ineffecient. Accepting these facts, this leads me to the conclusion that the main reason for the struct
passing pattern is that a single actor needs to maintain a coherent view of the rest of the actors in the program. In other words, the objective is to have a series of operations invoked in a predictable, linear fashion.
Go has no semantics for expressing the invocation of partial invocation of a function. The Python module functools
implement something like this with functools.partial
. But what Go does have is functions as a first class feature of the language. In this way, any invocation of a function can be expressed as a function with no arguments and no return value. Consider the following example.
invocation := func(){ fmt.Printf("foo %d",5) } invocation()
Here, the invocation of fmt.Printf
is wrapped in a function definition and is not actually called until invocation()
. I call this pattern deferred invocation.
While Go does not explicitly support partial invocation, you can create bound functions against a type
type struct Example { SomeValue int } func (ex *Example) someFunction() int { return ex.SomeValue }
If you have an instance of the Example
structure named foo
you can use the expression foo.someFunction
to get a function that is bound. It still has to be called with any applicable arguments other than the receiver. This means you don't need to pass around the pointer to foo
yourself.
The second level of sharing
Using the pattern of deferred invocation, we can refactor the example server program. First we'll make a Server
type that incorporates all the logic around the initialization and use of the increment
type. Then, we can modify the connection routine to no longer need to know about channels.
package main import "net" import "strconv" import "bufio" import "fmt" func connectionRoutine(conn net.Conn, increment func(amount int) int) { defer conn.Close() reader := bufio.NewReader(conn) line, err := reader.ReadString('\n') if err != nil { return } line = line[:len(line)-1] incr, err := strconv.Atoi(line) if err != nil { fmt.Fprintf(conn, "The value %q is not a valid number\n", line) return } fmt.Fprintf(conn, "After incrementing, new value is %d\n", increment(incr)) } type increment struct { amount int response chan<- int } type Server struct { listener net.Listener conns chan net.Conn increments chan increment } func (s *Server) Accept() { for { conn, err := s.listener.Accept() if err != nil { break } s.conns <- conn go connectionRoutine(conn, s.Increment) } } func (s *Server) Increment(amount int) int { response := make(chan int, 1) increment := increment{} increment.amount = amount increment.response = response s.increments <- increment return <-response } func main() { var err error s := Server{} s.listener, err = net.Listen("tcp", "0.0.0.0:3333") if err != nil { return } defer s.listener.Close() var value int s.increments = make(chan increment) s.conns = make(chan net.Conn, 16) go s.Accept() for { select { case incr := <-s.increments: value += incr.amount incr.response <- value case conn := <-s.conns: fmt.Printf("New connection from %v\n", conn.RemoteAddr()) } } }
You should notice that this hasn't removed the need for the use of a request channel, individual response channels, and the increment
type. However, this does allow all that logic to be placed in one location. All this is achieved with no loss of functionality.
From the point of connectionRoutine
, it only needs to know about a function that takes an integer and returns an integer. It appears as a regular blocking call in the flow of that goroutine.
Conclusion
I feel that passing around functions is a decent approach to sharing state between goroutines. This obviously won't apply for all use cases. I view Go's channels as more of a means to an end than the final abstraction for all communication. For cases requiring a response from another goroutine, it may not make any sense to do anything but block until the result is available. This fits into my example. Even for cases where no response is expected, abstracting the actual placement onto the channel can make code easier to read and easier to test. Passing in channels whenever a goroutine is launched can of course achieve the same thing. But doing so quickly becomes cumbersome as it may not be immediately obvious what each channel is used for. It also leads to a large amount of boilerplate to wire together goroutines.
Source code
All the source code is available on github.