SkillAgentSearch skills...

Mypipe

MySQL binary log consumer with the ability to act on changed rows and publish changes to different systems with emphasis on Apache Kafka.

Install / Use

/learn @mardambey/Mypipe
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

mypipe

mypipe latches onto a MySQL server with binary log replication enabled and allows for the creation of pipes that can consume the replication stream and act on the data (primarily integrated with Apache Kafka).

Features

  • streams binary logs remotely, emulating a slave
  • writes binlog events into Kafka using a generic or specific Avro schema
  • supports saving / loading binary log positions in a modular fashion (files, MySQL, or custom Java/Scala code to do so)
  • handles ALTER TABLE events and can refresh Avro schema being used
  • built in a modular way allowing binlog events to be published into any system, not just Kafka
  • can preload an entire MySQL table into Kafka, then resume from binary logs (useful with Kafka compaction, infinite retention, can be used to bootstrap downstream systems with the entire data for a table)
  • configurable Kafka topic names based on the database and table
  • whitelist / blacklist support for what to process or what not to process in a binary log (based on database and table)
  • configurable error handling with the ability to specify custom handlers written in Java or Scala
  • Kafka generic console consumer that interfaces with an in memory Avro schema repo for quick and easy data exploration in Kafka

API

mypipe tries to provide enough information that usually is not part of the MySQL binary log stream so that the data is meaningful. mypipe requires a row based binary log format and provides Insert, Update, and Delete mutations representing changed rows. Each change is related back to it's table and the API provides metadata like column types, primary key information (composite, key order), and other such useful information.

Look at ColumnType.scala and Mutation.scala for more details.

Producers

Producers receive MySQL binary log events and act on them. They can funnel down to another data store, send them to some service, or just print them out to the screen in the case of the stdout producer.

Pipes

Pipes tie one or more MySQL binary log consumers to a producer. Pipes can be used to create a system of fan-in data flow from several MySQL servers to other data sources such as Hadoop, Cassandra, Kafka, or other MySQL servers. They can also be used to update or flush caches.

Kafka integration

mypipe's main goal is to replicate a MySQL binlog stream into Apache Kafka. mypipe supports Avro encoding and can use a schema repository to figure out how to encode data. Data can either be encoded generically and stored in Avro maps by type (integers, strings, longs, etc.) or it can encode data more specifically if the schema repository can return specific schemas. The latter will allow the table's structure to be reflected in the Avro structure.

Kafka message format

Binary log events, specifically mutation events (insert, update, delete) are pushed into Kafka and are binary encoded. Every message has the following format:

 -----------------
| MAGIC | 1 byte  |
|-----------------|
| MTYPE | 1 byte  |
|-----------------|
| SCMID | N bytes |
|-----------------|
| DATA  | N bytes |
 -----------------

The above fields are:

  • MAGIC: magic byte, used to figure out protocol version
  • MTYPE: mutation type, a single byte indicating insert (0x1), update (0x2), or delete (0x3)
  • SCMID: Avro schema ID, variable number of bytes
  • DATA: the actual mutation data as bytes, variable size

MySQL to "generic" Kafka topics

If you do not have an Avro schema repository running that contains schemas for each of your tables you can use generic Avro encoding. This will take binary log mutations (insert, update, or delete) and encode them into the following structure in the case of an insert (InsertMutation.avsc):

{
  "namespace": "mypipe.avro",
  "type": "record",
  "name": "InsertMutation",
  "fields": [
    {
      "name": "database",
      "type": "string"
    },
    {
      "name": "table",
      "type": "string"
    },
    {
      "name": "tableId",
      "type": "long"
    },
    {
      "name": "txid",
      "type": ["null", "Guid"],
      "default": "null"
    },
    {
      "name": "bytes",
      "type": {"type": "map", "values": "bytes"}
    },
    {
      "name": "integers",
      "type": {"type": "map", "values": "int"}
    },
    {
      "name": "strings",
      "type": {"type": "map", "values": "string"}
    },
    {
      "name": "longs",
      "type": {"type": "map", "values": "long"}
    }
  ]
}

Updates will contain both the old row values and the new ones (see UpdateMutation.avsc) and deletes are similar to inserts (DeleteMutation.avsc). Once transformed into Avro data the mutations are pushed into Kafka topics based on the following convention (this is configurable):

topicName = s"$db_$table_generic"

This ensures that all mutations destined to a specific database / table tuple are all added to a single topic with mutation ordering guarantees.

MySQL to "specific" Kafka topics

