Sparkql
sparkql: Apache Spark SQL DataFrame schema management for sensible humans
Install / Use
/learn @mattjw/SparkqlREADME
sparkql ✨
Python Spark SQL DataFrame schema management for sensible humans, with no dependencies aside from pyspark.
Don't sweat it... sparkql it ✨
Why use sparkql
sparkql takes the pain out of working with DataFrame schemas in PySpark.
It makes schema definition more Pythonic. And it's
particularly useful you're dealing with structured data.
In plain old PySpark, you might find that you write schemas like this:
CITY_SCHEMA = StructType()
CITY_NAME_FIELD = "name"
CITY_SCHEMA.add(StructField(CITY_NAME_FIELD, StringType(), False))
CITY_LAT_FIELD = "latitude"
CITY_SCHEMA.add(StructField(CITY_LAT_FIELD, FloatType()))
CITY_LONG_FIELD = "longitude"
CITY_SCHEMA.add(StructField(CITY_LONG_FIELD, FloatType()))
CONFERENCE_SCHEMA = StructType()
CONF_NAME_FIELD = "name"
CONFERENCE_SCHEMA.add(StructField(CONF_NAME_FIELD, StringType(), False))
CONF_CITY_FIELD = "city"
CONFERENCE_SCHEMA.add(StructField(CONF_CITY_FIELD, CITY_SCHEMA))
And then plain old PySpark makes you deal with nested fields like this:
dframe.withColumn("city_name", df[CONF_CITY_FIELD][CITY_NAME_FIELD])
Instead, with sparkql, schemas become a lot
more literate:
class City(Struct):
name = String(nullable=False)
latitude = Float()
longitude = Float()
class Conference(Struct):
name = String(nullable=False)
city = City()
As does dealing with nested fields:
dframe.withColumn("city_name", Conference.city.name.COL)
Here's a summary of sparkql's features.
- ORM-like class-based Spark schema definitions.
- Automated field naming: The attribute name of a field as it appears
in its
Structis (by default) used as its field name. This name can be optionally overridden. - Programatically reference nested fields in your structs with the
PATHandCOLspecial properties. Avoid hand-constructing strings (orColumns) to reference your nested fields. - Validate that a DataFrame matches a
sparkqlschema. - Reuse and build composite schemas with
inheritance,includes, andimplements. - Get a human-readable Spark schema representation with
pretty_schema. - Create an instance of a schema as a dictionary, with validation of the input values.
Read on for documentation on these features.
Defining a schema
Each Spark atomic type has a counterpart sparkql field:
| PySpark type | sparkql field |
|---|---|
| ByteType | Byte |
| IntegerType | Integer |
| LongType | Long |
| ShortType | Short |
| DecimalType | Decimal |
| DoubleType | Double |
| FloatType | Float |
| StringType | String |
| BinaryType | Binary |
| BooleanType | Boolean |
| DateType | Date |
| TimestampType | Timestamp |
Array (counterpart to ArrayType in PySpark) allows the definition
of arrays of objects. By creating a subclass of Struct, we can
define a custom class that will be converted to a StructType.
For
example,
given the sparkql schema definition:
from sparkql import Struct, String, Array
class Article(Struct):
title = String(nullable=False)
tags = Array(String(), nullable=False)
comments = Array(String(nullable=False))
Then we can build the equivalent PySpark schema (a StructType)
with:
from sparkql import schema
pyspark_struct = schema(Article)
Pretty printing the schema with the expression
sparkql.pretty_schema(pyspark_struct) will give the following:
StructType([
StructField('title', StringType(), False),
StructField('tags',
ArrayType(StringType(), True),
False),
StructField('comments',
ArrayType(StringType(), False),
True)])
Features
Many examples of how to use sparkql can be found in
examples.
Automated field naming
By default, field names are inferred from the attribute name in the struct they are declared.
For example, given the struct
class Geolocation(Struct):
latitude = Float()
longitude = Float()
the concrete name of the Geolocation.latitude field is latitude.
Names also be overridden by explicitly specifying the field name as an argument to the field
class Geolocation(Struct):
latitude = Float(name="lat")
longitude = Float(name="lon")
which would mean the concrete name of the Geolocation.latitude field
is lat.
Field paths and nested objects
Referencing fields in nested data can be a chore. sparkql simplifies this
with path referencing.
For example, if we have a schema with nested objects:
class Address(Struct):
post_code = String()
city = String()
class User(Struct):
username = String(nullable=False)
address = Address()
class Comment(Struct):
message = String()
author = User(nullable=False)
class Article(Struct):
title = String(nullable=False)
author = User(nullable=False)
comments = Array(Comment())
We can use the special PATH property to turn a path into a
Spark-understandable string:
author_city_str = Article.author.address.city.PATH
"author.address.city"
COL is a counterpart to PATH that returns a Spark Column
object for the path, allowing it to be used in all places where Spark
requires a column.
Function equivalents path_str, path_col, and name are also available.
This table demonstrates the equivalence of the property styles and the function
styles:
| Property style | Function style | Result (both styles are equivalent) |
| --- | --- | --- |
| Article.author.address.city.PATH | sparkql.path_str(Article.author.address.city) | "author.address.city" |
| Article.author.address.city.COL | sparkql.path_col(Article.author.address.city) | Column pointing to author.address.city |
| Article.author.address.city.NAME | sparkql.name(Article.author.address.city) | "city" |
For paths that include an array, two approaches are provided:
comment_usernames_str = Article.comments.e.author.username.PATH
"comments.author.username"
comment_usernames_str = Article.comments.author.username.PATH
"comments.author.username"
Both give the same result. However, the former (e) is more
type-oriented. The e attribute corresponds to the array's element
field. Although this looks strange at first, it has the advantage of
being inspectable by IDEs and other tools, allowing goodness such as
IDE auto-completion, automated refactoring, and identifying errors
before runtime.
Field metadata
Field metadata can be specified with the metadata argument to a field, which accepts a dictionary
of key-value pairs.
class Article(Struct):
title = String(nullable=False,
metadata={"description": "The title of the article", "max_length": 100})
The metadata can be accessed with the METADATA property of the field:
Article.title.METADATA
{"description": "The title of the article", "max_length": 100}
DataFrame validation
Struct method validate_data_frame will verify if a given DataFrame's
schema matches the Struct.
For example,
if we have our Article
struct and a DataFrame we want to ensure adheres to the Article
schema:
dframe = spark_session.createDataFrame([{"title": "abc"}])
class Article(Struct):
title = String()
body = String()
Then we can can validate with:
validation_result = Article.validate_data_frame(dframe)
validation_result.is_valid indicates whether the DataFrame is valid
(False in this case), and validation_result.report is a
human-readable string describing the differences:
Struct schema...
StructType([
StructField('title', StringType(), True),
StructField('body', StringType(), True)])
DataFrame schema...
StructType([
StructField('title', StringType(), True)])
Diff of struct -> data frame...
StructType([
- StructField('title', StringType(), True)])
+ StructField('title', StringType(), True),
+ StructField('body', StringType(), True)])
For convenience,
Article.validate_data_frame(dframe).raise_on_invalid()
will raise a InvalidDataFrameError (see sparkql.exceptions) if the
DataFrame is not valid.
Creating an instance of a schema
sparkql simplifies the process of creating an instance of a struct.
You might need to do this, for example, when creating test data, or
when creating an object (a dict or a row) to return from a UDF.
Use Struct.make_dict(...) to instantiate a struct as a dictionary.
This has the advantage that the input values will be correctly
validated, and it will convert schema property names into their
underlying field names.
For example, given some simple Structs:
class User(Struct):
id = Integer(name="user_id", nullable=False)
username = String()
class Article(Struc
Related Skills
feishu-drive
350.1k|
things-mac
350.1kManage Things 3 via the `things` CLI on macOS (add/update projects+todos via URL scheme; read/search/list from the local Things database)
clawhub
350.1kUse the ClawHub CLI to search, install, update, and publish agent skills from clawhub.com
postkit
PostgreSQL-native identity, configuration, metering, and job queues. SQL functions that work with any language or driver
