SkillAgentSearch skills...

Briefly

Briefly - A Python Meta-programming Library for Job Flow Control

Install / Use

/learn @bloomreach/Briefly
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

Briefly Job Flow Control

Join the chat at https://gitter.im/bloomreach/briefly Chou-han Yang (chyang@bloomreach.com) (Version 1.0)

Overview

Briefly is a Python based meta programming library designed to manage complex workflows with various type of tasks, such as Hadoop (local, Amazon EMR, or Qubole), Java processes, and shell commands, with a minimal and elegant Hartman pipeline syntax. Briefly provides resource management for job execution i.e. infrastructure and operational logic so that developers can focus on their job logic.

At BloomReach, we run thousands of hadoop jobs everyday on clusters with hundreds of machines. We use Briefly to manage our critical pipelines, and to maintain the robustness and efficiency of data processing jobs.

Supported types of processes

  • Simple python process
  • Local hadoop process
  • Amazon Elastic Mapreduce hadoop process
  • Qubole hadoop process
  • Java process
  • Shell process

Each process is capable of defining its own dependencies and declares its own output. The library provides a lot of default wrappers to help start off with minimal customization.

Features

  • Use of a Hartman pipeline to create job flow.
  • Resource management for multiple Hadoop clusters (Amazon EMR, Qubole) for parallel execution, also allowing customized Hadoop cluster.
  • Individual logs for each process to make debugging easier.
  • Fully resumable pipeline with customizable execution check and error handling.
  • Encapsulated local and remote filesystem (s3) for unified access.
  • Automatic download of files from s3 for local process and upload of files to s3 for remote process with s4cmd.
  • Automatic fail/retry logic for all failed processes.
  • Automatic price upgrades for EMR clusters with spot instances.
  • Timeout for Hadoop jobs to prevent long-running clusters.
  • Dynamic cluster size adjustment for EMR steps (to be implemented)

External and Third-Party Library Requirements

Required libraries will be installed automatically with setup.py.

Installation

Clone briefly from github

git clone https://github.com/bloomreach/briefly.git

Install python package

cd briefly
python setup.py install

Getting Started

Your First Briefly Pipeline

from briefly import *
from briefly.common import *

objs = Pipeline("My first pipeline")
prop = objs.prop

@simple_process
def dump(self):
  '''Dump all data from the source node'''
  for line in self.read():
    self.write(line)
    print line,

target = objs.source(prop.input) | sort() | dump()
objs.run(target)

Run your pipeline:

mkdir build
python pipeline.py -Dinput=demo.txt

You will see the sorted output of the file:

Configuring targets...
Ann     5003
Jonh    5001
Lee     5004
Smith   5002
Releasing resources...
1 total target(s), 1 executed.

Note that if you run it again, you won't see anything because Briefly checks the source and target date and see there is no need to generate the target again.

Configuring targets...
Releasing resources...
1 total target(s), 1 executed.

You just need to delete the content of build directory to execute it again.

Create a Propery File for Options

As the pipeline grows more complex, we will have more configurations. It makes sense to put all pipeline configurations into a file (or multiple files). We can create demo.conf:

my_input = "demo.txt"
some_opt1 = ["1", "2", "3"]
some_opt2 = {"a": 1, "b": 2, "c": 3}
some_opt3 = true

The value of each option can be any valid JSON object. See next chapter for full usage on property files and how to read values out from it. Now you can execute again with -p option:

python pipeline.py -pdemo.conf

Multiple configuration files can be passed in and the configuration passed in later will overwrite the one load previously:

python pipeline.py -p demo.conf -p foo.conf -p bar.conf -Dopt1=xxx -Dopt2=yyy

Chain to hadoop process in pipeline

OK, so now we have some basic pipelines running. We can add more complex processes such as hadoop process. We are going to use Hadoop's Word Count in hadoop-examples.jar. First, we have to set a property in demo.conf:

my_input = "demo.txt"

# This tells Briefly to run hadoop locally. Valid options are local, emr, and qubole
hadoop.runner = "local"

Now we can chain the pipeline with our first hadoop job:

from briefly import *
from briefly.common import *

