Event-Driven Microservices with RabbitMQ and Ruby

In my last post we discussed how event sourcing with Apache Kafka can be used to create robust reactive microservices. While event sourcing as an architectural pattern is useful to understand, it is not commonly used when starting a new project or service, which generally start their life as simple CRUD applications. Event sourcing works best when chosen as a top-level architecture for a new service from the start. As such, it is not always a great fit when trying to migrate an existing monolithic application into an event-driven microservices architecture; event sourcing is not easy to “bolt-on” to an existing application. In cases like this, there are still many great options and patterns available to move towards a reactive event-driven microservices architecture.

Today I am going to describe a pattern that is very similar to what we use here at Kontena for our Kontena Cloud product. Kontena Cloud is composed of microservices and APIs coded in Ruby (well, most of them), and a front-end application written in Typescript and node.js. We use a mix of PostgreSQL, MongoDB and Redis for data storage, with RabbitMQ to tie it all together.

While reading thru this blog post, please also check out the accompanying Github Repository.

The Architecture


RabbitMQ is a very popular and stable message broker used by countless companies to implement various types of asynchronous workflows. We chose RabbitMQ here at Kontena because we needed a stable, manageable and highly-available solution for messaging. We already had a decent amount of previous institutional knowledge managing RabbitMQ at scale compared to other alternatives such as Kafka and Zookeeper.

The two most common use cases often seen for RabbitMQ are the producer/consumer (work queues) pattern and the publish/subscribe pattern. We make heavy use of both here at Kontena. For the purpose of this article, I am going to focus mainly on the publish/subscribe pattern, specifically using topic exchanges. If you are not already familiar with RabbitMQ and its terminology, I recommend you read through their fantastic tutorials to get a feel for how RabbitMQ works.

