Workers
Workers is a Ruby gem for performing work in background threads. Multi-threading made simple.
Install / Use
/learn @chadrem/WorkersREADME
Workers

Workers is a Ruby gem for performing work in background threads. Design goals include high performance, low latency, simple API, customizability, and multi-layered architecture. It provides a number of simple to use classes that solve a wide range of concurrency problems. It is used by Tribe to implement event-driven actors.
Contents
- Installation
- Parallel Map
- Tasks
- Workers
- Pool
- Timers
- Schedulers
- Concurrency and performance
- Contributing
Installation
Add this line to your application's Gemfile:
gem 'workers'
And then execute:
$ bundle
Or install it yourself as:
$ gem install workers
Parallel Map
Parallel map is the simplest way to get started with the Workers gem. It is similar to Ruby's built-in Array#map method except each element is mapped in parallel.
Workers.map([1, 2, 3, 4, 5]) { |i| i * i }
Any exceptions while mapping with cause the entire map method to fail. If your block is prone to temporary failures (exceptions), you can retry it.
Workers.map([1, 2, 3, 4, 5], :max_tries => 100) do |i|
if rand <= 0.8
puts "Sometimes I like to fail while computing #{i} * #{i}."
raise 'sad face'
end
i * i
end
Tasks
Tasks and task groups provide more flexibility than parallel map. For example, you get to decide how you want to handle exceptions.
# Create a task group (it contains a pool of worker threads).
group = Workers::TaskGroup.new
# Add tasks to the group.
10.times do |i|
10.times do |j|
group.add(:max_tries => 10) do
group.synchronize { puts "Computing #{i} * #{j}..." }
if rand <= 0.9
group.synchronize { puts "Sometimes I like to fail while computing #{i} * #{i}." }
raise 'sad face'
end
i * j # Last statement executed is the result of the task.
end
end
end
# Execute the tasks (blocks until the tasks complete).
# Returns true if all tasks succeed. False if any fail.
group.run
# Return an array of all the tasks.
group.tasks
# Return an array of the successful tasks.
group.successes
# Return an array of the failed tasks (raised an exception).
group.failures
# Review the results.
group.tasks.each do |t|
t.succeeded? # True or false (false if an exception occurred).
t.failed? # True or false (true if an exception occurred).
t.input # Input value.
t.result # Output value (the result of i * i in this example).
t.exception # The exception if one exists.
t.max_tries # Maximum number of attempts.
t.tries # Actual number of attempts.
end
Note that instances of TaskGroup provide a 'synchronize' method. This method uses a mutex so you can serialize portions of your tasks that aren't thread safe.
Options
group = Workers::TaskGroup.new(
:logger => nil, # Ruby logger instance.
:pool => Workers.pool # The workers pool used to execute timer callbacks.
)
task = Workers::Task.new(
:logger => nil, # Ruby logger instance.
:on_perform => proc {}, # Required option. Block of code to run.
:input => [], # Array of arguments passed to the 'perform' block.
:on_finished => nil, # Callback to execute after attempting to run the task.
:max_tries => 1, # Number of times to try completing the task (without an exception).
)
Workers
Basic
The main purpose of the Worker class is to add an event system on top of Ruby's built-in Thread class. This greatly simplifies inter-thread communication. You must manually dispose of pools and workers so they get garbage collected.
# Initialize a worker pool.
pool = Workers::Pool.new(:on_exception => proc { |e|
puts "A worker encountered an exception: #{e.class}: #{e.message}"
})
# Perform some work in the background.
100.times do
pool.perform do
sleep(rand(3))
raise 'sad face' if rand < 0.5
puts "Hello world from thread #{Thread.current.object_id}"
end
end
# Wait up to 30 seconds for the workers to cleanly shutdown (or forcefully kill them).
pool.dispose(30) do
puts "Worker thread #{Thread.current.object_id} is shutting down."
end
Advanced
The Worker class is designed to be customized through inheritence and its event system:
# Create a subclass that handles custom events.
# Super is called to handle built-in events such as perform and shutdown.
class CustomWorker < Workers::Worker
private
def event_handler(event)
case event.command
when :my_custom
puts "Worker received custom event: #{event.data}"
sleep(1)
else
super(event)
end
end
end
# Create a pool that uses your custom worker class.
pool = Workers::Pool.new(:worker_class => CustomWorker, :on_exception => proc { |e|
puts "A worker encountered an exception: #{e.class}: e.message}"
})
# Tell the workers to do some work using custom events.
100.times do |i|
pool.enqueue(:my_custom, i)
end
# Wait up to 30 seconds for the workers to cleanly shutdown (or forcefully kill them).
pool.dispose(30) do
puts "Worker thread #{Thread.current.object_id} is shutting down."
end
Without pools
In most cases you will be using a group of workers (a pool) as demonstrated above. In certain cases, you may want to use a worker directly without the pool. This gives you direct access to a single event-driven thread that won't die on an exception.
# Create a single worker.
worker = Workers::Worker.new
# Perform some work in the background.
25.times do |i|
worker.perform do
sleep(0.1)
puts "Hello world from thread #{Thread.current.object_id}"
end
end
# Wait up to 30 seconds for the worker to cleanly shutdown (or forcefully kill it).
worker.dispose(30)
Options
worker = Workers::Worker.new(
:logger => nil, # Ruby Logger instance.
:input_queue => nil, # Ruby Queue used for input events.
:on_exception => nil # Callback to execute on exception (exception passed as only argument).
)
Pools
Pools allow a group of workers to share a work queue. The Workers gem has a default pool (Workers.pool) with 20 workers so in most cases you won't need to create your own. Pools can be adjusted using the below methods:
# Create a pool.
pool = Workers::Pool.new
# Return the number of workers in the pool.
pool.size
# Increase the number of workers in the pool.
pool.expand(5)
# Decrease the number of workers in the pool.
pool.contract(5)
# Resize the pool size to a specific value.
pool.resize(20)
Options
pool = Workers::Pool.new(
:size => 20, # Number of threads to create.
:logger => nil, # Ruby Logger instance.
:worker_class => Workers::Worker # Class of worker to use for this pool.
:on_exception => nil # Callback to execute on exception (exception passed as only argument).
)
Timers
Timers provide a way to execute code in the future. You can easily use them to tell a Worker or it's higher level classes (Task, TaskGroup, etc) to perform work in the future.
# Create a one shot timer that executes in 1.5 seconds.
timer = Workers::Timer.new(1.5) do
puts 'Hello world'
end
# Create a periodic timer that loops infinitely or until 'cancel' is called.
timer = Workers::PeriodicTimer.new(1) do
puts 'Hello world many times'
end
# Let the timer print some lines.
sleep(5)
# Shutdown the timer.
timer.cancel
Options
timer = Workers::Timer.new(1,
:logger => nil, # Ruby logger instance.
:repeat => false, # Repeat the timer until 'cancel' is called.
:scheduler => Workers.scheduler, # The scheduler that manages execution.
:callback => nil # The proc to execute (provide this or a block, but not both).
)
timer = Workers::PeriodicTimer.new(1,
:logger => nil, # Ruby logger instance.
:scheduler => Workers.scheduler, # The scheduler that manages execution.
:callback => nil # The proc to execute (provide this or a block, but not both).
)
Schedulers
Schedulers are what trigger Timers to fire. The Workers gem has a default scheduler (Workers.scheduler) so in most cases you won't need to create your own. Schedulers execute timers using a pool of workers so make sure your timer block is thread safe. You can create additional schedulers as necessary:
# Create a workers pool with a larger than default thread count (optional).
pool = Workers::Pool.new(:size => 100)
# Create a scheduler.
scheduler = Workers::Scheduler.new(:pool => pool)
# Create a timer that uses the above scheduler.
timer = Workers::Timer.new(1, :scheduler => scheduler) do
puts 'Hello world'
end
# Wait for the timer to fire.
sleep(5)
# Shutdown the scheduler.
scheduler.dispose
Options
scheduler = Workers::Scheduler.new(
:logger => nil, # Ruby logger instance.
:pool => Workers::Pool.new # The workers pool used to execute timer callbacks.
)
Bucket Schedulers
The Bucket scheduler class is a specialized scheduler designed to work around lock contention.
This is accomplished by using many pools (100 by default) each with a small number of workers (1 by default).
Timers are assigned to a scheduler by their hash value.
Most users will never need to use this class, but it is documented here for completeness.
Both the number of buckets a
