RPC over AMQP

I am currently in the process of moving a single endpoint out of a large, monolithic Ruby on Rails app. That endpoint does the following, in order.

  1. Accepts an HTTP Post request.
  2. Parses the YAML content sent in the body by the client.
  3. Updates some database entries based on the contents of that YAML.
  4. Generates a hash that is a smorgasbord of different pieces of data.
  5. Marshalls that data to YAML.
  6. Sends the YAML as the response.

The current implementation does all of the work on the request thread. Moving the database writes off the request thread would help only slightly. The real issue is that most of the time is spent marshalling all the data into YAML. This means that an enormous amount of CPU time is spent generating that data. This holds open a connection on the server which is limited to a fixed number of connections at any one time.

The upside to this is that the actual communication performed is machine-to-machine. The individual latency of any connection is not particularly relevant, but the required overall throughput is high. Even at peak-load hours, 15% of the servers time is spent processing requests for just this endpoint.

I began rewriting this single endpoint as a seperate piece of software in Go. The current architecture already uses a load balancer, so it can be configured to send requests for this endpoint to a separate set of HTTP servers. In the process of rewriting this I discovered a large amount of code that predated myself. This code is difficult to port because the people who have written it have long since moved on. Additionally, I am not enthused about creating an implementation to write to an ActiveRecord database from a Go application.

In the end I made the decision to keep a large portion of the existing implementation. This of course meant that I needed to come up with a way to call the Ruby code from Go. I looked at some existing RPC solutions, but ultimately decided that it would be easier to write a Request-Reply system on top of AMQP.

I chose this solution because we already have a large AMQP cluster in our production environment that is under-utilized. We also have several servers that do not serve HTTP requests, but instead are reserved for all other work such as periodic tasks. By running the Ruby portion of my new hybrid-solution on these servers, the servers that process HTTP requests are completely unburdened by this service.

Along the way of implementing this, I learned a great deal about AMQP and RPC in general. My findings are presented here.

What is RPC?

RPC stands for Remote Procedure Call. The basic concept of RPC is that some information is sent and later some information is received. There is a one to one correlation between those two categories of information under normal circumstances. This mirrors the basic paradigm of imperative programming: the function call. The difference is RPC is typically used to exchange information between two systems separated by a network connection or another communication medium. The two systems may not even be written in the same programming language.

Broker mediated messages

Broker mediated messages allow a single producer to deliver to messages to zero or more consumers. This is distinct from using a communication protocol like TCP to exchange messages because the broker can make certain guarantees about how each message is handled. Each client of a message broker usually acts as either a publisher or subscriber of messages. Sometimes the terms producer and consumer are substituted. A publisher is responsible for generating messages and a subscriber is responsible for taking the correct action on each message.

Common functions of a message broker include

  • Persistence - Messages are stored when produced and consumed later
  • Expiration - Messages are stored for a configured amount of time
  • One to many - Each message is delivered to all available consumers

This should give you a good idea of what a message broker does. The exact functions available depend on the specific implementation. There are many standards for message brokers. The only one I'll be discussing in this article is AMQP. The lessons learned here do apply to brokers based on other standards.

Most message brokers only define a way to exchange information between a publisher and a subscriber. It's entirely up to the programmer to make sure both ends exchange data in a format the other can understand.

All brokers implementing the AMQP standard support the ability to define queues. Queues give you the ability to segregate messages into distinct groups. By publishing only messages of one type to a specific queue, the subscribers to that queue can be guaranteed to receive messages that they always understand.

RPC and message brokers

At this point it should be no surprise that a message broker can be used to implement an RPC mechanism. The difference between a typical use of a message broker and implementing RPC is that with RPC each end must act as both a publisher and a subscriber.

Conceptually I separate the roles in an RPC implementation into that of a client and a servant. A client issues makes a request. A servant processes the request and sends a response to the client. Message queues are used to exchange information between the client and servant. Each client publishes a message to a queue each time it wants to call on a servant. The servant subscribes to that queue. It picks up the message, does whatever processing is necessary and then publishes a response message to another queue. The client then subscribes to that queue and picks up the response.

This implementation is simple to describe, but more challenging to implement.

Use Cases

From here on out I'll introduce each use case for RPC over AMQP. We'll start with the use case that is simplest to implement and move up in complexity from there.

Single-client & Single-servant

This is the simplest use case by a huge margin. At the broker two predefined queues exist: one for requests and for one replies.

In a hypothetical RPC use case, the client must increment an integer value. The servant is capable of receiving an integer value, incrementing it, and sending the result.

