Using AWS Kinesis with the Kinesis Producer Library

At this point I've had a server I wrote up and running for a year that is using the Kinesis Producer library (called the KPL going forward) to push customer data into AWS Kinesis as it is sent to an HTTP server. This article isn't really about how to use the KPL, there is plenty of material on that from Amazon. Here are some links if that is what you are interested

  1. Developing Producers Using the Amazon Kinesis Producer Library
  2. Kinesis Producer Library on GitHub

Why does this exist?

The first thing I had to ask about this library was why does it exist? All of the standard AWS SDK's include ways to talk to Kinesis without involving yet another library. The standard putRecords operation will allow you to dump records into a Kinesis stream. It even handles writing data to the correct shard if you need to partition your data amongst multiple shards in a consistent manner.

But there are some pretty obvious limitations. You pay for an Amazon Kinesis shard by the hour and it can support writes up to 1,000 records per second, up to a maximum data rate of 1 MB per second. One way to reach this limit is to to write records that are approximately 10485 bytes into Kinesis at a rate of 1000 per second. But this is unrealistic as most systems don't have a large record size. Consider a system that tracks failed login attempts by sending each one to Kinesis. You could have a huge number of events coming in, but you'd probably just have a record that contains a URL, an IP address, and the username. Each record would be a few hundred bytes at most.

If you don't fully utilize a shard you're basically gifting Amazon money because you are paying for throughput you won't use. In the worst case you'd write single byte records into a shard, paying the full price for a mere 1000 bytes per second utilization.

The obvious solution to this is just pack multiple of your records into a single record in Kinesis. Kinesis won't know the difference and Amazon won't care. It seems than rather than letting a multitude of open source projects spring up for this, AWS just created one. The KPL is licensed under the Amazon Software License which grants a "perpetual, worldwide, non-exclusive, royalty-free, copyright license". Using this is a good idea as it is a mostly supported solution by Amazon. Even if you're writing very small records into a Kinesis stream you can get the most for your money using the KPL.

How it is implemented

I'm relatively glad to see that Amazon went about building this in the simplest way possible. From the perspective of the KPL there are user records and kinesis records. User records are whatever byte-strings your application wants to store in Kinesis. Kinesis records are the actual records written into a kinesis shard. The KPL is able to aggregate multiple user records into a single kinesis record.

The format of a kinesis record is as follows

  1. A four byte magic string that is always 0xF3, 0x89, 0x9A, 0xC2
  2. A Google Protocol buffers message
  3. An MD5 of the protocol buffers message

This is a basic container format that allows a single shard to have a mixture of "raw" messages written to it by an application as well as aggregated messages from the KPL. One thing to realize is that if you do perform the getRecords and find yourself receiving messages that are unusually large and with unexpected encoding, you're probably dealing with a client using the KPL to write into the Kinesis stream. You can verify this by checking the first four bytes listed above. If you want to unpack these messages, you'll need to use the Kinesis Consumer Library.

The included MD5 doesn't really seem necessary to me, but can be used to verify message integrity.

It's not obvious here, but the implementation preserves the same distribution of records that is used normally according to the partition key set on each record. When user records are aggregated into the above format, records are grouped together based off the Kinesis shard that they would normally be assigned to.

The actual Google protocol buffers message contains all the user records. That message type is defined here and I've copied the relevant section over

message AggregatedRecord {
  repeated string partition_key_table     = 1;
  repeated string explicit_hash_key_table = 2;
  repeated Record records                 = 3;
}

message Record {
  required uint64 partition_key_index     = 1;
  optional uint64 explicit_hash_key_index = 2;
  required bytes  data                    = 3;
  repeated Tag    tags                    = 4;
}

message Tag {
  required string key   = 1;
  optional string value = 2;
}

I won't go into how this is actually encoded because Google protocol buffers is well documented here. Amazon doesn't do anything special. If you have a compiler that can generate code for your language of choice, you can read and write these messages.

So the KPL basically takes your user records, bundles them up and then sends them off into Kinesis right? Well there is a lot more to it than that.

It's not really a library

