SkillAgentSearch skills...

SparkORM

ORM for Apache Spark and DataFrames schema manager

Install / Use

/learn @asuiu/SparkORM
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

SparkORM ✨

PyPI version License: MIT

Python Spark SQL & DataFrame schema management and basic Object Relational Mapping.

Why use SparkORM

SparkORM takes the pain out of working with DataFrame schemas in PySpark. It makes schema definition more Pythonic. And it's particularly useful you're dealing with structured data.

In plain old PySpark, you might find that you write schemas like this:

CITY_SCHEMA = StructType()
CITY_NAME_FIELD = "name"
CITY_SCHEMA.add(StructField(CITY_NAME_FIELD, StringType(), False))
CITY_LAT_FIELD = "latitude"
CITY_SCHEMA.add(StructField(CITY_LAT_FIELD, FloatType()))
CITY_LONG_FIELD = "longitude"
CITY_SCHEMA.add(StructField(CITY_LONG_FIELD, FloatType()))

CONFERENCE_SCHEMA = StructType()
CONF_NAME_FIELD = "name"
CONFERENCE_SCHEMA.add(StructField(CONF_NAME_FIELD, StringType(), False))
CONF_CITY_FIELD = "city"
CONFERENCE_SCHEMA.add(StructField(CONF_CITY_FIELD, CITY_SCHEMA))

And then plain old PySpark makes you deal with nested fields like this:

dframe.withColumn("city_name", df[CONF_CITY_FIELD][CITY_NAME_FIELD])

Instead, with SparkORM, schemas become a lot more literate:

class City(Struct):
    name = String()
    latitude = Float()
    longitude = Float()
    date_created = Date()

class Conference(TableModel):
    class Meta:
        name = "conference_table"
    name = String(nullable=False)
    city = City()

class LocalConferenceView(ViewModel):
    class Meta:
        name = "city_table"

Conference(spark).create()

Conference(spark).ensure_exists()  # Creates the table, and if it already exists - validates the scheme and throws an exception if it doesn't match

LocalConferenceView(spark).create_or_replace(select_statement=f"SELECT * FROM {Conference.get_name()}")

Conference(spark).insert([("Bucharest", 44.4268, 26.1025, date(2020, 1, 1))])

Conference(spark).drop()

As does dealing with nested fields:

dframe.withColumn("city_name", Conference.city.name.COL)

Here's a summary of SparkORM's features.

  • ORM-like class-based Spark schema definitions.
  • Automated field naming: The attribute name of a field as it appears in its Struct is (by default) used as its field name. This name can be optionally overridden.
  • Programatically reference nested fields in your structs with the PATH and COL special properties. Avoid hand-constructing strings (or Columns) to reference your nested fields.
  • Validate that a DataFrame matches a SparkORM schema.
  • Reuse and build composite schemas with inheritance, includes, and implements.
  • Get a human-readable Spark schema representation with pretty_schema.
  • Create an instance of a schema as a dictionary, with validation of the input values.

Read on for documentation on these features.

Defining a schema

Each Spark atomic type has a counterpart SparkORM field:

| PySpark type | SparkORM field | |---|---| | ByteType | Byte | | IntegerType | Integer | | LongType | Long | | ShortType | Short | | DecimalType | Decimal | | DoubleType | Double | | FloatType | Float | | StringType | String | | BinaryType | Binary | | BooleanType | Boolean | | DateType | Date | | TimestampType | Timestamp |

Array (counterpart to ArrayType in PySpark) allows the definition of arrays of objects. By creating a subclass of Struct, we can define a custom class that will be converted to a StructType.

For example, given the SparkORM schema definition:

from SparkORM import TableModel, String, Array

class Article(TableModel):
    title = String(nullable=False)
    tags = Array(String(), nullable=False)
    comments = Array(String(nullable=False))

Then we can build the equivalent PySpark schema (a StructType) with:


pyspark_struct = Article.get_schema()

Pretty printing the schema with the expression SparkORM.pretty_schema(pyspark_struct) will give the following:

StructType([
    StructField('title', StringType(), False),
    StructField('tags',
        ArrayType(StringType(), True),
        False),
    StructField('comments',
        ArrayType(StringType(), False),
        True)])