In this case two queues are defined at the message broker named increment.requests and increment.responses.

The client has the integer value 5 and publishes a message to increment.requests with the value 5 as a body. The servant is subscribed to increment.requests and receives the value 5. After incrementing it, the servant now has the value 6. The servant publishes the value 6 to increment.responses The client is subscribed to increments.responses and receives the value 6. This complete the RPC.

This use-case is incredibly easy to implement. The client and servant always know the correct destination for each message they must send without any other information. There are numerous drawbacks.

By allowing for only one client, this eliminates the ability to use the AMQP broker as a meet-me point in a service oriented architecture. If for whatever reason the client is stopped and a new one is connected to the system to replace it, there is the possiblity of that client receiving responses from the servant which were intended for the old one. This can be mitigated by purging the queues and restarting the servant. Otherwise the client must be capable of ignoring such messages.

By allowing for only one servant, the process cannot be scaled out. For some operations the servant may not be time-bound based on the number of requests the client is making. Even if a single servant can deliver the required throughput you are likely to want at least one additional servant. By having two servants running on two physically separate pieces of hardware, you can handle failure of one of them with no intervention whatsoever.

Single-client & Multiple-servant

A multiple-servant implementation is not terrificly more complex than a single-servant implementation. The first assumption made is that all servants are equal. This means that each servant generates the same side effects and returns the same value given the same input. With this assumption, it does not matter what servant processes a given request. All servants subscribe to the same request queue. A request message is delivered by the broker to exactly one servant. Each servant still publishes to the same response queue.

Multiple-client & Single-servant

An advantage to implementing RPC over AMQP is it allows the broker to function as a meet-me point for services to communicate with each other. This allows simpler systems to avoid the need for a service discovery service. Each client connects to the broker and send messages to the queue that has servants listening. For this to work, the RPC implementation needs to support multiple clients.

In the single-client implementation, a single response queue is used. If a multiple client implementation attempted to use a single response queue, each client would get responses to requests that it did not issue. The client that originally issued those requests would never actually receive those responses.

In order to facilitate multiple clients, each client must be capable of subscribing in such a way that it only receives the responses for requests it generates. The solution to this problem is to create a queue with a random name for each client. Expanding on the previous example, the response queue for an individual client might be named increment.responses.client-6075fa53064bb86d5a373cb5c370cea6. The final portion of the queue name is randomly generated. The odds of a collision with a large enough name space are low enough not to be a problem. The actual choice of name is irrelevant, I chose it to be consistent.

Each client creates a uniquely named queue at startup time and subscribes to it such that it may receive responses. With an AMQP broker, the queue should be specified as auto-delete once there are no more subscribers. This eliminates the problem of queues that are no longer in use populating the broker.

The other piece to this solution is having the servant publish responses to the correct queue for each request it processes. The AMQP standard specifies the existence of the "reply to" header but does not specify what an application should use it for. The logical use of such a header is to place the name of the queue that the servant should publish the response to. The servant treats the value of this header as an opaque destination and doesn't care about the specifics of it. The client sets this header on each request it sends.

If a servant receives a message without a "reply to" header, it can reject the message. It wouldn't have the information it needs to send the response anyways.

While a servant processes a message the client may disconnect from the broker. This causes the response queue to be deleted. When the servant publishes a message to that queue the broker simply drops it.

This solution requires only a small amount of application level logic implement. It allows an arbitary number of clients to make RPC calls against the servant without any per-client configuration needed at the broker.

As in the other case, implementing a multiple-client & multiple-servant RPC system is no more challenging than that of the multiple-client & single-servant one. You only need to have multiple servants all subscribed to the same request queue. The case of multiple-client & multiple-servant does not get its own section here.

Matching each response to each request

Having solved the problem a multiple-client & multiple-servant RPC system, it is now time to analyze a less obvious problem.

When a client receives a response, how does it know what request it is for?

All of the solutions presented here are done so with the multiple-client & multiple-servant RPC system in mind.

Single outstanding request client

The most obvious solution to such a problem is for each client to never have more than a single outstanding request. When a response is received, there is only one possible request to match it with.

If the application only makes a single RPC call in its lifetime this approach can work just fine.

At first it might seem that if a client only makes RPC calls infrequently it might also work. The problem with this is the following scenario:

  1. The client issues a request A.
  2. No servants are running due to a service outage.
  3. The client times out waiting for a response to request A.
  4. The client issues request B.
  5. Servants come online and process request A.
  6. The servant sends response A
  7. The client receives response A.
  8. The client mistakenly pairs response A with request B.

