SQS Benchmark (with large messages)

SQS Benchmark (with large messages)

Amazon Simple Queue Service (Amazon SQS) is a scalable and fully managed message queuing service that allows users to transmit any amount of data through the web without administrative responsibility.

Recently, I tried to evaluate whenever the SQS service will fit my needs for a design I’m working on. My interest was getting information regarding it’s throughput and latencies with large messages and different concurrencies.

After trying to find the information on the internet, I found some nice benchmarks for SQS (for example an article from SoftwareMills regarding “Amazon’s SQS performance and latency“) but all of them used 100 bytes messages.

Note: If you don’t want to read the whole technical part, the benchmark results can be found here.

I gave up pretty fast and went to implementing my own benchmark tool.
AWS has a nice C++ SDK that is pretty simple to use.

The benchmark tool sends messages to an SQS queue and has 3 arguments:

  • Queue Depth – The concurrency we want to use when sending messages.
  • Message size in KB
  • Number of messages

The AWS credentials can be supplied in many ways that are described here.

In the beginning, we’ll create a random buffer with the given size and will use it for all the messages (I think it is safe to assume that there is no client side de-duplication implemented in the SDK). In this case performance is not an issue so I just picked an example from the internet and used it (can be found here).

In order to make things easier, I’ve created class called “MessageSender” that is responsible for all the activity related to SQS (sending messages, aggregating results, measure latencies and etc.).

Measuring latency is simple and can be done by using std::chrono::high_resolution_clock and the following pattern:

auto start = std::chrono::high_resolution_clock::now();
startAsyncOperation([start]()
	{
		auto finish = std::chrono::high_resolution_clock::now();
		auto latency = std::chrono::duration_cast<chrono::milliseconds>(finish - start).count();
	});

All the communication with SQS is done using the SQS client provided by the SDK. There are some constructors that can be used and the default constructor usually gets the job done. In our example, we’ll pass some client options that we want to specify.

void MessageSender::createSQSClient()
{
	Aws::Client::ClientConfiguration conf;

	conf.region = Aws::Region::EU_WEST_1;
	conf.scheme = Aws::Http::Scheme::HTTPS;
	conf.verifySSL = false;

	m_client = std::unique_ptr<Aws::SQS::SQSClient>(new Aws::SQS::SQSClient(conf));
}

Sending message will be done asynchronously so we won’t need to build a threading model and we can leave this job to the SDK. SQS messages cannot contain binary data so I’ve used the message attributes to put the actual data I want to send (the generated random buffer we’ve created earlier):

void MessageSender::sendMessage()
{
	Aws::SQS::Model::SendMessageRequest request;
	request.SetQueueUrl(m_url);
	request.SetMessageBody(".");

	Aws::SQS::Model::MessageAttributeValue data;
	data.SetDataType("Binary");
	data.SetBinaryValue(m_message);
	request.AddMessageAttributes("data", data);

	auto start = std::chrono::high_resolution_clock::now();
	m_client->SendMessageAsync(request, [this, start](const Aws::SQS::SQSClient*,
	                                                  const Aws::SQS::Model::SendMessageRequest&,
	                                                  const Aws::SQS::Model::SendMessageOutcome& response,
	                                                  const std::shared_ptr<const Aws::Client::AsyncCallerContext>&)
		{
			onMessageSent(start, response);
		});
}

When MessageSender finishes it’s work, it will call a callback and pass the results in a struct that contains the following members:

struct SendResults
{
	uint32_t concurrency;
	uint32_t messageSizeKB;

	uint64_t failures;
	uint64_t messages;
	double totalSizeMB;
	double throughput;
	Counter latency;

	uint64_t testDurationMs;
};

Please note that the type of the latency field is Counter, it is another class that I’ve wrote in order to maintain extra information regarding the latency (min, max, mean, stddev).
Each access to this struct should be safe because I have a mutex in MessageSender that protects us. For thread safety, an internal mutex can be introduced.

In order to implemented the requested queue depth (concurrency), we’ll maintain two class members: m_inflight and m_left. m_left will hold the number of messages that are not sent yet (or are still inflight) and m_inflight will hold the number of messages that are currently inflight (should be equal to the concurrency most of the time).
m_left is initialized to hold the number of requested messages and each time we get a response we’ll decrement it. Once m_left reaches zero, we don’t have any more messages to send so we start decrementing the m_inflight member until we reach the response for the last message (m_left = 0 && m_inflight = 1). Now we know that everything is done and we can calculate the results and finish the benchmark run.

