Scaling Klaviyo’s Event processing Pipeline with Stream Processing

By Seed Zeng

Klaviyo has been growing at an incredible rate for the last few years — doubling our customer base and ingesting an order of magnitude more data every year. To handle the increasing event volume, we developed a system named Abacus to replace the initial version of Klaviyo’s real-time event aggregation system. Utilizing Apache Flink as its underlying framework and Kafka as its message broker, Abacus is a highly performant stateful stream processing application. In this article, I will cover the challenges of the initial version of Klaviyo’s event aggregation system, the rationale behind choosing Flink as the streaming framework, and how we built and shipped Abacus.

The initial version of Klaviyo’s event processing pipeline was a complex series of Celery tasks with RabbitMQ acting as a message bus. Historically, Klaviyo has heavily utilized Celery task processing framework with RabbitMQ, which is well proven for Python data processing workloads. Our overall event processing system consisted of the following components:

Initial Version Event Processing Pipeline

Amongst these components, Klaviyo’s real-time event aggregation system had the most trouble handling the ever-increasing traffic. Although this system served Klaviyo well for many years, it had some limitations, which we’ll cover below. But first, to fully grasp the challenges that the aggregation system possessed, we need first to understand what Klaviyo aggregates or counts to empower our customers.

This blog post detailed the motivation and application of counting at Klaviyo. Essentially, Klaviyo’s counting is an aggregation of events in real time. Here, I will illustrate this aggregation process using the example of ingesting an email event. Suppose we have an event payload coming in like this :

{
"email": john@example.com",
"message_id": "MSG123",
"timestamp": 1544153562,
"ip": "127.0.0.1",
"browser": "Safari 12.0.1",
"type": "open"
}

After validation and hydration, the event payload looks something like this:

{
"email": john@example.com",
"customer_id": "CUS123",
"message_id": "MSG123",
"campaign_name": "holiday sales"
"timestamp": 1544153562,
"ip": "127.0.0.1",
"browser": "Safari 12.0.1",
"statistic_id": "STA123",
"company_id": "COM123"
}

Each key/value pair is a facet of what our customers want to track, so we count their occurrences at both an account (a Klaviyo customer) and an individual customer (our customers’ customers) level. For example, at an account level, we increment the count by 1 for company COM123’s statistic STA123(email open) for December 7, 2018. At a customer level, we increment the count by 1 for customer CUS123’s statistic STA123(email open) for December 7, 2018.

We record counts for the same action for different time resolutions; in addition to December 7, 2018, we record the same action for the hourly time interval when the action falls into(10 PM to 11 PM for timestamp 1544153562). Similarly, we store a record for the monthly interval.

To make this even more challenging, we also record unique counts at an account level to answer questions like “How many unique individuals opened the campaign ‘Holiday Sales’.” To determine the uniqueness of a particular action, we have to track all actions any customer performed in a certain timeframe. Essentially, we keep a set of actions each customer has performed that looked something like this:

Uniqueness Set

As you can see, a single event can fan out to lots of increments for our system to keep track of, which results in many writes to our counter databases.

The initial version of Klaviyo’s real-time event aggregation system was prone to data accuracy issues because the ingestion was not idempotent. The aggregation system relied upon two Klaviyo proprietary subsystems — Big Bertha and Check Uniques — to process events and write to Cassandra. When the aggregation system ingested an event, the Check Uniques system queried Redis clusters to identify the uniqueness of the associated actions and utilized Big Bertha (a microservice built upon Redis and SupervisorD) to pre-aggregate account-level counts to reduce IO on our Cassandra clusters.

Account-level Aggregation Subsystems

As a result, the system’s dependency on Big Bertha and Check Uniques made the ingestion of an event no longer idempotent, which meant any failure in these subsystems and their storage tiers could cause data accuracy issues. To audit, debug and heal these ongoing data integrity issues, we built a number of automated processes and even built a standalone trace logging system called Athena Logging (discussed in my previous blog post).

Additionally, the system offered no isolation between account-level and customer-level aggregations. Although those workloads impacted different areas of Klaviyo — customer-level aggregates were used for customer-level analytics, flow triggering and segmentation, while the account-level aggregates were used by campaign sending and analytics reporting. The lack of isolation meant that either system’s operational incidents could negatively impact the other.

