Authors: Bandish Chheda and Yiming Li
Today, we're excited to discuss Qmessage, our distributed, asynchronous task queue. We use Qmessage to handle tasks where a response is not immediately needed and would require too much time to execute during a web or API request, such as delivering answers, sending notifications, and generating digest emails. Our main queue currently processes 15,000 tasks per second for more than 500 types of tasks. The average latency is less than 2 minutes, and the queue reliability (the percentage of tasks that have been processed without loss) is over 99.98%. In this post, we'll discuss the architecture of the system that powers this execution engine.
Initial in-memory queue
The initial version of this system, built soon after Quora was founded, was an in-memory queue written in C++ with a Thrift interface. We deployed the system to several EC2 memory-optimized instances. In Python, developers could defer a function's execution into the task queue (from a web request, for example) by simply applying a decorator:
@execute_after_request(queue_name='work') def send_notification(notification): # generate the notification # send it out and do some logging
This decorator was responsible for serializing the function call (i.e., the function name and arguments) and adding this data to the corresponding queue. A simplified implementation might look like this:
def execute_after_request(queue_name='work'): def f(inner_function): # string_of_function returns the fully-qualified name of a Python function function_string = string_of_function(inner_function) def new_function(*args, **kwargs): message_object = WorkerTaskMessage( function_name=function_string, args_pickled=pickle.dumps(args), kwargs_pickled=pickle.dumps(kwargs), enqueue_time=time.time(), queue_name=queue_name ) cpp_queue_client.enqueue( queue_name, message=thrift_wrap.serialize_object(message_object) ) return new_function return f
Meanwhile, the task worker dequeue function polled messages from the queue service, deserialized the data, and executed the deferred function. One significant issue with this architecture was that the queue service was a single point of failure, which led to recovery difficulties—if an incident occurred, manual engineering effort was required to replay the lost tasks. Our workaround was to double-write each function call message to our internal logging system, so we could recover from failures as needed.
As Quora scaled, we eventually needed to rearchitect the system significantly in order to make it more reliable without sacrificing performance.
Redesigning the pipeline with Kafka
There are several popular queueing systems available today. For example, Redis can be used as a simple queue. However, because Redis is mostly in-memory, we would suffer from recovery issues similar to our original queue service. Amazon SQS is more stable and has very little operational burden, but is significantly more expensive. Apache Kafka scales very well, provides good performance, and doesn’t cost much, so we choose it as the foundation of our new task queue architecture. Scalability can be achieved by horizontally adding more machines, and replicas would improve reliability by avoiding a single point of failure. Recoverability would become a matter of resetting the offset to replay any lost tasks. In order to bring Kafka into Quora's architecture, we needed to implement several components: the Kafka relay, a Kafka consumer service, and Qmessage.
- Kafka Relay: Each machine that enqueues tasks runs a relay service which acts as a proxy, batching request data and delivering it to Kafka. Since our web code is written mostly in Python, the relay helps build a bridge between the Python client and Java Kafka service. On the client side, we only need to change
cpp_queue_client.enqueuefrom above to
- Kafka Consumer Service (KCS): The consumer service operates as a proxy between the Kafka service and the workers. It dequeues the batch of records from Kafka and stores them in the memory, then commits back the record to Kafka after a worker completes the task. So, KCS is stateless.
- Qmessage: Finally, Qmessage is used to dequeue a record from KCS, deserialize the record, and execute the task. We can configure Qmessage for two scenarios:
- At most once: Once the record is dequeued from KCS, we then commit it immediately and start to execute the record.
- At least once: We will commit it only if it successfully executes the record.
The below diagram illustrates the lifecycle of a Qmessage task:
Migration to Qmessage
As we began to test the deployment of the new system, we encountered three types of issues: (1) the dequeue rate was much lower than the enqueue rate, so we couldn’t drain the task queue, (2) some tasks were triggering out-of-memory errors or running for excessive periods of time, and (3) process failures were difficult to debug because of the huge number of tasks per second.
Our task machines are in an AWS autoscaling group. Based on traffic patterns and a set of defined rules, we can scale up or down the number of EC2 instances executing tasks in order to maintain the desired dequeue rate while minimizing cost. Initially, we combined three scaling rules: CPU usage, Kafka queue length, and Kafka growth rate (
enqueue rate - dequeue rate). After a few weeks, we discovered occurrences of unnecessary scaling, which we were able to trace back to noise in the Kafka growth rate. Another issue we uncovered was that CPU usage did not accurately reflect the status of task workers. For example, we couldn't scale down the number of machines if CPU usage was slow due to a traffic jam in the backend storage (e.g., tasks are stuck on the network calls). So, we instead decided on two rules: Kafka queue length and busy time, defined as the percentage of time the Qmessage process is executing the job, relative to the time spent idle. To support autoscaling, we have a cron job which aggregates the necessary metrics as each process reports data to the database.
In order to avoid out-of-memory issues, we applied Linux cgroup which limited each Qmessage process to 1.5GB by default and is adjustable via a YAML configuration file. We also set a hard limit of 5 minutes for time-consuming tasks, and if a task fails to run three times, we throw an exception in the KCS and trigger an alert.
Any process failure is sent to the exception log in our ELK framework so we can track and debug issues. We also log the process dequeue time, commit rate, commit time, and job status in our in-house monitoring framework, which uses Graphite to store metrics data. At the task level, we log memory usage, count, consuming time, and latency (defined as the time difference between enqueue and dequeue) to Graphite for debugging purposes. A cron job runs every 15 minutes to proactively identify bad tasks by checking average time usage and total count. That information is used to help us quickly optimize our tasks.
We sample from the pipeline to proactively understand and improve our loss rate of tasks. When a task is enqueued in our pipeline, the record object may be marked for inclusion in the sample. The data is then inserted at the end of the of a sorted list in Redis with
_ENQ_TYPE based on one global key. Once the sampled record is finished executing, we insert the data at the end of the sorted list in Redis with
_DEQ_TYPE. We have a minutely cron job that checks Redis and verifies all enqueued data are appropriately dequeued. If any enqueued data is not dequeued within our SLA of 6 hours, we assume the data is lost.
The introduction of Kafka and Qmessage was a huge improvement over the initial in-memory queue. We’ve found ways to prevent memory exhaustion and improved our ability to monitor and debug the system. There remain a few lingering concerns that we're actively working on. For instance, currently, the task queue has two priorities: high and low. The volume of high priority tasks is assumed to be much lower than the low priority tasks. If the number of high priority tasks increases enough that the high priority queue is always full, starvation will happen, which means that low priority tasks will be not be scheduled. By rolling out Qmessage, we've significantly increased the scalability of our offline task infrastructure, and we'll continue to scale systems like Qmessage as we grow.