SkillAgentSearch skills...

Cdc

Change Data Capture (Postgres -> Debezium/Kafka -> Clickhouse )

Install / Use

/learn @joelatiam/Cdc
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

Change Data Capture (Postgres -> Debezium/Kafka -> Clickhouse -> Superset)

Table of Contents

Introduction

This data integration project harnesses the power of Debezium, Kafka, and ClickHouse to create a seamless flow of data changes from a PostgreSQL database to a ClickHouse Data Warehouse. The core objective of this project is to establish a streamlined and real-time data replication process. This empowers businesses to efficiently analyze and report on their PostgreSQL data, ensuring timely and accurate insights.

The implementation leverages the "Publish–subscribe pattern" (PubSub) to facilitate the smooth movement of data from Postgres to ClickHouse, enhancing data accessibility and analysis.

Folder Structure

The folder and file structure in the project is organized alphabetically rather than following a project hierarchy. This structure aligns with the functionality hierarchy and can be referred to in the "How to Start" section for a clear understanding of the project's components and their order.

.
├── cdc                                 # Change Data Capture (Debezium Configs)
    ├── connectors                      # Debezium Connectors
    ├── kafka                           # Scripts to monitor Kafka topics
    |── docker-compose.yml				# CDC images/containers

├── db                                  # About Databases (Postgres & Clickhouse)
    ├── olap							# Clickhouse section
        ├── ddl                       	# Tables and Views definition
        ├── ddm							# Ingest data from Postgres using scripts                                     
    ├── oltp							# Postgres section
        ├── ddl                       	# Tables definition
        ├── ddm							# Seed the db with random records
├──  img                                # BI Screenshots
├── .gitignore
├── README.md
└── requirements.txt                    #  Python packages to be installed

Superset is not included in this repository. Instead, instructions for installing it as a separate service and integrating it with ClickHouse will be provided

How to Start

Project Setup

Follow these steps to get started with CDC:

  1. Clone the Rembo CDC repository to your local machine using the following command:
git  clone  git@github.com:joelatiam/cdc.git
  1. (Optional) Create/Start Python Virtual Environment:
  • Create a virtual environment:

    python  -m  venv  .venv
    
  • Activate the virtual environment:

    • On macOS and Linux:

      source  .venv/bin/activate
      
  1. Install dependencies:

    pip  install  -r  requirements.txt
    

OLTP Database: Postgres

Instructions on setting up and configuring the PostgreSQL database. (Update your oltp/.env following oltp/.env.example)

For the OLTP component, database replication should be logical, and the db user needs to have replication privileges (ALTER ROLE db_user WITH REPLICATION LOGIN;). Follow these steps to set up the OLTP component:

  1. Create tables:

    • To create/recreate the necessary tables, run the following command:
      python  db/oltp/ddl/tables_creations.py
      
  2. Alter tables with Replica Identity:

    python  db/oltp/ddl/alter_tables.py
    
  3. Populate (Fake) Records to the rembo db

    • We are generating random records (500k: customer, sales | 11 sales_territory | 5 employee) to be used in our project
    python  db/oltp/dml/create_random_records.py
    

These steps will ensure that the PostgreSQL database is ready for logical replication and data capture.

OLAP Clickhouse Database: Tables Creations and Copy Postgres Records

Instructions on setting up and configuring the Clickhouse database. (Update your olap/.env following olap/.env.example)

You can use this approache to install and run Clickhouse. Follow these steps to set up the OLAP component:

  • Create tables:

    • To create/recreate tables, run the following command:

      python  db/olap/ddl/tables_creations.py
      
    • db/olap/ddl/ directories contains:

      • Tables definitaions
      • Kafka Table Engines & Materielized Views
      • Views helping to fetch most recent records versions (This might not be the best approach since Clickhouse provides rich functionalities of organizing records in table in creation).
  • Ingest records from OTLP (Postgres)

    • Since we are initializing our Postgres DB with many records (500k: customer, sales | 11 sales_territory | 5 employee), getting all the records at once through kafka will be challenging in development environment, this script helps copying all the postgres records
    python  db/olap/dml/import_from_oltp.py
    

CDC with Debezium and Kafka

We will use Debezium to capture changes from our Postgres and publish to Kafka topics.

  • Start applications (zookeeper, kafka, debezium, kafdrop: view kafka topics, groups on the browser: http://localhost:9090/)

    cd cdc
    
    docker-compose up -d
    
  • Create Debezium connector This json.example (cdc/connectors/pg-src.json.example) is an example of the connector we will create by posting the request to debezium (you can use postman, vscode thunder client, ...) . Remember to update your dabase.info if you use cdc/connectors/pg-src.json.example

    eg: POST http://localhost:8083/connectors

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "topic.prefix": "dbserver1", "database.include.list": "inventory", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schemahistory.inventory" } }' 
    

    Debezium provides other endpoints to helping to manage the connector

  • Test Postgres to Kafka (Update records and watch kafka messages)

    • Create/Update records from in our tables

    • View/Read Kafka messages using Kafkadrop http://localhost:9090/

      Message example

      {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"sales_id"},{"type":"int32","optional":true,"field":"customer_id"},{"type":"string","optional":true,"field":"discount_amount"}},"payload":{"sales_id":2, "customer_id":2496,"discount_amount":"85","duedate":"2023-10-19","extended_amount":843,"order_quantity":96,"product_standard_cost":22,"revision_number":62,"sales_amount":7,"sales_order_line_number":"2","sales_order_number":"2","sales_territory_id":3,"shipdate":19657,"tax_amt":55,"total_product_cost":77,"unit_price":21677,"employee_id":2,"__table":"sales","__op":"u","__lsn":12912566024,"__source_ts_ms":1698765730595,"__deleted":"false"}} 
      
    • (Optional) Listen to a kafka topic from the terminal. We provided a python script (cdc/kafka/watch-topics.py) to watch a topic messages (Remember to update the .env)

      python cdc/kafka/watch-topics.py 
      

Our Clickhouse DB will be connected to Kafka from the the folling section, so at this phase changes from OLTP DB to OLAP DB won't be reflected yet

Databases Sync: Connect Clickhouse Tables to Kafka Topics

Instructions on connecting clickhouse tables to kafka topics. (Update your olap/.env following olap/.env.example)

  • Connect tables to kafka
    python  db/olap/ddl/create_kafka_connect.py  
    

From here, any Create, Update, Delete operation from postgres will be reflected to our clickhouse db.

Data Visualization: Superset

You can install Superset by referring to the vendor's official website. We have adopted the "git clone" option and employed Docker Compose for running Superset. Depending on your specific requirements, you may need to make adjustments to the provided Docker Compose and **.en

Related Skills

View on GitHub
GitHub Stars5
CategoryDevelopment
Updated1y ago
Forks0

Languages

Python

Security Score

55/100

Audited on Sep 5, 2024

No findings