The aggregation system was also slow. While average execution time of the entire event processing pipeline hovered around 450 milliseconds, the real-time event aggregation system alone took up to 380 ms. To make things worse, since the system aggregated increments of counts (deltas instead of final counts), we used Cassandra’s counter data type to perform the final aggregation. For the counter data type, Cassandra demanded a read before write blocking operation, which made the writing to counter data type less performant than writing to other Cassandra data types. You can find more details about the design and limitations of Cassandra’s counter data type in this blog post.

We had the option to build a streaming framework from scratch. However, leveraging an existing, well-tested and mature framework was more judicious. For our specific workload, the chosen frameworks needed these essential attributes:

  1. Stateful — The framework’s ability to efficiently manage states internally is essential for our success because external state management is the leading cause of non-idempotency for our workload.
  2. Highly available — If the aggregation system is unavailable, it impacts all of our products and customers. The streaming framework has to be resilient to failures.
  3. Easy to scale — Klaviyo’s ingestion workload varies wildly during the course of normal daily operations as well as for large events. For instance, during Black Friday, overall event ingestion more than triples and we see spikes of over 10x previous highs. It is crucial for the framework to allow us to scale easily. For us, scalability has two primary aspects. One is the scalability in terms of throughput. The system’s throughput should be readily increased by simply adding more nodes. The other equally important aspect is the scalability of data storage. It should be trivial to grow the amount of data the system can store.
  4. Real-time — Downstream tasks rely on ingestion of event to be within one second to perform accurate business logic actions, such as segmentation and flow sending.

We looked at the popular stream processing technologies that were relatively mature. The following candidates made our list:

  1. Apache Spark
  2. Apache Storm
  3. Apache Flink
  4. In-memory DB, e.g., VoltDB
Streaming Frameworks & Attributes

In-memory DB options like VoltDB, though had great scalability in terms of throughput, did not have great data storage scalability because data was retained in memory instead of disk. Flink alone stood out to us because it possessed all the attributes we wanted in a framework. Even better, the Flink community was growing fast and the documentation of the software was reasonably detailed. After some prototyping and benchmarking, we selected Flink as our streaming framework.

To truly overcome the challenges of the initial version of our real-time event aggregation system, we decided to transform it into a modern streaming application leveraging Apache Flink as the framework. However, addressing all problems all at once would make this project too formidable. To scope the project better, we started by understanding what optimizations were essential for us to smoothly handle Black Friday 2018 traffic.

Combining all the previously outlined subsystems into one to make ingestion of event idempotent was essential. Making the new aggregation system fast and isolating account-level and customer-level aggregation would also be crucial for us to triumph on Black Friday. Moving off from Cassandra’s counter data type would require us to redesign the API layer and reaggregate all historical data. This effort, though ideal for our long-term scalability, was not essential for us to accomplish before Black Friday. Also, achieving counter-less storage would become a more tractable goal after consolidating all the subsystems. Therefore, we defined two milestones for the Abacus project.

The first milestone, which needs to be achieved before Black Friday, is to release the initial version of Abacus that aggregate events and write deltas to the database. This version of Abacus continues to use Cassandra’s counter data type to leverage existing data storage and API. Thus no data migration or new API layer is needed. The goals of this milestone are isolating account-level and customer-level aggregation, consolidating all subsystems to make ingestion of events idempotent, and improving the efficiency of the aggregation system.

The second milestone will be achieved after Black Friday. This milestone encompasses the release of the second version of Abacus where we aggregate final counts in place and persist the results to Cassandra (or any other distributed database). The second milestone also includes the redesign of the API layer to query data and migration of all historical data. After the second milestone, we can finally move off from Cassandra’s counter data type.

In order to solve the problem of isolation, we utilize Flink’s native connector in combination with Kafka. Kafka’s concept of consumer group takes advantages of both message queuing and publish-subscribe models. Kafka consumers belonging to the same consumer group share a group ID. These consumers then divide the topic partitions as fairly amongst themselves as possible and guarantee that each partition is only consumed by a single consumer from the group.

Kafka Consumer Group