The first thing to grasp is that the KPL is a library, but it also comes with a precompiled daemon for OSX, Linux, and Windows. When you use the library it will start this daemon up in the background for you. This daemon actually handles the aggregation of records and sending them to Kinesis. One thing about the daemon is that it is written in C++. The KPL is touted as being fast. If your application is already well written in Java you aren't likely to see much speedup here in my opinion. Java is the officially supported language of the library. If you are smart and don't just allocate large byte buffers every time you need to send a record, the JVM is probably optimizing your application into native code. After all, it mostly just consists of packing the data into the format that Kinesis expects in an HTTP request.

Here's a quick sketch of how this works on a OSX or Linux system.

The C++ components runs in its own separate process with its own threadpool for execution and it's own HTTPS connections to Kinesis. More on that later. The messages sent across to the daemon are Google protocol buffer messages as well. This is probably pragmatism, given the library is already present for encoding the records before being sent to Kinesis. Interestingly this is done over Unix named FIFOs on the fileystem. I would think a Unix socket would be more typical for this type of integration, but perhaps there is some type of advantage to the FIFO I am not aware of.

The daemon doesn't refresh it's own credentials with AWS

One curious thing I noticed immediately is that the parent process takes whatever AWS credentials it is running with and passes them into the KPL daemon for usage. When the parent process gets updated credentials, it will pass the updated ones to the KPL as well. This happens if you're running on an AWS EC2 node for example. This is all transparent to the user. So whatever credentials your application has access to, those are the credentials the KPL utilizes.

Firehose automatically unpacks the records

From the perspective of an AWS Kinesis shard, it just starts getting recordsthat have a larger bytesize and are less frequent. But AWS also made Kinesis Firehose smart enough to unpack the format written by the KPL. So if you have Kinesis firehose configured to save your records to S3 for longterm storage you won't have to worry about the format changing when you start using the KPL.

There is lots of tunability

The KPL has lots of tunability about it. These are all encapsulated in the class KinesisProducerConfiguration. Here are the more important ones

  1. setMaxConnections - this lets you set the maximum number of TCP connections the KPL will open to Kinesis
  2. setTempDirectory - the temporary directory into which to extract the native binaries
  3. setConnectTimeout - the timeout of connections to requests, 6000 milliseconds by default
  4. setRequestTimeout - the timeout of requests to Kinesis, 6000 milliseconds by default
  5. setThreadingModel - the way threads are used in the library, default to thread per request
  6. setThreadPoolSize - sets the maximum number of threads in the thread pool
  7. setRecordMaxBufferedTime - the maximum time a user record can be buffered, default of 100 milliseconds

The default timeouts I find are a little pessimistic, I raised them up to 20 seconds in production.

The threading model is incredibly important to consider. There are two options: PER_REQUEST and POOLED. The pooled option is exactly what it sounds like. A threadpool of finite size is created and reused. The per request option creates a new thread for each request which is impractical. Creating new threads is really slow compared to reusing existing ones. You should in my opinion always use the pooled option.

The final thing is the max buffered timed. In an effort to aggregate more user records into a single kinesis record, the KPL buffers messages in memory. This has a time limit associated with it. I have no need to try and get real-time like behavior from Kinesis so I raised this value up to 2 seconds. This gives the KPL the opportunity to try and aggregate records into larger messages.

Drawbacks

It reorders messages in Kinesis

Internally the daemon must receive messages on a Unix FIFO and deserialize them one by one for aggregation. This is really simple to describe, but the actual implementation uses a threadpool. This threadpool is separate from the threadpool that is configurable. It's defined and created programatically here. I've copied the implementation for brevity

std::shared_ptr<aws::utils::Executor> get_executor() {
  int cores = aws::thread::hardware_concurrency();
  int workers = std::min(8, std::max(1, cores - 2));
  return std::make_shared<aws::utils::IoServiceExecutor>(workers);
}

What this means is you'll wind up with an executor that has at most 8 threads. Dual and three core systems will wind up with a single thread. The usage of this threadpool is as follows

executor_->submit([batch = std::move(batch), this]() mutable {
  for (auto& s : batch) {
    this->on_ipc_message(std::move(s));
  }
});

