Briefly
Briefly - A Python Meta-programming Library for Job Flow Control
Install / Use
/learn @bloomreach/BrieflyREADME
Briefly Job Flow Control
Chou-han Yang (chyang@bloomreach.com)
(Version 1.0)
Overview
Briefly is a Python based meta programming library designed to manage complex workflows with various type of tasks, such as Hadoop (local, Amazon EMR, or Qubole), Java processes, and shell commands, with a minimal and elegant Hartman pipeline syntax. Briefly provides resource management for job execution i.e. infrastructure and operational logic so that developers can focus on their job logic.
At BloomReach, we run thousands of hadoop jobs everyday on clusters with hundreds of machines. We use Briefly to manage our critical pipelines, and to maintain the robustness and efficiency of data processing jobs.
Supported types of processes
- Simple python process
- Local hadoop process
- Amazon Elastic Mapreduce hadoop process
- Qubole hadoop process
- Java process
- Shell process
Each process is capable of defining its own dependencies and declares its own output. The library provides a lot of default wrappers to help start off with minimal customization.
Features
- Use of a Hartman pipeline to create job flow.
- Resource management for multiple Hadoop clusters (Amazon EMR, Qubole) for parallel execution, also allowing customized Hadoop cluster.
- Individual logs for each process to make debugging easier.
- Fully resumable pipeline with customizable execution check and error handling.
- Encapsulated local and remote filesystem (s3) for unified access.
- Automatic download of files from s3 for local process and upload of files to s3 for remote process with s4cmd.
- Automatic fail/retry logic for all failed processes.
- Automatic price upgrades for EMR clusters with spot instances.
- Timeout for Hadoop jobs to prevent long-running clusters.
- Dynamic cluster size adjustment for EMR steps (to be implemented)
External and Third-Party Library Requirements
- s4cmd (>=1.5.20): https://github.com/bloomreach/s4cmd
- boto (>=2.30.0): https://github.com/boto/boto
- Qubole SDK (>=1.4.0): https://github.com/qubole/qds-sdk-py
Required libraries will be installed automatically with setup.py.
Installation
Clone briefly from github
git clone https://github.com/bloomreach/briefly.git
Install python package
cd briefly
python setup.py install
Getting Started
Your First Briefly Pipeline
from briefly import *
from briefly.common import *
objs = Pipeline("My first pipeline")
prop = objs.prop
@simple_process
def dump(self):
'''Dump all data from the source node'''
for line in self.read():
self.write(line)
print line,
target = objs.source(prop.input) | sort() | dump()
objs.run(target)
Run your pipeline:
mkdir build
python pipeline.py -Dinput=demo.txt
You will see the sorted output of the file:
Configuring targets...
Ann 5003
Jonh 5001
Lee 5004
Smith 5002
Releasing resources...
1 total target(s), 1 executed.
Note that if you run it again, you won't see anything because Briefly checks the source and target date and see there is no need to generate the target again.
Configuring targets...
Releasing resources...
1 total target(s), 1 executed.
You just need to delete the content of build directory to execute it again.
Create a Propery File for Options
As the pipeline grows more complex, we will have more configurations. It makes sense to put all pipeline configurations into a file (or multiple files). We can create demo.conf:
my_input = "demo.txt"
some_opt1 = ["1", "2", "3"]
some_opt2 = {"a": 1, "b": 2, "c": 3}
some_opt3 = true
The value of each option can be any valid JSON object. See next chapter for full usage on property files and how to read values out from it. Now you can execute again with -p option:
python pipeline.py -pdemo.conf
Multiple configuration files can be passed in and the configuration passed in later will overwrite the one load previously:
python pipeline.py -p demo.conf -p foo.conf -p bar.conf -Dopt1=xxx -Dopt2=yyy
Chain to hadoop process in pipeline
OK, so now we have some basic pipelines running. We can add more complex processes such as hadoop process. We are going to use Hadoop's Word Count in hadoop-examples.jar. First, we have to set a property in demo.conf:
my_input = "demo.txt"
# This tells Briefly to run hadoop locally. Valid options are local, emr, and qubole
hadoop.runner = "local"
Now we can chain the pipeline with our first hadoop job:
from briefly import *
from briefly.common import *
objs = Pipeline("My first hadoop pipeline")
prop = objs.prop
@simple_process
def dump(self):
for line in self.read():
self.write(line)
print line,
@simple_hadoop_process
def word_count(self):
self.config.hadoop.jar = 'hadoop-examples.jar' # path to your local jar
self.config.defaults(
main_class = 'wordcount', # This is special for hadoop-examples.jar. Use full class name instead.
args = ['${input}', '${output}']
)
target = objs.source(prop.my_input) | sort() | dump()
target2 = objs.source(prop.my_input) | word_count() | dump()
objs.run(target, target2)
Run it again, we will see the output:
Configuring targets...
Ann 5003
Jonh 5001
Lee 5004
Smith 5002
5001 1
5002 1
5003 1
5004 1
Ann 1
Jonh 1
Lee 1
Smith 1
Releasing resources...
2 total target(s), 2 executed.
Full log for each process can be found in build directory. You can found the main execution log in build/execute.log:
Running pipeline: My first pipeline
Configuring targets...
- sort-6462031860a6f17b : executing
- word_count-7f55d503e26321d7 : executing
- sort-6462031860a6f17b : done
- dump-ea94f33ba627c5f6 : executing
- dump-ea94f33ba627c5f6 : done
- word_count-7f55d503e26321d7 : done
- dump-7f06c703ee1089fb : executing
- dump-7f06c703ee1089fb : done
Releasing resources...
2 total target(s), 2 executed.
Congratulations! Now that you've completed the demo pipeline, it is easy to go from here to build more complex pipelines. Things you can do:
- Create multiple property files for test, staging, and production runs
- Put your files on Amazon S3
- Change hadoop.runner to use Amazone EMR or Qubole
- And more...
Pipeline Basics
Briefly pipelines always bind to a property collection. The collection provides basic settings and environment for the entire pipeline. You can set the default value and combine with the properties loaded from -p command line parameter to override. See properties section for more details.
Internally, Briefly creates a DAG (directed acyclic graph) to resolve the dependencies between processes. Every function-like process in Briefly is actually a node class factory. So all the executions are deferred until you call
objs.run(targets)
Pipeline Creation
To create a new pipeline is simple:
objs = Pipeline("Name of your pipeline")
prop = objs.prop
The pipeline constructor will also generate a new property collection, from which you can get or set values.
Node Executions
Each process you chain in the pipeline will be augmented to a class by the decorator, such as @simple_process. Different types of process require different initialization. For example, a function with @simple_hadoop_process will just let you configure all the necessary parameters to invoke hadoop process instead of actually processing data inside the function.
The Node class in Briefly looks like:
class Node(object):
...
def configure(self):
'''Main configuration method.
Setup the dependencies and create hash id here.
Child class can override this method to modify the job flow.
'''
...
def check(self):
'''Check if we can skip the execution.
Child class should override this function to provide
customized check.
'''
...
def execute(self):
'''Execute this node. Child class should override.'''
...
configure()-- setup a node for execution, rewire process flow and setup necessary parameters.check()-- to verify if a node has been executed before by checking file timestamps, therefore can be skipped.execute()-- actual implementation of the process execution.
Phases of execution
There are 3 major phases for pipeline constructions and execution:
DAG construction- Follow the code execution order of the Python program, your job flow will be constructed with a DAG as internal representation. This phase runs in single thread.Node Configure- Check if each node meets the dependency requirements for execution. Also rewire the DAG for automatic file transfer if needed. This phase runs in single thread.Node Execute- Execute all processes with the order and precedence with the DAG. This runs with multiple threads. Each node execution will hold a thread during execution.
Default Properties
All Briefly system configurations have default values set in defaults.py. This file also provides good references of what options provided by Briefly.
Property Collection
As a job flows get more complicated, we may want to have more options to control a job flow. More and more control options and parameters will be included into the property coll