When we discuss event driven systems, what we are referring to is a system of separate applications and services that communicate with each other in an asynchronous fashion, as loosely coupled as possible. When designing such a system, there are two factors that seem to be at odds with each other: our event publishers should not care who consumes messages, while our event consumers should never lose messages, even when all instances of the consuming service are down. RabbitMQ solves this problem using exchanges, topics and queues. Our publisher application writes an event to an exchange, with some topic routing key describing the type of event. Our consumers, on the other hand, create a durable queue that is bound to the exchange and some topic routing pattern (for example, if our publisher uses the topics product.created and product.updated, our subscriber can bind their queue to product.# to receive ALL product events).

RabbitMQ Topic Exchange Example

Unlike Kafka, one feature RabbitMQ does NOT offer is an event history and storage mechanism. In RabbitMQ, events published on an exchange are ephemeral: unless they are bound to a queue, they are lost forever. And, even if it ends up in a queue, the message is gone as soon as a consumer finishes handling it. This poses a problem for an event-driven microservices architecture, as these events are essentially the source of truth for business transactions that occur between multiple services. Having this history allows us to leverage a lot of powerful features when adding new services, such as replaying all previous events and inspecting past events to help troubleshoot issues, not to mention potential end-user-facing features like audit history. Therefore, we need to build another layer onto our system.

State Machines

In event sourcing, we said that the source of truth for our service was the event stream, usually stored in some log-structured data storage or append-only database table. On the other hand, most applications store their data in an RDBMS or NoSQL data store, and the data is mutable. By mutable, we mean that every change updates an existing row or document. We could conceivably listen for changes to this entity in our code (maybe using Active Record model callbacks for example) and publish out that data over RabbitMQ, but unless we also store these events we lose all the real benefits that an event driven architecture should offer (history, replay-ability and being responsive to future changes).

A nice way to solve this is to embed a state machine engine into each of our primary entities, and build the event storage as a generic extension of the state-machine engine. Unlike event sourcing, our history table is not the source of truth for our application, but instead an artifact from any changes in the system. But, to outside systems consuming our events, these events will in fact exist as the source of truth for our services external interface to the rest of the system.

In the example application we are using a lightweight Ruby state machine library aasm. Besides being lightweight, aasm has an easy to manage DSL for describing an entity’s states and events, with built in support for storing and retrieving the current state via most popular Ruby ORMs (Active Record, Sequel, Mongoid, etc).

The Code

The Products Service

Let’s imagine a Products service, which stores a catalog of various products for some eCommerce site. This will be the simplest products catalog ever, having just a single Product model type with name, price, inventory and state fields.

When adding a state machine to our model, we are immediately confronted with the question: what are the possible states our model can be in, and what are the events that get us there? One thing that becomes clear after working with state machines is that, even in other architectures that don’t use explicit state machines, the states are still there implicitly. Your Product can be in-stock, out of stock or discontinued even if there is no explicit state column in your data model stating it; you just derive that information based on the other attributes of your model (maybe an inventory integer field, as well as a is_discontinued boolean field). The explicit state machine just makes this easier to understand at a glance, as well as enforcing business rules when transitioning between states. Being able to define business rules in an easily-readable declarative manner is also a huge help for new developers trying to understand existing code they did not write (not to mention your own code 6 months later!).

Below is an example of what our Product model might look like in Ruby, stored in MongoDB:

class Product include Mongoid::Document include Mongoid::Timestamps include AASM field :name, type: String field :state, type: String field :price, type: Integer field :inventory, type: Integer, default: 0 index({ name: 1 }, { unique: true }) aasm(column: :state, with_klass: StateTracking) do enable_tracking! state :initialized, initial: true state :created state :in_stock state :out_of_stock state :discountinued event :create do transitions :from => :initialized, :to => :created end event :add_inventory do transitions :from => [:created, :in_stock, :out_of_stock], :to => :in_stock end event :reduce_inventory do transitions :from => :in_stock, :to => :in_stock, :guard => :has_inventory? transitions :from => :in_stock, :to => :out_of_stock end event :discontinue do transitions :from => [:created, :in_stock, :out_of_stock], :to => :discountinued end end def has_inventory? inventory > 0 end

As you can see, the various states our model can be in, as well as the events that transition between these states, are now explicitly defined. We even have some additional business logic defined in the form of guard statements.

Modifying A Product

Imagine we have an existing Product and we want to update its inventory, potentially changing its state. One nice way to encapsulate this business logic is via the Ruby Mutations library (we love our Mutations here at Kontena):

module Products class Update < Mutations::Command required do model :product integer :adjust_inventory end def validate if adjust_inventory == 0 return add_error(:adjust_inventory, :invalid, 'Inventory adjustment factory cannot be zero') end if adjust_inventory < 0 && ( (product.inventory + adjust_inventory) < 0 ) return add_error(:adjust_inventory, :invalid, 'Cannot set inventory below zero') end if product.discountinued? return add_error(:product, :discountinued, 'Product is discountinued') end end def execute product.inventory += adjust_inventory product.save if adjust_inventory > 0 product.add_inventory! else product.reduce_inventory! end product end end

Storing Events

In our example code above we have two potential events happening on our Product: add_inventory! or reduce_inventory!. In either case, we would like our framework to be able to store and publish out our events to the rest of our services. The first step to implement this is to create a new model that represents our event data:

class StateTransitionEvent include Mongoid::Document include Mongoid::Timestamps::Created field :entity_type, type: String field :entity_id, type: BSON::ObjectId field :entity_attributes, type: Hash field :event, type: String field :from, type: String field :to, type: String

The first three fields will represent the entity being stored (in our case, the Product ), and the last three will represent the aasm event, previous state and current state. The key to storing this event data in a generic way is to use the aasm extensibility hooks via the AASM::Base base class. The syntax for achieving this is somewhat complicated, but the concept is simple: when we save our model’s state to the database, we need to record the event along with our model’s attributes, our model’s previous state and our current state and store all of this in the database as a new immutable record. Our aasm extension injects a class level method enable_tracking! which enables this hook for a model. Take a look at the accompanying Github repository for the gritty details.

Publishing Our Events

If you looked at our aasm extension, you will see we are not only storing StateTransitionEvent's but also passing them to a class method EventBus.publish_event. This is the code responsible for serializing our event data and publishing it to our RabbitMQ topic exchange.

You can view the full source for this class here, but for now we just want to look at the publish_event method:

 def publish_event(event, prefix: '') routing_key = build_routing_key(event, prefix) payload = build_event_payload(event) channel_pool.with do |channel| channel. topic(eventbus_exchange, durable: true). publish(payload, routing_key: routing_key) end
``` At a high level we can see we are building a routing key, serializing our event into a JSON payload, fetching a RabbitMQ `channel` from our connection pool and publishing our event on a topic exchange. The one thing that might appear strange here is the `prefix` parameter. This is some arbitrary string that, when present, is attached to the beginning of our RabbitMQ topic routing key. Topic routing keys work similar to dot-separated namespaces in many programming languages, so adding this prefix is similar to adding a top-level namespace to our routing key. This is useful later when we want to replay messages. ####Replaying Messages
Why would we want to ever replay messages? One idea is, what if we want to add a new microservice to our system that intends on storing its own copy of our event data? Or, as in [our last blog](http://blog.kontena.io/event-sourcing-microservices-with-kafka/) post, what if we want to always read all events at startup and store the data in-memory? In both of these cases, being able to request a replay of all events is necessary. Of course, replaying all events can also be dangerous. What if one of the other consumers is not properly coded to handle receiving very old messages? Not to mention our services will need to handle lots of replayed messages they are not actually interested in. One solution to this problem is for each consumer to bind to two separate queues. The first queue is bound to the entities’ standard topic routing key, such as `product.#` for all product events. The second queue is bound to another routing key with a unique prefix for the consumer, something like `my-consumer.product.#`. That way, when our service requests the products service to replay all product events, we can pass in `my-consumer` as the topic prefix, which will replay every event in history but only consumers bound to the `my-consumer` topic prefix will receive the messages. Our other existing services will not have this topic binding and thus will not need to deal with them. ##The Demo ####Installing
All of this is much more fun when you can run some code and watch what it’s doing. For our demo, we will run our [example application](https://github.com/kontena/state-machine-events-example/blob/master/api/app/services/event_bus.rb) in Docker containers on top of the Kontena Platform. The easiest way to get started is to create a new account at [https://cloud.kontena.io](https://cloud.kontena.io/) and follow the steps create a new test Kontena Platform. Don’t worry, you can run this without spending a dime using our free credits when signing up. In Kontena Cloud, make sure you have a Platform created along with one or more compute nodes attached. Once this is done, we are going to set up a RabbitMQ cluster. This is actually much simpler than it sounds thanks to our ready built [https://github.com/kontena/kontena-stacks/tree/master/rabbitmq) (note that this is basically the same stack running in production right now connecting all of Kontena Cloud’s backend services). From the integrated web terminal, run the following commands to create the necessary data volumes and install the stack: 

volume create --scope instance --driver local harbur-rabbitmq-cluster-seed-data

volume create --scope instance --driver local harbur-rabbitmq-cluster-node-data

stack install kontena/harbur-rabbitmq-cluster

When prompted for the amount of nodes, set it to the amount of compute nodes you have created when you created your Platform. Next, let’s install our demo application. This is also available as a public Kontena Stack: 

stack install kontena/state-machine-events

Just as with RabbitMQ, make sure to enter the correct number of initial nodes you want to deploy to. Note that this can be less than the amount of nodes attached to your Platform but not more. ####Testing
Let’s start with our basic little UI application. This is just some simple Javascript hosted by a simple node.js server, with [socket.io](https://socket.io/) and [express](https://expressjs.com/) for handling Websockets and HTTP, respectively. To view the UI, first go to the “nodes” tab in the Kontena Cloud UI, then in the “Config” section scroll down to get the public IP of one of the available compute nodes. Enter this IP into the browser. You should be greeted with a very sparse UI (basically it just says “Products” at the top). To get some data in, we need to make an HTTP request to our API, also running on the same IP as our UI. I will assume you have access to the `curl` utility, but any HTTP tool (such as [Postman](https://www.getpostman.com/) or [Windows Powershell](https://docs.microsoft.com/en-us/powershell/module/microsoft.powershell.utility/invoke-webrequest?view=powershell-5.1)) will work. Enter the following command, replacing $IP with the public IP from above: 

curl http://$IP/api/products

You should get back an empty JSON array. Let’s create some data: 

curl -XPOST -H "Content-Type: application/json" -d '{"name":"candy","price":100}' http://$IP/api/products

After running this, not only should you see a response from `curl` with your new product, but you should immediately see the new product on the UI, thanks to RabbitMQ and Websockets. Stuff like this is where a reactive event-driven backend really starts to shine! Next, let’s try updating and deleting our product, watching the UI between commands (replace $ID with the product’s ID from the UI): 

curl -XPUT -H "Content-Type: application/json" -d '{ "adjust_inventory": 2 }' http://$IP/api/products/$ID

curl -XPUT -H "Content-Type: application/json" -d '{ "adjust_inventory": -2 }' http://$IP/api/products/$ID

curl -XDELETE http://$IP/api/products/$ID

Finally, try closing the browser tab and opening a new one with the UI URL. The UI’s backend code requests an event replay as discussed earlier, ensuring the new page has all the existing event data. Take a look at the [UI code](https://github.com/kontena/state-machine-events-example/tree/master/ui) to see how this works. ##Why use Kontena Cloud for your microservices? Setting up and maintaining a complex container platform like Kubernetes (or any of its derivatives) is not where the race is won. Building stuff on top of the platform is the only thing that matters and the best platform is the one you don’t even notice being there. Proprietary platforms like Amazon ECS might be easy but comes with serious restrictions. If you are looking for a complete, developer friendly solution that is open source, extremely easy to use and works for everybody on any infrastructure, the Kontena Cloud is for you. So, that’s why! I hope you enjoyed this article. Looking forward to hear your feedback and comments! Disclaimer: Unless you didn’t figure it out already, I work for Kontena :) And [we are hiring](https://www.kontena.io/jobs)! [www.kontena.io](https://kontena.io). Image Credits: Blue cube design modern cube shape by [PIRO4D](https://pixabay.com/en/users/PIRO4D-2707530/).