Rukawa
Hyper simple workflow engine by concurrent-ruby
Install / Use
/learn @joker1007/RukawaREADME
Rukawa
Rukawa = (流川)
This gem is workflow engine and this is hyper simple. Job is defined by Ruby class. Dependency of each jobs is defined by Hash.
Installation
Add this line to your application's Gemfile:
gem 'rukawa'
And then execute:
$ bundle
Or install it yourself as:
$ gem install rukawa
Usage
Job Definition
See sample/jobs/sample_job.rb.
JobNet Definition
See sample/job_nets/sample_job_net.rb.
JobGraph

Execution
% cd rukawa/sample
# load ./jobs/**/*.rb, ./job_nets/**/*.rb automatically
% bundle exec rukawa run SampleJobNet -r 10 -c 10
+----------------+----------+
| Job | Status |
+----------------+----------+
| Job1 | finished |
| Job2 | waiting |
| Job3 | waiting |
| Job4 | waiting |
| InnerJobNet | waiting |
| InnerJob3 | waiting |
| InnerJob1 | waiting |
| InnerJob2 | waiting |
| Job8 | waiting |
| Job5 | waiting |
| Job6 | waiting |
| Job7 | waiting |
| InnerJobNet2 | waiting |
| InnerJob4 | waiting |
| InnerJob5 | waiting |
| InnerJob6 | waiting |
| InnerJobNet3 | waiting |
| InnerJob7 | waiting |
| InnerJob8 | waiting |
| InnerJob9 | waiting |
| InnerJob10 | waiting |
| InnerJobNet4 | waiting |
| InnerJob11 | waiting |
| InnerJob12 | waiting |
| InnerJob13 | waiting |
| NestedJobNet | waiting |
| NestedJob1 | waiting |
| NestedJob2 | waiting |
+----------------+----------+
+----------------+----------+
| Job | Status |
+----------------+----------+
| Job1 | finished |
| Job2 | finished |
| Job3 | finished |
| Job4 | finished |
| InnerJobNet | error |
| InnerJob3 | finished |
| InnerJob1 | finished |
| InnerJob2 | error |
| Job8 | aborted |
| Job5 | error |
| Job6 | aborted |
| Job7 | aborted |
| InnerJobNet2 | running |
| InnerJob4 | running |
| InnerJob5 | waiting |
| InnerJob6 | waiting |
| InnerJobNet3 | aborted |
| InnerJob7 | aborted |
| InnerJob8 | aborted |
| InnerJob9 | aborted |
| InnerJob10 | aborted |
| InnerJobNet4 | aborted |
| InnerJob11 | aborted |
| InnerJob12 | aborted |
| InnerJob13 | aborted |
| NestedJobNet | aborted |
| NestedJob1 | aborted |
| NestedJob2 | aborted |
+----------------+----------+
# generate result graph image
% dot -Tpng -o result.png result.dot

