Build Messaging Between Ruby/Rails Applications with ActiveMQ


One of the tools that are rarely covered in the Ruby/Rails world are the message brokers (probably because they mostly written in Java). Everyone are familiar mostly with background jobs processing, but message brokers offer a more flexible approach to asynchronous execution. For example, you can create a message from one application and process it in another and continue executing without waiting for the response.

Some benefits that you get at the architectural level: Fault Tolerance, Guaranteed delivery, Asynchronous communication(through Publish/Subscribe pattern), Loosely coupling, etc.

One of the messaging brokers that I used is ActiveMQ. ActiveMQ provides most of these features and I will consider building communication using this broker as an example.

ActiveMQ

Open source multi-protocol Messaging Broker.

Advantages:

Installing

ActiveMQ requires Java 7 to run and to build.

Brew (on MacOS)

The easier way to install

brew install apache-activemq
activemq start

Unix Binary Installation

Download the latest version here and follow up the documentation.

Docker

Unfortunately, the ActiveMQ doesn’t have official docker image. One of me
checked that I can recommend is https://hub.docker.com/r/rmohr/activemq

docker pull rmohr/activemq
docker run -p 61616:61616 -p 8161:8161 rmohr/activemq

CLI commands

Here are three most useful commands for beginning:

activemq start — Creates and starts a broker using a configuration file.

activemq stop — Stops a running broker.

activemq restart — Restarts a running broker.

To see all the commands just call activemq into terminal.

Start ActiveMQ

activemq INFO: Loading '/usr/local/Cellar/activemq/5.15.9/libexec//bin/env'INFO: Using java '/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/bin/java'INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get detailsINFO: pidfile created : '/usr/local/Cellar/activemq/5.15.9/libexec//data/activemq.pid' (pid '61388')

Monitoring ActiveMQ

You can monitor ActiveMQ using the Web Console by pointing your browser at http://localhost:8161/admin. Default credentials: login: admin, pass: admin

Messaging Patterns

The use of messaging brokers is simple and consists of only two concepts —
Topics and Queues. But modern tools also provide a combination of these approaches and provide additional features such as implementations of Publish/Subscribe pattern, failover, etc. But first, let’s get to know the main concepts.

Queue Queues are the base messaging pattern. They provide direct communication between a publisher and a subscriber. The publisher creates

messages, while the consumer reads one after another. After a message was read, it’s gone from the Queue. If the queue has multiple subscribers, only one of them will get the message.

Topic
Topic implements one-to-many communication. Unlike a queue, every subscriber will receive a message sent by the publisher. And the main problem is that the message cannot be recovered for a single listener(for example, if the service is disconnected from reading Topic).

Virtual Topics
Virtual topics combine both approaches. While the publisher sends messages to a topic, subscribers will receive a copy of the message on their own related queue.

Protocols

ActiveMQ supports most of the communication protocols such as MQTT,
OpenWire, REST, RSS and Atom, Stomp, WSIF, WebSocket
and XMPP.

Getting Started

The easiest way is to start a feature review with a familiar protocol HTTP.

REST

ActiveMQ implements a RESTful API to messaging which allows any web capable device to publish messages using a regular HTTP POST or GET.

Publish to Queue

curl -u admin:admin -d "body=order_id" http://localhost:8161/api/message/shop?type=queue

Publish to Topic

curl -u admin:admin -d "body=order_id” http://localhost:8161/api/message/shop?type=topic

Integration with Ruby

The protocol that I will consider — STOMP (The Simple Text Oriented Messaging Protocol). STOMP provides an interoperable wire format so that STOMP clients can communicate with any STOMP message broker to provide easy and widespread messaging interoperability among many languages, platforms, and brokers.

There is a great gem to work with this protocol for sending and receiving messages from a Stomp protocol compliant message queue. Includes: failover logic, SSL support.

gem 'stomp' bundle install

Initialize Connection

def config_hash { hosts: [ { login: 'admin', passcode: 'admin', host: '0.0.0.0', port: 61613, ssl: false } ] }
end client = Stomp::Client.new(config_hash)

