SkillAgentSearch skills...

Sputnik

No description available

Install / Use

/learn @airbnb/Sputnik
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

Sputnik

Framework for writing daily Spark batch jobs, which use Hive as primary storage.

Sputnik was the first artificial Earth satellite. The Soviet Union launched it into an elliptical low Earth orbit on 4 October 1957, orbiting for three weeks before its batteries died, then silently for two more months before falling back into the atmosphere. sputnik schema

Motivation

Apache Spark is general purpose execution engine which provides a lot of power and flexibility. It allows a data engineer to read from different sources in different manners. Daily batch jobs, which read from Hive and write to Hive usually do not require such flexibility. On the opposite some restrictive code is required to implement some good practices of data engineering. Example of that might be a code, which reads partitioned data for current date and writes to this date's partition in result table. Backfilling of the result table is something what Spark does not do and require user to define. Sputnik is a framework which helps follow good practices for data engineering of daily batch jobs working with data in Hive. It contains most of the code, which data engineer would need to write and operate their job. This includes, but not limited to:

  • Reading data from source table filtered by date or date range specified in console for this job run.
  • Backfilling data
  • Running checks on result data before inserting it into the result table
  • Writing data to testing version of the table, when job runs in testing or staging mode
  • Utils to easily write unit test for a job
  • Creating schema for result table from annotated case class or java bean
  • Updating result table meta-information
  • Creating result table through Hive “create statement” to improve compatibility

Job logic vs run logic

Because of Spark's flexibility data engineer needs to specify not only the domain specific logic for processing the data, but orchestration logic for reading and writing the data. Sputnik tries to keep these two sets of logic separate and asks data engineer to define job logic, but keep run logic to Sputnik.

Example of job specific logic:

  • job multiply every value from input table by 2
  • job specifies that source table is “core.numbers” and result table is “core.numbers_multiplied”
  • job specifies that result table is partitioned by date
  • job specifies retention of result tables
  • job specifies checks for data, before it's written

Example of run specific logic:

  • we run job for one date, so we retrieve input data only for that date
  • job tries to write to table, which does not exists, so we need to create the table
  • job runs in staging mode, so all result tables are created with “_testing”

sputnik schema

Code example

User(data engineer) needs to extend SparkJob, define run method and use HiveTableReader and HiveTableWriter to read and write data.


object VisitsAggregationJob extends SparkJob {

  def run(): Unit = {

    val input = hiveTableReader.getDataframe("user_data.visits")

    val result = input
      .groupBy("userId", "ds")
      .agg(countDistinct("url").as("distinctUrlCount"))

    hiveTableWriter.saveAsHiveTable(
      dataFrame = result,
      dbTableName = "user_data.visits_aggregation",
      partitionSpec = HivePartitionSpec.DS_PARTITIONING
    )
  }

}

When user would run this job in console, they can specify the date to process and other parameters for this run:

spark-submit --class VisitsAggregationJob --ds 2019-01-10 \
 --dropResultTables true \
 --writeEnv PROD \
 --logLevel ERROR

User Guide

Writing spark job

VisitsAggregationJob

Let's look at simple job. Imagine that you have input table user_data.visits with information about how users visited Airbnb site. You need to generate table user_data.visits_aggregation, where url's would be aggregated by userId and ds. So the schemas for the tables are:

  user_data.visits
    -- userId
    -- url
    -- ds
  user_data.visits_aggregation
    -- userId
    -- distinct_url_count
    -- ds

Like majority of other tables, table user_data.visits is partitioned by ds. So, we need the result table to be partitioned by ds as well. Let's look at the code of the job:

object VisitsAggregationJob extends SputnikJob with AutoModeSputnikJob {

  override val startDate: LocalDate = DateConverter.stringToDate("2015-01-01")
  override val outputTables: Seq[String] = List("user_data.visits_aggregation")

  def run(): Unit = {

    val inputTable = "user_data.visits"

    val spark = sputnikSession.ss
    import spark.implicits._

    val input: DataFrame = hiveTableReader
      .getDataframe(tableName = inputTable)

    val result = input
      .groupBy("userId", DS_FIELD)
      .agg(countDistinct("url").as("distinctUrlCount"))
      .as[VisitAggregated]

    hiveTableWriter.saveDatasetAsHiveTable(
      dataset = result,
      itemClass = classOf[VisitAggregated]
    )
  }

}