Such a mismatch between requests and responses is unacceptable for almost all systems. The only possible way to avoid such a problem is to terminate the client when it times out waiting for request A.

There are many other sequences of events that could lead to mismatch between a request and response. They are not presented here.

The single outstanding request client is also unacceptable for any system requiring high throughput. Waiting for each response before issuing another request creates a bottleneck at each client where there otherwise would not be one.

Single response queue per request

To correctly match a response to any number of outstanding requests, each client can create a seperate response queue for each request. Before making each request, the client must populate a lookup table that allows it to match the name of the response queue to the request which resulted in its creation. When a response is received, the same lookup table is consulted.

This approach can work but there are two things to consider. Each request now involves the insertion into some form of a lookup table. The average complexity of this operation is \(O(log(n))\). Insertion could also involve memory allocation as a step. The complexity of the lookup into the data structure when a response is received is possibly \(O(1)\) if a hash table is used. Otherwise the lookup would also be \(O(log(n))\). During a period of high volume of requests and high response latency, this data structure could grow to be very large.

The creation of a response queues on the broker side is not free. While clients and servants can often be scaled horizontally, scaling the broker horizontally is not as easy. To mitigate this, a client might reuse response queues. In order to avoid the problem of the mismatched response, the client would need to dispose of any queue if it timed out waiting for a request on it.

The slotted client

The solution I chose I refer to as a 'slotted client'. At startup time, the client creates a single response queue which is used to receive responses for all requests. It also creates a slot data structure. The slot data structure is an array where each element stores the following information.

  • In Use - A boolean indicating if the slot is in use
  • Request Sequence Number - An integer value
  • Request Time - The time at which the request was made

There is a fourth piece of information that needs to be stored in each slot. That is the destination of the response if it is received. In Go, I use always use a channel. The actual type is implementation specific.

When a client is ready to make a request, it locates a free slot. The client marks the slot as in use. The client stores the destination for the response in the slot. For each and every request, the client generates a sequence number. I use a 32 bit unsigned integer. The client stores this sequence number in the slot.

The client needs a way to identify a response as belonging to a specific slot. Since a single response queue is used there is no easy way to discriminate amongst responses. The AMQP standards defines the "correlation id" header, but leaves the usage up to the application. In my implementation, each servant simply copies the correlation id from the request into the correlation id of the response. The servant treats this as an opaque value. The guarantees that the client can use the "correlation id" as an identifier when processing responses.

For each request, the client marshalls the slot number and sequence number into the correlation id header. When a response is received, it unmarshalls the slot number and sequence number. By comparing the sequence number in the received message wih the sequence number in the slot, the client can avoid mismatching requests and responses. Collisions due to rollover are avoided by choosing a suitably wide type for the sequence number. The use of a 32-bit unsigned type means that in order for the same sequence number to be used twice in a 24 hour period would require a request rate in excess of 49710 requests per second.

The slot data structure in my implementation is statically sized. The data structure could be sized dynamically. I am leery of any data structure which might grow unbounded. As a result, I chose to put a hard cap on the size of the data structure. I call this value the concurrency of the client. This means the client is limited to no more than that number of concurrent outstanding requests.

Request-Response Complexity

To issue a request, the broker must find a free slot. I use a linear search across the slot data structure to find the first available free slot. This has an average complexity of \(O(n)\). This means that as the concurrency of the client is increased, it spends more and more time searching for free slots. The only advantage to this implementation is that an overly large slot data structure does not guarantee an average complexity of \(O(n)\). Since the search always begins at the start of the array, the client prefers to use the slots at the start of the array. The actual complexity approximates \(O(c)\) where \(c\) is the average number of outstanding requests. When the average number of outstanding requests approaches the concurrency of the client, the complexity is then \(O(n)\).

To match a response with a request, the client only needs to check the slot. It already knows the exact slot number from the correlation id. By using an array this process is always \(O(1)\). What this really means is that it is constant time. A hash table of course has the same lookup complexity. Unlike a hash table, the actual cost of lookup into an array is much lower. A hash table by definition requires hashing the key before checking for the existence of a value of that key. Lookup into array in a language such as Go is just the cost of some pointer arithmetic.

The cumulative complexity of the request-response process with the slotted client is

\(O(n) + O(1)\)

Limits on throughput

