SkillAgentSearch skills...

Runic

Powerful DAG workflow composition tool for Elixir.

Install / Use

/learn @zblanco/Runic
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

<!-- MDOC !-->

Runic

Runic is a tool for modeling programs as data driven workflows that can be composed together at runtime.

Runic components connect together in a Runic.Workflow supporting lazily evaluated concurrent execution.

Runic Workflows are modeled as a decorated dataflow graph (a DAG - "directed acyclic graph") compiled from components such as steps, rules, pipelines, and state machines and more allowing coordinated interaction of disparate parts.

Installation

If available in Hex, the package can be installed by adding runic to your list of dependencies in mix.exs:

def deps do
  [
    {:runic, "~> 0.1.0-alpha"}
  ]
end

Documentation can be generated with ExDoc and published on HexDocs. Once published, the docs can be found at https://hexdocs.pm/runic.

Concepts

Data flow dependencies between Lambda expressions, common in ETL pipelines, can be built with %Step{} components.

A Lambda Steps is a simple input -> output function.

require Runic

step = Runic.step(fn x -> x + 1 end)

Steps are composable in a workflow:

workflow = Runic.workflow(
  name: "example pipeline workflow",
  steps: [
    Runic.step(fn x -> x + 1 end), #A
    Runic.step(fn x -> x * 2 end), #B
    Runic.step(fn x -> x - 1 end) #C
  ]
)

This produces a workflow graph where R is the entrypoint or "root" of the tree:

graph TD;
    R-->A;
    R-->B;
    R-->C;

In Runic, inputs flow through a workflow as a %Fact{}. During workflow evaluation various steps are traversed to and invoked producing more Facts.

alias Runic.Workflow

workflow
|> Workflow.react_until_satisfied(2)
|> Workflow.raw_productions()

[3, 4, 1]

A core benefit Runic workflows are modeling pipelines that aren't just linear. For example:

defmodule TextProcessing do
  def tokenize(text) do
    text
    |> String.downcase()
    |> String.split(~R/[^[:alnum:]\-]/u, trim: true)
  end

  def count_words(list_of_words) do
    list_of_words
    |> Enum.reduce(Map.new(), fn word, map ->
      Map.update(map, word, 1, &(&1 + 1))
    end)
  end

  def count_uniques(word_count) do
    Enum.count(word_count)
  end

  def first_word(list_of_words) do
    List.first(list_of_words)
  end

  def last_word(list_of_words) do
    List.last(list_of_words)
  end
end

Notice we have 3 functions that expect a list_of_words. In Elixir if we wanted to evaluate each output we can pipe them together the pipeline |> operator...

import TextProcessing

word_count = 
  "anybody want a peanut?"
  |> tokenize()
  |> count_words()

first_word = 
  "anybody want a peanut?"
  |> tokenize()
  |> first_word()

last_word = 
  "anybody want a peanut?"
  |> tokenize()
  |> last_word()

However we're now evaluating linearly: using the common tokenize/1 function 3 times for the same input text.

This could be problematic if tokenize/1 is expensive - we'd prefer to run tokenize/1 just once and then fed into the rest of our pipeline.

With Runic we can compose all of these steps into one workflow and evaluate them together.

text_processing_workflow = 
  Runic.workflow(
    name: "basic text processing example",
    steps: [
      {Runic.step(&tokenize/1),
        [
          {Runic.step(&count_words/1),
          [
            Runic.step(&count_uniques/1)
          ]},
          Runic.step(&first_word/1),
          Runic.step(&last_word/1)
        ]}
    ]
  )

Our text processing workflow graph now looks something like this:

graph TD;
    R-->tokenize;
    tokenize-->first_word;
    tokenize-->last_word;
    tokenize-->count_words;
    count_words-->count_uniques;

Now Runic can traverse over the graph of dataflow connections only evaluating tokenize/1 once for all three dependent steps.

alias Runic.Workflow

text_processing_workflow 
|> Workflow.react_until_satisfied("anybody want a peanut?") 
|> Workflow.raw_productions()

[
  ["anybody", "want", "a", "peanut"], 
  "anybody", 
  "peanut", 
  4,
  %{"a" => 1, "anybody" => 1, "peanut" => 1, "want" => 1}
]

Beyond steps, Runic has support for Rules, Joins, State Machines, FSMs, Aggregates, Sagas, and ProcessManagers for more complex control flow and stateful evaluation.

The Runic.Workflow.Invokable protocol is what allows for extension of Runic's runtime supporting nodes with different execution properties and evaluation.

The Runic.Component protocol supports extension of modeling new components that can be added and connected with other components in Runic workflows.

Runtime Workflow Composition

Workflows can be composed dynamically at runtime:

require Runic
alias Runic.Workflow

# Using Workflow.add/3 for dynamic composition
workflow = Runic.workflow()
  |> Workflow.add(Runic.step(fn x -> x + 1 end, name: :add))
  |> Workflow.add(Runic.step(fn x -> x * 2 end, name: :double), to: :add)

