Skip to content

Commit 08e8df8

Browse files
feat: Add Debezium-style Snapshot for Initial Data Capture (#9)
* feat: add snapshot updates and handle it * feat: bump go-pq-cdc * feat: bump go-pq-cdc * feat: bump go-pq-cdc * feat: bump go-pq-cdc * feat: bump go-pq-cdc * feat: bump go-pq-cdc * feat: bump go-pq-cdc * chore: fix security gates pipeline problem * feat: snapshot only mode handle gracefully * feat: bump go-pq-cdc * chore: docs added * chore: add example and docs for snapshot * chore: fix lint
1 parent 7524d20 commit 08e8df8

File tree

13 files changed

+687
-217
lines changed

13 files changed

+687
-217
lines changed

.github/workflows/build.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,4 @@ jobs:
4848
actions: read
4949
contents: read
5050
security-events: write
51+
secrets: inherit

README.md

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ Kafka connector for [go-pq-cdc](https://github.com/Trendyol/go-pq-cdc).
1010

1111
- **Optimized for Speed and Efficiency**: Minimal resource consumption and faster processing, designed to handle high-throughput data replication.
1212
- **Real-Time Data Streaming**: Streams data directly from PostgreSQL to Kafka, ensuring up-to-date synchronization across systems.
13+
- **Initial Snapshot Support**: Capture existing data before starting CDC, ensuring downstream systems receive both historical and real-time data.
1314
- **Automatic Failover**: In the event of a failure, `go-pq-cdc-kafka` can quickly recover and resume data replication.
1415
- **Concurrency**: Built with Go's concurrency model (goroutines and channels), ensuring lightweight and highly performant parallel operations.
1516

@@ -38,8 +39,63 @@ The `go-pq-cdc-kafka` ensures high availability with passive/active modes for Po
3839

3940
- **Passive Mode**: If the replication slot becomes inactive, it automatically captures the slot and resumes data streaming. Additionally, other deployments monitor the slot’s status, ensuring redundancy and failover capabilities.
4041

41-
This architecture guarantees minimal downtime and continuous data synchronization, even in the event of failure. Additionally, Gos faster cold starts provide quicker recovery times compared to Debezium, further minimizing potential downtime.
42+
This architecture guarantees minimal downtime and continuous data synchronization, even in the event of failure. Additionally, Go's faster cold starts provide quicker recovery times compared to Debezium, further minimizing potential downtime.
4243

44+
## 📸 NEW: Snapshot Feature
45+
46+
**Capture existing data before starting CDC!** The snapshot feature enables initial data synchronization, ensuring downstream systems (Kafka) receive both historical and real-time data.
47+
48+
**Key Highlights:**
49+
50+
- **Zero Data Loss**: Consistent point-in-time snapshot using PostgreSQL's `pg_export_snapshot()`
51+
- **Chunk-Based Processing**: Memory-efficient processing of large tables
52+
- **Multi-Instance Support**: Parallel processing across multiple instances for faster snapshots
53+
- **Crash Recovery**: Automatic resume from failures with chunk-level tracking
54+
- **No Duplicates**: Seamless transition from snapshot to CDC mode
55+
- **Flexible Modes**: Choose between `initial`, `never`, or `snapshot_only` based on your needs
56+
57+
### Snapshot Modes
58+
59+
| Mode | Description | Use Case |
60+
|-----------------|--------------------------------------------------------------------------------------------------|-------------------------------------------------|
61+
| `initial` | Takes snapshot only if no previous snapshot exists, then starts CDC | First-time setup with existing data |
62+
| `never` | Skips snapshot entirely, starts CDC immediately | New tables or when historical data not needed |
63+
| `snapshot_only` | Takes snapshot and exits (no CDC, no replication slot required) | One-time data migration or backfill |
64+
65+
### How It Works
66+
67+
1. **Snapshot Phase**: Captures existing data in chunks for memory efficiency
68+
2. **Consistent Point**: Uses PostgreSQL's `pg_export_snapshot()` to ensure data consistency
69+
3. **CDC Phase**: Seamlessly transitions to real-time change data capture
70+
4. **No Gaps**: Ensures all changes during snapshot are captured via CDC
71+
72+
### Identifying Snapshot vs CDC Messages
73+
74+
Your handler function can distinguish between snapshot and CDC messages:
75+
76+
```go
77+
func Handler(msg *cdc.Message) []gokafka.Message {
78+
// Check if this is a snapshot message (historical data)
79+
if msg.Type.IsSnapshot() {
80+
// Handle snapshot data
81+
return []gokafka.Message{{
82+
Headers: []gokafka.Header{
83+
{Key: "operation", Value: []byte("SNAPSHOT")},
84+
{Key: "source", Value: []byte("initial-snapshot")},
85+
},
86+
Key: key,
87+
Value: data,
88+
}}
89+
}
90+
91+
// Handle real-time CDC operations
92+
if msg.Type.IsInsert() { /* ... */ }
93+
if msg.Type.IsUpdate() { /* ... */ }
94+
if msg.Type.IsDelete() { /* ... */ }
95+
}
96+
```
97+
98+
For detailed configuration and usage, see the [snapshot example](./example/snapshot).
4399

44100
## Usage
45101

@@ -159,6 +215,7 @@ func Handler(msg *cdc.Message) []gokafka.Message {
159215
## Examples
160216

161217
* [Simple](./example/simple)
218+
* [Snapshot](./example/snapshot)
162219

163220
## Configuration
164221

@@ -182,6 +239,13 @@ func Handler(msg *cdc.Message) []gokafka.Message {
182239
| `cdc.slot.createIfNotExists` | bool | no | - | Create replication slot if not exists. Otherwise, return `replication slot is not exists` error. | |
183240
| `cdc.slot.name` | string | yes | - | Set the logical replication slot name | Should be unique and descriptive. |
184241
| `cdc.slot.slotActivityCheckerInterval` | int | yes | 1000 | Set the slot activity check interval time in milliseconds | Specify as an integer value in milliseconds (e.g., `1000` for 1 second). |
242+
| `cdc.snapshot.enabled` | bool | no | false | Enable initial snapshot feature | When enabled, captures existing data before starting CDC. |
243+
| `cdc.snapshot.mode` | string | no | never | Snapshot mode: `initial`, `never`, or `snapshot_only` | **initial:** Take snapshot only if no previous snapshot exists, then start CDC. <br> **never:** Skip snapshot, start CDC immediately. <br> **snapshot_only:** Take snapshot and exit (no CDC). |
244+
| `cdc.snapshot.chunkSize` | int64 | no | 8000 | Number of rows per chunk during snapshot | Adjust based on table size. Larger chunks = fewer chunks but more memory per chunk. |
245+
| `cdc.snapshot.claimTimeout` | time.Duration | no | 30s | Timeout to reclaim stale chunks | If a worker doesn't send heartbeat for this duration, chunk is reclaimed by another worker. |
246+
| `cdc.snapshot.heartbeatInterval` | time.Duration | no | 5s | Interval for worker heartbeat updates | Workers send heartbeat every N seconds to indicate they're processing a chunk. |
247+
| `cdc.snapshot.instanceId` | string | no | auto | Custom instance identifier (optional) | Auto-generated as hostname-pid if not specified. Useful for tracking workers in multi-instance scenarios. |
248+
| `cdc.snapshot.tables` | []Table | no* | - | Tables to snapshot (required for `snapshot_only` mode, optional for `initial` mode) | **snapshot_only:** Must be specified here (independent from publication). <br> **initial:** If specified, must be a subset of publication tables. If not specified, all publication tables are snapshotted. |
185249
| `kafka.tableTopicMapping` | map[string]string | yes | - | Mapping of PostgreSQL table events to Kafka topics | Maps table names to Kafka topics. |
186250
| `kafka.brokers` | []string | yes | - | Broker IP and port information | |
187251
| `kafka.producerBatchSize` | integer | no | 2000 | Maximum message count for batch, if exceeded, flush will be triggered. | |
@@ -223,6 +287,17 @@ the `/metrics` endpoint.
223287
| go_pq_cdc_kafka_write_total | The total number of successful in write operation to kafka. | slot_name, host, topic_name | Counter |
224288
| go_pq_cdc_kafka_err_total | The total number of unsuccessful in write operation to kafka. | slot_name, host, topic_name | Counter |
225289

290+
### Snapshot Metrics
291+
292+
| Metric Name | Description | Labels | Value Type |
293+
|--------------------------------------------------------------|-------------------------------------------------------------------------------------------------------|-----------------------------|--------------|
294+
| go_pq_cdc_snapshot_in_progress | Indicates whether snapshot is currently in progress (1 for active, 0 for inactive). | slot_name, host | Gauge |
295+
| go_pq_cdc_snapshot_total_tables | Total number of tables to snapshot. | slot_name, host | Gauge |
296+
| go_pq_cdc_snapshot_total_chunks | Total number of chunks to process across all tables. | slot_name, host | Gauge |
297+
| go_pq_cdc_snapshot_completed_chunks | Number of chunks completed in snapshot. | slot_name, host | Gauge |
298+
| go_pq_cdc_snapshot_total_rows | Total number of rows read during snapshot. | slot_name, host | Counter |
299+
| go_pq_cdc_snapshot_duration_seconds | Duration of the last snapshot operation in seconds. | slot_name, host | Gauge |
300+
226301
You can also use all cdc related metrics explained [here](https://github.com/Trendyol/go-pq-cdc#exposed-metrics).
227302
All cdc related metrics are automatically injected. It means you don't need to do anything.
228303

connector.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,22 @@ func NewConnector(ctx context.Context, config config.Connector, handler Handler,
7878
}
7979

8080
func (c *connector) Start(ctx context.Context) {
81+
// Snapshot-only mode: different flow since upstream CDC exits immediately
82+
if c.cfg.CDC.IsSnapshotOnlyMode() {
83+
logger.Info("starting snapshot-only mode")
84+
logger.Info("bulk process started")
85+
c.producer.StartBatch()
86+
87+
// Signal ready immediately since there's no CDC to wait for
88+
c.readyCh <- struct{}{}
89+
90+
// Start CDC synchronously - it will execute snapshot and return
91+
c.cdc.Start(ctx)
92+
logger.Info("snapshot-only mode completed")
93+
return
94+
}
95+
96+
// Normal CDC mode: async flow
8197
go func() {
8298
logger.Info("waiting for connector start...")
8399
if err := c.cdc.WaitUntilReady(ctx); err != nil {
@@ -119,6 +135,8 @@ func (c *connector) listener(ctx *replication.ListenerContext) {
119135
msg = NewUpdateMessage(m)
120136
case *format.Delete:
121137
msg = NewDeleteMessage(m)
138+
case *format.Snapshot:
139+
msg = NewSnapshotMessage(m)
122140
default:
123141
return
124142
}

example/simple/docker-compose.yml

Lines changed: 12 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,17 @@ services:
99
POSTGRES_PASSWORD: "cdc_pass"
1010
POSTGRES_DB: "cdc_db"
1111
POSTGRES_HOST_AUTH_METHOD: trust
12-
network_mode: "host"
12+
volumes:
13+
- postgres_data:/var/lib/postgresql/data
14+
ports:
15+
- "5432:5432"
16+
networks:
17+
- cdc_net
18+
healthcheck:
19+
test: ["CMD-SHELL", "pg_isready -U cdc_user -d cdc_db"]
20+
interval: 10s
21+
timeout: 5s
22+
retries: 5
1323

1424
##### KAFKA ######
1525
redpanda:
@@ -57,57 +67,17 @@ services:
5767
adminApi:
5868
enabled: true
5969
urls: ["http://redpanda:9644"]
60-
connect:
61-
enabled: true
62-
clusters:
63-
- name: local-connect-cluster
64-
url: http://connect:8083
6570
ports:
6671
- 8085:8080
6772
networks:
6873
- cdc_net
6974
depends_on:
7075
- redpanda
7176

72-
connect:
73-
image: docker.redpanda.com/redpandadata/connectors:latest
74-
hostname: connect
75-
container_name: connect
76-
networks:
77-
- cdc_net
78-
# platform: 'linux/amd64'
79-
depends_on:
80-
- redpanda
81-
ports:
82-
- "8083:8083"
83-
environment:
84-
CONNECT_CONFIGURATION: |
85-
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
86-
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
87-
group.id=connectors-cluster
88-
offset.storage.topic=_internal_connectors_offsets
89-
config.storage.topic=_internal_connectors_configs
90-
status.storage.topic=_internal_connectors_status
91-
config.storage.replication.factor=-1
92-
offset.storage.replication.factor=-1
93-
status.storage.replication.factor=-1
94-
offset.flush.interval.ms=1000
95-
producer.linger.ms=50
96-
producer.batch.size=131072
97-
CONNECT_BOOTSTRAP_SERVERS: redpanda:9092
98-
CONNECT_GC_LOG_ENABLED: "false"
99-
CONNECT_HEAP_OPTS: -Xms512M -Xmx512M
100-
CONNECT_LOG_LEVEL: info
101-
10277
volumes:
103-
couchbase_data: null
10478
postgres_data: null
105-
zookeeper_data: null
106-
zookeeper_log: null
107-
kafka_data: null
10879
redpanda_data: null
109-
kibana1_data: null
110-
es01_data: null
80+
11181
networks:
11282
cdc_net:
11383
driver: bridge

example/simple/go.mod

Lines changed: 0 additions & 38 deletions
This file was deleted.

0 commit comments

Comments
 (0)