Pysparkdt
An open-source Python library for simplifying local testing of Databricks workflows that use PySpark and Delta tables.
Install / Use
/learn @datamole-ai/PysparkdtREADME
pysparkdt (PySpark Delta Testing)
<p align="center"> <a href="https://pypi.org/project/pysparkdt"> <img src="https://img.shields.io/pypi/pyversions/pysparkdt.svg?color=%2334D058" alt="Supported Python versions"> </a> <a href="https://pypi.org/project/pysparkdt" target="_blank"> <img src="https://img.shields.io/pypi/v/pysparkdt?color=%2334D058&label=pypi%20package" alt="Package version"> </a> <a href="https://pypi.org/project/pysparkdt"> <img alt="PyPI - Downloads" src="https://img.shields.io/pypi/dm/pysparkdt.svg?label=PyPI%20downloads"> </a> <a href="https://github.com/astral-sh/ruff"> <img alt="Ruff" src="https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json"> </a> </p>An open-source Python library for simplifying local testing of Databricks workflows using PySpark and Delta tables.
This library enables seamless testing of PySpark processing logic outside Databricks by emulating Unity Catalog behavior. It dynamically generates a local metastore to mimic Unity Catalog and supports simplified handling of Delta tables for both batch and streaming workloads.
Guideline
Table of Contents
Overview
Scope
This guideline helps you test Databricks Python pipelines with a focus on PySpark code. While basic unit testing knowledge with pytest is helpful, it's not the central focus.
Key Points
-
Standalone Testing: The setup allows you to test code without Databricks access, enabling easy CI integration.
-
Local Metastore: Mimic the Databricks Unity Catalog using a dynamically generated local metastore with local Delta tables.
-
Code Testability: Move core processing logic from notebooks to Python modules. Notebooks then serve as entrypoints.
Setup
In the following section we will assume that you are creating tests for a job which has one delta table on input and produces one delta table on output. It utilizes PySpark for its processing.
1. Installation
Install pysparkdt
- Get this package from the pypi. It's only needed in your test environment.
pip install pysparkdt
2. Testable code
-
Modularization: Move processing logic from notebooks to modules.
-
Notebook Role: Notebooks primarily handle initialization and triggering processing. They should contain all the code specific to Databricks (e.g.
dbutilsusage)
# Databricks notebook source
import sys
from pathlib import Path
MODULE_DIR = Path.cwd().parent
sys.path.append(MODULE_DIR.as_posix())
# COMMAND ----------
from myjobpackage.processing import process_data
# COMMAND ----------
input_table = dbutils.widgets.get('input_table')
output_table = dbutils.widgets.get('output_table')
# COMMAND ----------
process_data(
spark=spark,
input_table=input_table,
output_table=output_table,
)
myjobpackage.processing
- Contains the core logic to test
- Our test focuses on the core function
myjobpackage.processing.process_data
3. File structure
myjobpackage
├── __init__.py
├── entrypoint.py # Databricks Notebook
└── processing.py
tests
├── __init__.py
├── test_processing.py
└── data
└── tables
├── example_input.ndjson
├── expected_output.ndjson
└── schema
├── example_input.json
└── expected_output.json
Data Format
- Test Data: Newline-delimited JSON (
.ndjson) - Optional Schema: JSON
- If present, full schema must be provided (all columns included).
- The format of the schema file is defined by PySpark StructType JSON representation.
{"id": 0, "time_utc": "2024-01-08T11:00:00", "name": "Jorge", "feature": 0.5876}
{"id": 1, "time_utc": "2024-01-11T14:28:00", "name": "Ricardo", "feature": 0.42}
<div align="center">
<strong>example_input.json</strong>
</div>
{
"type": "struct",
"fields":
[
{
"name": "id",
"type": "long",
"nullable": false,
"metadata": {}
},
{
"name": "time_utc",
"type": "timestamp",
"nullable": false,
"metadata": {}
},
{
"name": "name",
"type": "string",
"nullable": true,
"metadata": {}
},
{
"name": "feature",
"type": "double",
"nullable": true,
"metadata": {}
}
]
}
Tip: A schema file for a loaded PySpark DataFrame df can be created using:
with(open('example_input.json', 'w')) as file:
file.write(json.dumps(df.schema.jsonValue(), indent=4))
Thus, you can first load a table without a schema, then create schema file from it and modify the types to the desired one.
4. Tests
Constants: Define paths for test data and the temporary metastore.
DATA_DIR = f'{os.path.dirname(__file__)}/data'
JSON_TABLES_DIR = f'{DATA_DIR}/tables'
TMP_DIR = f'{DATA_DIR}/tmp'
METASTORE_DIR = f'{TMP_DIR}/metastore'
Spark Fixture: Define fixture for the local spark session using
spark_base function from the testing package. Specify the temporal metastore
location.
from pytest import fixture
from pysparkdt import spark_base
@fixture(scope='module')
def spark():
yield from spark_base(METASTORE_DIR)
Metastore Initialization: Use reinit_local_metastore
At the beginning of your test method call reinit_local_metastore function
from the testing package to initialize the metastore with the tables from
your json folder (JSON_TABLES_DIR). You can also choose to enable or disable
deletion vectors for Delta tables (default: enabled). If the method is called
while the metastore already exists, it will delete all the existing tables
before initializing the new ones.
Alternatively, you can call this method only once per testing module, but then individual testing methods might affect each other by modifying metastore tables.
from myjobpackage.processing import process_data
from pysparkdt import reinit_local_metastore
from pyspark.testing import assertDataFrameEqual
def test_process_data(
spark: SparkSession,
):
reinit_local_metastore(spark, JSON_TABLES_DIR, deletion_vectors=True)
process_data(
spark=spark,
input_table='example_input',
output_table='output',
)
output = spark.read.format('delta').table('output')
expected = spark.read.format('delta').table('expected_output')
assertDataFrameEqual(
actual=output.select(sorted(output.columns)),
expected=expected.select(sorted(expected.columns)),
)
In the example above, we use assertDataFrameEqual to compare PySpark
DataFrames. We ensure the columns are ordered so that the order of result
columns does not matter. By default, the order of rows does not matter in
assertDataFrameEqual (this can be adjusted using the checkRowOrder
parameter).
ℹ️ For complete example, please look at example.
⚠️ Manual deletion of local metastore
Deleting the local metastore manually invalidates any Spark session configured
for that location. You would need to start a new Spark session because
the original session’s state is no longer valid. Avoid manual deletion —
use reinit_local_metastore for reinitialization instead.
⚠️ Note on running tests in parallel
With the setup above, the metastore is shared on the module scope. Therefore, if tests defined in the same module are run in parallel, race conditions can occur if multiple test functions use the same tables.
To mitigate this, make sure each test in the module uses its own set of tables.
Advanced
Testing Stream Processing
Let's now focus on a case where a job is reading input delta table using PySpark streaming, performing some computation on the data and saving it to the output delta table.
In order to be able to test the processing we need to explicitly wait for its completion. The best way to do it is to await the streaming function performing the processing.
To be able to await the streaming function, the test function needs to have access to it. Thus, we need to make sure the streaming function (query in Databricks terms) is accessible - for example by returning it by the processing function.
<div align="center"> <strong>myjobpackage/processing.py</strong> </div>def process_data(
spark: SparkSession,
input_table: str,
output_table: str,
checkpoint_location: str,
) -> StreamingQuery:
load_query = spark.readStream.format('delta').table(input_table)
def process_batch(df: pyspark.sql.DataFrame, _) -> None:
... process df ...
df.write.mode('append').format('delta').saveAsTable(output_table)
return (
load_query.writeStream.format('delta')
.foreachBatch(process_batch)
.trigger(availableNow=True)
.option('checkpointLocation', checkpoint_location)
.start()
)
<div align="center">
<strong>mRelated Skills
gh-issues
349.7kFetch GitHub issues, spawn sub-agents to implement fixes and open PRs, then monitor and address PR review comments. Usage: /gh-issues [owner/repo] [--label bug] [--limit 5] [--milestone v1.0] [--assignee @me] [--fork user/repo] [--watch] [--interval 5] [--reviews-only] [--cron] [--dry-run] [--model glm-5] [--notify-channel -1002381931352]
node-connect
349.7kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
claude-opus-4-5-migration
109.7kMigrate prompts and code from Claude Sonnet 4.0, Sonnet 4.5, or Opus 4.1 to Opus 4.5
frontend-design
109.7kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
