Gush
Fast and distributed workflow runner using ActiveJob and Redis
Install / Use
/learn @chaps-io/GushREADME
Gush
Gush is a parallel workflow runner using only Redis as storage and ActiveJob for scheduling and executing jobs.
Theory
Gush relies on directed acyclic graphs to store dependencies, see Parallelizing Operations With Dependencies by Stephen Toub to learn more about this method.
WARNING - version notice
This README is about the latest master code, which might differ from what is released on RubyGems. See tags to browse previous READMEs.
Installation
1. Add gush to Gemfile
gem 'gush', '~> 5.0'
2. Create Gushfile
When using Gush and its CLI commands you need a Gushfile in the root directory.
Gushfile should require all your workflows and jobs.
Ruby on Rails
For RoR it is enough to require the full environment:
require_relative './config/environment.rb'
and make sure your jobs and workflows are correctly loaded by adding their directories to autoload_paths, inside config/application.rb:
config.autoload_paths += ["#{Rails.root}/app/jobs", "#{Rails.root}/app/workflows"]
Ruby
Simply require any jobs and workflows manually in Gushfile:
require_relative 'lib/workflows/example_workflow.rb'
require_relative 'lib/jobs/some_job.rb'
require_relative 'lib/jobs/some_other_job.rb'
Example
The DSL for defining jobs consists of a single run method.
Here is a complete example of a workflow you can create:
# app/workflows/sample_workflow.rb
class SampleWorkflow < Gush::Workflow
def configure(url_to_fetch_from)
run FetchJob1, params: { url: url_to_fetch_from }
run FetchJob2, params: { some_flag: true, url: 'http://url.com' }
run PersistJob1, after: FetchJob1
run PersistJob2, after: FetchJob2
run Normalize,
after: [PersistJob1, PersistJob2],
before: Index
run Index
end
end
and this is how the graph will look like:
graph TD
A{Start} --> B[FetchJob1]
A --> C[FetchJob2]
B --> D[PersistJob1]
C --> E[PersistJob2]
D --> F[NormalizeJob]
E --> F
F --> G[IndexJob]
G --> H{Finish}
Defining workflows
Let's start with the simplest workflow possible, consisting of a single job:
class SimpleWorkflow < Gush::Workflow
def configure
run DownloadJob
end
end
Of course having a workflow with only a single job does not make sense, so it's time to define dependencies:
class SimpleWorkflow < Gush::Workflow
def configure
run DownloadJob
run SaveJob, after: DownloadJob
end
end
We just told Gush to execute SaveJob right after DownloadJob finishes successfully.
But what if your job must have multiple dependencies? That's easy, just provide an array to the after attribute:
class SimpleWorkflow < Gush::Workflow
def configure
run FirstDownloadJob
run SecondDownloadJob
run SaveJob, after: [FirstDownloadJob, SecondDownloadJob]
end
end
Now SaveJob will only execute after both its parents finish without errors.
With this simple syntax you can build any complex workflows you can imagine!
Alternative way
run method also accepts before: attribute to define the opposite association. So we can write the same workflow as above, but like this:
class SimpleWorkflow < Gush::Workflow
def configure
run FirstDownloadJob, before: SaveJob
run SecondDownloadJob, before: SaveJob
run SaveJob
end
end
You can use whatever way you find more readable or even both at once :)
Passing arguments to workflows
Workflows can accept any primitive arguments in their constructor, which then will be available in your
configure method.
Let's assume we are writing a book publishing workflow which needs to know where the PDF of the book is and under what ISBN it will be released:
class PublishBookWorkflow < Gush::Workflow
def configure(url, isbn, publish: false)
run FetchBook, params: { url: url }
if publish
run PublishBook, params: { book_isbn: isbn }, after: FetchBook
end
end
end
and then create your workflow with those arguments:
PublishBookWorkflow.create("http://url.com/book.pdf", "978-0470081204", publish: true)
and that's basically it for defining workflows, see below on how to define jobs:
Defining jobs
The simplest job is a class inheriting from Gush::Job and responding to perform method. Much like any other ActiveJob class.
class FetchBook < Gush::Job
def perform
# do some fetching from remote APIs
end
end
But what about those params we passed in the previous step?
Passing parameters into jobs
To do that, simply provide a params: attribute with a hash of parameters you'd like to have available inside the perform method of the job.
So, inside workflow:
(...)
run FetchBook, params: {url: "http://url.com/book.pdf"}
(...)
and within the job we can access them like this:
class FetchBook < Gush::Job
def perform
# you can access `params` method here, for example:
params #=> {url: "http://url.com/book.pdf"}
end
end
Executing workflows
Now that we have defined our workflow and its jobs, we can use it:
1. Start background worker process
Important: The command to start background workers depends on the backend you chose for ActiveJob. For example, in case of Sidekiq this would be:
bundle exec sidekiq -q gush
Click here to see backends section in official ActiveJob documentation about configuring backends
Hint: gush uses gush queue name by default. Keep that in mind, because some backends (like Sidekiq) will only run jobs from explicitly stated queues.
2. Create the workflow instance
flow = PublishBookWorkflow.create("http://url.com/book.pdf", "978-0470081204")
3. Start the workflow
flow.start!
Now Gush will start processing jobs in the background using ActiveJob and your chosen backend.
4. Monitor its progress:
flow.reload
flow.status
#=> :running|:finished|:failed
reload is needed to see the latest status, since workflows are updated asynchronously.
Loading workflows
Finding a workflow by id
flow = Workflow.find(id)
Paging through workflows
To get workflows with pagination, use start and stop (inclusive) index values:
flows = Workflow.page(0, 99)
Or in reverse order:
flows = Workflow.page(0, 99, order: :desc)
Advanced features
Global parameters for jobs
Workflows can accept a hash of globals that are automatically forwarded as parameters to all jobs.
This is useful to have common functionality across workflow and job classes, such as tracking the creator id for all instances:
class SimpleWorkflow < Gush::Workflow
def configure(url_to_fetch_from)
run DownloadJob, params: { url: url_to_fetch_from }
end
end
flow = SimpleWorkflow.create('http://foo.com', globals: { creator_id: 123 })
flow.globals
=> {:creator_id=>123}
flow.jobs.first.params
=> {:creator_id=>123, :url=>"http://foo.com"}
Note: job params with the same key as globals will take precedence over the globals.
Pipelining
Gush offers a useful tool to pass results of a job to its dependencies, so they can act differently.
Example:
Let's assume you have two jobs, DownloadVideo, EncodeVideo.
The latter needs to know where the first one saved the file to be able to open it.
class DownloadVideo < Gush::Job
def perform
downloader = VideoDownloader.fetch("http://youtube.com/?v=someytvideo")
output(downloader.file_path)
end
end
output method is used to ouput data from the job to all dependant jobs.
Now, since DownloadVideo finished and its dependant job EncodeVideo started, we can access that payload inside it:
class EncodeVideo < Gush::Job
def perform
video_path = payloads.first[:output]
end
end
payloads is an array containing outputs from all ancestor jobs. So for our EncodeVideo job from above, the array will look like:
[
{
id: "DownloadVideo-41bfb730-b49f-42ac-a808-156327989294" # unique id of the ancestor job
class: "DownloadVideo",
output: "https://s3.amazonaws.com/somebucket/downloaded-file.mp4" #the payload returned by DownloadVideo job using `output()` method
}
]
Note: Keep in mind that payloads can only contain data which can be serialized as JSON, because that's how Gush stores them internally.
Dynamic workflows
There might be a case when you have to construct the workflow dynamically depending on the input.
As an example, let's write a workflow which accepts an array of users and has to send an email to each one. Additionally after it sends the e-mail to every user, it also has to notify the admin about finishing.
class NotifyWorkflow < Gush::Workflow
def configure(user_ids)
notification_jobs = user_ids.map do |user_id|
run NotificationJob, params: {user_id: user_id}
end
run AdminNotificationJob, after: notification_jobs
end
end
We can achieve that because run method returns the id of the created job, which we can use for chaining dependencies.
Now, when we create the workflow like this:
flow = NotifyWorkflow.create([54, 21, 24, 154, 65]) # 5 user ids as an argument
it will generate a workflow with 5 NotificationJobs and one AdminNotificationJob which will depend on all of them:
graph TD
A{Start} --> B[NotificationJob]
A --> C[NotificationJob]
A --> D[NotificationJob]
A --> E[NotificationJob]
A