This submits a "batch" for deserialization and subsequent processing. A batch is just an arbitrary group of user records that come across the Unix FIFO. There is no guarantee as to the ordering of these threads execution. Internally this is implented using a boost::asio::io_service from the popular Boost C++ library. However, the author just has all the threads runnning whatever is available, so this is just a basic threadpool in this case.

What this means is that you could send hypothetical records A,B,C,D to the KPL. If it has two threads running, A & B could wind up in the first batch with C & D in the second batch. If two threads are in the thread pool, thread 1 would get batch 1 and thread would get batch 2. But from here forward, there is no guarantee as to what order things are executed in it. As a result, user records can get aggregated and sent to Kinesis in any order.

The above scenario is easy to understand if you have 2 threads running. Crucially, even when the threadpool size is 1 there is no guarantee of in order processing. This is because threadpools don't actually guarantee in order processing. That just isn't a design criteria of a threadpool. A threadpool with 1 thread is best described as probably in-order. That isn't something I would like to rely on. It's exceptionally rare, but the KPL does in fact process these "batches" out of order and send records into Kinesis in an order different than the ones you intended.

So if you care about the ordering of your records, you have two options:

  1. Live with the fact that sometimes records do in fact get re-ordered.
  2. Don't use the KPL.

It signals errors after retries are exhausted

One of the nice things about the KPL is that a failure returned from a putRecords call to AWS Kinesis will retry the failed messages appropriately. This means that messages sent via the KPL should always show in Kinesis at least once. There is a complete error handling mechanims to make retries at the appropriate time.

The KinesisProducer class in the Java library has a method getMetrics that returns a bunch of metrics. Some of these metrics are error metrics. When the metric UserRecordExpired starts to increase, these retries have already been expired. Most server applications will want to implement some sort of health check and monitor this metric. Since the retries have already been exhausted, a non-zero value on this metric indicates when the health check should indicate the service has failed.

Changing the number of shards results in message duplication

As mentioned above in this article, each kinesis record only contains user records that are intended for the same shard. The KPL keeps a shard map in memory to make this determination. The shard map just explains what partition keys are assigned to what shard. But if you dynamically resize your Kinesis cluster the new shards won't be in this copy of the shard map. The same goes for removing shards.

The KPL handles this by detecting what shard the record went to in the putRecords result and comparing it with the predicted shard. If a mismatch is detected it just resends the message to the correct shard along with refreshing the shard map. This is an "at least once" behavior of the messages. If you were to analyze the records in the Kinesis shards at this time you'd find some duplication. The Kinesis Client Library has a complementary behavior where it ignores records that it finds in the "wrong" shard. So a client using the Kinesis Client Library should really only see each user record once.

This is the best way to handle this, but if you see some retries or abnormal message rates as you resize your Kinesis cluster don't be alarmed. It's just this duplication going on.

It's buggy

The daemon is launched like any other process, complete with command line style switches. This is managed by a class called Daemon in the Java library. One "option" I discovered while testing things is one called --enable-stack-trace or -t. This has a curious side effect of calling sigaction(SIGPIPE, &pipe_action, NULL); on startup which registers a signal handler for the SIGPIPE signal. Apparently, the curl library fires this quite often in production and it can be safely ignored. If it is not ignored, the application crashes due to an unhandled signal being delivered.

The Java library appears to always launch the daemon with this option set.

In my case I was actually launching the daemon manually and encountered this problem. Once I added the command line switch it went away. I opened an issue about this but have not heard anything back.

Conclusion

My conclusion is the KPL is an interesting attempt by AWS give developers a way to optimize what they pay for with Kinesis. It's probably the right way, but it has some drawbacks. My chosen solution in the long term is to keep the aggregation strategy used by the KPL but implement it in a language of my choice. This strategy eliminates the need for launching an external daemon and allows us to make stronger guarantees about the ordering of things sent to Kinesis in each putRecords request. This also keeps compatiblity with the Kinesis Consumer library which we are already obviously using.


Copyright Eric Urban 2019, or the respective entity where indicated