Sourced
Event Sourcing / CQRS toolkit for Ruby. Eventual consistency and concurrency built-in.
Install / Use
/learn @ismasan/SourcedREADME
sourced
WORK IN PROGRESS
Event Sourcing / CQRS library for Ruby. There's many ES gems available already. The objectives here are:
- Cohesive and toy-like DX.
- Eventual consistency by default. Actor-like execution model.
- Low-level APIs for durable messaging.
- Supports the Decide, Evolve, React pattern
- Control concurrency by modeling.
- Simple to operate: it should be as simple to run as most Ruby queuing systems.
- Explore ES as a programming model for Ruby apps.
A small demo app here.
The programming model
If you're unfamiliar with Event Sourcing, you can read this first: Event Sourcing from the ground up, with Ruby examples For a high-level overview of the mental model, read this. Or the video version, here.
The entire behaviour of an event-sourced app is described via commands, events and reactions.
<img width="1024" height="469" alt="sourced-arch-diagram" src="https://github.com/user-attachments/assets/ed916471-525f-4743-bc9a-10a2b6d9f8e9" />- Commands are intents to effect some change in the state of the system. Ex.
Add cart item,Place order,Update email, etc. - Events are produced after handling a command and they describe facts or state changes in the system. Ex.
Item added to cart,order placed,email updated. Events are stored and you can use them to build views ("projections"), caches and reports to support UIs, or other artifacts. - Reactions are blocks of code that run after an event has been processed and can dispatch new commands in a workflow or automation.
- State is whatever object you need to hold the current state of a part of the system. It's usually derived from past events, and it's just enough to interrogate the state of the system and make the next decision.
Actors
Overview
Actors are classes that encapsulate the full life-cycle of a concept in your domain, backed by an event stream. This includes loading state from past events and handling commands for a part of your system. They can also define reactions to their own events, or events emitted by other actors. This is a simple shopping cart actor.
class Cart < Sourced::Actor
# Define what cart state looks like.
# This is the initial state which will be updated by applying events.
# The state holds whatever data is relevant to decide how to handle a command.
# It can be any object you need. A custom class instance, a Hash, an Array, etc.
CartState = Struct.new(:id, :status, :items) do
def total = items.sum { |it| it.price * it.quantity }
end
CartItem = Struct.new(:product_id, :price, :quantity)
# This factory is called to initialise a blank cart.
state do |id|
CartState.new(id:, status: 'open', items: [])
end
# Define a command and its handling logic.
# The command handler will be passed the current state of the cart,
# and the command instance itself.
# Its main job is to validate business rules and decide whether new events
# can be emitted to update the state
command :add_item, product_id: String, price: Integer, quantity: Integer do |cart, cmd|
# Validate that this command can run
raise "cart is not open!" unless cart.status == 'open'
# Produce a new event with the same attributes as the command
event :item_added, cmd.payload
end
# Define an event handler that will "evolve" the state of the cart by adding an item to it.
# These handlers are also used to "hydrate" the initial state from Sourced's storage backend
# when first handling a command
event :item_added, product_id: String, price: Integer, quantity: Integer do |cart, event|
cart.items << CartItem.new(**event.payload.to_h)
end
# Optionally, define how this actor reacts to the event above.
# .reaction blocks can dispatch new commands that will be routed to their handlers.
# This allows you to build workflows.
reaction :item_added do |event|
# Evaluate whether we should dispatch the next command.
# Here we could fetch some external data or query that might be needed
# to populate the new commands.
# Here we dispatch a command to the same stream_id present in the event
dispatch(:send_admin_email, product_id: event.payload.product_id)
end
# Handle the :send_admin_email dispatched by the reaction above
command :send_admin_email, product_id: String do |cart, cmd|
# maybe produce new events
end
end
Using the Cart actor in an IRB console. This will use Sourced's in-memory backend by default.
cart = Cart.new(id: 'test-cart')
cart.state.total # => 0
# Instantiate a command and handle it
cmd = Cart::AddItem.build('test-cart', product_id: 'p123', price: 1000, quantity: 2)
events = cart.decide(cmd)
# => [Cart::ItemAdded.new(...)]
cmd.valid? # true
# Inspect state
cart.state.total # 2000
cart.items.items.size # 1
# Inspect that events were stored
cart.seq # 2 the sequence number or "version" in storage. Ie. how many commands / events exist for this cart
# Append new messages to the backend
Sourced.config.backend.append_to_stream('test-cart', events)
# Load events for cart
events = Sourced.history_for(cart)
# => an array with instances of [Cart::AddItem, Cart::ItemAdded]
events.map(&:type) # ['cart.add_item', 'cart.item_added']
Try loading a new cart instance from recorded events
cart2, events = Sourced.load(Cart, 'test-cart')
cart2.seq # 2
cart2.state.total # 2000
cart2.state.items.size # 1
Registering actors
Invoking commands directly on an actor instance works in an IRB console or a synchronous-only web handler, but for actors to be available to background workers, and to react to other actor's events, you need to register them.
Sourced.register(Cart)
This achieves two things:
- Messages can be routed to this actor by background processes, using
Sourced.dispatch(message). - The actor can react to other events in the system (more on event choreography later), via its low-level
.handle(event)Reactor Interface.
These two properties are what enables asynchronous, eventually-consistent systems in Sourced.
Expanded message syntax
Commands and event structs can also be defined separately as Sourced::Command and Sourced::Event sub-classes.
These definitions include a message type (for storage) and payload attributes schema, if any.
module Carts
# A command to add an item to the cart
# Commands may come from HTML forms, so we use Types::Lax to coerce attributes
AddItem = Sourced::Command.define('carts.add_item') do
attribute :product_id, Types::Lax::Integer
attribute :quantity, Types::Lax::Integer.default(1)
attribute :price, Types::Lax::Integer.default(0)
end
# An event to track items added to the cart
# Events are only produced by valid commands, so we don't
# need validations or coercions
ItemAdded = Sourced::Event.define('carts.item_added') do
attribute :product_id, Integer
attribute :quantity, Integer
attribute :price, Integer
end
## Now define command and event handlers in a Actor
class Cart < Sourced::Actor
# Initial state, etc...
command AddItem do |cart, cmd|
# logic here
event ItemAdded, cmd.payload
end
event ItemAdded do |cart, event|
cart.items << CartItem.new(**event.payload.to_h)
end
end
end
.command block
The class-level .command block defines a command handler. Its job is to take a command (from a user, an automation, etc), validate it, and apply state changes by publishing new events.
command AddItem do |cart, cmd|
# logic here...
# apply and publish one or more new events
# using instance-level #event(event_type, **payload)
event ItemAdded, product_id: cmd.payload.product_id
end
.event block
The class-level .event block registers an event handler used to evolve the actor's internal state.
These blocks are used both to load the initial state when handling a command, and to apply new events to the state in command handlers.
<img width="573" height="146" alt="sourced-evolve-handler" src="https://github.com/user-attachments/assets/174fb8d0-e2ef-41f3-8f43-c94b766529ec" />event ItemAdded do |cart, event|
cart.items << CartItem.new(**event.payload.to_h)
end
These handlers are pure: given the same state and event, they should always update the state in the same exact way. They should never reach out to the outside (API calls, current time, etc), and they should never run validations. They work on events already committed to history, which by definition are assumed to be valid.
.before_evolve block
The class-level .before_evolve block registers a callback that runs before each registered event handler during state evolution. This is useful for common logic that should run before all event handlers, such as updating timestamps or recording metadata.
class CartListings < Sourced::Projector::StateStored
state do |id|
{ id:, items: [], updated_at: nil, seq: 0 }
end
# This block runs before any .event handler
before_evolve do |state, event|
state[:updated_at] = event.created_at
state[:seq] = event.seq
end
event Cart::ItemAdded do |state, event|
state[:items] << event.payload.to_h
end
event Cart::Placed do |state, event|
state[:status] = :placed
end
end
The before_evolve callback only runs for events that have a registered handler via the .event macro. If an event is not handled by this class, the