Flink’s native Kafka connector will create a consumer group for each job. Therefore, by simply running two Flink jobs, we can separate out customer-level and account-level aggregations at the source. Respectively, we define those two Flink jobs alongside with their pertinent sink databases Abacus Customer and Abacus Statistic. Logically, Abacus Customer and Abacus Statistic are nearly identical. The only difference is Abacus customer does not have step check uniqueness of customer actions, which results in a simpler workload. Therefore, I will focus on explaining the design and implementation of the more complicated pipeline — Abacus Statistic, of which the architecture is drawn below.

Milestone One Implementation of Abacus Statistic

Check Uniqueness is a Flink RichMapFunction we maintain to track the uniqueness of a certain action. We create a log record only the first time a customer performs a certain action. This record signals that the action has already been counted and the subsequent actions of the same customer are not unique. Events are then windowed using processing time instead of event time. When the window closes, the increments — instead of the final counts — are written to Cassandra.

Flink supports both processing time and event time in streaming programs. When a streaming program runs on processing time, all time-based operations (like time windows) will use the system clock of the machines that run the respective operator. When running on event time, a streaming application is clocked on the time that each individual event occurred on its producing device. In event time, the progress of time depends on the data. Event time programs must specify how to generate Event Time Watermarks, which is the mechanism that signals progress in event time. you can read more details on how Flink watermark works in this post. Optionally, event time programs can specify allowed lateness for window operators. By default, late elements are dropped when the watermark is past the end of the window. However, Flink allows specifying a maximum allowed lateness for window operators. Allowed lateness specifies by how much time elements can be late before they are dropped.

For our workload, incoming events can be heavily delayed by upstream data sources or historical back population during customer onboarding. Therefore, the application uses processing time for windowing and aggregating events before writing increments to Cassandra. Compared to relying on event time, using processing time for the application also has the benefit of maintaining a much smaller window states.

We implemented the design above and delivered it as part of milestone one. The current version of Abacus has consolidated subsystems — Check Uniques and Big Bertha — of the initial version of the event aggregation system and guarantees idempotency of event ingestion before writing to Cassandra. Also, replacing the old aggregation system with Abacus has successfully reduced the ingestion time of the whole event processing pipeline from 450 milliseconds to 80 milliseconds. Additionally, account-level and customer-level aggregations have been separated, which isolates the impact of any potential failure of each system.

Relying on Abacus, Klaviyo was able to break all data processing records on Black Friday and ensure success for our customers. Our new aggregation system has not only been proven to smoothly processed more than a billion events on Cyber Monday alone but also achieved this by using only one-sixth of the original resources.

Immediately after last Black Friday, we started the development of the milestone two Abacus where we would aggregate final counts instead of increments in the stream. The prudent design of milestone one Abacus has enabled us to reuse all the code of the first version. We only need to add one additional step — Value Reader — to our stream. The implementation looks like this:

Milestone Two Implementation of Abacus Statistic

Value Reader consists of two Flink operators — WindowManager and CassandraHydrator. WindowManager is a Flink RichMapFunction to maintain counts for different timeframes. CassandraHydrator is a Flink AsyncFunction that enables us to read specific counts from the account-level database. When an aggregate of a specific customer action flows through the pipeline, WindowManager will query local state in RocksDB to see whether it has the count for the customer action. If it does, the application initializes the current aggregate in the stream with the count. If the count is not available locally, CassandraHydrator is invoked to initialize the aggregate through reading the count from database asynchronously. Later, this initialized aggregate is reduced to final counts after windowed using processing time. Leveraging this design, the account-level Cassandra cluster can be finally freed from the infamous Cassandra’s counter data type because we will always write the final counts instead of deltas to Cassandra at the end of the stream.

Currently, parallel to developing the next version of Abacus, we are rewriting our API layer and planning the migration of historical data for the new schema. After milestone one, we will gain appreciable improvement on the already performant real-time event aggregation system.

In this article, I have illustrated how we transformed the initial version of Klaviyo’s real-time event aggregation system using the power of stream processing. In upcoming blog posts, we will discuss details of optimizing Abacus for the workload and deployment of Flink clusters. Stay tuned and keep going fast!

Special thanks to Ally Hangartner on Klaviyo’s design team for designing all the beautiful diagrams. Thanks to my colleagues for all the suggestions.