Understanding message queues with mbq.pubsub

This is a crosspost from my employer Managed by Q's blog Do Not Erase

If you develop software, there's a good chance you're familiar with test-driven development (whether you practice it or not): write your tests first, confirm that they fail, write your initial code, and iterate until your tests no longer fail.

While TDD is common, test coverage only guarantees so much. When a system is actually running in production--especially over a distributed network--there are countless ways it can fail unexpectedly.

Much like writing tests, we can try to pre-empt and understand those failures, as well as other types of behavior we can't predict, with monitoring and other runtime tools.

However, you probably wouldn't write a unit test that, for example, asserts that a function gives you any output at all: you test that specific inputs produce specific outputs. Similarly, there is no one-size-fits-all approach to understanding running systems: you write tools tailored to the specific problem you intend your system to solve. Again, in the spirit of TDD, this is often easier to do you if design these tools alongside or prior to your code.

At Managed by Q, we built a Python library, mbq.pubsub, to solve particular problems for our engineers, and the way we understand the health and working state of mbq.pubsub is closely tied to its purpose and how that purpose supplements our other systems.

Problems

At Managed by Q, our backends are comprised of several discrete services that communicate with each other asynchronously using a publisher/subscriber pattern via AWS SNS (Simple Notification Service) and SQS (Simple Queue Service). In this pattern, publisher services publish messages to an SNS topic. All SQS queues which are subscribed to an SNS topic receive the topic's messages, and the subscriber service associated with each SQS queue can then read those messages.

While SNS and SQS meet our needs--and are quite reliable--their APIs (which we request via the Python boto3 package) leave a number of implementation details up to us. We can easily receive messages from an SQS queue, but how do we design a consumer process to receive and process those messages? After a certain number of failed delivery attempts, SQS moves messages to a dead letter queue, but what do we do with the messages then?

To answer these and other questions, we organically developed light publisher/subscriber modules in each of our services, but without a coordinated plan. We ended up with duplicated code with service-specific adjustments. If you were spinning up a new service, the repeated advice was to copy queue-related code out of the repository with the most recent changes--not so ideal.

This accidental design was, in some ways, helpful. We could safely and quickly make changes to each service's publisher or consumer implementation without site-wide ramifications. If you were reading a service's code for the first time, understanding how queues integrated into the service was straightforward.

Critically, though, we couldn't easily understand how our publishers/consumers were operating across all of our systems. We couldn't consistently detect or address failure: some services exposed metrics that we could monitor, and others didn't. Rolling out changes to address these deficiencies was tedious and time-consuming.

mbq.pubsub

We designed mbq.pubsub as a general but opinionated set of SNS/SQS tools Managed by Q engineers can install with minimal effort in services they maintain, and encouraged engineers to replace all old, fragmented queue code with it.

This makes mbq.pubsub more of a black box to most of our engineers than previous service-specific publisher/subscriber implementations--using it in a Django-based service doesn't require much beyond declaring a function handler per message type. Even if they're not spending much time in mbq.pubsub's internals, our engineers still need to know that their service's consumers and publishers are healthy, and get insights into queue operations when necessary. We designed mbq.pubsub with these needs in mind from the start.

Dead Letter Queues

A common issue our engineers encountered involved SQS messages that failed to deliver to their service, either because of an error in the handler code processing the message, or something more systemic. SQS does some of the work for us by pushing undeliverable messages onto an associated dead letter queue (DLQ). But it was still frustratingly difficult to understand if a problem causing messages to end up in that DLQ was transient, allowing us to safely replay the messages, or a signal of something broader or more urgent.

We addressed this with a custom DLQ implementation that layers on top of, but still relies on, AWS's DLQs:

Diagram representing mbq.pubsub's DLQ infrastructure

In services that use it, mbq.pubsub provisions an "undeliverable message" table in the service's database. It then runs a continuous consumer process that receives messages from the service's dead letter queue and inserts the complete message payload, along with relevant metadata, into the "undeliverable message" table.

