SkillAgentSearch skills...

Pgsync

Postgres to Elasticsearch/OpenSearch sync

Install / Use

/learn @toluaina/Pgsync

README

<div align="center">

PGSync

Real-time PostgreSQL to Elasticsearch/OpenSearch sync

Keep your relational database as the source of truth while powering lightning-fast search

PyPI version Python versions Build status codecov Downloads License

Docker Code style: black

Website · Documentation · Examples · Report Bug

</div>

What is PGSync?

PGSync is a change data capture tool that syncs data from PostgreSQL, MySQL, or MariaDB to Elasticsearch or OpenSearch in real-time. Define your document structure in JSON, and PGSync handles the rest — no custom code required.

flowchart LR
    subgraph Source["🗄️ Source Database"]
        DB[(PostgreSQL<br/>MySQL<br/>MariaDB)]
    end

    subgraph CDC["⚡ Change Data Capture"]
        P[PGSync]
    end

    subgraph Search["🔍 Search Engine"]
        ES[(Elasticsearch<br/>OpenSearch)]
    end

    DB -->|WAL / Binlog| P
    P -->|Bulk Index| ES

Key Features

| Feature | Description | |---------|-------------| | Real-time sync | Changes propagate instantly via logical replication | | Zero code | Define mappings in JSON — no ETL pipelines to build | | Nested documents | Automatically denormalize complex relationships | | Fault tolerant | Resumes from checkpoints after crashes | | Transactionally consistent | Documents appear in commit order | | Minimal overhead | Lightweight CDC with negligible database impact |


Quick Start

Using Docker (Fastest)

docker run --rm -it \
  -e PG_URL=postgres://user:pass@host/db \
  -e ELASTICSEARCH_URL=http://localhost:9200 \
  -e REDIS_HOST=localhost \
  -v "$(pwd)/schema.json:/app/schema.json" \
  toluaina1/pgsync:latest -c schema.json -d -b

Using pip

pip install pgsync
# Bootstrap (one-time setup)
bootstrap --config schema.json

# Run sync
pgsync --config schema.json -d

Using Docker Compose

Default (Elasticsearch + Kibana):

git clone https://github.com/toluaina/pgsync
cd pgsync
docker-compose up

This starts PostgreSQL, Redis, Elasticsearch, Kibana, and PGSync configured for Elasticsearch.

For OpenSearch:

docker-compose --profile opensearch up

This starts PostgreSQL, Redis, OpenSearch, and PGSync configured for OpenSearch.

Ports:

  • PostgreSQL: 15432
  • Elasticsearch: 9201 (default)
  • Kibana: 5601 (default)
  • OpenSearch: 9400 (OpenSearch profile)

How It Works

1. Define your schema — Map tables to document structure:

{
  "table": "book",
  "columns": ["isbn", "title", "description"],
  "children": [{
    "table": "author",
    "columns": ["name"]
  }]
}

2. PGSync generates optimized queries — Complex JOINs handled automatically:

SELECT JSON_BUILD_OBJECT(
  'isbn', book.isbn,
  'title', book.title,
  'authors', (SELECT JSON_AGG(author.name) FROM author ...)
) FROM book

3. Get denormalized documents — Ready for search:

{
  "isbn": "9785811243570",
  "title": "Charlie and the Chocolate Factory",
  "authors": ["Roald Dahl"]
}

Changes to any related table automatically update the document in Elasticsearch/OpenSearch.


Requirements

| Component | Version | |-----------|---------| | Python | 3.10+ | | PostgreSQL | 9.6+ (or MySQL 5.7.22+ / MariaDB 10.5+) | | Elasticsearch | 6.3.1+ (or OpenSearch 1.3.7+) | | Redis | 3.1+ (or Valkey 7.2+) — optional in WAL mode |


Database Setup

<details> <summary><b>PostgreSQL</b></summary>

Enable logical decoding in postgresql.conf:

wal_level = logical
max_replication_slots = 1

Optionally limit WAL size:

max_slot_wal_keep_size = 100GB
</details> <details> <summary><b>MySQL / MariaDB</b></summary>

Enable binary logging in my.cnf:

server-id = 1
log_bin = mysql-bin
binlog_row_image = FULL
binlog_expire_logs_seconds = 604800

Create replication user:

CREATE USER 'replicator'@'%' IDENTIFIED WITH mysql_native_password BY 'password';
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator'@'%';
FLUSH PRIVILEGES;
</details>

Example

Consider a book library with related authors:

| Book | | | | |----------|--|--|--| | isbn (PK) | title | description | | 9785811243570 | Charlie and the Chocolate Factory | Willy Wonka's famous... | | 9781471331435 | 1984 | George Orwell's chilling... |

| Author | | |------------|--| | id (PK) | name | | 1 | Roald Dahl | | 4 | George Orwell |

PGSync transforms this into search-ready documents:

[
  {
    "isbn": "9785811243570",
    "title": "Charlie and the Chocolate Factory",
    "authors": ["Roald Dahl"]
  },
  {
    "isbn": "9781471331435",
    "title": "1984",
    "authors": ["George Orwell"]
  }
]

Any change — updating an author's name, adding a new book, deleting a relationship — is automatically synced.


Transforms

PGSync supports built-in transforms to modify field values before indexing. Transforms are applied in order: replacerenameconcat.

Replace

Find and replace substrings within field values:

{
  "table": "product",
  "columns": ["code", "name"],
  "transform": {
    "replace": {
      "code": {
        "-": "/",
        "_": " "
      }
    }
  }
}

| Before | After | |--------|-------| | ABC-DEF_GHI | ABC/DEF GHI |

Rename

Rename fields in the output document:

{
  "table": "book",
  "columns": ["id", "title"],
  "transform": {
    "rename": {
      "id": "book_id",
      "title": "book_title"
    }
  }
}

Concat

Combine multiple fields into a new field:

{
  "table": "user",
  "columns": ["first_name", "last_name"],
  "transform": {
    "concat": {
      "columns": ["first_name", "last_name"],
      "destination": "full_name",
      "delimiter": " "
    }
  }
}

Combined Example

Transforms can be combined and applied to nested children:

{
  "table": "book",
  "columns": ["isbn", "title"],
  "children": [{
    "table": "publisher",
    "columns": ["code", "name"],
    "transform": {
      "replace": { "code": { "-": "." } },
      "rename": { "name": "publisher_name" }
    }
  }],
  "transform": {
    "concat": {
      "columns": ["isbn", "title"],
      "destination": "search_text",
      "delimiter": " - "
    }
  }
}

Why PGSync?

| Challenge | PGSync Solution | |-----------|-----------------| | Dual writes are error-prone | Captures changes from WAL — single source of truth | | Complex JOIN queries | Auto-generates optimized SQL from your schema | | Nested document updates | Detects changes in any related table | | Data consistency | Transactionally consistent, ordered delivery | | Crash recovery | Checkpoint-based resumption |


Environment Variables

Full list at pgsync.com/env-vars

| Variable | Description | |----------|-------------| | PG_URL | PostgreSQL connection string | | ELASTICSEARCH_URL | Elasticsearch/OpenSearch URL | | REDIS_HOST | Redis/Valkey host | | REDIS_CHECKPOINT | Use Redis for checkpoints (recommended for production) |


One-Click Deploy

Deploy to DigitalOcean


Sponsors

<a href="https://www.digitalocean.com/?utm_medium=opensource&utm_source=pgsync"> <img src="https://opensource.nyc3.cdn.digitaloceanspaces.com/attribution/assets/SVG/DO_Logo_horizontal_blue.svg" alt="DigitalOcean" width="200"> </a>

Contributing

Contributions welcome! See CONTRIBUTING.rst for guidelines.

License

MIT — use it freely in your projects.

Related Skills

View on GitHub
GitHub Stars1.4k
CategoryData
Updated10d ago
Forks210

Languages

Python

Security Score

100/100

Audited on Mar 20, 2026

No findings