Flowex
Flow-Based Programming framework for Elixir
Install / Use
/learn @antonmi/FlowexREADME
Flowex
Railway Flow-Based Programming.
The library is not supported anymore, see the ALF project.
Flowex is a set of abstractions built on top Elixir GenStage which allows writing program with Flow-Based Programming paradigm.
I would say it is a mix of FBP and so-called Railway Oriented Programming (ROP) approach.
Flowex DSL allows you to easily create "pipelines" of Elixir GenStages.
Dedicated to my lovely girlfriend Chryścina.
Resources
- Railway Flow-Based Programming with Flowex - post
- Flowex: Flow-Based Programming with Elixir GenStage - presentation
- Flow-based programming with Elixir - presentation
- Flow-Based REST API with Flowex and Plug - post
- Multi language FBP with Flowex - presentation
- Multi-language Flowex components - post
- Flow-Based REST API with Flowex and Plug - post
Contents
- Installation
- A simple example to get the idea
- More complex example for understanding interface
- Flowex magic!
- Run the pipeline
- How it works
- Error handling
- Pipeline and pipe options
- Synchronous and asynchronous calls
- Bottlenecks
- Module pipes
- Data available in pipes
- Starting strategies
- Debugging with Flowex.Sync.Pipeline
- Contributing
Installation
Just add flowex as dependency to the mix.exs file.
A simple example to get the idea
Let's consider a simple program which receives a number as an input, then adds one, then multiplies the result by two and finally subtracts 3.
defmodule Functions do
def add_one(number), do: number + 1
def mult_by_two(number), do: number * 2
def minus_three(number), do: number - 3
end
defmodule MainModule do
def run(number) do
number
|> Functions.add_one
|> Functions.mult_by_two
|> Functions.minus_three
end
end
So the program is a pipeline of functions with the same interface. The functions are very simple in the example.
In the real world they can be something like validate_http_request, get_user_from_db, update_db_from_request and render_response.
Furthermore, each of the function can potentially fail. But for getting the idea let's stick the simplest example.
FBP defines applications as networks of "black box" processes, which exchange data across predefined connections by message passing.
To satisfy the FBP approach we need to place each of the function into a separate process. So the number will be passed from 'add_one' process to 'mult_by_two' and then 'minus_three' process which returns the final result.
That, in short, is the idea of Flowex!
More complex example for understanding interface
Let's define a more strict interface for our function. So each of the function will receive a predefined struct as a first argument and will return a map:
def add_one(%{number: number}, opts) do
%{number: number + 1, a: opts.a}
end
The function receives a structure with number field and the options map with field a and returns map with new number.
The second argument is a set of options and will be described later.
Let's rewrite the whole Functions module in the following way:
defmodule Functions do
defstruct number: nil, a: nil, b: nil, c: nil
def add_one(%{number: number}, %{a: a}) do
%{number: number + 1, a: a}
end
def mult_by_two(%{number: number}, %{b: b}) do
%{number: number * 2, b: b}
end
def minus_three(%{number: number}, %{c: c}) do
%{number: number - 3, c: c}
end
end
The module defines three functions with the similar interface.
We also defined as struct %Functions{} which defines a data-structure being passed to the functions.
The main module may look like:
defmodule MainModule do
def run(number) do
opts = %{a: 1, b: 2, c: 3}
%Functions{number: number}
|> Functions.add_one(opts)
|> Functions.mult_by_two(opts)
|> Functions.minus_three(opts)
end
end
Flowex magic!
Let's add a few lines at the beginning.
defmodule FunPipeline do
use Flowex.Pipeline
pipe :add_one
pipe :mult_by_two
pipe :minus_three
defstruct number: nil, a: nil, b: nil, c: nil
def add_one(%{number: number}, %{a: a}) do
%{number: number + 1, a: a}
end
# mult_by_two and minus_three definitions skipped
end
We also renamed the module to FunPipeline because we are going to create "Flowex pipeline".
Flowex.Pipeline extend our module, so we have:
pipemacro to define which function evaluation should be placed into separate GenStage;error_pipemacro to define function which will be called if error occurs;start,supervised_startandstopfunctions to create and destroy pipelines;callfunction to run pipeline computations synchronously.castfunction to run pipeline computations asynchronously.- overridable
initfunction which, by default, acceptsoptsand return them
Let's start a pipeline:
opts = %{a: 1, b: 2, c: 3}
pipeline = FunPipeline.start(opts)
#returns
%Flowex.Pipeline{in_name: :"Flowex.Producer_#Reference<0.0.7.504>",
module: FunPipeline, out_name: :"Flowex.Consumer_#Reference<0.0.7.521>",
sup_pid: #PID<0.136.0>}
What happened:
- Three GenStages have been started - one for each of the function in pipeline. Each of GenStages is
:producer_consumer; - One additional GenStage for error processing has been started (it is also
:producer_consumer); - 'producer' and 'consumer' GenStages for input and output have been added;
- All the components have been placed under Supervisor.
The next picture shows what the 'pipeline' is.

The start function returns a %Flowex.Pipeline{} struct with the following fields:
- module - the name of the module
- in_name - unique name of 'producer';
- out_name - unique name of 'consumer';
- sup_name - unique name of the pipeline supervisor
Note, we have passed options to start function. This options will be passed to each function of the pipeline as a second argument.
There is supervised_start function which allows to place pipeline's under external supervisor.
See details in Starting strategies section.
Run the pipeline
One can run calculations in pipeline synchronously and asynchronously:
callfunction to run pipeline computations synchronously.castfunction to run pipeline computations asynchronously.
FunPipeline.call/2 function receive a %Flowex.Pipeline{} struct as a first argument and must receive a %FunPipeline{} struct as a second one.
The call function returns a %FunPipeline{} struct.
FunPipeline.call(pipeline, %FunPipeline{number: 2})
# returns
%FunPipeline{a: 1, b: 2, c: 3, number: 3}
As expected, pipeline returned %FunPipeline{} struct with number: 3. a, b and c were set from options.
If you don't care about the result, you should use cast/2 function to run and forget.
FunPipeline.cast(pipeline, %FunPipeline{number: 2})
# returns
:ok
Run via client
Another way is using Flowex.Client module which implements GenServer behavior.
The Flowex.Client.start\1 function receives pipeline struct as an argument.
Then you can use call/2 function or cast/2. See example below:
{:ok, client_pid} = Flowex.Client.start(pipeline)
Flowex.Client.call(client_pid, %FunPipeline{number: 2})
# returns
%FunPipeline{a: 1, b: 2, c: 3, number: 3}
#or
Flowex.Client.cast(client_pid, %FunPipeline{number: 2})
# returns
:ok
How it works
The following figure demonstrates the way data follows:
Note: error_pipe is not on the picture in order to save place.
The things happen when you call Flowex.Client.call (synchronous):
selfprocess makes synchronous call to the client gen_server with%FunPipeline{number: 2}struct;- the client makes synchronous call 'FunPipeline.call(pipeline, %FunPipeline{number: 2})';
- the struct is wrapped into
%Flowex.IP{}struct and begins its asynchronous journey from one GenStage to another; - when the consumer receives the Information Packet (IP), it sends it back to the client which sends it back to the caller process.
The things happen when you cast pipeline (asynchronous):
selfprocess makescastcall to the client and immediately receives:ok- the client makes
castto pipeline; - the struct is wrapped into
%Flowex.IP{}struct and begins its asynchronous journey from one GenStage to another; - consumer does not send data back, because this is
cast
Error handling
What happens
