ETL
Extract, Transform, and Load data with Ruby
Install / Use
/learn @square/ETLREADME
ETL
Extract, transform, and load data with ruby!
Installation
Add this line to your application's Gemfile:
gem 'ETL'
And then execute:
$ bundle
Or install it yourself as:
$ gem install ETL
ETL Dependencies
ETL depends on having a database connection object that must respond
to #query. The mysql2 gem is a good option.
You can also proxy another library using Ruby's SimpleDelegator and add a #query
method if need be.
The gem comes bundled with a default logger. If you'd like to write your own
just make sure that it implements #debug and #info. For more information
on what is logged and when, view the logger details.
Basic ETL
Assume that we have a database connection represented by connection.
To run a basic ETL that is composed of sequential SQL statements, start by creating a new ETL instance:
# setting connection at the class level
ETL.connection = connection
etl = ETL.new(description: "a description of what this ETL does")
or
# setting connection at the instance level
etl = ETL.new(
description: "a description of what this ETL does",
connection: connection
)
which can then be configured:
etl.config do |etl|
etl.ensure_destination do |etl|
# For most ETLs you may want to ensure that the destination exists, so the
# #ensure_destination block is ideally suited to fulfill this requirement.
#
# By way of example:
#
etl.query %[
CREATE TABLE IF NOT EXISTS some_database.some_destination_table (
user_id INT UNSIGNED NOT NULL
, created_date DATE NOT NULL
, total_amount INT SIGNED NOT NULL
, message VARCHAR(100) DEFAULT NULL
, PRIMARY KEY (user_id, created_date)
, KEY (created_date)
)
]
end
etl.before_etl do |etl|
# All pre-ETL work is performed in this block.
#
# This can be thought of as a before-ETL hook that will fire only once. When
# you are not leveraging the ETL iteration capabilities, the value of this
# block vs the #etl block is not very clear. We will see how and when to
# leverage this block effectively when we introduce iteration.
#
# As an example, let's say we want to get rid of all entries that have an
# amount less than zero before moving on to our actual etl:
#
etl.query %[DELETE FROM some_database.some_source_table WHERE amount < 0]
end
etl.etl do |etl|
# Here is where the magic happens! This block contains the main ETL
# operation.
#
# For example:
#
etl.query %[
REPLACE INTO some_database.some_destination_table (
user_id
, created_date
, total_amount
) SELECT
user_id
, DATE(created_at) AS created_date
, SUM(amount) AS total_amount
FROM
some_database.some_source_table sst
GROUP BY
sst.user_id
, DATE(sst.created_at)
]
end
etl.after_etl do |etl|
# All post-ETL work is performed in this block.
#
# Again, to finish up with an example:
#
etl.query %[
UPDATE some_database.some_destination_table
SET message = "WOW"
WHERE total_amount > 100
]
end
end
At this point it is possible to run the ETL instance via:
etl.run
which executes #ensure_destination, #before_etl, #etl, and #after_etl in
that order.
ETL with iteration
To add in iteration, simply supply #start, #step, and #stop blocks. This
is useful when dealing with large data sets or when executing queries that,
while optimized, are still slow.
Again, to kick things off:
etl = ETL.new(
description: "a description of what this ETL does",
connection: connection
)
where connection is the same as described above.
Next we can configure the ETL:
# assuming we have the ETL instance from above
etl.config do |etl|
etl.ensure_destination do |etl|
# For most ETLs you may want to ensure that the destination exists, so the
# #ensure_destination block is ideally suited to fulfill this requirement.
#
# By way of example:
#
etl.query %[
CREATE TABLE IF NOT EXISTS some_database.some_destination_table (
user_id INT UNSIGNED NOT NULL
, created_date DATE NOT NULL
, total_amount INT SIGNED NOT NULL
, message VARCHAR(100) DEFAULT NULL
, PRIMARY KEY (user_id, created_date)
, KEY (created_date)
)
]
end
etl.before_etl do |etl|
# All pre-ETL work is performed in this block.
#
# Now that we are leveraging iteration the #before_etl block becomes
# more useful as a way to execute an operation once before we begin
# our iteration.
#
# As an example, let's say we want to get rid of all entries that have an
# amount less than zero before moving on to our actual etl:
#
etl.query %[
DELETE FROM some_database.some_source_table
WHERE amount < 0
]
end
etl.start do |etl|
# This defines where the ETL should start. This can be a flat number
# or date, or even SQL / other code can be executed to produce a starting
# value.
#
# Usually, this is the last known entry for the destination table with
# some sensible default if the destination does not yet contain data.
#
# As an example:
#
# Note that we cast the default date as a DATE. If we don't, it will be
# treated as a string and our iterator will fail under the hood when testing
# if it is complete.
res = etl.query %[
SELECT COALESCE(MAX(created_date), DATE('2010-01-01')) AS the_max
FROM some_database.some_destination_table
]
res.to_a.first['the_max']
end
etl.step do |etl|
# The step block defines the size of the iteration block. To iterate by
# ten records, the step block should be set to return 10.
#
# As an alternative example, to set the iteration to go 10,000 units
# at a time, the following value should be provided:
#
# 10_000 (Note: an underscore is used for readability)
#
# As an example, to iterate 7 days at a time:
#
7
end
etl.stop do |etl|
# The stop block defines when the iteration should halt.
# Again, this can be a flat value or code. Either way, one value *must* be
# returned.
#
# As a flat value:
#
# 1_000_000
#
# Or a date value:
#
# Time.now.to_date
#
# Or as a code example:
#
res = etl.query %[
SELECT DATE(MAX(created_at)) AS the_max
FROM some_database.some_source_table
]
res.to_a.first['the_max']
end
etl.etl do |etl, lbound, ubound|
# The etl block is the main part of the framework. Note: there are
# two extra args with the iterator this time around: "lbound" and "ubound"
#
# "lbound" is the lower bound of the current iteration. When iterating
# from 0 to 10 and stepping by 2, the lbound would equal 2 on the
# second iteration.
#
# "ubound" is the upper bound of the current iteration. In continuing with the
# example above, when iterating from 0 to 10 and stepping by 2, the ubound would
# equal 4 on the second iteration.
#
# These args can be used to "window" SQL queries or other code operations.
#
# As a first example, to iterate over a set of ids:
#
# etl.query %[
# REPLACE INTO some_database.some_destination_table (
# created_date
# , user_id
# , total_amount
# ) SELECT
# DATE(sst.created_at) AS created_date
# , sst.user_id
# , SUM(sst.amount) AS total_amount
# FROM
# some_database.some_source_table sst
# WHERE
# sst.user_id > #{lbound} AND sst.user_id <= #{ubound}
# GROUP BY
# DATE(sst.created_at)
# , sst.user_id]
#
# To "window" a SQL query using dates:
#
etl.query %[
REPLACE INTO some_database.some_destination_table (
created_date
, user_id
, total_amount
) SELECT
DATE(sst.created_at) AS created_date
, sst.user_id
, SUM(sst.amount) AS total_amount
FROM
some_database.some_source_table sst
WHERE
-- Note the usage of quotes surrounding the lbound and ubound vars.
-- This is is required when dealing with dates / datetimes
sst.created_at >= '#{lbound}' AND sst.created_at < '#{ubound}'
GROUP BY
DATE(sst.created_at)
, sst.user_id
]
# Note that there is no sql sanitization here so there is *potential* for SQL
# injection. That being said you'll likely be using this gem in an internal
# tool so hopefully your co-workers are not looking to sabotage your ETL
# pipeline. Just be aware of this and handle it as you see fit.
end
etl.after_etl do |etl|
# All post-ETL work is performed in this block.
#
# Again, to finish up with an example:
#
etl.query %[
UPDATE some_database.some_destination_table
SET message = "WOW"
WHERE total_amount > 100
]
end
end
At this point it is possible to run the ETL instance via:
etl.run
which executes #ensure_destination, #before_etl, #etl, and #after_etl in
that order.
Note that #etl executes #start and #stop once and memoizes the result for
each. It then begins to iterate from what #start evaluated to up until what #stop
evaluated to by what #step evaluates to.
Examples
There are two examples found in ./examples that demonstrate the basic ETL and
iteration ETL. Each file uses the mysql2
gem and reads / writes data to localhost using the root user with no password.
Adjust as needed.
Logger Details
A logger must support two methods: #info and #warn.
Both methods should accept a
