SkillAgentSearch skills...

Sparkplug

A framework for creating composable and pluggable data processing pipelines using Apache Spark, and running them on a cluster.

Install / Use

/learn @springnz/Sparkplug
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

Sparkplug Sparkplug

A framework for creating composable and pluggable data processing pipelines using Apache Spark, and running them on a cluster.

Please note that this project is early stage work in progress, and will be subject to some breaking changes and refactoring.

Apache Spark is awesome, it lets you define distributed data transformation and analysis with elegant functional Scala code.

Unfortunately the application code that ends up gluing these processes together can end up a bit of a mess, and some thought needs to go into how to make it testable. The mechanism for cluster execution that comes out the box is somewhat inefficient and clunky.

This project aims to bridge the gap. In particular, it addresses these two specific requirements:

  1. Creating data processing pipelines, with elegant Scala code, that are easy to reuse and test in isolation.
  2. Providing a lightweight mechanism for launching and executing Spark processes on a cluster.

These two requirements are quite different. Indeed it is possible to use Sparkplug for either of them without taking advantage of the other. For example it is possible to create composable data pipelines as described below, then execute them directly, or using any other Spark cluster execution or job manager of your choice.

Functional data processing pipelines

The key abstraction here is the SparkOperation monad.

sealed trait SparkOperation[+A] {
  def run(ctx: SparkContext): A
}

SparkOperation are typically created using the companion class. Here is a simple example:

val textRDDOperation: SparkOperation[RDD[String]] = SparkOperation {
  ctx ⇒ ctx.makeRDD("There is nothing either good or bad, but thinking makes it so.".split(' '))
}

This is a simple SparkOperation that takes a string and returns a RDD[String] consisting of the words of a sentence.

We can then use this SparkOperation to create another operation.

val letterCount: SparkOperation[Long] = for {
    logData ← textRDDProvider
  } yield logData.filter(_.contains("a")).count()
}

In this case we are counting the number of words that contain the letter 'a'.

Proceeding as in this simple example, we can create complex data processing pipelines, mainly using monadic operations.

These include:

  • Bread and butter map and flatmap to compose operations (as above).
  • Combining operations (e.g. convert a tuple of SparkOperations to a SparkOperation of tuples).
  • Sequence operations (e.g. convert a list of SparkOperations to a SparkOperation of list).

Then once we have composed the SparkOperation as desired, it is against a given SparkContext.

val answer = letterCount.run(sparkContext)

The types of SparkOperations are typically, at least until the final step of the pipeline, RDDs.

Why go to all this trouble?

For simple processes, as above, it is overkill. However, non-trivial data processing pipelines typically involve many stages, and often there are many permutations over which these steps may be applied in different scenarios.

Splitting the process into discrete, separate operations has two main advantages:

  1. SparkOperations, modular in nature, can easily be reused or shared across different data processing pipelines.
  2. They can be unit tested in isolation. There are several utilities included in the project that facilitate this. This is covered in the section on testing below.
  3. Operations can be glued together using compact functional code.

Note that this pattern involves decoupling the pipeline definition from the pipeline execution, which enables a great deal of flexibility over how one defines pipelines and executes them. It enables cases in which it useful to reverse the order of operations, and in certain cases avoid their execution completely.

It does lead to the one drawback in that stack dumps are not normally very meaningful. For this reason good logging and error handling is important.

Wiring together SparkOperation components

As a common use case, consider a data transformation pipeline. The fist step in the pipeline is the data input step. The subsequent steps involve processing this data, or running algorithms on it.

For example, consider a pipeline processing a corpus of documents:

  1. The first step processes input data into a RDD of Documents.
  2. The next step is to transform Documents into ParserInfos.
  3. The final step is to calculate document statistics, and return a DocumentStats object with summary statistics.

This can be represented by the following pipeline trait.

trait DocumentPipeline {
  def dataSource: SparkOperation[RDD[InputType]]

  lazy val createOperation: SparkOperation[RDD[Document]] = dataSource.map {
    input ⇒ createDocument(input)
  }

  lazy val parseOperation: SparkOperation[RDD[ParserInfo]] = createOperation.map {
    doc ⇒ parseDocument(doc)
  }

