Sage
A dependency-free tool to run distributed transactions in Elixir, inspired by Sagas pattern.
Install / Use
/learn @Nebo15/SageREADME
Sage
Sage is a dependency-free implementation of the Sagas pattern in pure Elixir and provides a set of additional built-in features.
It is a go-to way for dealing with distributed transactions, especially with an error recovery/cleanup. Sage does it's best to guarantee that either all of the transactions in a saga are successfully completed, or compensating that all of the transactions did run to amend a partial execution.
It’s like
Ecto.Multibut across business logic and third-party APIs.-- @jayjun
This is done by defining two way flow with transaction and compensation functions. When one of the transactions fails, Sage will ensure that the transaction's and all of its predecessors' compensations are executed. However, it's important to note that Sage can not protect you from a node failure that executes given Sage.
To visualize it, let's imagine we have a 4-step transaction. Successful execution flow would look like:
[T1] -> [T2] -> [T3] -> [T4]
and if we get a failure on 3-d step, Sage would cleanup side effects by running compensation functions:
[T1] -> [T2] -> [T3 has an error]
↓
[C1] <- [C2] <- [C3]
Additional Features
Along with that simple idea, you will get much more out of the box with Sage:
- Transaction retries;
- Asynchronous transactions with timeout;
- Retries with exponential backoff and jitter;
- Ease to write circuit breakers;
- Code that is clean and easy to test;
- Low cost of integration in existing code base and low performance overhead;
- Ability to not lock the database with long running transactions;
- Extensibility - write your own handler for critical errors or metric collector to measure how much time each step took.
Goals
- Become a de facto tool to run distributed transactions in the Elixir world;
- Stay simple to use and small to maintain: less code - less bugs;
- Educate people how to run distributed transaction pragmatically.
Rationale (use cases)
Lot's of applications I've seen face a common task - interaction with third-party API's to offload some of the work on SaaS products or micro-services, when you simply need to commit to more than one database or in all other cases where you don't have transaction isolation between business logic steps (that we all got used to thanks to RDBMS).
When dealing with those, it is a common desire to handle all sorts of errors when application code has failed in the middle of a transaction so that you won't leave databases in an inconsistent state.
Using with (the old way)
One solution is to write business logic using with syntax. But when the number of transaction steps grow,
code becomes hard to maintain, test and even looks ugly. Consider the following pseudo-code (don't do this):
defmodule WithExample do
def create_and_subscribe_user(attrs) do
Repo.transaction(fn ->
with {:ok, user} <- create_user(attrs),
{:ok, plans} <- fetch_subscription_plans(attrs),
{:ok, charge} <- charge_card(user, subscription),
{:ok, subscription} <- create_subscription(user, plan, attrs),
{:ok, _delivery} <- schedule_delivery(user, subscription, attrs),
{:ok, _receipt} <- send_email_receipt(user, subscription, attrs),
{:ok, user} <- update_user(user, %{subscription: subscription}) do
acknowledge_job(opts)
else
{:error, {:charge_failed, _reason}} ->
# First problem: charge is not available here
:ok = refund(charge)
reject_job(opts)
{:error, {:create_subscription, _reason}} ->
# Second problem: growing list of compensations
:ok = refund(charge)
:ok = delete_subscription(subscription)
reject_job(opts)
# Third problem: how to decide when we should be sending another email or
# at which stage we've failed?
other ->
# Will rollback transaction on all other errors
:ok = ensure_deleted(fn -> refund(charge) end)
:ok = ensure_deleted(fn -> delete_subscription(subscription) end)
:ok = ensure_deleted(fn -> delete_delivery_from_schedule(delivery) end)
reject_job(opts)
other
end
end)
end
defp ensure_deleted(cb) do
case cb.() do
:ok -> :ok
{:error, :not_found} -> :ok
end
end
end
Along with the issues highlighted in the code itself, there are few more:
- To know at which stage we failed we need to keep an eye on the special returns from the functions we're using here;
- Hard to control that there is a condition to compensate for all possible error cases;
- Impossible to not keep relative code close to each other, because bare expressions in
withdo not leak to theelseblock; - Hard to test;
- Hard to improve, eg. it is hard to add retries, async operations or circuit breaker without making it even worse.
For some time you might get away by splitting create_and_subscribe_user/1, but it only works while the number of transactions is very small.
Using Sagas
Instead, let's see how that pipeline would look with Sage:
defmodule SageExample do
import Sage
require Logger
@spec create_and_subscribe_user(attrs :: map()) :: {:ok, last_effect :: any(), all_effects :: map()} | {:error, reason :: any()}
def create_and_subscribe_user(attrs) do
new()
|> run(:user, &create_user/2)
|> run(:plans, &fetch_subscription_plans/2, &subscription_plans_circuit_breaker/3)
|> run(:subscription, &create_subscription/2, &delete_subscription/3)
|> run_async(:delivery, &schedule_delivery/2, &delete_delivery_from_schedule/3)
|> run_async(:receipt, &send_email_receipt/2, &send_excuse_for_email_receipt/3)
|> run(:update_user, &set_plan_for_a_user/2)
|> finally(&acknowledge_job/2)
|> transaction(SageExample.Repo, attrs)
end
# Transaction behaviour:
# @callback transaction(attrs :: map()) :: {:ok, last_effect :: any(), all_effects :: map()} | {:error, reason :: any()}
# Compensation behaviour:
# @callback compensation(
# effect_to_compensate :: any(),
# effects_so_far :: map(),
# attrs :: any()
# ) :: :ok | :abort | {:retry, retry_opts :: Sage.retry_opts()} | {:continue, any()}
def create_user(_effects_so_far, %{"user" => user_attrs}) do
%SageExample.User{}
|> SageExample.User.changeset(user_attrs)
|> SageExample.Repo.insert()
end
def fetch_subscription_plans(_effects_so_far, _attrs) do
{:ok, _plans} = SageExample.Billing.APIClient.list_plans()
end
# If we failed to fetch plans, let's continue with cached ones
def subscription_plans_circuit_breaker(_effect_to_compensate, _effects_so_far, _attrs) do
{:continue, [%{"id" => "free", "total" => 0}, %{"id" => "standard", "total" => 4.99}]}
end
def create_subscription(%{user: user}, %{"subscription" => subscription}) do
{:ok, subscription} = SageExample.Billing.APIClient.subscribe_user(user, subscription["plan"])
end
def delete_subscription(_effect_to_compensate, %{user: user}, _attrs) do
:ok = SageExample.Billing.APIClient.delete_all_subscriptions_for_user(user)
# We want to apply forward compensation from :subscription stage for 5 times
{:retry, retry_limit: 5, base_backoff: 10, max_backoff: 30_000, enable_jitter: true}
end
# .. other transaction and compensation callbacks
def acknowledge_job(:ok, attrs) do
Logger.info("Successfully created user #{attrs["user"]["email"]}")
end
def acknowledge_job(_error, attrs) do
Logger.warn("Failed to create user #{attrs["user"]["email"]}")
end
end
Along with a readable code, you are getting:
- Reasonable guarantees that all transaction steps are completed or all failed steps are compensated;
- Code which is much simpler and easier to test a code;
- Retries, circuit breaking and asynchronous requests out of the box;
- Declarative way to define your transactions and run them.
Testing is easier, because instead of one monstrous function you will have many small callbacks which are easy to cover with unit tests. You only need to test business logic in transactions and that compensations are able to cleanup their effects. Sage itself has 100% test coverage.
Even more, it is possible to apply a new kind of architecture in an Elixir project where Phoenix contexts (or just application domains) are providing helper functions for building sagas to a controller, which then uses one or more of them to make sure that each request is side-effects free. Simplified example:
defmodule SageExample.UserController do
use SageExample.Web, :controller
action_fallback SageExample.FallbackController
def signup_and_accept_team_invitation(conn, attrs) do
Sage.new()
|> SageExample.Users.Sagas.create_user()
|> SageExample.Teams.Sagas.accept_invitation()
|> SageExample.Billing.Sagas.prorate_team_size()
|> Sage.execute(attrs)
end
end
If you want to have more examples, check out this blog post on Sage.