aborted means that dependent job failure aborts following jobs.
Resume
Add JOB_NAME arguments to run command.
% bundle exec rukawa run SampleJobNet Job8 InnerJob4 -r 5 -c 10
+----------------+----------+
| Job | Status |
+----------------+----------+
| Job1 | bypassed |
| Job2 | bypassed |
| Job3 | bypassed |
| Job4 | bypassed |
| InnerJobNet | bypassed |
| InnerJob3 | bypassed |
| InnerJob1 | bypassed |
| InnerJob2 | bypassed |
| Job8 | waiting |
| Job5 | bypassed |
| Job6 | bypassed |
| Job7 | bypassed |
| InnerJobNet2 | waiting |
| InnerJob4 | waiting |
| InnerJob5 | waiting |
| InnerJob6 | waiting |
| InnerJobNet3 | waiting |
| InnerJob7 | waiting |
| InnerJob8 | waiting |
| InnerJob9 | waiting |
| InnerJob10 | waiting |
| InnerJobNet4 | waiting |
| InnerJob11 | waiting |
| InnerJob12 | waiting |
| InnerJob13 | waiting |
| NestedJobNet | waiting |
| NestedJob1 | waiting |
| NestedJob2 | waiting |
+----------------+----------+
+----------------+----------+
| Job | Status |
+----------------+----------+
| Job1 | bypassed |
| Job2 | bypassed |
| Job3 | bypassed |
| Job4 | bypassed |
| InnerJobNet | bypassed |
| InnerJob3 | bypassed |
| InnerJob1 | bypassed |
| InnerJob2 | bypassed |
| Job8 | finished |
| Job5 | bypassed |
| Job6 | bypassed |
| Job7 | bypassed |
| InnerJobNet2 | skipped |
| InnerJob4 | finished |
| InnerJob5 | skipped |
| InnerJob6 | skipped |
| InnerJobNet3 | running |
| InnerJob7 | finished |
| InnerJob8 | running |
| InnerJob9 | waiting |
| InnerJob10 | waiting |
| InnerJobNet4 | waiting |
| InnerJob11 | waiting |
| InnerJob12 | waiting |
| InnerJob13 | waiting |
| NestedJobNet | waiting |
| NestedJob1 | waiting |
| NestedJob2 | waiting |
+----------------+----------+
In above case, All jobs except Job8 and InnerJob4 and depending them are set bypassed.
bypassed means that job is not executed, and regarded as successful.
For example, a job depends two other jobs. Even if either job is bypassed, and another job is finished, it is executed.
Run Specific jobs
run_job command executes specified job forcely.
% be rukawa run_job Job8 InnerJob6 NestedJob1 -c 3 -r 5
+------------+---------+
| Job | Status |
+------------+---------+
| Job8 | waiting |
| InnerJob6 | waiting |
| NestedJob1 | running |
+------------+---------+
+------------+----------+
| Job | Status |
+------------+----------+
| Job8 | finished |
| InnerJob6 | finished |
| NestedJob1 | finished |
+------------+----------+
Main usage is manual reentering.
Output jobnet graph (dot file)
% bundle exec rukawa graph -o SampleJobNet.dot SampleJobNet
% dot -Tpng -o SampleJobNet.png SampleJobNet.dot
Use variables
class Job < Rukawa::Job
def run
# access via `variables` method
# return freezed Hash object
puts variables["var1"]
puts variables["var2"]
end
end
% bundle exec rukawa run SampleJobNet --var var1:value1 var2:value2
# or
% bundle exec rukawa run SampleJobNet --varfile variables.yml
# or
% bundle exec rukawa run SampleJobNet --varfile variables.json
# variables.yml
var1: value1
var2: value2
// variables.json
{
"var1": "value1",
"var2": "value2"
}
Semaphore
You can control concurrency consumption.
class Job < Rukawa::Job
set_resource_count 2
def run
# process
end
end
This job use 2 concurrency. (this does not means that job use 2 threads) If concurrency is less than jobs's resource count, resource count is set concurrency size.
You can set 0 to resource count. If a job is set 0 resource, concurrency of the job is unlimited.
Callback
- before_run
- after_run
- around_run
- after_fail
class Job < Rukawa::Job
before_run :wait_other_resource
after_run :notify_finish
around_run do |job, blk|
begin
setup_resource
blk.call
ensure
release_resource
end
end
def run
# process
end
def wait_other_resource
sleep(3)
end
def notify_finish
send_notification_to_slack
end
end
Config Example
# rukawa.rb
Rukawa.configure do |c|
c.logger = OtherLogger.new
c.concurrency = 4
c.graph.concentrate = true
c.graph.nodesep = 0.8
end
ActiveJob Integration
class SampleJobNet < Rukawa::JobNet
class << self
def dependencies
# Generate Wrapper class
wrapped1 = Rukawa::Wrapper::ActiveJob[ActiveJobSample1] # named Rukawa::Wrapper::ActiveJobSample1Wrapper
wrapped2 = Rukawa::Wrapper::ActiveJob[ActiveJobSample2] # named Rukawa::Wrapper::ActiveJobSample2Wrapper
{
Job1 => [],
wrapped1 => [Job1],
wrapped2 => [wrapped1],
}
end
end
end
And write config to define status store for tracking remote job status"
redis_host = ENV["REDIS_HOST"] || "localhost:6379"
Rukawa.configure do |c|
c.status_store = ActiveSupport::Cache::RedisStore.new(redis_host)
c.status_expire_duration = 72 * 60 * 60 # default is 24 hours
end
Caution: When rukawa runs wrapper job, base ActiveJob class includes hook modules automatically in order to track job running status.
help
% bundle exec rukawa help run
Usage:
rukawa run JOB_NET_NAME [JOB_NAME] [JOB_NAME] ...
Options:
[--config=CONFIG] # If this options is not set, try to load ./rukawa.rb
[--job-dirs=JOB_DIR1 JOB_DIR2] # Load job directories
-c, [--concurrency=N] # Default: cpu count
--var, [--variables=KEY:VALUE KEY:VALUE]
[--varfile=VARFILE] # variable definition file. ex (variables.yml, variables.json)
-b, [--batch], [--no-batch] # If batch mode, not display running status
-l, [--log=LOG] # Default: ./rukawa.log
[--stdout], [--no-stdout] # Output log to stdout
[--syslog], [--no-syslog] # Output log to syslog
-d, [--dot=DOT] # Output job status by dot format
-f, [--format=FORMAT] # Output job status format: png, svg, pdf, ... etc
-r, [--refresh-interval=N] # Refre