# Merge two workflows together
workflow1 = Runic.workflow(steps: [Runic.step(fn x -> x + 1 end)])
workflow2 = Runic.workflow(steps: [Runic.step(fn x -> x * 2 end)])
combined = Workflow.merge(workflow1, workflow2)

# Join multiple parent nodes
workflow = workflow
  |> Workflow.add(Runic.step(fn a, b -> a + b end, name: :join), to: [:branch_a, :branch_b])

See Runic.Workflow module documentation for adding components to workflows and running them.

Three-Phase Execution Model

Runic's Invokable protocol enforces a three-phase execution model designed for parallel execution and external scheduling:

  1. Prepare - Extract minimal context from the workflow into %Runnable{} structs
  2. Execute - Run runnables containing work functions and their inputs in isolation (can be parallelized)
  3. Apply - Reduce results back into the workflow so next steps can be determined

Parallel Execution

For workflows where nodes can execute concurrently:

alias Runic.Workflow

# Execute runnables in parallel with configurable concurrency
workflow
|> Workflow.react_until_satisfied(input, async: true, max_concurrency: 8) # Task.async_stream options
|> Workflow.raw_productions()

External Scheduler Integration

For custom schedulers, worker pools, or distributed execution:

defmodule MyApp.WorkflowScheduler do
  use GenServer
  alias Runic.Workflow
  alias Runic.Workflow.Invokable

# Phase 1: Prepare runnables for dispatch
  def handle_cast({:run, input}, %{workflow: workflow} = state) do
    workflow = 
      workflow
      |> Workflow.plan_eagerly(input)
      |> dispatch_tasks()

    {:noreply, %{state | workflow: workflow}}
  end

  # Phase 2: Execute (dispatch to async worker pool, queue, external service, etc.)
  defp dispatch_tasks(workflow) do
    {workflow, runnables} = Workflow.prepare_for_dispatch(workflow)

    Enum.map(runnables, fn runnable -> 
      Task.async(fn ->
        # consider logging, error handling, retries here
        Invokable.execute(runnable.node, runnable)
      end)
    end)

    workflow
  end

  # Phase 3: Apply results back to workflow by handling async task callbacks with excecuted runnable
  def handle_info({ref, executed_runnable}, %{workflow: workflow} = state) do
    new_workflow =
      Enum.reduce(executed, workflow, fn {:ok, runnable}, wrk ->
        Workflow.apply_runnable(wrk, runnable)
      end)

    workflow = 
      if Workflow.is_runnable?(new_workflow) do
        dispatch_tasks(workflow)
      end

    {:noreply, %{state | workflow: workflow}}
  end
end

Key APIs for external scheduling:

  • Workflow.prepare_for_dispatch/1 - Returns {workflow, [%Runnable{}]} for dispatch
  • Workflow.apply_runnable/2 - Applies a completed runnable back to the workflow
  • Invokable.execute/2 - Executes a runnable in isolation (no workflow access)

In summary, the Runic module provides high level functions and macros for building Runic Components such as Steps, Rules, Workflows, and Accumulators.

The Runic.Workflow module is for connecting components together and running them with inputs.

Runic was designed to be used with custom process topologies and libraries such as GenStage, Broadway, and Flow without coupling you to one runtime model or a limited set of adapters.

Runic has first class support for dynamic runtime composition of workflows.

Runic is useful in problems where a developer cannot know upfront the logic or data flow in compiled code such as expert systems, user DSLs like Excel spreadsheets, low/no-code tools, or dynamic data pipelines.

If the runtime modification of a workflow or complex parallel dataflow evaluation isn't something your use case requires you might not need Runic.

Runic Workflows are essentially a dataflow based virtual machine running within Elixir and will not be faster than compiled Elixir code. If you know the flow of the program upfront during development you might not need Runic.

Runtime Context

Components can declare dependencies on external runtime values using context/1:

# Steps can reference external values
step = Runic.step(fn _x -> context(:api_key) end, name: :call_llm)

# Rules can use context in conditions
rule = Runic.rule name: :gated do
  given(val: v)
  where(v > context(:threshold))
  then(fn %{val: v} -> {:ok, v} end)
end

# Accumulators, map, and reduce also support context/1
acc = Runic.accumulator(0, fn x, s -> s + x * context(:factor) end, name: :scaled)
map = Runic.map(fn x -> x * context(:multiplier) end, name: :mult_map)

# Provide defaults for optional context keys
step = Runic.step(fn _x -> context(:model, default: "gpt-4") end, name: :call_llm)

# Provide values at runtime
workflow
|> Workflow.put_run_context(%{
  call_llm: %{api_key: "sk-..."},
  _global: %{workspace_id: "ws1"}
})
|> Workflow.react_until_satisfied(input)

Context values are scoped by component name, not part of the workflow hash, and not serializ

Related Skills

View on GitHub
GitHub Stars85
CategoryDevelopment
Updated13d ago
Forks5

Languages

Elixir

Security Score

100/100

Audited on Mar 19, 2026

No findings