Jodie
Delta lake and filesystem helper methods
Install / Use
/learn @mrpowers-io/JodieREADME
jodie
This library provides helpful Delta Lake and filesystem utility functions.

Accessing the library
Fetch the JAR file from Maven.
libraryDependencies += "com.github.mrpowers" %% "jodie" % "0.0.3"
You can find the spark-daria releases for different Scala versions:
Delta Helpers
Type 2 SCDs
This library provides an opinionated, conventions over configuration, approach to Type 2 SCD management. Let's look at an example before covering the conventions required to take advantage of the functionality.
Suppose you have the following SCD table with the pkey primary key:
+----+-----+-----+----------+-------------------+--------+
|pkey|attr1|attr2|is_current| effective_time|end_time|
+----+-----+-----+----------+-------------------+--------+
| 1| A| A| true|2019-01-01 00:00:00| null|
| 2| B| B| true|2019-01-01 00:00:00| null|
| 4| D| D| true|2019-01-01 00:00:00| null|
+----+-----+-----+----------+-------------------+--------+
You'd like to perform an upsert with this data:
+----+-----+-----+-------------------+
|pkey|attr1|attr2| effective_time|
+----+-----+-----+-------------------+
| 2| Z| null|2020-01-01 00:00:00| // upsert data
| 3| C| C|2020-09-15 00:00:00| // new pkey
+----+-----+-----+-------------------+
Here's how to perform the upsert:
Type2Scd.upsert(deltaTable, updatesDF, "pkey", Seq("attr1", "attr2"))
Here's the table after the upsert:
+----+-----+-----+----------+-------------------+-------------------+
|pkey|attr1|attr2|is_current| effective_time| end_time|
+----+-----+-----+----------+-------------------+-------------------+
| 2| B| B| false|2019-01-01 00:00:00|2020-01-01 00:00:00|
| 4| D| D| true|2019-01-01 00:00:00| null|
| 1| A| A| true|2019-01-01 00:00:00| null|
| 3| C| C| true|2020-09-15 00:00:00| null|
| 2| Z| null| true|2020-01-01 00:00:00| null|
+----+-----+-----+----------+-------------------+-------------------+
You can leverage the upsert code if your SCD table meets these requirements:
- Contains a unique primary key column
- Any change in an attribute column triggers an upsert
- SCD logic is exposed via
effective_time,end_timeandis_currentcolumn
merge logic can get really messy, so it's easiest to follow these conventions. See this blog post if you'd like to build a SCD with custom logic.
Kill Duplicates
The function killDuplicateRecords deletes all the duplicated records from a table given a set of columns.
Suppose you have the following table:
+----+---------+---------+
| id|firstname| lastname|
+----+---------+---------+
| 1| Benito| Jackson| # duplicate
| 2| Maria| Willis|
| 3| Jose| Travolta| # duplicate
| 4| Benito| Jackson| # duplicate
| 5| Jose| Travolta| # duplicate
| 6| Maria| Pitt|
| 9| Benito| Jackson| # duplicate
+----+---------+---------+
We can Run the following function to remove all duplicates:
DeltaHelpers.killDuplicateRecords(
deltaTable = deltaTable,
duplicateColumns = Seq("firstname","lastname")
)
The result of running the previous function is the following table:
+----+---------+---------+
| id|firstname| lastname|
+----+---------+---------+
| 2| Maria| Willis|
| 6| Maria| Pitt|
+----+---------+---------+
Remove Duplicates
The functions removeDuplicateRecords deletes duplicates but keeps one occurrence of each record that was duplicated.
There are two versions of that function, lets look an example of each,
Let’s see an example of how to use the first version:
Suppose you have the following table:
+----+---------+---------+
| id|firstname| lastname|
+----+---------+---------+
| 2| Maria| Willis|
| 3| Jose| Travolta|
| 4| Benito| Jackson|
| 1| Benito| Jackson| # duplicate
| 5| Jose| Travolta| # duplicate
| 6| Maria| Willis| # duplicate
| 9| Benito| Jackson| # duplicate
+----+---------+---------+
We can Run the following function to remove all duplicates:
DeltaHelpers.removeDuplicateRecords(
deltaTable = deltaTable,
duplicateColumns = Seq("firstname","lastname")
)
The result of running the previous function is the following table:
+----+---------+---------+
| id|firstname| lastname|
+----+---------+---------+
| 2| Maria| Willis|
| 3| Jose| Travolta|
| 4| Benito| Jackson|
+----+---------+---------+
Now let’s see an example of how to use the second version:
Suppose you have a similar table:
+----+---------+---------+
| id|firstname| lastname|
+----+---------+---------+
| 2| Maria| Willis|
| 3| Jose| Travolta| # duplicate
| 4| Benito| Jackson| # duplicate
| 1| Benito| Jackson| # duplicate
| 5| Jose| Travolta| # duplicate
| 6| Maria| Pitt|
| 9| Benito| Jackson| # duplicate
+----+---------+---------+
This time the function takes an additional input parameter, a primary key that will be used to sort the duplicated records in ascending order and remove them according to that order.
DeltaHelpers.removeDuplicateRecords(
deltaTable = deltaTable,
primaryKey = "id",
duplicateColumns = Seq("firstname","lastname")
)
The result of running the previous function is the following:
+----+---------+---------+
| id|firstname| lastname|
+----+---------+---------+
| 1| Benito| Jackson|
| 2| Maria| Willis|
| 3| Jose| Travolta|
| 6| Maria| Pitt|
+----+---------+---------+
These functions come in handy when you are doing data cleansing.
Copy Delta Table
This function takes an existing delta table and makes a copy of all its data, properties, and partitions to a new delta table. The new table could be created based on a specified path or just a given table name.
Copying does not include the delta log, which means that you will not be able to restore the new table to an old version of the original table.
Here's how to perform the copy to a specific path:
DeltaHelpers.copyTable(deltaTable = deltaTable, targetPath = Some(targetPath))
Here's how to perform the copy using a table name:
DeltaHelpers.copyTable(deltaTable = deltaTable, targetTableName = Some(tableName))
Note the location where the table will be stored in this last function call
will be based on the spark conf property spark.sql.warehouse.dir.
Validate append
The validateAppend function provides a mechanism for allowing some columns for schema evolution, but rejecting appends with columns that aren't specificly allowlisted.
Suppose you have the following Delta table:
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 2| b| B|
| 1| a| A|
+----+----+----+
Here's an appender function that wraps validateAppend:
DeltaHelpers.validateAppend(
deltaTable = deltaTable,
appendDF = appendDf,
requiredCols = List("col1", "col2"),
optionalCols = List("col4")
)
You can append the following DataFrame that contains the required columns and the optional columns:
+----+----+----+
|col1|col2|col4|
+----+----+----+
| 3| c| cat|
| 4| d| dog|
+----+----+----+
Here's what the Delta table will contain after that data is appended:
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| 3| c|null| cat|
| 4| d|null| dog|
| 2| b| B|null|
| 1| a| A|null|
+----+----+----+----+
You cannot append the following DataFrame which contains the required columns, but also contains another column (col5) that's not specified as an optional column.
+----+----+----+
|col1|col2|col5|
+----+----+----+
| 4| b| A|
| 5| y| C|
| 6| z| D|
+----+----+----+
Here's the error you'll get when you attempt this write: "The following columns are not part of the current Delta table. If you want to add these columns to the table, you must set the optionalCols parameter: List(col5)"
You also cannot append the following DataFrame which is missing one of the required columns.
+----+----+
|col1|col4|
+----+----+
| 4| A|
| 5| C|
| 6| D|
+----+----+
Here's the error you'll get: "The base Delta table has these columns List(col1, col4), but these columns are required List(col1, col2)"
Latest Version of Delta Table
The function latestVersion return the latest version number of a table given its storage path.
Here's how to use the function:
DeltaHelpers.latestVersion(path = "file:/path/to/your/delta-lake/table")
Insert Data Without Duplicates
The function appendWithoutDuplicates inserts data into an existing delta table and prevents data duplication in the process.
Let's see an example of how it works.
Suppose we have the following table:
+----+---------+---------+
| id|firstname| lastname|
+----+---------+---------+
| 1| Benito| Jackson|
| 4| Maria| Pitt|
| 6| Rosalia| Pitt|
+----+---------+---------+
And we want to insert this new dataframe:
+----+---------+---------+
| id|firstname| lastname|
+----+---------+---------+
| 6| Rosalia| Pitt| # duplicate
| 2| Maria| Willis|
| 3| Jose| Travolta|
| 4| Maria| Pitt| # duplicate
+----+---------+---------+
We can use the following function to insert new data and avoid data duplication:
DeltaHelpers.appendWithoutDuplicates(
deltaTable = deltaTable,
appendData = newDataDF,
compositeKey = Seq("firstname","lastname")
)
The result table wil
Related Skills
node-connect
349.7kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
frontend-design
109.7kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
openai-whisper-api
349.7kTranscribe audio via OpenAI Audio Transcriptions API (Whisper).
qqbot-media
349.7kQQBot 富媒体收发能力。使用 <qqmedia> 标签,系统根据文件扩展名自动识别类型(图片/语音/视频/文件)。
