Wukong
Ruby on Hadoop: Efficient, effective Hadoop streaming & bulk data processing. Write micro scripts for terabyte-scale data
Install / Use
/learn @infochimps-labs/WukongREADME
Wukong
Wukong is a toolkit for rapid, agile development of data applications at any scale.
The core concept in Wukong is a Processor. Wukong processors are simple Ruby classes that do one thing and do it well. This codebase implements processors and other core Wukong classes and provides a way to run and combine processors on the command-line.
Wukong's larger theme is powerful black boxes, beautiful glue. The Wukong ecosystem consists of other tools which run Wukong processors in various topologies across a variety of different backends. Code written in Wukong can be easily ported between environments and frameworks: local command-line scripts on your laptop instantly turn into powerful jobs running in Hadoop.
Here is a list of various other projects which you may also want to peruse when trying to understand the full Wukong experience:
- <a href="http://github.com/infochimps-labs/wukong-hadoop">wukong-hadoop</a>: Run Wukong processors as mappers and reducers within the Hadoop framework. Model Hadoop jobs locally before you run them.
- <a href="http://github.com/infochimps-labs/wukong-storm">wukong-storm</a>: Run Wukong processors within the Storm framework. Model flows locally before you run them.
- <a href="http://github.com/infochimps-labs/wukong-load">wukong-load</a>: Load the output data from your local Wukong jobs and flows into a variety of different data stores.
- <a href="http://github.com/infochimps-labs/wonderdog">wonderdog</a>: Connect Wukong processors running within Hadoop to Elasticsearch as either a source or sink for data.
- <a href="http://github.com/infochimps-labs/wukong-deploy">wukong-deploy</a>: Orchestrate Wukong and other wu-tools together to support an application running on the Infochimps Platform.
For a more holistic perspective also see the Infochimps Platform Community Edition (FIXME: link to this) which combines all the Wukong tools together into a jetpack which fits comfortably over the shoulders of developers.
<a name="processors"></a>
Writing Simple Processors
The fundamental unit of computation in Wukong is the processor. A processor is Ruby class which
- subclasses
Wukong::Processor(use theWukong.processormethod as sugar for this) - defines a
processmethod which takes an input record, does something, and callsyieldon the output
Here's a processor that reverses each of its input records:
# in string_reverser.rb
Wukong.processor(:string_reverser) do
def process string
yield string.reverse
end
end
You can run this processor on the command line using text files as
input using the wu-local tool that comes with Wukong:
$ cat novel.txt
It was the best of times, it was the worst of times.
...
$ cat novel.txt | wu-local string_reverser.rb
.semit fo tsrow eht saw ti ,semit fo tseb eht saw tI
The wu-local program consumes one line at at time from STDIN and
calls your processor's process method with that line as a Ruby
String object. Each object you yield within your process method
will be printed back out on STDOUT.
Multiple Processors, Multiple (Or No) Yields
Processors are intended to be combined so they can be stored in the same file like these two, related processors:
# in processors.rb
Wukong.processor(:splitter) do
def process line
line.split.each { |token| yield token }
end
end
Wukong.processor(:normalizer) do
def process token
stripped = token.downcase.gsub(/\W/,'')
yield stripped if stripped.size > 0
end
end
Notice how the splitter yields multiple tokens for each of its input
tokens and that the normalizer may sometimes never yield at all,
depending on its input. Processors are under no obligations by the
framework to yield or return anything so they can easily act as
filters or even sinks in data flows.
There are two processors in this file and neither shares a name with
the basename of the file ("processors") so wu-local can't
automatically choose a processor to run. We can specify one
explicitly with the --run option:
$ cat novel.txt | wu-local processors.rb --run=splitter
It
was
the
best
of
times,
...
We can combine the two processors together
$ cat novel.txt | wu-local processors.rb --run=splitter | wu-local processors.rb --run=normalizer
it
was
the
best
of
times
...
but there's an easier way of doing this with <a href="#flows">dataflows</a>.
Adding Configurable Options
Processors can have options that can be set in Ruby code, from the command-line, a configuration file, or a variety of other places thanks to Configliere.
This processor calculates percentiles from observations assuming a
normal distribution given a particular mean and standard deviation.
It uses two fields, the mean or average of a distribution (mean)
and its standard deviation (std_dev). From this information, it
will measure the percentile of all input values.
# in percentile.rb
Wukong.processor(:percentile) do
SQRT_1_HALF = Math.sqrt(0.5)
field :mean, Float, :default => 0.0
field :std_dev, Float, :default => 1.0
def process value
observation = value.to_f
z_score = (mean - observation) / std_dev
percentile = 50 * Math.erfc(z_score * SQRT_1_HALF)
yield [observation, percentile].join("\t")
end
end
These fields have default values but you can overide them on the command line. If you scored a 95 on an exam where the mean score was 80 points and the standard deviation of the scores was 10 points, for example, then you'd be in the 93rd percentile:
$ echo 95 | wu-local /tmp/percentile.rb --mean=80 --std_dev=10
95.0 93.3192798731142
If the exam were more difficult, with a mean of 75 points and a standard deviation of 8 points, you'd be in the 99th percentile!
$ echo 95 | wu-local /tmp/percentile.rb --mean=75 --std_dev=8
95.0 99.37903346742239
The Lifecycle of a Processor
Processors have a lifecycle that they execute when they are run within
the context of a Wukong runner like wu-local or wu-hadoop. Each
lifecycle phase corresponds to a method of the processor that is
called:
setupcalled after the Processor is initialized but before the first record is processed. You cannot yield from this method.processcalled once for each input record, may yield once, many, or no times.finalizecalled after the the last record has been processed but while the processor still has an opportunity to yield records.stopcalled to signal to the processor that all work should stop, open connections should be closed, &c. You cannot yield from this method.
The above examples have already focused on the process method.
The setup and stop methods are often used together to handle
external connections
# in geolocator.rb
Wukong.processor(:geolocator) do
field :host, String, :default => 'localhost'
attr_accessor :connection
def setup
self.connection = Database::Connection.new(host)
end
def process record
record.added_value = connection.find("...some query...")
end
def stop
self.connection.close
end
end
The finalize method is most useful when writing a "reduce"-type
operation that involves storing or aggregating information till some
criterion is met. It will always be called after the last record has
been given (to process) but you can call it whenever you want to
within your own code.
Here's an example of using the finalize method to implement a simple
counter that counts all the input records:
# in counter.rb
Wukong.processor(:counter) do
attr_accessor :count
def setup
self.count = 0
end
def process thing
self.count += 1
end
def finalize
yield count
end
end
It hinges on the fact that the last input record will be passed to
process first and only then will finalize be called. This
allows the last input record to be counted/processed/aggregated and
then the entire aggregate to be dealt with in finalize.
Because of this emphasis on building and processing aggregates, the
finalize method is often useful within processors meant to run as
reducers in a Hadoop environment.
Note:: Finalize is not guaranteed to be called by in every possible
environment as it depends on the chosen runner. In a local or Hadoop
environment, the notion of "last record" makes sense and so the
corresponding runners will call finalize. In an environment like
Storm, where the concept of last record is not (supposed to be)
meaningful, the corresponding runner doesn't ever call it.
Serialization
wu-local (and many similar tools) deal with inputs and outputs as
strings.
Processors want to process objects as close to their domain as is possible. A processor which decorates address book entries with Twitter handles doesn't want to think of its inputs as Strings but Hashes or, better yet, Persons.
Wukong makes it easy to wrap a processor with other processors dedicated to handling the common tasks of parsing records into or out of formats like JSON and turning them into Ruby model instances.
De-serializing data formats like JSON or TSV
Wukong can parse and emit common data formats like JSON and delimited formats like TSV or CSV so that you don't pollute or tie down your own processors with protocol logic.
Here's an example of a processor that wants to deal with Hashes as input.
# in extractor.rb
Wukong.processor(:extractor) do
def process hsh
yield hsh["first_name"]
end
end
Given JSON data,
$ cat input.json
{"first_name": "John", "last_name":, "Smith"}
{"first_name": "Sally", "last_name":, "Johnson"}
...
you can feed it directly to a processor
$ cat input.json | wu-local --from=json extractor.rb
John
Sally
...
Other processors really like Arrays:
# in summer.rb
Wukong.processor(:summer) do
def process values
yield values.map(&:to_f).inject(&:+)
end
end
s
Related Skills
node-connect
342.0kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
frontend-design
84.7kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
openai-whisper-api
342.0kTranscribe audio via OpenAI Audio Transcriptions API (Whisper).
commit-push-pr
84.7kCommit, push, and open a PR