Limiting the concurrency of the client doesn't impose a limit on the throughput of the client. To calculate the limit on the peak throughput of the client, you need to know the maximum latency of the response.

The throughput of any process is defined as the average number of events handled over some time period. Using the definition of the average, you can see that the throughput \(H\) of the process is defined by the values

  • \(E\) - the number of events

  • \(T\) - the time period required to process all events

\(H = E/T\)

Using this definition we can find the peak throughput of the client.

  • \(C\) - concurrency of the client

  • \(L_m\) - maximum latency of the response, in seconds

The peak throughput of the client is \(P\).

\( P = C / L_m\)

This analysis is actually incredibly pessimistic. It assumes two things.

  • The client makes \(C\) requests immediately, all at once. This fills all the slots.
  • The servants take the maximum amount of time possible to process each request and send the response.

Such a scenario is extremely unlikely to happen. In other words, this is the worst-case peak throughput of the client. The actual limit on the throughput is defined in a more sane manner by subsituting \(L_a\), the average response latency, for \(L_m\). This gives the limit of the clients average throughput.

Whenever \(C\) is one, we see that

\( P = 1 / L_a\)

This means that in the case of the single outstanding request client present earlier, the peak throughput is that of the inverse of the average response time. This explains why the single outstanding request client is not a desirable implementation.

All of this analysis is dependent on the presumption that servant can be scaled horizontally as well. If the servant does not scale horizontally, then when the number of clients or the concurrency of a client is increased the average response time becomes higher.

Handling all slots being occupied

When all slots are occupied, the client cannot make anymore requests. In my software I always must handle the possibility of the response never being received. In the case of all slots being occupied, the software proceeds down the same exact execution path as a timeout waiting for a response.

By introducing the concept of a response timeout to the RPC client itself, the client can possibly free up slots. By configuring a response timeout on the RPC client, a callee of the client is stating "I am never going to wait more than this amount of time for a response. Thus, it is acceptable if the RPC client does not either."

The client records the time that each request is sent at. When all slots are full, the client can scan all the slots. Any slot that had a request sent longer ago than the response timeout can be freed up for re use. This isn't terribly useful, but it avoids a scenario where a slot could become permanently in use.

Consider the following scenario

  1. The client sends request A
  2. The servant receives request A
  3. During processsing, the servant experiences a catastrophic failure such as a power outage.

In this case, response A is never sent. If the client does not eventually reuse the slot for response A the slot would remain marked as in use forever. If this scenario repeats itself enough times the client becomes starved for slots. It could only be rectified by restarting the software. This is unacceptable.

The advantage to this implementation is that the logic to free slots is only triggerred after an out-of-slots condition is reached. No CPU cycles wasted if this condition is not reached.

Optimization - The last free slot

One easy optimization is to record the index of the slot that was most recently marked as not in use when a response was received. When making a request, the client does not have to search for a slot if this value is available. This makes the complexity of the slot search \(O(1)\) a portion of the time. The actual complexity winds up being somewhere between \(O(1)\) and \(O(n)\).

At this point you're probably thinking "Just maintain a free list!". The reality is, pushing and popping values from the free list is not zero cost. Using a stack that is pre-allocated to the same size as the slot data structure would incur the least cost. This also avoid searching through a full data structure whenever all slots are occupied because the stack is empty.

Optimization - Free slot search inversion

The actual complexity of the slot search is approximates \(O(c)\) where \(c\) is the average number of outstanding requests. This is because the slot search always begins at the start of the array. By alternating each search starting at opposite end of the same array, the slot search would instead approximate \(O(c/2)\). This is because the best outcome of this it is twice as likely for the search to find a free slot near the beginning. In the worst case, it is still \(O(c)\). The only other added costed is flipping a boolean value after each search. Again, the actual cost ends up being between \(O(c/2)\) and \(O(c)\).

Optimization - Negative responses

If a servant receives a request and cannot process it, it may choose to simply discard the request. A request for division by zero obviously cannot be processed. If a servant encounters a fatal condition like an exception, it also cannot send a sane response. This means that when clients make such a request, the client times out waiting for a response. During this time the slot for the request is occupied continously. This lowers the overall throughput of the system. By having the servant send a negative response indicating that the request can never be processed, the slot can be freed immediately. Whatever process is waiting on the result of the RPC can either proceed or fail immediately in that case, which may be important. It is in mine, because it means an HTTP connection is held open that otherwise would not need to be.

Conclusion

I believe I have presented the merits of using a message broker as a communications medium for RPC. I hope you find this useful.


© Eric Urban 2014