The whole sequence implementation in MessageSender looks like this:

void MessageSender::start()
{
	m_startTime = std::chrono::high_resolution_clock::now();

	uint32_t jobs = m_concurrency;
	for(uint32_t i = 0; i < jobs; ++i) { 		sendMessage(); 	} } void MessageSender::sendMessage() { 	Aws::SQS::Model::SendMessageRequest request; 	request.SetQueueUrl(m_url); 	request.SetMessageBody("."); 	Aws::SQS::Model::MessageAttributeValue data; 	data.SetDataType("Binary"); 	data.SetBinaryValue(m_message); 	request.AddMessageAttributes("data", data); 	auto start = std::chrono::high_resolution_clock::now(); 	m_client->SendMessageAsync(request, [this, start](const Aws::SQS::SQSClient*,
	                                                  const Aws::SQS::Model::SendMessageRequest&,
	                                                  const Aws::SQS::Model::SendMessageOutcome& response,
	                                                  const std::shared_ptr<const Aws::Client::AsyncCallerContext>&)
		{
			onMessageSent(start, response);
		});
}

void MessageSender::onMessageSent(const std::chrono::time_point<std::chrono::high_resolution_clock>& start, const Aws::SQS::Model::SendMessageOutcome& response)
{
	auto finish = std::chrono::high_resolution_clock::now();

	unique_lock<mutex> guard(m_lock);
	bool result = response.IsSuccess();
	if (__glibc_likely(result)) {
		++(m_results.messages);

		auto latency = std::chrono::duration_cast<chrono::milliseconds>(finish - start).count();
		m_results.latency.submit(latency);
	}
	else {
		std::cerr << "Error sending message to " << m_url << ": " << response.GetError().GetMessage() << std::endl;
		++(m_results.failures);
	}

	bool done = (m_left == 0) && (m_inflight == 1);
	if (done) {
		m_finishTime = finish;
		guard.unlock();

		complete();
		return;
	}

	// Nothing to do but there are still inflight requests
	if (m_left == 0) {
		--m_inflight;
		return;
	}

	--m_left;
	guard.unlock();
	sendMessage();
}

void MessageSender::complete()
{
	unique_ptr<MessageSender> kill(this);
	m_results.totalSizeMB = m_results.messages * m_message.GetLength() / (1024.0 * 1024);

	m_results.testDurationMs = std::chrono::duration_cast<chrono::milliseconds>(m_finishTime - m_startTime).count();
	m_results.throughput = m_results.totalSizeMB / (m_results.testDurationMs / 1000.0);

	m_callback(m_results);
}

And the usage is simple as well:

void runBenchmark(const std::string& queueUrl, uint32_t concurrency, uint32_t sizeKB, uint64_t messages)
{
	Synchronizer sync;
	MessageSender *sender = new MessageSender(queueUrl, concurrency, sizeKB, messages, [&sync](const SendResults& results)
	{
		showResults(results);
		sync.notify();
	});

	sender->start();
	sync.wait();
}

Synchronizer is another utility class that makes the thread sleep until it is notified. This way we don’t finish the runBenchmark function until the lambda function is called.

Running the benchmark tool requires some arguments:

[22:00 alexander ~/Projects/sqs-benchmark-tool ]$ ./benchmark
Usage: ./benchmark <queueUrl> <QD> <sizeKB> <messages>

For example, running the tool with 20,000 messages of 64KB and QD=64:

[22:00 alexander ~/Projects/sqs-benchmark-tool ]$ ./benchmark <put-url-here> 64 64 20000

Generating 64KB buffer
Initializing MessageSender: QD=64, messageSize=64 KB, numOfMessages=20000
--------------------------------------------------------
Benchmark results for 64 KB messages with QD=64
Duration: 32.354 sec
Transferred: 1250 MB
Messages: 20000
Failures: 0
Latency [ms]: min=33, max=856, mean=102.54, stdev=40.0585
Throughput: 312.5 MBit/sec

The full source code can be found here. Feel free to modify and use it.
Building the project is done using CMake:

cmake -Daws-sdk-cpp_DIR=/path/to/aws-sdk-cpp
make -j8

In order to get some numbers, I’ve created an SQS queue and a c4.2xlarge instance and ran the benchmark tool in various message sizes and concurrences.
The results I got can be found here.

Alexander.

Advertisements

One thought on “SQS Benchmark (with large messages)

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s