If you are running an Avro schema repository you can encode binary log events based on the structures specified in that repository for the incoming database / table streams.

In order to configure a specific producer you should add a pipe using a mypipe.kafka.producer.KafkaMutationSpecificAvroProducer as it's producer. The producer needs some configuration values in order to find the Kafka brokers, ZooKeeper ensemble, and the Avro schema repository. Here is a sample configuration:

kafka-specific {
  enabled = true
  consumers = ["localhost"]
  producer {
    kafka-specific {
      schema-repo-client = "mypipe.avro.schema.SchemaRepo"
      metadata-brokers = "localhost:9092"
      zk-connect = "localhost:2181"
    }
  }
}

Note that if you use mypipe.avro.schema.SchemaRepo as the schema repository client, you have to provide the running JVM with the system property avro.repo.server-url in order for the client to know where to reach the repository.

Configuring Kafka topic names

mypipe will push events into Kafka based on the topic-format configuration value. reference.conf has the default values under mypipe.kafka, specific-producer and generic-producer.

specific-producer {
  topic-format = "${db}_${table}_specific"
}
 
generic-producer {
  topic-format = "${db}_${table}_generic"
}

ALTER queries and "generic" Kafka topics

mypipe handles ALTER table queries (as described below) allowing it to add new columns or stop including removed ones into "generic" Avro records published into Kafka. Since the "generic" Avro beans consist of typed maps (ints, strings, etc.) mypipe can easily include or remove columns based on ALTER queries. Once a table's metadata is refreshed (blocking operation) all subsequent mutations to the table will use the new structure and publish that into Kafka.

Consuming from "generic" Kafka topics

In order to consume from generic Kafka topics the KafkaGenericMutationAvroConsumer can be used. This consumer will allow you react to insert, update, and delete mutations. The consumer needs an Avro schema repository as well as some helpers to be defined. A quick (and incomplete) example follows:

val kafkaConsumer = new KafkaGenericMutationAvroConsumer[Short](
  topic = KafkaUtil.genericTopic("databaseName", "tableName"),
  zkConnect = "localhost:2181",
  groupId = "someGroupId",
  schemaIdSizeInBytes = 2)(
  
  insertCallback = { insertMutation ⇒ ??? }
  updateCallback = { updateMutation ⇒ ??? }
  deleteCallback = { deleteMutation ⇒ ??? } 
) {
  protected val schemaRepoClient: GenericSchemaRepository[Short, Schema] = GenericInMemorySchemaRepo
}

For a more complete example take a look at KafkaGenericSpec.scala.

Alternatively you can implement your own Kafka consumer given the binary structure of the messages as shown above if the KafkaGenericMutationAvroConsumer does not satisfy your needs.

MySQL Event Processing Internals

mypipe uses the mysql-binlog-connector-java to tap into the MySQL server's binary log stream and handles several types of events.

TABLE_MAP

This event causes mypipe to look up a table's metadata (primary key, column names and types, etc.). This is done by issuing the following query to the MySQL server to determine column information:

select COLUMN_NAME, DATA_TYPE, COLUMN_KEY 
from COLUMNS 
where TABLE_SCHEMA="$db" and TABLE_NAME = "$table" 
order by ORDINAL_POSITION

The following query is issued to the server also to determine the primary key:

select COLUMN_NAME
from KEY_COLUMN_USAGE 
where TABLE_SCHEMA='${db}' and TABLE_NAME='${table}' and CONSTRAINT_NAME='PRIMARY' 
order by ORDINAL_POSITION

While a TABLE_MAP event is being handled no other events will be handled concurrently.

QUERY

mypipe handles a few different types of raw queries (besides mutations) like:

BEGIN

If transaction event grouping is enabled mypipe will queue up all events that arrive after a BEGIN query has been encountered. While queuing is occuring mypipe will not save it's binary log position as it receives events and will only do so once the transaction is committed.

COMMIT

If transaction event grouping is enabled mypipe will hold wait for a COMMIT query to arrive before "flushing" all queued events (mutations) at and then saves it's binary log position to mark the processing of the entire transaction.

ROLLBACK

If transaction event grouping is enabled mypipe will clear and not flush the queued up events (mutations) upon receiving a ROLBACK.

ALTER

Upon receiving an ALTER query mypipe will attempt to reload the affected table's metadata. This allows myp

View on GitHub
GitHub Stars428
CategoryData
Updated25d ago
Forks79

Languages

Scala

Security Score

95/100

Audited on Mar 4, 2026

No findings