Mypipe
MySQL binary log consumer with the ability to act on changed rows and publish changes to different systems with emphasis on Apache Kafka.
Install / Use
/learn @mardambey/MypipeREADME
mypipe
mypipe latches onto a MySQL server with binary log replication enabled and allows for the creation of pipes that can consume the replication stream and act on the data (primarily integrated with Apache Kafka).
Features
- streams binary logs remotely, emulating a slave
- writes binlog events into Kafka using a generic or specific Avro schema
- supports saving / loading binary log positions in a modular fashion (files, MySQL, or custom Java/Scala code to do so)
- handles
ALTER TABLEevents and can refresh Avro schema being used - built in a modular way allowing binlog events to be published into any system, not just Kafka
- can preload an entire MySQL table into Kafka, then resume from binary logs (useful with Kafka compaction, infinite retention, can be used to bootstrap downstream systems with the entire data for a table)
- configurable Kafka topic names based on the database and table
- whitelist / blacklist support for what to process or what not to process in a binary log (based on database and table)
- configurable error handling with the ability to specify custom handlers written in Java or Scala
- Kafka generic console consumer that interfaces with an in memory Avro schema repo for quick and easy data exploration in Kafka
API
mypipe tries to provide enough information that usually is not part of the
MySQL binary log stream so that the data is meaningful. mypipe requires a
row based binary log format and provides Insert, Update, and Delete
mutations representing changed rows. Each change is related back to it's
table and the API provides metadata like column types, primary key
information (composite, key order), and other such useful information.
Look at ColumnType.scala and Mutation.scala for more details.
Producers
Producers receive MySQL binary log events and act on them. They can funnel down to another data store, send them to some service, or just print them out to the screen in the case of the stdout producer.
Pipes
Pipes tie one or more MySQL binary log consumers to a producer. Pipes can be used to create a system of fan-in data flow from several MySQL servers to other data sources such as Hadoop, Cassandra, Kafka, or other MySQL servers. They can also be used to update or flush caches.
Kafka integration
mypipe's main goal is to replicate a MySQL binlog stream into Apache Kafka. mypipe supports Avro encoding and can use a schema repository to figure out how to encode data. Data can either be encoded generically and stored in Avro maps by type (integers, strings, longs, etc.) or it can encode data more specifically if the schema repository can return specific schemas. The latter will allow the table's structure to be reflected in the Avro structure.
Kafka message format
Binary log events, specifically mutation events (insert, update, delete) are pushed into Kafka and are binary encoded. Every message has the following format:
-----------------
| MAGIC | 1 byte |
|-----------------|
| MTYPE | 1 byte |
|-----------------|
| SCMID | N bytes |
|-----------------|
| DATA | N bytes |
-----------------
The above fields are:
MAGIC: magic byte, used to figure out protocol versionMTYPE: mutation type, a single byte indicating insert (0x1), update (0x2), or delete (0x3)SCMID: Avro schema ID, variable number of bytesDATA: the actual mutation data as bytes, variable size
MySQL to "generic" Kafka topics
If you do not have an Avro schema repository running that contains schemas
for each of your tables you can use generic Avro encoding. This will take
binary log mutations (insert, update, or delete) and encode them into the
following structure in the case of an insert (InsertMutation.avsc):
{
"namespace": "mypipe.avro",
"type": "record",
"name": "InsertMutation",
"fields": [
{
"name": "database",
"type": "string"
},
{
"name": "table",
"type": "string"
},
{
"name": "tableId",
"type": "long"
},
{
"name": "txid",
"type": ["null", "Guid"],
"default": "null"
},
{
"name": "bytes",
"type": {"type": "map", "values": "bytes"}
},
{
"name": "integers",
"type": {"type": "map", "values": "int"}
},
{
"name": "strings",
"type": {"type": "map", "values": "string"}
},
{
"name": "longs",
"type": {"type": "map", "values": "long"}
}
]
}
Updates will contain both the old row values and the new ones (see
UpdateMutation.avsc) and deletes are similar to inserts (DeleteMutation.avsc).
Once transformed into Avro data the mutations are pushed into Kafka topics
based on the following convention (this is configurable):
topicName = s"$db_$table_generic"
This ensures that all mutations destined to a specific database / table tuple are all added to a single topic with mutation ordering guarantees.
MySQL to "specific" Kafka topics
If you are running an Avro schema repository you can encode binary log events based on the structures specified in that repository for the incoming database / table streams.
In order to configure a specific producer you should add a pipe using a mypipe.kafka.producer.KafkaMutationSpecificAvroProducer
as it's producer. The producer needs some configuration values in order to find the Kafka brokers,
ZooKeeper ensemble, and the Avro schema repository. Here is a sample configuration:
kafka-specific {
enabled = true
consumers = ["localhost"]
producer {
kafka-specific {
schema-repo-client = "mypipe.avro.schema.SchemaRepo"
metadata-brokers = "localhost:9092"
zk-connect = "localhost:2181"
}
}
}
Note that if you use mypipe.avro.schema.SchemaRepo as the schema repository client, you have to
provide the running JVM with the system property avro.repo.server-url in order for the client to
know where to reach the repository.
Configuring Kafka topic names
mypipe will push events into Kafka based on the topic-format configuration value. reference.conf
has the default values under mypipe.kafka, specific-producer and generic-producer.
specific-producer {
topic-format = "${db}_${table}_specific"
}
generic-producer {
topic-format = "${db}_${table}_generic"
}
ALTER queries and "generic" Kafka topics
mypipe handles ALTER table queries (as described below) allowing it to add
new columns or stop including removed ones into "generic" Avro records published
into Kafka. Since the "generic" Avro beans consist of typed maps (ints, strings, etc.)
mypipe can easily include or remove columns based on ALTER queries. Once a table's
metadata is refreshed (blocking operation) all subsequent mutations to the
table will use the new structure and publish that into Kafka.
Consuming from "generic" Kafka topics
In order to consume from generic Kafka topics the KafkaGenericMutationAvroConsumer
can be used. This consumer will allow you react to insert, update, and delete mutations.
The consumer needs an Avro schema repository as well as some helpers to be defined. A quick
(and incomplete) example follows:
val kafkaConsumer = new KafkaGenericMutationAvroConsumer[Short](
topic = KafkaUtil.genericTopic("databaseName", "tableName"),
zkConnect = "localhost:2181",
groupId = "someGroupId",
schemaIdSizeInBytes = 2)(
insertCallback = { insertMutation ⇒ ??? }
updateCallback = { updateMutation ⇒ ??? }
deleteCallback = { deleteMutation ⇒ ??? }
) {
protected val schemaRepoClient: GenericSchemaRepository[Short, Schema] = GenericInMemorySchemaRepo
}
For a more complete example take a look at KafkaGenericSpec.scala.
Alternatively you can implement your own Kafka consumer given the binary structure
of the messages as shown above if the KafkaGenericMutationAvroConsumer does not
satisfy your needs.
MySQL Event Processing Internals
mypipe uses the mysql-binlog-connector-java to tap into the MySQL server's binary log stream and handles several types of events.
TABLE_MAP
This event causes mypipe to look up a table's metadata (primary key, column names and types, etc.). This is done by issuing the following query to the MySQL server to determine column information:
select COLUMN_NAME, DATA_TYPE, COLUMN_KEY
from COLUMNS
where TABLE_SCHEMA="$db" and TABLE_NAME = "$table"
order by ORDINAL_POSITION
The following query is issued to the server also to determine the primary key:
select COLUMN_NAME
from KEY_COLUMN_USAGE
where TABLE_SCHEMA='${db}' and TABLE_NAME='${table}' and CONSTRAINT_NAME='PRIMARY'
order by ORDINAL_POSITION
While a TABLE_MAP event is being handled no other events will be handled concurrently.
QUERY
mypipe handles a few different types of raw queries (besides mutations) like:
BEGIN
If transaction event grouping is enabled mypipe will queue up all events that
arrive after a BEGIN query has been encountered. While queuing is occuring
mypipe will not save it's binary log position as it receives events and will
only do so once the transaction is committed.
COMMIT
If transaction event grouping is enabled mypipe will hold wait for a COMMIT
query to arrive before "flushing" all queued events (mutations) at and then
saves it's binary log position to mark the processing of the entire transaction.
ROLLBACK
If transaction event grouping is enabled mypipe will clear and not flush the
queued up events (mutations) upon receiving a ROLBACK.
ALTER
Upon receiving an ALTER query mypipe will attempt to reload the affected table's
metadata. This allows myp