We now have complete control over DLQ messages within the service. We can replicate and delete messages, replay them back to the primary SQS queue, and (most importantly) understand why they ended up in the DLQ in the first place.

To consolidate what mbq.pubsub knows about a given DLQ message into one place for our engineers, we built a browser-based interface using Django's admin API (a low-overhead way to build internal web tools, which we use frequently at Managed by Q):

mbq.pubsub's DLQ web interface

This interface gives engineers quick access to basic information about the message, like its full payload, what topic published it, and the queue that attempted to deliver it. For deeper visibility into the message, and to give engineers more context as they work to fix the problem, mbq.pubsub also attempts to generate URL queries that take engineers directly to the point of failure in our system logs, streamed in Papertrail, and the server-side exception, tracked in Rollbar (as SQS doesn't capture any exception or general failure information when moving messages to its DLQ).

Equipped with these insights, our engineers can decide what to do next.

Metrics

Our previous publisher/subscriber implementations tended to either collect little to no monitorable metrics about their operation, or so many that it became difficult to sift for actionable or interesting data. In mbq.pubsub, we tightened our metrics collection around two poles that give us visibility into mbq.pubsub's specific use in our systems: simple data about the message types we're publishing, and more in-depth data about the chronological lifecycle of messages as they travel through our systems.

For every message we publish with mbq.pubsub, we collect metrics that identify the type of the message: the service that created it, the data type of the payload, and the SNS topic to which we're publishing it. On the consumer side, we collect similar data, as well as whether the message was processed successfully. This gives us the flexibility to alert engineers on elevated message delivery failures or other unusual trends via our monitoring service, Datadog, as well as visualize the general status of our publishers/subscribers:

Dashboard of mbq.pubsub metrics in Datadog

From this dashboard, we can easily see unusual behavior across time in our systems. We also get insights that are not directly related to system failures, like whether we're sending a high volume of messages to our subscribers that they aren't configured to process and accordingly skip--suggesting that maybe we should be more thoughtful about the SNS topics to which we subscribe each SQS queue. If certain types of messages are being published or processed in frequency clusters that we don't expect from our implementation, we'll see that here, too.

We supplement these general data metrics with timing information collected from the start of the message's life through when a subscriber finishes processing it:

Diagram representing when in a message's lifecyle timing metrics are collected

Detecting latency in our service's use of queues becomes much easier and more granular when we track these metrics. We can determine whether a potential bottleneck is in a place where we'd typically expect it--like a message processing handler function that's taking too long--from the "processed.process_time_ms" metric, or in a more unusual place, like the gap between the time we create and prepare the message with our custom envelope type and the time SNS publishes the message to SQS, which we can derive from the difference between "read.from_envelope_creation_ms" and "read.from_message_creation_ms."

mbq.pubsub also attaches each message's identifying metadata to all metrics we collect, so we can combine different metric types for even more specific visibility into a given message type or service (though we're certainly still figuring out the most useful ways to do that).

What's next

I could apply my last thought--that we're still figuring out useful ways to use a particular tool--to the entirety of mbq.pubsub. While we built mbq.pubsub for Managed by Q's specific use cases for message queues, those use cases, and the ways we get visibility into their health and operations, are frequently changing.

Now that we have an easy-to-use tool for managing queues, we hope to move more of our service's synchronous and less fault-tolerant HTTP communications with each other to asynchronous messages, which might require more specific data on overall message throughput.

We recently added support for Protocol Buffers as a message data format in mbq.pubsub, and as we adopt it more widely, we suspect our engineers will need more insight into schema, type validation, and serialization issues.

Many of our services use Celery to schedule asynchronous tasks, so we're considering adding first-class support for Celery tasks as subscriber message handlers (currently, scheduling a Celery task from a message requires an intermediate function; we're repeating this pattern across our services, and we're not sure we like it).

We're open to suggestions!