The job extends class SputnikJob, which is base class for all spark jobs written in Sputnik. Block which does the core logic of the job is

val result = input
      .groupBy("userId", DS_FIELD)
      .agg(countDistinct("url").as("distinctUrlCount"))
      .as[VisitAggregated]

It has not special magic, just pure spark. Line:

.as[VisitAggregated]

means, that we convert result DataFrame to Dataset of VisitAggregated. You can skip this step if you prefer to work with DataFrames. Everything in Sputnik builds around the DataFrame API, but user can use Datasets for convenience. We have block

val spark = ss
import spark.implicits._

to be able to convert DataFrame to Dataset, but it's just spark thing and has nothing to do with Sputnik. Now let's look more closely at blocks of code where Sputnik provides functionality additional to Spark. We read from input table with HiveTableReader:

val input: DataFrame = hiveTableReader.getDataframe(inputTable)

Although the table is partitioned, we do not specify that or specify a filter to take data from the certain partition. The reason for not specifying partition logic here is that choosing the partition to take data from is “run specific” logic, not “job specific”. When we would run this job for one day, Sputnik would filter out the data only for that day. When we run the job for batch of days, HiveTableReader would get the data for batch of days. After we've done aggregation we need to write the result:

    hiveTableWriter.saveDatasetAsHiveTable(
      dataset = result,
      itemClass = classOf[VisitAggregated]
    )

When we write the result with Sputnik we specify where to write to and what the result table looks like. We do not specify how we want to write data. We specify all meta information about the table in annotations to case class of result Dataset. Or we use HiveTableProperties if we use Dataframe API(would be later in examples).

  @TableName("user_data.visits_aggregation")
  @TableDescription("Counting distinct visited url for a user")
  @FieldsFormatting(CaseFormat.LOWER_UNDERSCORE)
  @TableFormat(TableFileFormat.RCFILE)
  case class VisitAggregated(
                              userId: String@Comment("Id of a user"),
                              distinctUrlCount: Long,
                              @PartitioningField ds: String
                            )

Annotation TableName specify the name of table with data for this case class. TableDescription specify description of the table in meta information in Hive. We annotate class with FieldsFormatting, when we want to convert field names before writing it to Hive. For example schema of the hive table for this class would have next field names: user_id, distinct_count, ds. Motivation for that is the fact that naming conventions in Java/Scala and Hive are different: in Java you use camelCase to name field and in Hive you use snake_case to name columns. PartitioningField allows use to specify fields on which data would be partitioned. In 99% of the cases, when we partition, we want to partition by ds field.

Logic, which HiveTableWriter does:

  • It changes the result table name, if we run job in staging mode
  • It creates table with “CREATE TABLE” hive statement, if table does not yet exist. Default spark writer to hive does not do that and it creates problems with compatibility with other systems.
  • It updates table meta information
  • It drops the result table and creates a new one if we specify such behavior with a command-line flag, so we can easily iterate in developer mode
  • It changes schema of dataframe according to result schema of table, so even if we change the logic and it would result in change in order of the fields we would write correctly.
  • It repartitions and tries to reduce number of result files on disk
  • It does checks on result, before inserting it.

So, please use HiveTableWriter.

UsersToCountryJob

Let's move to the next job to explore more of Sputnik functionality. Job UsersToCountryJob joins tables to get user -> country mapping. Input is

user_data.users
    -- userId
    -- areaCode

user_data.area_codes
    -- areaCode
    -- country

to get

user_data.users_to_country
    -- userId
    -- country

The job code:


object NullCheck extends SQLCheck {

  def sql(temporaryTableName: String): String = {
    s"""
       | SELECT
       | IF(country is null OR country = "", false, true) as countryExist,
       | IF(userId is null OR userId = "", false, true) as userExists
       | from $temporaryTableName
     """.stripMargin
  }

}

object UsersToCountryJob extends SputnikJob {

  def run(): Unit = {
    val userTable = "user_data.users"
    val areaCodesTable
View on GitHub
GitHub Stars63
CategoryDevelopment
Updated16h ago
Forks20

Languages

Scala

Security Score

75/100

Audited on Mar 31, 2026

No findings