objs = Pipeline("My first hadoop pipeline")
prop = objs.prop

@simple_process
def dump(self):
  for line in self.read():
    self.write(line)
    print line,

@simple_hadoop_process
def word_count(self):
  self.config.hadoop.jar = 'hadoop-examples.jar' # path to your local jar
  self.config.defaults(
    main_class = 'wordcount', # This is special for hadoop-examples.jar. Use full class name instead.
    args = ['${input}', '${output}']
  )

target = objs.source(prop.my_input) | sort() | dump()
target2 = objs.source(prop.my_input) | word_count() | dump()

objs.run(target, target2)

Run it again, we will see the output:

Configuring targets...
Ann 5003
Jonh  5001
Lee 5004
Smith 5002
5001  1
5002  1
5003  1
5004  1
Ann 1
Jonh  1
Lee 1
Smith 1
Releasing resources...
2 total target(s), 2 executed.

Full log for each process can be found in build directory. You can found the main execution log in build/execute.log:

Running pipeline: My first pipeline
Configuring targets...
 - sort-6462031860a6f17b : executing
 - word_count-7f55d503e26321d7 : executing
 - sort-6462031860a6f17b : done
 - dump-ea94f33ba627c5f6 : executing
 - dump-ea94f33ba627c5f6 : done
 - word_count-7f55d503e26321d7 : done
 - dump-7f06c703ee1089fb : executing
 - dump-7f06c703ee1089fb : done
Releasing resources...
2 total target(s), 2 executed.

Congratulations! Now that you've completed the demo pipeline, it is easy to go from here to build more complex pipelines. Things you can do:

  • Create multiple property files for test, staging, and production runs
  • Put your files on Amazon S3
  • Change hadoop.runner to use Amazone EMR or Qubole
  • And more...

Pipeline Basics

Briefly pipelines always bind to a property collection. The collection provides basic settings and environment for the entire pipeline. You can set the default value and combine with the properties loaded from -p command line parameter to override. See properties section for more details.

Internally, Briefly creates a DAG (directed acyclic graph) to resolve the dependencies between processes. Every function-like process in Briefly is actually a node class factory. So all the executions are deferred until you call

objs.run(targets)

Pipeline Creation

To create a new pipeline is simple:

objs = Pipeline("Name of your pipeline")
prop = objs.prop

The pipeline constructor will also generate a new property collection, from which you can get or set values.

Node Executions

Each process you chain in the pipeline will be augmented to a class by the decorator, such as @simple_process. Different types of process require different initialization. For example, a function with @simple_hadoop_process will just let you configure all the necessary parameters to invoke hadoop process instead of actually processing data inside the function.

The Node class in Briefly looks like:

class Node(object):
  ...
  def configure(self):
    '''Main configuration method.
       Setup the dependencies and create hash id here.
       Child class can override this method to modify the job flow.
    '''
    ...

  def check(self):
    '''Check if we can skip the execution.
       Child class should override this function to provide
       customized check.
    '''
    ...

  def execute(self):
    '''Execute this node. Child class should override.'''
    ...
  • configure() -- setup a node for execution, rewire process flow and setup necessary parameters.
  • check() -- to verify if a node has been executed before by checking file timestamps, therefore can be skipped.
  • execute() -- actual implementation of the process execution.

Phases of execution

There are 3 major phases for pipeline constructions and execution:

  • DAG construction - Follow the code execution order of the Python program, your job flow will be constructed with a DAG as internal representation. This phase runs in single thread.
  • Node Configure - Check if each node meets the dependency requirements for execution. Also rewire the DAG for automatic file transfer if needed. This phase runs in single thread.
  • Node Execute - Execute all processes with the order and precedence with the DAG. This runs with multiple threads. Each node execution will hold a thread during execution.

Default Properties

All Briefly system configurations have default values set in defaults.py. This file also provides good references of what options provided by Briefly.

Property Collection

As a job flows get more complicated, we may want to have more options to control a job flow. More and more control options and parameters will be included into the property coll

View on GitHub
GitHub Stars108
CategoryDevelopment
Updated7d ago
Forks32

Languages

Python

Security Score

95/100

Audited on Mar 19, 2026

No findings