ABRiS
Avro SerDe for Apache Spark structured APIs.
Install / Use
/learn @AbsaOSS/ABRiSREADME
ABRiS - Avro Bridge for Spark
-
Pain free Spark/Avro integration.
-
Seamlessly integrate with Confluent platform, including Schema Registry with all available naming strategies and schema evolution.
-
Seamlessly convert your Avro records from anywhere (e.g. Kafka, Parquet, HDFS, etc) into Spark Rows.
-
Convert your Dataframes into Avro records without even specifying a schema.
-
Go back-and-forth Spark Avro (since Spark 2.4).
Coordinates for Maven POM dependency
| Scala | Abris |
|:------:|:-------:|
| 2.11 | |
| 2.12 |
|
| 2.13 |
|
Supported versions
| Abris | Spark | Scala | |:-----: |:-------------:|:-----: | | 6.2.0 - 6.x.x | 3.2.1 - 3.5.x | 2.12 / 2.13 | | 6.0.0 - 6.1.1 | 3.2.0 | 2.12 / 2.13 | | 5.0.0 - 5.x.x | 3.0.x / 3.1.x | 2.12 | | 5.0.0 - 5.x.x | 2.4.x | 2.11 / 2.12 |
From version 6.0.0, ABRiS only supports Spark 3.2.x.
ABRiS 5.0.x is still supported for older versions of Spark (see branch-5)
Older Versions
This is documentation for Abris version 6. Documentation for older versions is located in corresponding branches: branch-5, branch-4, branch-3.2.
Confluent Schema Registry Version
Abris by default uses Confluent client version 6.2.0.
Installation
Abris needs spark-avro to run, make sure you include the spark-avro dependency when using Abris.
The version of spark-avro and Spark should be identical.
Example: submitting a Spark job:
./bin/spark-submit \
--packages org.apache.spark:spark-avro_2.12:3.5.0,za.co.absa:abris_2.12:6.4.0 \
...rest of submit params...
Example: using Abris in maven project:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.5.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.12</artifactId>
<version>3.5.0</version> <!-- version must be the same as Spark -->
</dependency>
<dependency>
<groupId>za.co.absa</groupId>
<artifactId>abris_2.12</artifactId>
<version>6.4.0</version>
</dependency>
Example: using Abris in SBT project:
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.5.0" % Provided,
"org.apache.spark" %% "spark-avro" % "3.5.0",
"za.co.absa" %% "abris" % "6.4.0"
)
Usage
ABRiS API is in it's most basic form almost identical to Spark built-in support for Avro, but it provides additional functionality. Mainly it's support of schema registry and also seamless integration with confluent Avro data format.
The API consists of two Spark SQL expressions (to_avro and from_avro) and fluent configurator (AbrisConfig)
Using the configurator you can choose from four basic config types:
toSimpleAvro,toConfluentAvro,fromSimpleAvroandfromConfluentAvro
And configure what you want to do, mainly how to get the avro schema.
Example of usage:
val abrisConfig = AbrisConfig
.fromConfluentAvro
.downloadReaderSchemaByLatestVersion
.andTopicNameStrategy("topic123")
.usingSchemaRegistry("http://localhost:8081")
import za.co.absa.abris.avro.functions.from_avro
val deserialized = dataFrame.select(from_avro(col("value"), abrisConfig) as 'data)
Detailed instructions for many use cases are in separated documents:
- How to use Abris with vanilla avro (with examples)
- How to use Abris with Confluent avro (with examples)
- How to use Abris in Python (with examples)
Full runnable examples can be found in the za.co.absa.abris.examples package. You can also take a look at unit tests in package za.co.absa.abris.avro.sql.
IMPORTANT: Spark dependencies have provided scope in the pom.xml, so when running the examples, please make sure that you either, instruct your IDE to include dependencies with
provided scope, or change the scope directly.
Confluent Avro format
The format of Avro binary data is defined in Avro specification. Confluent format extends it and prepends the schema id before the actual record. The Confluent expressions in this library expect this format and add the id after the Avro data are generated or remove it before they are parsed.
You can find more about Confluent and Schema Registry in Confluent documentation.
Schema Registry security and other additional settings
Only Schema registry client setting that is mandatory is the url, but if you need to provide more the configurer allows you to provide a whole map.
For example, you may want to provide basic.auth.user.info and basic.auth.credentials.source required for user authentication.
You can do it this way:
val registryConfig = Map(
AbrisConfig.SCHEMA_REGISTRY_URL -> "http://localhost:8081",
"basic.auth.credentials.source" -> "USER_INFO",
"basic.auth.user.info" -> "srkey:srvalue"
)
val abrisConfig = AbrisConfig
.fromConfluentAvro
.downloadReaderSchemaByLatestVersion
.andTopicNameStrategy("topic123")
.usingSchemaRegistry(registryConfig) // use the map instead of just url
Other Features
Generating Avro schema from Spark data frame column
There is a helper method that allows you to generate schema automatically from spark column. Assuming you have a data frame containing column "input". You can generate schema for data in that column like this:
val schema = AvroSchemaUtils.toAvroSchema(dataFrame, "input")
Using schema manager to directly download or register schema
You can use SchemaManager directly to do operations with schema registry. The configuration is identical to Schema Registry Client. The SchemaManager is just a wrapper around the client providing helpful methods and abstractions.
val schemaRegistryClientConfig = Map( ...configuration... )
val schemaManager = SchemaManagerFactory.create(schemaRegistryClientConfig)
// Downloading schema:
val schema = schemaManager.getSchemaById(42)
// Registering schema:
val schemaString = "{...avro schema json...}"
val subject = SchemaSubject.usingTopicNameStrategy("fooTopic")
val schemaId = schemaManager.register(subject, schemaString)
// and more, check SchemaManager's methods
De-serialisation Error Handling
There are 2 ways ABRiS handles de-serialisation errors:
FailFast (Default)
Given no provided de-serialisation handler, a failure will result in a spark exception being thrown and with the error being outputted. This is the default procedure.
SpecificRecordHandler
The second option requires providing a default record that will be outputted in the event of a failure. This should be used as a flag to be deleted outside ABRiS that should mean the spark job will not stop. Beware however, a null or empty record will also result in an error so a record with a different input should be chosen.
This can be provided as such:
val abrisConfig = AbrisConfig
.fromConfluentAvro
.downloadReaderSchemaByLatestVersion
.andTopicNameStrategy("topic123")
.usingSchemaRegistry(registryConfig)
.withSchemaConverter("custom")
.withExceptionHandler(new SpecificRecordExceptionHandler(providedDefaultRecord))
This is only for confluent-based configuration, not for standard avro.
PermissiveRecordExceptionHandler
The third option is to use the PermissiveRecordExceptionHandler. In case of a deserialization failure, this handler replaces the problematic record with a fully null record, instead of throwing an exception. This allows the data processing pipeline to continue without interruption.
The main use case for this option is when you want to prioritize continuity of processing over individual record integrity. It's especially useful when dealing with large datasets where occasional malformed records could be tolerated.
Here's how to use it:
val abrisConfig = AbrisConfig
.fromConfluentAvro
.downloadReaderSchemaByLatestVersion
.andTopicNameStrategy("topic123")
.usingSchemaRegistry(registryConfig)
.withSchemaConverter("custom")
.withExceptionHandler(new PermissiveRecordExceptionHandler())
With this configuration, in the event of a deserialization error, the PermissiveRecordExceptionHandler will log a warning, substitute the malformed record with a fully null one, and allow the data processing pipeline to continue.
Data Conversions
This library also provides convenient methods to convert between Avro and Spark schemas.
If you have an Avro schema which you want to convert into a Spark SQL one - to generate your Dataframes, for instance - you can do as follows:
val avroSchema: Schema = AvroSchemaUtils.load("path_to_avro_schema")
val sqlSchema: StructType = SparkAvroConversions.toSqlType(avroSchema)
You can also do the inverse operation by running:
val sqlSchema = new StructType(new StructField ....
val avroSchema = SparkAvroConversions.toAv
Related Skills
node-connect
350.8kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
frontend-design
110.4kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
openai-whisper-api
350.8kTranscribe audio via OpenAI Audio Transcriptions API (Whisper).
qqbot-media
350.8kQQBot 富媒体收发能力。使用 <qqmedia> 标签,系统根据文件扩展名自动识别类型(图片/语音/视频/文件)。