Features

Many examples of how to use SparkORM can be found in examples.

ORM-like class-based schema definitions

The SparkORM table schema definition is based on classes. Each column is a class and accepts a number of arguments that will be used to generate the schema.

The following arguments are supported:

  • nullable - if the column is nullable or not (default: True)
  • name - the name of the column (default: the name of the attribute)
  • comment - the comment of the column (default: None)
  • auto_increment - if the column is auto incremented or not (default: False) Note: applicable only for Long columns
  • sql_modifiers - the SQL modifiers of the column (default: None)
  • partitioned_by - if the column is partitioned by or not (default: False)

Examples:

class City(TableModel):
    name = String(nullable=False)
    latitude = Long(auto_increment=True) # auto_increment is a special property that will generate a unique value for each row
    longitude = Float(comment="Some comment")
    date_created = Date(sql_modifiers="GENERATED ALWAYS AS (CAST(birthDate AS DATE))") # sql_modifiers will be added to the CREATE clause for the column
    birthDate = Date(nullable=False, partitioned_by=True) # partitioned_by is a special property that will generate a partitioned_by clause for the column

Automated field naming

By default, field names are inferred from the attribute name in the struct they are declared.

For example, given the struct

class Geolocation(TableModel):
    latitude = Float()
    longitude = Float()

the concrete name of the Geolocation.latitude field is latitude.

Names also be overridden by explicitly specifying the field name as an argument to the field

class Geolocation(TableModel):
    latitude = Float(name="lat")
    longitude = Float(name="lon")

which would mean the concrete name of the Geolocation.latitude field is lat.

Field paths and nested objects

Referencing fields in nested data can be a chore. SparkORM simplifies this with path referencing.

For example, if we have a schema with nested objects:

class Address(Struct):
    post_code = String()
    city = String()


class User(Struct):
    username = String(nullable=False)
    address = Address()


class Comment(Struct):
    message = String()
    author = User(nullable=False)


class Article(TableModel):
    title = String(nullable=False)
    author = User(nullable=False)
    comments = Array(Comment())

We can use the special PATH property to turn a path into a Spark-understandable string:

author_city_str = Article.author.address.city.PATH
"author.address.city"

COL is a counterpart to PATH that returns a Spark Column object for the path, allowing it to be used in all places where Spark requires a column.

Function equivalents path_str, path_col, and name are also available. This table demonstrates the equivalence of the property styles and the function styles:

| Property style | Function style | Result (both styles are equivalent) | | --- | --- | --- | | Article.author.address.city.PATH | SparkORM.path_str(Article.author.address.city) | "author.address.city" | | Article.author.address.city.COL | SparkORM.path_col(Article.author.address.city) | Column pointing to author.address.city | | Article.author.address.city.NAME | SparkORM.name(Article.author.address.city) | "city" |

For paths that include an array, two approaches are provided:

comment_usernames_str = Article.comments.e.author.username.PATH
"comments.author.username"

comment_usernames_str = Article.comments.author.username.PATH
"comments.author.username"

Both give the same result. However, the former (e) is more type-oriented. The e attribute corresponds to the array's element field. Although this looks strange at first, it has the advantage of being inspectable by IDEs and other tools, allowing goodness such as IDE auto-completion, automated refactoring, and identifying errors before runtime.

Field metadata

Field metadata can be specified with the metadata argument to a field, which accepts a dictionary of key-value pairs.

class Article(TableModel):
    title = String(nullable=False,
                   metadata={"description": "The title of the article", "max_length": 100})

The metadata can be accessed with the METADATA property of the field:

Article.title.METADATA
{"description": "The title of the article", "max_length": 100}

DataFrame validation

Struct method validate_data_frame will verify if a given DataFrame's schema matches the Struct. For example, if we have our Article struct and a DataFrame we want to ensure adheres to the Article schema:

dframe = spark_session.createDataFrame([{"title": "abc"}])

class Article(TableModel):
    title = String()
    body = String()

Then we can can validate with:

validation_re
View on GitHub
GitHub Stars16
CategoryData
Updated1mo ago
Forks3

Languages

Python

Security Score

95/100

Audited on Feb 20, 2026

No findings