Skip to content
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
ba40177
feat: add snapshot feature like debezium for initial data
Abdulsametileri Oct 16, 2025
ae05583
chore: lint
Abdulsametileri Oct 16, 2025
a73b25e
feat: multiple instance and table snapshot (#38)
Abdulsametileri Oct 18, 2025
f7c2dca
feat: snapshot lsn cdc integration
Abdulsametileri Oct 18, 2025
2dddde7
feat: add retry to the all snapshot db operation
Abdulsametileri Oct 19, 2025
609c570
feat: add example test plan etc
Abdulsametileri Oct 19, 2025
ac859b9
chore: wip
Abdulsametileri Oct 19, 2025
5621b4c
chore: wip
Abdulsametileri Oct 19, 2025
0b240a7
feat: first test successful
Abdulsametileri Oct 19, 2025
c2916fe
feat: log update
Abdulsametileri Oct 19, 2025
3bd9ef8
feat: some testing updates
Abdulsametileri Oct 20, 2025
0b344b8
feat: fix snapshot conn close issue and advisory lock release issue w…
Abdulsametileri Oct 20, 2025
3c81052
feat: worker transaction wrap and retry
Abdulsametileri Oct 21, 2025
acd2998
feat: snapshot divide (prepare, execute), lsn slot cdc avoid data loss
Abdulsametileri Oct 21, 2025
d7557b1
feat: refactor ^^
Abdulsametileri Oct 21, 2025
19bf74a
feat: refactor worker side, fix basic bug
Abdulsametileri Oct 21, 2025
870ab08
feat: waitForCoordinator refactor, remove workerConn
Abdulsametileri Oct 21, 2025
2687223
feat: review notes
Abdulsametileri Oct 22, 2025
4e275a4
feat: add 2 integration test and documentation, fix estimate count an…
Abdulsametileri Oct 22, 2025
2fd9bdb
feat: add 4 integration test and increase wal sender, max slot
Abdulsametileri Oct 22, 2025
640ad22
feat: refactor snapshot tests
Abdulsametileri Oct 22, 2025
8c7b0a5
feat: linter error
Abdulsametileri Oct 22, 2025
9704c90
feat: refactor isTransient error method
Abdulsametileri Oct 23, 2025
f7f6c1b
chore: fix import
Abdulsametileri Oct 23, 2025
c032551
chore: fix import
Abdulsametileri Oct 23, 2025
30642ec
refactor: time format
Abdulsametileri Oct 23, 2025
8908ca4
refactor: disable timeout when opening snapshot conn for exporting
Abdulsametileri Oct 23, 2025
790dac8
refactor: add keep alive select 1 for snapshot conn
Abdulsametileri Oct 23, 2025
bcfc865
feat: change create table if not exist logic for permission problems
Abdulsametileri Oct 30, 2025
f480320
chore: no lint fun len
Abdulsametileri Oct 30, 2025
8630a26
feat: publication table exist or not support in snapshot
Abdulsametileri Nov 4, 2025
c754171
feat: extract replication and normal connections because of preventin…
Abdulsametileri Nov 4, 2025
0ff70a8
feat: graceful shutdown snapshot conn
Abdulsametileri Nov 5, 2025
60a69b3
feat: coordinator failure worker suspend fix
Abdulsametileri Nov 5, 2025
317463a
Merge remote-tracking branch 'origin/main' into feature/snapshot
Abdulsametileri Nov 13, 2025
0d80a4f
chore: remove unused vars
Abdulsametileri Nov 13, 2025
e482767
Merge remote-tracking branch 'origin/main' into feature/snapshot
Abdulsametileri Nov 13, 2025
9e2a37d
chore: fix lint
Abdulsametileri Nov 13, 2025
9f3cb3b
chore: fix lint
Abdulsametileri Nov 13, 2025
11c7f8d
chore: funlen change
Abdulsametileri Nov 13, 2025
c5f0f8b
feat: add snapshot only feature
Abdulsametileri Nov 13, 2025
5d1567d
chore: docs ref
Abdulsametileri Nov 16, 2025
6632453
feat: introduce snapshot tables field
Abdulsametileri Nov 16, 2025
05f4e2c
chore: fix lint
Abdulsametileri Nov 16, 2025
8d3a0f3
chore: benchmark files of snapshot mode
3n0ugh Nov 17, 2025
285460c
chore: benchmark initial updated, grafana etc.
Abdulsametileri Nov 17, 2025
8940d79
refactor: performance improvements
Abdulsametileri Nov 19, 2025
28cb5a0
chore: log
Abdulsametileri Nov 19, 2025
6148d23
chore: benchmark initial multistage
Abdulsametileri Nov 19, 2025
a9c2e99
chore: mem upgrade for debezium
Abdulsametileri Nov 19, 2025
8e749c0
add pk cache and limit offset integration etc
Abdulsametileri Nov 20, 2025
0f2a491
chore: fix lint
Abdulsametileri Nov 20, 2025
c63cb27
chore: benchmark added
Abdulsametileri Nov 23, 2025
b72390d
chore: docs
Abdulsametileri Nov 24, 2025
8b8aa51
chore: docs
Abdulsametileri Nov 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
linters-settings:
funlen:
lines: 70
lines: 85

linters:
disable-all: true
Expand Down
51 changes: 42 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,35 @@ ensuring low resource consumption and high performance.

[Debezium vs go-pq-cdc benchmark](./benchmark)

## 📸 NEW: Snapshot Feature

**Capture existing data before starting CDC!** The new snapshot feature enables initial data synchronization, ensuring downstream systems receive both historical and real-time data.

✨ **Key Highlights:**
- **Zero Data Loss**: Consistent point-in-time snapshot using PostgreSQL's `pg_export_snapshot()`
- **Chunk-Based Processing**: Memory-efficient processing of large tables
- **Multi-Instance Support**: Parallel processing across multiple instances
- **Crash Recovery**: Automatic resume from failures
- **No Duplicates**: Seamless transition from snapshot to CDC
- **Snapshot Only Mode**: One-time data export without CDC (no replication slot required)

📚 **[Read Full Documentation](docs/SNAPSHOT_FEATURE.md)** for detailed architecture, configuration, and best practices.

### Contents

* [Why?](#why)
* [Usage](#usage)
* [Examples](#examples)
* [Availability](#availability)
* [Configuration](#configuration)
* [API](#api)
* [Exposed Metrics](#exposed-metrics)
* [Compatibility](#compatibility)
* [Breaking Changes](#breaking-changes)
- [go-pq-cdc ](#go-pq-cdc---)
- [📸 NEW: Snapshot Feature](#-new-snapshot-feature)
- [Contents](#contents)
- [Why?](#why)
- [Usage](#usage)
- [Examples](#examples)
- [Availability](#availability)
- [Configuration](#configuration)
- [API](#api)
- [Exposed Metrics](#exposed-metrics)
- [Grafana Dashboard](#grafana-dashboard)
- [Compatibility](#compatibility)
- [Breaking Changes](#breaking-changes)

### Why?

Expand Down Expand Up @@ -114,6 +132,8 @@ func Handler(ctx *replication.ListenerContext) {

* [Simple](./example/simple)
* [Simple File Config](./example/simple-file-config)
* [Snapshot Mode (Initial Data Capture)](./example/snapshotmode)
* [Snapshot Only Mode (One-Time Export)](./example/snapshotonlymode)
* [PostgreSQL to Elasticsearch](https://github.com/Trendyol/go-pq-cdc-elasticsearch/tree/main/example/simple)
* [PostgreSQL to Kafka](https://github.com/Trendyol/go-pq-cdc-kafka/tree/main/example/simple)
* [PostgreSQL to PostgreSQL](./example/postgresql)
Expand Down Expand Up @@ -154,6 +174,13 @@ This setup ensures continuous data synchronization and minimal downtime in captu
| `slot.createIfNotExists` | bool | no | - | Create replication slot if not exists. Otherwise, return `replication slot is not exists` error. | |
| `slot.name` | string | yes | - | Set the logical replication slot name | Should be unique and descriptive. |
| `slot.slotActivityCheckerInterval` | int | no | 1000 | Set the slot activity check interval time in milliseconds | Specify as an integer value in milliseconds (e.g., `1000` for 1 second). |
| `snapshot.enabled` | bool | no | false | Enable initial snapshot feature | When enabled, captures existing data before starting CDC. |
| `snapshot.mode` | string | no | never | Snapshot mode: `initial`, `never`, or `snapshot_only` | **initial:** Take snapshot only if no previous snapshot exists, then start CDC. <br> **never:** Skip snapshot, start CDC immediately. <br> **snapshot_only:** Take snapshot and exit (no CDC, no replication slot required). |
| `snapshot.chunkSize` | int64 | no | 8000 | Number of rows per chunk during snapshot | Adjust based on table size. Larger chunks = fewer chunks but more memory per chunk. |
| `snapshot.claimTimeout` | duration | no | 30s | Timeout to reclaim stale chunks | If a worker doesn't send heartbeat for this duration, chunk is reclaimed by another worker. |
| `snapshot.heartbeatInterval` | duration | no | 5s | Interval for worker heartbeat updates | Workers send heartbeat every N seconds to indicate they're processing a chunk. |
| `snapshot.instanceId` | string | no | auto | Custom instance identifier (optional) | Auto-generated as `hostname-pid` if not specified. Useful for tracking workers. |
| `snapshot.tables` | []Table | no* | - | Tables to snapshot (required for `snapshot_only` mode, optional for `initial` mode) | **snapshot_only:** Must be specified here (independent from publication). <br> **initial:** If specified, must be a subset of publication tables. If not specified, all publication tables are snapshotted. |
| `extensionSupport.enableTimescaleDB` | bool | no | false | Enable support for TimescaleDB hypertables. Ensures proper handling of compressed chunks during replication. | |

### API
Expand Down Expand Up @@ -181,6 +208,12 @@ the `/metrics` endpoint.
| go_pq_cdc_replication_slot_slot_is_active | Indicates whether the PostgreSQL replication slot is currently active (1 for active, 0 for inactive). | slot_name, host| Gauge |
| go_pq_cdc_replication_slot_slot_lag | The replication lag measured by the difference between the current LSN and the confirmed flush LSN. | slot_name, host| Gauge |
| go_pq_cdc_replication_slot_slot_retained_wal_size | The size of Write-Ahead Logging (WAL) files retained for the replication slot in bytes. | slot_name, host| Gauge |
| go_pq_cdc_snapshot_in_progress | Indicates whether snapshot is currently in progress (1 for active, 0 for inactive). | slot_name, host| Gauge |
| go_pq_cdc_snapshot_total_tables | Total number of tables to snapshot. | slot_name, host| Gauge |
| go_pq_cdc_snapshot_total_chunks | Total number of chunks to process across all tables. | slot_name, host| Gauge |
| go_pq_cdc_snapshot_completed_chunks | Number of chunks completed in snapshot. | slot_name, host| Gauge |
| go_pq_cdc_snapshot_total_rows | Total number of rows read during snapshot. | slot_name, host| Counter |
| go_pq_cdc_snapshot_duration_seconds | Duration of the last snapshot operation in seconds. | slot_name, host| Gauge |
| runtime metrics | [Prometheus Collector](https://golang.bg/src/runtime/metrics/description.go) | N/A | N/A |

### Grafana Dashboard
Expand Down
File renamed without changes
File renamed without changes.
File renamed without changes
File renamed without changes.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/Trendyol/go-pq-cdc/benchmark/go-pq-cdc-kafka

go 1.22.4

replace github.com/Trendyol/go-pq-cdc => ../../
replace github.com/Trendyol/go-pq-cdc => ../../../

require (
github.com/Trendyol/go-pq-cdc v0.0.5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func main() {
os.Exit(1)
}

defer connector.Close()
connector.Start(ctx)
}

Expand Down
File renamed without changes.
File renamed without changes.
Binary file added benchmark/benchmark_initial/10m_test.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
78 changes: 78 additions & 0 deletions benchmark/benchmark_initial/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
## 10 M Insert Test

### Hardware
```txt
PC: Macbook Apple M1 Pro (2021)
Memory: 32 GB

go-pq-cdc:
resources:
limits:
cpus: 1
memory: 512M
reservations:
cpus: '0.25'
memory: 128M

Debezium:
resources:
limits:
cpus: 2
memory: 1024M
reservations:
cpus: '0.25'
memory: 128M
```

### Result
| | go-pq-cdc | Debezium |
|----------------------|------------|--------------|
| Row Count | 10 m | 10 m |
| Elapsed Time | 2.5 min | 21 min |
| Cpu Usage Max | 44% | 181% |
| Memory Usage Max | 130 MB | 1.07 GB |
| Received Traffic Max | 4.36 MiB/s | 7.97 MiB/s |
| Sent Traffic Max | 5.96 MiB/s | 6.27 MiB/s |

![10m_result](./10m_test.png)


## Requirements
- [Docker](https://docs.docker.com/compose/install/)
- [psql](https://www.postgresql.org/download/)

## Instructions

- Start the containers
```sh
docker compose up -d
```
- Connect to Postgres database:
```sh
psql postgres://cdc_user:[email protected]:5432/cdc_db
```
- Insert data to users table:
```sql
INSERT INTO users (name)
SELECT
'Oyleli' || i
FROM generate_series(1, 1000000) AS i;
```
- Go to grafana dashboard: http://localhost:3000/d/edl1ybvsmc64gb/benchmark?orgId=1
> **Grafana Credentials**
Username: `go-pq-cdc-user` Password: `go-pq-cdc-pass`
- Trace the process
![benchmark_dashboard](./dashboard.png)

## Ports

- RedPanda Console: `8085` http://localhost:8085
- RedPanda: `19092` http://localhost:19092
- Grafana: `3000` http://localhost:3000
- Prometheus: `9090` http://localhost:9090
- cAdvisor: `8080` http://localhost:8080
- PostgreSQL:`5432` http://localhost:5432
- PostgreSQL Metric Exporter: `9187` http://localhost:9187
- Debezium: `9093` http://localhost:9093
- go-pq-cdc Metric: `2112` http://localhost:2112

Binary file added benchmark/benchmark_initial/dashboard.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Loading