Cdc
Change Data Capture (Postgres -> Debezium/Kafka -> Clickhouse )
Install / Use
/learn @joelatiam/CdcREADME
Change Data Capture (Postgres -> Debezium/Kafka -> Clickhouse -> Superset)
Table of Contents
Introduction
This data integration project harnesses the power of Debezium, Kafka, and ClickHouse to create a seamless flow of data changes from a PostgreSQL database to a ClickHouse Data Warehouse. The core objective of this project is to establish a streamlined and real-time data replication process. This empowers businesses to efficiently analyze and report on their PostgreSQL data, ensuring timely and accurate insights.
The implementation leverages the "Publish–subscribe pattern" (PubSub) to facilitate the smooth movement of data from Postgres to ClickHouse, enhancing data accessibility and analysis.
Folder Structure
The folder and file structure in the project is organized alphabetically rather than following a project hierarchy. This structure aligns with the functionality hierarchy and can be referred to in the "How to Start" section for a clear understanding of the project's components and their order.
.
├── cdc # Change Data Capture (Debezium Configs)
├── connectors # Debezium Connectors
├── kafka # Scripts to monitor Kafka topics
|── docker-compose.yml # CDC images/containers
├── db # About Databases (Postgres & Clickhouse)
├── olap # Clickhouse section
├── ddl # Tables and Views definition
├── ddm # Ingest data from Postgres using scripts
├── oltp # Postgres section
├── ddl # Tables definition
├── ddm # Seed the db with random records
├── img # BI Screenshots
├── .gitignore
├── README.md
└── requirements.txt # Python packages to be installed
Superset is not included in this repository. Instead, instructions for installing it as a separate service and integrating it with ClickHouse will be provided
How to Start
Project Setup
Follow these steps to get started with CDC:
- Clone the Rembo CDC repository to your local machine using the following command:
git clone git@github.com:joelatiam/cdc.git
- (Optional) Create/Start Python Virtual Environment:
-
Create a virtual environment:
python -m venv .venv -
Activate the virtual environment:
-
On macOS and Linux:
source .venv/bin/activate
-
-
Install dependencies:
pip install -r requirements.txt
OLTP Database: Postgres
Instructions on setting up and configuring the PostgreSQL database. (Update your oltp/.env following oltp/.env.example)
For the OLTP component, database replication should be logical, and the db user needs to have replication privileges (ALTER ROLE db_user WITH REPLICATION LOGIN;). Follow these steps to set up the OLTP component:
-
Create tables:
- To create/recreate the necessary tables, run the following command:
python db/oltp/ddl/tables_creations.py
- To create/recreate the necessary tables, run the following command:
-
Alter tables with Replica Identity:
python db/oltp/ddl/alter_tables.py -
Populate (Fake) Records to the rembo db
- We are generating random records (500k: customer, sales | 11 sales_territory | 5 employee) to be used in our project
python db/oltp/dml/create_random_records.py
These steps will ensure that the PostgreSQL database is ready for logical replication and data capture.
OLAP Clickhouse Database: Tables Creations and Copy Postgres Records
Instructions on setting up and configuring the Clickhouse database. (Update your olap/.env following olap/.env.example)
You can use this approache to install and run Clickhouse. Follow these steps to set up the OLAP component:
-
Create tables:
-
To create/recreate tables, run the following command:
python db/olap/ddl/tables_creations.py -
db/olap/ddl/ directories contains:
- Tables definitaions
- Kafka Table Engines & Materielized Views
- Views helping to fetch most recent records versions (This might not be the best approach since Clickhouse provides rich functionalities of organizing records in table in creation).
-
-
Ingest records from OTLP (Postgres)
- Since we are initializing our Postgres DB with many records (500k: customer, sales | 11 sales_territory | 5 employee), getting all the records at once through kafka will be challenging in development environment, this script helps copying all the postgres records
python db/olap/dml/import_from_oltp.py
CDC with Debezium and Kafka
We will use Debezium to capture changes from our Postgres and publish to Kafka topics.
-
Start applications (zookeeper, kafka, debezium, kafdrop: view kafka topics, groups on the browser: http://localhost:9090/)
cd cdcdocker-compose up -d -
Create Debezium connector This json.example (cdc/connectors/pg-src.json.example) is an example of the connector we will create by posting the request to debezium (you can use postman, vscode thunder client, ...) . Remember to update your dabase.info if you use cdc/connectors/pg-src.json.example
eg: POST http://localhost:8083/connectors
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "topic.prefix": "dbserver1", "database.include.list": "inventory", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schemahistory.inventory" } }'Debezium provides other endpoints to helping to manage the connector
-
Test Postgres to Kafka (Update records and watch kafka messages)
-
Create/Update records from in our tables
-
View/Read Kafka messages using Kafkadrop http://localhost:9090/
Message example
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"sales_id"},{"type":"int32","optional":true,"field":"customer_id"},{"type":"string","optional":true,"field":"discount_amount"}},"payload":{"sales_id":2, "customer_id":2496,"discount_amount":"85","duedate":"2023-10-19","extended_amount":843,"order_quantity":96,"product_standard_cost":22,"revision_number":62,"sales_amount":7,"sales_order_line_number":"2","sales_order_number":"2","sales_territory_id":3,"shipdate":19657,"tax_amt":55,"total_product_cost":77,"unit_price":21677,"employee_id":2,"__table":"sales","__op":"u","__lsn":12912566024,"__source_ts_ms":1698765730595,"__deleted":"false"}} -
(Optional) Listen to a kafka topic from the terminal. We provided a python script (cdc/kafka/watch-topics.py) to watch a topic messages (Remember to update the .env)
python cdc/kafka/watch-topics.py
-
Our Clickhouse DB will be connected to Kafka from the the folling section, so at this phase changes from OLTP DB to OLAP DB won't be reflected yet
Databases Sync: Connect Clickhouse Tables to Kafka Topics
Instructions on connecting clickhouse tables to kafka topics. (Update your olap/.env following olap/.env.example)
- Connect tables to kafka
python db/olap/ddl/create_kafka_connect.py
From here, any Create, Update, Delete operation from postgres will be reflected to our clickhouse db.
Data Visualization: Superset
You can install Superset by referring to the vendor's official website. We have adopted the "git clone" option and employed Docker Compose for running Superset. Depending on your specific requirements, you may need to make adjustments to the provided Docker Compose and **.en
Related Skills
node-connect
338.0kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
frontend-design
83.4kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
openai-whisper-api
338.0kTranscribe audio via OpenAI Audio Transcriptions API (Whisper).
commit-push-pr
83.4kCommit, push, and open a PR