Queues

Interface to Queues with STOMP is pretty simple. Just initialize the connection with the list of configurations and close after publishing the message.

Send a Message to Queue

client = Stomp::Client.new(config_hash) data = { order_id: 1, command: :paid } client.publish('/queue/user-notifications', data.to_json) client.close

Receive a Message from Queue

client = Stomp::Client.new(config_hash) Thread.new do client.subscribe('/queue/user-notifications') do |msg| begin msg = JSON.parse(msg.body) # message processing... rescue StandardError => e Raven.capture_exception(e) end end
end

Note: Use exception handling to respond to them in time. For example, here I use Raven — Sentry wrapper.

Topics

Topics have a similar interface to Queues. Some examples below.

Send a Message to Topic

client = Stomp::Client.new(config_hash) data = { order_id: 1, command: :paid } client.publish('/topic/user-notifications', data.to_json) client.close

Receive a Message from Topic

client = Stomp::Client.new(config_hash) Thread.new do client.subscribe('/topic/user-notifications') do |msg| begin msg = JSON.parse(msg.body) # message processing... rescue StandardError => e Raven.capture_exception(e) end end
end

Integration with Rails

ActiveMessaging — Attempt to bring the simplicity and elegance of rails development to the world of messaging.

Add a gem to Gemfile version for Rails 5+

gem 'activemessaging', github: 'kookster/activemessaging', branch: 'feat/rails5'

And then execute:

Initializing

After adding ActiveMessaging the following command (to add a base class for defining listeners and polling server)

rails g active_messaging:install create app/processors/application_processor.rb create script/poller chmod script/poller create script/threaded_poller chmod script/threaded_poller create lib/poller.rb create config/broker.yml gemfile daemons

Generate a listener

rails g active_messaging:processor RailsQueue create app/processors/rails_queue_processor.rb create config/messaging.rb invoke rspec create spec/functional/rails_queue_processor_spec.rb

Processor
Here you specify which will be listened by subscribes_to . When the message is published to RailsQueue then on_message executes with first arguments as the message body.

class RailsQueueProcessor < ApplicationProcessor subscribes_to :rails_queue def on_message(message) logger.debug 'RailsQueueProcessor received: ' + message end
end

Destination config
In initializer, we describe the destination of queue.

ActiveMessaging::Gateway.define do |s| s.destination :rails_queue, '/queue/RailsQueue'
end

Run Application

Now you can publish to RailsQueue and Rails instance will receive messages.

Production

First of all, you should think about deployment and maintenance. So, for small teams (without DevOps/SRE/System Administrator Role) I suggest look into Cloud Solutions.

AmazonMQ

https://aws.amazon.com/amazon-mq

Some features:

  • Uses Apache KahaDB as its data store. Other data stores, such as JDBC and LevelDB, aren't supported
  • Offers low latency messaging, often as low as single digit milliseconds
  • Persistence out of the box
  • Backups

No need to use extra API, just follow STOMP protocol (or any other) which Amazon provides to you.

FIFO (First In, First Out)

To enable saving the order of delivery of messages, you need to add in the broker configuration total ordering.

<destinationPolicy> <policyMap> <policyEntries> <policyEntry topic="&gt;"> <!-- The constantPendingMessageLimitStrategy is used to prevent slow topic consumers to block producers and affect other consumers by limiting the number of messages that are retained For more information, see: --> <dispatchPolicy> <strictOrderDispatchPolicy/> </dispatchPolicy> </policyEntry> </policyEntries> </policyMap>
</destinationPolicy>

Reference

Post related to my last talk on Ruby Wine#1 about Event-Driven Architecture and Messaging Patterns for Ruby Microservices.

Slides available on Speaker Deck.

For simple projects, the use of queues may not be justified, since adding an additional architectural layer responsible for queuing messages is not an easy task. And before use, you must weigh the pros and cons(for e.g. you are ready to spend time for support). Queuing allows you to scale the application more flexible and solve most performance problems that are not related to language, but to architecture.