Southpaw
:baseball: Streaming left joins in Kafka for change data capture
Install / Use
/learn @jwplayer/SouthpawREADME
Southpaw
Overview
Southpaw is a tool that creates denormalized records from input records based on hierarchical relationships. These relationships are similar to a LEFT OUTER JOIN defined by the following SQL statement:
SELECT ...
FROM table_a LEFT OUTER JOIN table_b on a_key = b_key
In this case 'table_b' is a child relationship of 'table_a.' 'a_key' is equivalent to the parent key and 'b_key' is equivalent to the join key in a child relation. Ultimately, one 'table' is the root relation. The record key in each topic for all input and denormalized records is treated as the primary key, which is used by the various indices and within the denormalized entities themselves.
Why?
While robust tools like Flink or Kafka Streams support joins, they are extremely limited. The typical use case is to enrich a stream of records with another stream that is used as a small lookup table. For Southpaw, we wanted to be able to create denormalized records in a streaming fashion as the input topics receive new records or updates are made to existing records. The results should be similar to running large JOIN queries against a standard SQL DB, but the results should be processed in a streaming fashion.
How?
Southpaw maintains a state of all records it sees, keeping the latest version of each record. In addition to this, it builds two types of indices. The first type is the parent index. This index tells Southpaw which denormalized records it should create whenever it sees a new or updated child record. The second type of index is the join index. This tells Southpaw which child records to include in an denormalized record when it is being created. WIth these two types of indices, Southpaw can create and recreate the denormalized records as input records are streamed from the input topics.
Running Southpaw
Southpaw accepts command line arguments and has a help option:
Option (* = required) Description
--------------------- -----------
--build Builds denormalized records using an
existing state.
* --config Path to the Southpaw config file
--delete-backup Deletes existing backups specified in
the config file. BE VERY CAREFUL
WITH THIS!!!
--delete-state Deletes the existing state specified
in the config file. BE VERY CAREFUL
WITH THIS!!!
--help Since you are seeing this, you
probably know what this is for. :)
* --relations Paths to one or more files containing
input record relations
--restore Restores the state from existing
backups.
--verify-state Compares the state index to reverse index
for each relational join and logs any
errors
NOTE: Setting the --restore flag functions similar to setting rocks.db.restore.mode: always config option except that it can be used without dependencies on Kafka or opening RocksDB state. If the --restore flag is used with the --build flag and rocks.db.restore.mode is set to always or when_needed, a restore can be performed twice before fully starting up.
A typical use would look like this:
java -cp ./southpaw.jar com.jwplayer.southpaw.Southpaw --config conf/stretch.yaml --relations relations/media.json --build
Project Structure
- conf - Configuration
- relations - Relation definitions
- src - Java code
- index - Index classes
- json - Auto-generated POJO objects created from the JSON schemas
- record - Record abstractions (e.g. JSON and Avro)
- serde - Kafka serializers and deserializers (e.g. JSON and Avro)
- state - State abstraction used for storing indices and data
- topic - Topic (log) abstractions used for reading and storing records
- util - utility code
State
Southpaw uses RocksDB for its state, an embedded key/value store. RocksDB supports both persistence and backups. Southpaw can sink backups to S3. While RocksDB is currently the only supported state, other states can be added, such as Redis.
S3 backups
If you specify an S3 URI (using the 's3' scheme) for the rocks.db.backup.uri config option, it will store backups locally under the RocksDB URI. This uses the standard AWS S3 methods for getting the region and credentials as the CLI does (env vars, config file, etc.), so you just need to use one of these methods to be able to store backups in S3.
Links for setting up the credentials and region:
Relations
Here is an example relations file:
[
{
"DenormalizedName": "DeFeed",
"Entity": "playlist",
"Children": [
{
"Entity": "user",
"JoinKey": "user_id",
"ParentKey": "user_id"
},
{
"Entity": "playlist_tag",
"JoinKey": "playlist_id",
"ParentKey": "id",
"Children": [
{
"Entity": "user_tag",
"JoinKey": "id",
"ParentKey": "user_tag_id"
}
]
},
{
"Entity": "playlist_custom_params",
"JoinKey": "playlist_id",
"ParentKey": "id"
},
{
"Entity": "playlist_media",
"JoinKey": "playlist_id",
"ParentKey": "id",
"Children": [
{
"Entity": "media",
"JoinKey": "id",
"ParentKey": "media_id"
}
]
}
]
}
]
As specified above, the best way to think about this is as a series of LEFT OUTER JOIN statements. The above would translate to:
SELECT ...
FROM
playlist
LEFT OUTER JOIN user ON playlist.user_id = user.user_id
LEFT OUTER JOIN playlist_tag ON playlist.id = playlist_tag.playlist_id
LEFT OUTER JOIN user_tag ON playlist_tag.user_tag_id = user_tag.id
LEFT OUTER JOIN playlist_custom_params ON playlist.id = playlist_custom_params.playlist.id
LEFT OUTER JOIN playlist_media ON playlist.id = playlist_media.playlist_id
LEFT OUTER JOIN media ON playlist_media.media_id = media.id
The root node in this relationship tree (playilst in the example) is special. It must have a DenormalizedName in addition to an Entity, but it has no ParentKey or JoinKey. Each child node also has an Entity in addition to a ParentKey and JoinKey. Each node (root or child) may or may not have children.
The Entity and DenormalizedName fields should match corresponding entries under topics in the configuration. This allows different input and output topics to have different configuration. You could even specify different servers for each topic.
You can also specify multiple types of denormalized records in a single file, but a standard use may only create a single type per instance of Southpaw.
Config
The config is broken up into multiple sections:
Generic Config
- backup.time.s - The amount of time in seconds between backups
- commit.time.s - The amount of time in seconds between full state commits
- create.records.time.s - The amount of time spent creating denormalized record before returning to the main loop
- create.records.trigger - Number of denormalized record create actions to queue before creating denormalized records. Only queues creation of records when lagging.
- index.lru.cache.size - The number of index entries to cache in memory
- index.write.batch.size - The number of entries each index holds in memory before flushing to the state
- metrics.report.time.s - The amount of time in seconds between calculating and reporting metrics
- queueing.strategy.class - When specified, allows specifying a custom QueueingStrategy class that allows finer control over how denormalized record primary keys are queued and created. The default behavior is that everything is put in the medium priority queue.
- topic.lag.trigger - Southpaw will stick to a single topic until it falls below a certain lag threshold before switching to the next topic. This is for performance purposes. This option controls that threshold.
RocksDB Config
Currently, Southpaw uses RocksDB for its state, though this could be made pluggable in the future. Many of these options correspond directly to RocksDB options. Check the RocksDB documentation for more information.
- rocks.db.backup.uri - Where to store backups. The local file system and S3 is supported.
- rocks.db.backups.auto.rollback (default: false) - Rollback to previous rocksdb backup upon state restoration corruption
- rocks.db.backups.to.keep - # of backups to keep
- rocks.db.compaction.read.ahead.size - Heap allocated to the compaction read ahead process
- rocks.db.log.level (default: INFO_LEVEL) - The log level of the native RocksDB layer logs. Acceptable values are:
- DEBUG_LEVEL
- INFO_LEVEL
- WARN_LEVEL
- ERROR_LEVEL
- FATAL_LEVEL
- HEADER_LEVEL
- rocks.db.max.background.compactions - Number of threads used for background compactions
- rocks.db.max.background.flushes - Number of threads used for background flushes
- rocks.db.max.subcompactions - Number o