  lazy val statsOperation: SparkOperation[DocumentStats] = parseOperation.map {
    parsedDoc ⇒ calculateStats(parsedDoc)
  }

  lazy val saveParsedDocOperation: SparkOperation[SaveStatus] = parseOperation.flatMap {
    parsedDoc ⇒ saveParsedDoc(parsedDoc)
  }
}

Note that SparkOperations need not return RDDs, and in general, the final step in the pipeline will generally return something other than a RDD.

This will generally be the final status after writing data to a database (Try[Unit] is good for this), or some summary/aggregate result.

The final result of the pipeline should be a serializable type.

We wish to use a different data source for test and production environments. This can be done by applying the following overrides:

E.g. for the production environment, we may be using Cassandra as a data source:

import springnz.sparkplug.cassandra.CassandraRDDFetcher

trait ProdDocPipeline extends DocumentPipeline {
  override lazy val dataSource = CassandraRDDFetcher.selectAll[InputType](keySpace, table)
}

Functional pipeline composition patterns

SparkOperation[A] is a monad. As such all the monad patterns are available. The monad implementation is provided by scalaz.

To take advantage of some of the operations, certain imports from scalaz are necessary. Implementations of map and flatMap are provided, so no imports for these, and for comprehensions, are necessary.

Here are some examples of functional pipeline patterns:

Map

This is the most commonly used pattern, and examples of its usage is given in the pipeline above. Map is best suited to constructing a single-step extension to the pipeline.

FlatMap

Many SparkOperations are constructed via a function of the following form:

object OperationFactory {
  def createOperation(rdd: RDD[A]): SparkOperation[B] = ???
}

This pattern is often used for operations that persist data to a database such as Cassandra or to a HDFS file store.

E.g. in the document example given above,

def saveParsedDoc(RDD[ParserInfo]): SparkOperation[SaveStatus]

is a function that generates a SparkOperation from a RDD. To plug it into the pipeline, it must be flatmapped.

map is generally useful for connecting a single process to the end of a pipeline. flatMap is more powerful. It can connect two entire pipelines.

Joins and pairs / tuples

FlatMap can be used to do joins. However, applicative functors are the functional abstraction most naturally suited to this operation.

Here is an example of a join operation (with A, B and C obviously being the appropriately compatible types):


def joinRDDs(rddA: RDD[A], rddB: RDD[B]): RDD[C] = {
  ...
  rddA.join(rddB)
}

Here is how they can be applied to create a SparkOperation representing this join:

import scalaz.syntax.bind._

def operationC: SparkOperation[RDD[C]] = (operationA |@| operationB)(joinRDDs)

The result of this is a new operation that when executed, will perform the following:

  • Execute operationA and operationB to produce an RDD[A] and an RDD[B] respectively.
  • Perform the join to produce an operationC or type RDD[C].

Note that it is necessary to import scalaz.syntax.bind._ to bring the |@| operator (or it's unicode variant, ) into scope.

Sequences and Traversables

Sequence operators are a natural generalisation of an applicative, which takes a pair of Spark operations and produces a Spark operation of pairs. A sequence operator takes a List of Spark operations and generates a single Spark operation of Lists. Traversable operations are a further generalisation of this to allow an extra function in between, and can be applied across a more general class of data structures than just Lists.

An example of this is the following:

Suppose we have a SparkOperation that processes a single days data, and we wish to run the same operation on a month of data.

def daysOperation(date: Date): SparkOperation[A]  = ???

The following code generates a list of operations, each of which processes data for a single date:

val listOfOperations: List[SparkOperation[A]] = for (day <- daysInMonthList) yield daysOperation(day)

This can be transformed as:

import scalaz.std.list.listInstance

val combinedOperation: SparkOperation[List[A]] = SparkOperation.monad.sequence(jobsList)

The syntax is not as cute, but it is still one line of code to create a SparkOperation that generates a list of As.

No more ugly looping code.

Anti-patterns

There aren't too many ways of going wrong, but the one pattern to avoid is performing operations on RDDs when they could be performed on SparkOperations.

As general guidelines:

  • Use the SparkOperation { ctx => ??? } generally for the first step of t
View on GitHub
GitHub Stars47
CategoryDevelopment
Updated1y ago
Forks8

Languages

Scala

Security Score

75/100

Audited on Oct 16, 2024

No findings