SkillAgentSearch skills...

Jodie

Delta lake and filesystem helper methods

Install / Use

/learn @mrpowers-io/Jodie
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

jodie

This library provides helpful Delta Lake and filesystem utility functions.

jodie

Accessing the library

Fetch the JAR file from Maven.

libraryDependencies += "com.github.mrpowers" %% "jodie" % "0.0.3"

You can find the spark-daria releases for different Scala versions:

Delta Helpers

Type 2 SCDs

This library provides an opinionated, conventions over configuration, approach to Type 2 SCD management. Let's look at an example before covering the conventions required to take advantage of the functionality.

Suppose you have the following SCD table with the pkey primary key:

+----+-----+-----+----------+-------------------+--------+
|pkey|attr1|attr2|is_current|     effective_time|end_time|
+----+-----+-----+----------+-------------------+--------+
|   1|    A|    A|      true|2019-01-01 00:00:00|    null|
|   2|    B|    B|      true|2019-01-01 00:00:00|    null|
|   4|    D|    D|      true|2019-01-01 00:00:00|    null|
+----+-----+-----+----------+-------------------+--------+

You'd like to perform an upsert with this data:

+----+-----+-----+-------------------+
|pkey|attr1|attr2|     effective_time|
+----+-----+-----+-------------------+
|   2|    Z| null|2020-01-01 00:00:00| // upsert data
|   3|    C|    C|2020-09-15 00:00:00| // new pkey
+----+-----+-----+-------------------+

Here's how to perform the upsert:

Type2Scd.upsert(deltaTable, updatesDF, "pkey", Seq("attr1", "attr2"))

Here's the table after the upsert:

+----+-----+-----+----------+-------------------+-------------------+
|pkey|attr1|attr2|is_current|     effective_time|           end_time|
+----+-----+-----+----------+-------------------+-------------------+
|   2|    B|    B|     false|2019-01-01 00:00:00|2020-01-01 00:00:00|
|   4|    D|    D|      true|2019-01-01 00:00:00|               null|
|   1|    A|    A|      true|2019-01-01 00:00:00|               null|
|   3|    C|    C|      true|2020-09-15 00:00:00|               null|
|   2|    Z| null|      true|2020-01-01 00:00:00|               null|
+----+-----+-----+----------+-------------------+-------------------+

You can leverage the upsert code if your SCD table meets these requirements:

  • Contains a unique primary key column
  • Any change in an attribute column triggers an upsert
  • SCD logic is exposed via effective_time, end_time and is_current column

merge logic can get really messy, so it's easiest to follow these conventions. See this blog post if you'd like to build a SCD with custom logic.

Kill Duplicates

The function killDuplicateRecords deletes all the duplicated records from a table given a set of columns.

Suppose you have the following table:

+----+---------+---------+
|  id|firstname| lastname|
+----+---------+---------+
|   1|   Benito|  Jackson| # duplicate
|   2|    Maria|   Willis|
|   3|     Jose| Travolta| # duplicate
|   4|   Benito|  Jackson| # duplicate
|   5|     Jose| Travolta| # duplicate
|   6|    Maria|     Pitt|
|   9|   Benito|  Jackson| # duplicate
+----+---------+---------+

We can Run the following function to remove all duplicates:

DeltaHelpers.killDuplicateRecords(
  deltaTable = deltaTable, 
  duplicateColumns = Seq("firstname","lastname")
)

The result of running the previous function is the following table:

+----+---------+---------+
|  id|firstname| lastname|
+----+---------+---------+
|   2|    Maria|   Willis|
|   6|    Maria|     Pitt| 
+----+---------+---------+

Remove Duplicates

The functions removeDuplicateRecords deletes duplicates but keeps one occurrence of each record that was duplicated. There are two versions of that function, lets look an example of each,

Let’s see an example of how to use the first version:

Suppose you have the following table:

+----+---------+---------+
|  id|firstname| lastname|
+----+---------+---------+
|   2|    Maria|   Willis|
|   3|     Jose| Travolta|
|   4|   Benito|  Jackson|
|   1|   Benito|  Jackson| # duplicate
|   5|     Jose| Travolta| # duplicate
|   6|    Maria|   Willis| # duplicate
|   9|   Benito|  Jackson| # duplicate
+----+---------+---------+

We can Run the following function to remove all duplicates:

DeltaHelpers.removeDuplicateRecords(
  deltaTable = deltaTable,
  duplicateColumns = Seq("firstname","lastname")
)

The result of running the previous function is the following table:

+----+---------+---------+
|  id|firstname| lastname|
+----+---------+---------+
|   2|    Maria|   Willis|
|   3|     Jose| Travolta|
|   4|   Benito|  Jackson| 
+----+---------+---------+

Now let’s see an example of how to use the second version:

Suppose you have a similar table:

+----+---------+---------+
|  id|firstname| lastname|
+----+---------+---------+
|   2|    Maria|   Willis|
|   3|     Jose| Travolta| # duplicate
|   4|   Benito|  Jackson| # duplicate
|   1|   Benito|  Jackson| # duplicate
|   5|     Jose| Travolta| # duplicate
|   6|    Maria|     Pitt|
|   9|   Benito|  Jackson| # duplicate
+----+---------+---------+

This time the function takes an additional input parameter, a primary key that will be used to sort the duplicated records in ascending order and remove them according to that order.

DeltaHelpers.removeDuplicateRecords(
  deltaTable = deltaTable,
  primaryKey = "id",
  duplicateColumns = Seq("firstname","lastname")
)

The result of running the previous function is the following:

+----+---------+---------+
|  id|firstname| lastname|
+----+---------+---------+
|   1|   Benito|  Jackson|
|   2|    Maria|   Willis|
|   3|     Jose| Travolta|
|   6|    Maria|     Pitt|
+----+---------+---------+

These functions come in handy when you are doing data cleansing.

Copy Delta Table

This function takes an existing delta table and makes a copy of all its data, properties, and partitions to a new delta table. The new table could be created based on a specified path or just a given table name.

Copying does not include the delta log, which means that you will not be able to restore the new table to an old version of the original table.

Here's how to perform the copy to a specific path:

DeltaHelpers.copyTable(deltaTable = deltaTable, targetPath = Some(targetPath))

Here's how to perform the copy using a table name:

DeltaHelpers.copyTable(deltaTable = deltaTable, targetTableName = Some(tableName))

Note the location where the table will be stored in this last function call will be based on the spark conf property spark.sql.warehouse.dir.

Validate append

The validateAppend function provides a mechanism for allowing some columns for schema evolution, but rejecting appends with columns that aren't specificly allowlisted.

Suppose you have the following Delta table:

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   2|   b|   B|
|   1|   a|   A|
+----+----+----+

Here's an appender function that wraps validateAppend:

DeltaHelpers.validateAppend(
  deltaTable = deltaTable,
  appendDF = appendDf,
  requiredCols = List("col1", "col2"),
  optionalCols = List("col4")
)

You can append the following DataFrame that contains the required columns and the optional columns:

+----+----+----+
|col1|col2|col4|
+----+----+----+
|   3|   c| cat|
|   4|   d| dog|
+----+----+----+

Here's what the Delta table will contain after that data is appended:

+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   3|   c|null| cat|
|   4|   d|null| dog|
|   2|   b|   B|null|
|   1|   a|   A|null|
+----+----+----+----+

You cannot append the following DataFrame which contains the required columns, but also contains another column (col5) that's not specified as an optional column.

+----+----+----+
|col1|col2|col5|
+----+----+----+
|   4|   b|   A|
|   5|   y|   C|
|   6|   z|   D|
+----+----+----+

Here's the error you'll get when you attempt this write: "The following columns are not part of the current Delta table. If you want to add these columns to the table, you must set the optionalCols parameter: List(col5)"

You also cannot append the following DataFrame which is missing one of the required columns.

+----+----+
|col1|col4|
+----+----+
|   4|   A|
|   5|   C|
|   6|   D|
+----+----+

Here's the error you'll get: "The base Delta table has these columns List(col1, col4), but these columns are required List(col1, col2)"

Latest Version of Delta Table

The function latestVersion return the latest version number of a table given its storage path.

Here's how to use the function:

DeltaHelpers.latestVersion(path = "file:/path/to/your/delta-lake/table")

Insert Data Without Duplicates

The function appendWithoutDuplicates inserts data into an existing delta table and prevents data duplication in the process. Let's see an example of how it works.

Suppose we have the following table:

+----+---------+---------+
|  id|firstname| lastname|
+----+---------+---------+
|   1|   Benito|  Jackson|
|   4|    Maria|     Pitt|
|   6|  Rosalia|     Pitt|
+----+---------+---------+

And we want to insert this new dataframe:

+----+---------+---------+
|  id|firstname| lastname|
+----+---------+---------+
|   6|  Rosalia|     Pitt| # duplicate
|   2|    Maria|   Willis|
|   3|     Jose| Travolta|
|   4|    Maria|     Pitt| # duplicate
+----+---------+---------+

We can use the following function to insert new data and avoid data duplication:

DeltaHelpers.appendWithoutDuplicates(
  deltaTable = deltaTable,
  appendData = newDataDF, 
  compositeKey = Seq("firstname","lastname")
)

The result table wil

Related Skills

View on GitHub
GitHub Stars50
CategoryDevelopment
Updated2mo ago
Forks11

Languages

Scala

Security Score

95/100

Audited on Jan 23, 2026

No findings