Hyperdrive
Extensible streaming ingestion pipeline on top of Apache Spark
Install / Use
/learn @AbsaOSS/HyperdriveREADME
Copyright 2018 ABSA Group Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Hyperdrive - An extensible streaming ingestion pipeline on top of Apache Spark
Build Status
| master | develop |
| ------------- | ------------- |
| |
|
What is Hyperdrive?
Hyperdrive is a configurable and scalable ingestion platform that allows data movement and transformation from streaming sources with exactly-once fault-tolerance semantics by using Apache Spark Structured Streaming.
In Hyperdrive, each ingestion is defined by the three components reader, transformer and writer. This separation allows adapting to different streaming sources and sinks, while reusing transformations common across multiple ingestion pipelines.
Motivation
Similar to batch processing, data ingestion pipelines are needed to process streaming data sources. While solutions for data pipelines exist, exactly-once fault-tolerance in streaming processing is an intricate problem and cannot be solved with the same strategies that exist for batch processing.
This is the gap the Hyperdrive aims to fill, by leveraging the exactly-once guarantee of Spark's Structured Streaming and by providing a flexible data pipeline.
Architecture
The data ingestion pipeline of Hyperdrive consists of four components: readers, transformers, writers.
- Readers define how to connect to sources, e.g. how to connect to Kafka in a secure cluster by providing security directives, which topic and brokers to connect to.
- Transformers define transformations to be applied to the decoded DataFrame, e.g. dropping columns.
- Writers define where DataFrames should be sent after the transformations, e.g. into HDFS as Parquet files.
Built-in components
KafkaStreamReader- reads from a Kafka topic.ParquetStreamReader- reads Parquet files from a source directory.ConfluentAvroDecodingTransformer- decodes the payload as Confluent Avro (through ABRiS), retrieving the schema from the specified Schema Registry. This transformer is capable of seamlessly handling whatever schemas the payload messages are using.ConfluentAvroEncodingTransformer- encodes the payload as Confluent Avro (through ABRiS), updating the schema to the specified Schema Registry. This transformer is capable of seamlessly handling whatever schema the dataframe is using.ColumnSelectorStreamTransformer- selects all columns from the decoded DataFrame.AddDateVersionTransformerStreamWriter- adds columns for ingestion date and an auto-incremented version number, to be used for partitioning.ParquetStreamWriter- writes the DataFrame as Parquet, in append mode.KafkaStreamWriter- writes to a Kafka topic.DeltaCDCToSnapshotWriter- writes the DataFrame in Delta format. It expects CDC events and performs merge logic and creates the latest snapshot table.DeltaCDCToSCD2Writer- writes the DataFrame in Delta format. It expects CDC events and performs merge logic and creates SCD2 table.HudiCDCToSCD2Writer- writes the DataFrame in Hudi format. It expects CDC events and performs merge logic and creates SCD2 table.
Custom components
Custom components can be implemented using the Component Archetype following the API defined in the package za.co.absa.hyperdrive.ingestor.api
- A custom component has to be a class which extends either of the abstract classes
StreamReader,StreamTransformerorStreamWriter - The class needs to have a companion object which implements the corresponding trait
StreamReaderFactory,StreamTransformerFactoryorStreamWriterFactory - The implemented components have to be packaged to a jar file, which can then be added to the classpath of the driver. To use a component, it has to be configured as described under Usage
After that, the new component will be able to be seamlessly invoked from the driver.
Usage
Hyperdrive has to be executed with Spark. Due to Spark-Kafka integration issues, it will only work with Spark 2.3 and higher.
How to run
git clone git@github.com:AbsaOSS/hyperdrive.git
mvn clean package
Given a configuration file has already been created, hyperdrive can be executed as follows:
spark-submit --class za.co.absa.hyperdrive.driver.drivers.PropertiesIngestionDriver driver/target/driver*.jar config.properties
Alternatively, configuration properties can also be passed as command-line arguments
spark-submit --class za.co.absa.hyperdrive.driver.drivers.CommandLineIngestionDriver driver/target/driver*.jar \
component.ingestor=spark \
component.reader=za.co.absa.hyperdrive.ingestor.implementation.reader.kafka.KafkaStreamReader \
# more properties ...
Configuration
The configuration file may be created from the template located at driver/src/resources/Ingestion.properties.template.
CommandLineIngestionDriverDockerTest may be consulted for a working pipeline configuration.
General settings
Pipeline settings
| Property Name | Required | Description |
| :--- | :---: | :--- |
| component.ingestor | Yes | Defines the ingestion pipeline. Only spark is currently supported. |
| component.reader | Yes | Fully qualified name of reader component, e.g.za.co.absa.hyperdrive.ingestor.implementation.reader.kafka.KafkaStreamReader |
| component.transformer.id.{order} | No | An arbitrary but unique string, referenced in this documentation as {transformer-id} |
| component.transformer.class.{transformer-id} | No | Fully qualified name of transformer component, e.g. za.co.absa.hyperdrive.ingestor.implementation.transformer.column.selection.ColumnSelectorStreamTransformer |
| component.writer | Yes | Fully qualified name of writer component, e.g. za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriter |
Multiple transformers can be configured in the pipeline, including multiple instances of the same transformer.
For each transformer instance, component.transformer.id.{order} and component.transformer.class.{transformer-id} have to specified, where {order} and {transformer-id} need to be unique.
In the above table, {order} must be an integer and may be negative. {transformer-id} is only used within the configuration
to identify which configuration options belong to a certain transformer instance.
Spark settings
| Property Name | Required | Description |
| :--- | :---: | :--- |
| ingestor.spark.termination.method | No | Either processAllAvailable (stop query when no more messages are incoming) or awaitTermination (stop query on signal, e.g. Ctrl-C). Default: awaitTermination. See also Combination of trigger and termination method |
| ingestor.spark.await.termination.timeout | No | Timeout in milliseconds. Stops query when timeout is reached. This option is only valid with termination method awaitTermination |
Settings for built-in components
KafkaStreamReader
| Property Name | Required | Description |
| :--- | :---: | :--- |
| reader.kafka.topic | Yes | The name of the kafka topic to ingest data from. Equivalent to Spark property subscribe |
| reader.kafka.brokers | Yes | List of kafka broker URLs . Equivalent to Spark property kafka.bootstrap.servers |
Any additional properties for kafka can be added with the prefix reader.option.. E.g. the property kafka.security.protocol can be added as reader.option.kafka.security.protocol
See e.g. the Structured Streaming + Kafka Integration Guide for optional kafka properties.
ParquetStreamReader
The parquet stream reader infers the schema from parquet files that already exist in the source directory. If no file exists, the reader will fail.
| Property Name | Required | Description |
| :--- | :---: | :--- |
| reader.parquet.source.directory | Yes | Source path for the parquet files. Equivalent to Spark property path for the DataStreamReader |
Any additional properties can be added with the prefix reader.parquet.options.. See Spark Structured Streaming Documentation
ConfluentAvroDecodingTransformer
The ConfluentAvroDecodingTransformer is built on ABRiS. More details about the configuration properties can be found there.
Caution: The ConfluentAvroDecodingTransformer requires the property reader.kafka.topic to be set.
| Property Name | Required | Description |
| :--- | :---: | :--- |
| transformer.{transformer-id}.schema.registry.url | Yes | URL of Schema Registry, e.g. http://localhost:8081. Equivalent to ABRiS property SchemaManager.PARAM_SCHEMA_REGISTRY_URL |
| transformer.{transformer-id}.value.schema.id | Yes | The schema id. Use `latest
