One simple trick to make Event Sourcing click

By PaweĊ‚ Pacana

Event Sourcing is like having two methods when previously there was one. There — I’ve said it.

But it isn’t my idea at all.

It was Greg that used it first, in a bit different context. When explaining CQRS he used this exact words:

Starting with CQRS, CQRS is simply the creation of two objects where there was previously only one. The separation occurs based upon whether the methods are a command or a query (the same definition that is used by Meyer in Command and Query Separation, a command is any method that mutates state and a query is any method that returns a value).

You can have quite a similar statement on event-sourced aggregate root. The separation occurs based upon whether the method:

  • corresponds to an action we want to take on an aggregate — protects business rules and tells what domain event happened if those were met
  • maps consequences of the domain event that happened to internal state representation (against which business rules are executed)

Not convinced yet? Let the examples speak.

Stereotypical aggregate without Event Sourcing

Below is a typical aggregate root. In the scope of the example there are only two actions you can take — via public register and supply methods.

class Product CannotSupply = Class.new(StandardError) AlreadyRegistered = Class.new(StandardError) def initialize(store_id: nil, sku: nil, quantity_available: 0) @store_id = store_id @sku = sku @quantity_available = quantity_available end def register(store_id:, sku:, event_store:) raise AlreadyRegistered if @store_id @store_id = store_id @sku = sku event_store.publish_event(ProductRegistered.new(data: { store_id: @store_id, sku: @sku, })) end def supply(quantity, event_store:) raise CannotSupply unless @store_id && @sku @quantity_available += quantity event_store.publish_event(ProductSupplied.new(data: { store_id: @store_id, sku: @sku, quantity: quantity, })) end
end

Aggregate with Event Sourcing

In event sourcing it is the domain events that are our source of truth. They state what happened. What we need to do is to make them a bit more useful and convenient for decision making. This is the sourcing part.

class Product CannotSupply = Class.new(StandardError) AlreadyRegistered = Class.new(StandardError) def initialize(store_id: nil, sku: nil, quantity_available: 0) @store_id = store_id @sku = sku @quantity_available = quantity_available end def register(store_id:, sku:, event_store:) raise AlreadyRegistered if @store_id event = ProductRegistered.new(data: { store_id: store_id, sku: sku, }) event_store.publish_event(event) registered(event) end def supply(quantity, event_store:) raise CannotSupply unless @store_id && @sku event = ProductSupplied.new(data: { store_id: @store_id, sku: @sku, quantity: quantity, }) event_store.publish_event(event) supplied(event) end private def supplied(event) @quantity_available += event.data.fetch(:quantity) end def registered(event) @sku = event.data.fetch(:sku) @store_id = event.data.fetch(:store_id) end
end

In this step we’ve drawn the line between making a statement that something happened (being possible to happen first) and what side effects does it have. Notice private registered and supplied methods.

Why make such effort and introduce indirection? The reason is simple — if the events are source of truth, we could not only shape internal state for current actions we take but also for the ones that happened in the past.

Instead of loading current state stored in a database, we can take collection of events that happened in scope of this aggregate — in its stream.

class Product CannotSupply = Class.new(StandardError) AlreadyRegistered = Class.new(StandardError) def initialize(store_id: nil, sku: nil, event_store:) stream_name = "Product$#{store_id}-#{sku}" events = event_store.read_all_events_forward(stream_name) events.each do |event| case event when ProductRegistered then registered(event) when ProductSupplied then supplied(event) end end end def register(store_id:, sku:, event_store:) raise AlreadyRegistered if @store_id event = ProductRegistered.new(data: { store_id: store_id, sku: sku, }) event_store.publish_event(event) registered(event) end def supply(quantity, event_store:) raise CannotSupply unless @store_id && @sku event = ProductSupplied.new(data: { store_id: @store_id, sku: @sku, quantity: quantity, }) event_store.publish_event(event) supplied(event) end private def supplied(event) @quantity_available += event.data.fetch(:quantity) end def registered(event) @sku = event.data.fetch(:sku) @store_id = event.data.fetch(:store_id) end
end

At this point you may have figured out that event_store dependency that we constantly pass as an argument belongs more to the infrastructure layer than to a domain and business.

What if something above passed a list of events first so we could rebuild the state? After an aggregate action happened we could provide a list of domain events to be published (unpublished_events):

class Product CannotSupply = Class.new(StandardError) AlreadyRegistered = Class.new(StandardError) attr_reader :unpublished_events def initialize(events) @unpublished_events = [] events.each { |event| dispatch(event) } end def register(store_id:, sku:) raise AlreadyRegistered if @store_id apply(ProductRegistered.new(data: { store_id: store_id, sku: sku, })) end def supply(quantity) raise CannotSupply unless @store_id && @sku apply(ProductSupplied.new(data: { store_id: @store_id, sku: @sku, quantity: quantity, })) end private def apply(event) dispatch(event) @unpublished_events << event end def dispatch(event) case event when ProductRegistered then registered(event) when ProductSupplied then supplied(event) end end def supplied(event) @quantity_available += event.data.fetch(:quantity) end def registered(event) @sku = event.data.fetch(:sku) @store_id = event.data.fetch(:store_id) end
end

More or less this reminds the aggregate_root gem that is aimed to assist you with event sourced aggregates.

The rule of having two methods when there was previously one however still holds.

  • The public method (such as supply) corresponds to an action we want to take on an aggregate — protects business rules and tells what domain event happened if those rules were met.
  • The private method (such as supplied) maps consequences of the domain event that happened to the internal state representation.

There are more code samples and The Why of Event Sourcing in Domain-Driven Rails which I fully recommend. With HIGH5EVENTSOURCING you can get it now with 25% off.

Have a great day!