Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
ba40177
feat: add snapshot feature like debezium for initial data
Abdulsametileri Oct 16, 2025
ae05583
chore: lint
Abdulsametileri Oct 16, 2025
a73b25e
feat: multiple instance and table snapshot (#38)
Abdulsametileri Oct 18, 2025
f7c2dca
feat: snapshot lsn cdc integration
Abdulsametileri Oct 18, 2025
2dddde7
feat: add retry to the all snapshot db operation
Abdulsametileri Oct 19, 2025
609c570
feat: add example test plan etc
Abdulsametileri Oct 19, 2025
ac859b9
chore: wip
Abdulsametileri Oct 19, 2025
5621b4c
chore: wip
Abdulsametileri Oct 19, 2025
0b240a7
feat: first test successful
Abdulsametileri Oct 19, 2025
c2916fe
feat: log update
Abdulsametileri Oct 19, 2025
3bd9ef8
feat: some testing updates
Abdulsametileri Oct 20, 2025
0b344b8
feat: fix snapshot conn close issue and advisory lock release issue w…
Abdulsametileri Oct 20, 2025
3c81052
feat: worker transaction wrap and retry
Abdulsametileri Oct 21, 2025
acd2998
feat: snapshot divide (prepare, execute), lsn slot cdc avoid data loss
Abdulsametileri Oct 21, 2025
d7557b1
feat: refactor ^^
Abdulsametileri Oct 21, 2025
19bf74a
feat: refactor worker side, fix basic bug
Abdulsametileri Oct 21, 2025
870ab08
feat: waitForCoordinator refactor, remove workerConn
Abdulsametileri Oct 21, 2025
2687223
feat: review notes
Abdulsametileri Oct 22, 2025
4e275a4
feat: add 2 integration test and documentation, fix estimate count an…
Abdulsametileri Oct 22, 2025
2fd9bdb
feat: add 4 integration test and increase wal sender, max slot
Abdulsametileri Oct 22, 2025
640ad22
feat: refactor snapshot tests
Abdulsametileri Oct 22, 2025
8c7b0a5
feat: linter error
Abdulsametileri Oct 22, 2025
9704c90
feat: refactor isTransient error method
Abdulsametileri Oct 23, 2025
f7f6c1b
chore: fix import
Abdulsametileri Oct 23, 2025
c032551
chore: fix import
Abdulsametileri Oct 23, 2025
30642ec
refactor: time format
Abdulsametileri Oct 23, 2025
8908ca4
refactor: disable timeout when opening snapshot conn for exporting
Abdulsametileri Oct 23, 2025
790dac8
refactor: add keep alive select 1 for snapshot conn
Abdulsametileri Oct 23, 2025
bcfc865
feat: change create table if not exist logic for permission problems
Abdulsametileri Oct 30, 2025
f480320
chore: no lint fun len
Abdulsametileri Oct 30, 2025
8630a26
feat: publication table exist or not support in snapshot
Abdulsametileri Nov 4, 2025
c754171
feat: extract replication and normal connections because of preventin…
Abdulsametileri Nov 4, 2025
0ff70a8
feat: graceful shutdown snapshot conn
Abdulsametileri Nov 5, 2025
60a69b3
feat: coordinator failure worker suspend fix
Abdulsametileri Nov 5, 2025
317463a
Merge remote-tracking branch 'origin/main' into feature/snapshot
Abdulsametileri Nov 13, 2025
0d80a4f
chore: remove unused vars
Abdulsametileri Nov 13, 2025
e482767
Merge remote-tracking branch 'origin/main' into feature/snapshot
Abdulsametileri Nov 13, 2025
9e2a37d
chore: fix lint
Abdulsametileri Nov 13, 2025
9f3cb3b
chore: fix lint
Abdulsametileri Nov 13, 2025
11c7f8d
chore: funlen change
Abdulsametileri Nov 13, 2025
c5f0f8b
feat: add snapshot only feature
Abdulsametileri Nov 13, 2025
5d1567d
chore: docs ref
Abdulsametileri Nov 16, 2025
6632453
feat: introduce snapshot tables field
Abdulsametileri Nov 16, 2025
05f4e2c
chore: fix lint
Abdulsametileri Nov 16, 2025
8d3a0f3
chore: benchmark files of snapshot mode
3n0ugh Nov 17, 2025
285460c
chore: benchmark initial updated, grafana etc.
Abdulsametileri Nov 17, 2025
8940d79
refactor: performance improvements
Abdulsametileri Nov 19, 2025
28cb5a0
chore: log
Abdulsametileri Nov 19, 2025
6148d23
chore: benchmark initial multistage
Abdulsametileri Nov 19, 2025
a9c2e99
chore: mem upgrade for debezium
Abdulsametileri Nov 19, 2025
8e749c0
add pk cache and limit offset integration etc
Abdulsametileri Nov 20, 2025
0f2a491
chore: fix lint
Abdulsametileri Nov 20, 2025
c63cb27
chore: benchmark added
Abdulsametileri Nov 23, 2025
b72390d
chore: docs
Abdulsametileri Nov 24, 2025
8b8aa51
chore: docs
Abdulsametileri Nov 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func Handler(ctx *replication.ListenerContext) {

* [Simple](./example/simple)
* [Simple File Config](./example/simple-file-config)
* [Snapshot (Initial Data Capture)](./example/snapshot) - 📸 **NEW!**
* [PostgreSQL to Elasticsearch](https://github.com/Trendyol/go-pq-cdc-elasticsearch/tree/main/example/simple)
* [PostgreSQL to Kafka](https://github.com/Trendyol/go-pq-cdc-kafka/tree/main/example/simple)
* [PostgreSQL to PostgreSQL](./example/postgresql)
Expand Down Expand Up @@ -154,6 +155,13 @@ This setup ensures continuous data synchronization and minimal downtime in captu
| `slot.createIfNotExists` | bool | no | - | Create replication slot if not exists. Otherwise, return `replication slot is not exists` error. | |
| `slot.name` | string | yes | - | Set the logical replication slot name | Should be unique and descriptive. |
| `slot.slotActivityCheckerInterval` | int | no | 1000 | Set the slot activity check interval time in milliseconds | Specify as an integer value in milliseconds (e.g., `1000` for 1 second). |
| `snapshot.enabled` | bool | no | false | Enable initial snapshot feature | When enabled, captures existing data before starting CDC. |
| `snapshot.mode` | string | no | never | Snapshot mode: `initial` or `never` | **initial:** Take snapshot only if no previous snapshot exists. <br> **never:** Skip snapshot. |
| `snapshot.batchSize` | int | no | 10000 | Number of rows to read per batch during snapshot | Adjust based on table size and memory. Larger batches = faster but more memory. |
| `snapshot.checkpointInterval` | int | no | 10 | Save snapshot state every N batches | Lower values = more frequent state saves = better recovery but slower. |
| `snapshot.maxRetries` | int | no | 3 | Maximum retry attempts on snapshot failure | Number of times to retry before giving up. |
| `snapshot.retryDelay` | duration | no | 5s | Delay between retry attempts | Time to wait before retrying (e.g., `5s`, `1m`). |
| `snapshot.timeout` | duration | no | 30m | Overall snapshot timeout | Maximum time allowed for entire snapshot operation. |

### API

Expand All @@ -180,6 +188,11 @@ the `/metrics` endpoint.
| go_pq_cdc_replication_slot_slot_is_active | Indicates whether the PostgreSQL replication slot is currently active (1 for active, 0 for inactive). | slot_name, host| Gauge |
| go_pq_cdc_replication_slot_slot_lag | The replication lag measured by the difference between the current LSN and the confirmed flush LSN. | slot_name, host| Gauge |
| go_pq_cdc_replication_slot_slot_retained_wal_size | The size of Write-Ahead Logging (WAL) files retained for the replication slot in bytes. | slot_name, host| Gauge |
| go_pq_cdc_snapshot_in_progress | Indicates whether snapshot is currently in progress (1 for active, 0 for inactive). | slot_name, host| Gauge |
| go_pq_cdc_snapshot_total_tables | Total number of tables to snapshot. | slot_name, host| Gauge |
| go_pq_cdc_snapshot_completed_tables | Number of tables completed in snapshot. | slot_name, host| Gauge |
| go_pq_cdc_snapshot_total_rows | Total number of rows read during snapshot. | slot_name, host| Counter |
| go_pq_cdc_snapshot_duration_seconds | Duration of the last snapshot operation in seconds. | slot_name, host| Gauge |
| runtime metrics | [Prometheus Collector](https://golang.bg/src/runtime/metrics/description.go) | N/A | N/A |

### Grafana Dashboard
Expand Down
73 changes: 73 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log/slog"
"strings"
"time"

"github.com/Trendyol/go-pq-cdc/logger"
"github.com/Trendyol/go-pq-cdc/pq/publication"
Expand All @@ -20,6 +21,7 @@ type Config struct {
Database string `json:"database" yaml:"database"`
Publication publication.Config `json:"publication" yaml:"publication"`
Slot slot.Config `json:"slot" yaml:"slot"`
Snapshot SnapshotConfig `json:"snapshot" yaml:"snapshot"`
Port int `json:"port" yaml:"port"`
Metric MetricConfig `json:"metric" yaml:"metric"`
DebugMode bool `json:"debugMode" yaml:"debugMode"`
Expand Down Expand Up @@ -65,6 +67,28 @@ func (c *Config) SetDefault() {
c.Publication.Tables[tableID].Schema = "public"
}
}

// Set default snapshot config
if c.Snapshot.Enabled {
if c.Snapshot.Mode == "" {
c.Snapshot.Mode = SnapshotModeNever
}
if c.Snapshot.Timeout == 0 {
c.Snapshot.Timeout = 30 * time.Minute
}
if c.Snapshot.BatchSize == 0 {
c.Snapshot.BatchSize = 10_000
}
if c.Snapshot.CheckpointInterval == 0 {
c.Snapshot.CheckpointInterval = 10
}
if c.Snapshot.MaxRetries == 0 {
c.Snapshot.MaxRetries = 3
}
if c.Snapshot.RetryDelay == 0 {
c.Snapshot.RetryDelay = 5 * time.Second
}
}
}

func (c *Config) Validate() error {
Expand Down Expand Up @@ -93,6 +117,10 @@ func (c *Config) Validate() error {
err = errors.Join(err, cErr)
}

if cErr := c.Snapshot.Validate(); cErr != nil {
err = errors.Join(err, cErr)
}

return err
}

Expand All @@ -106,3 +134,48 @@ func (c *Config) Print() {
func isEmpty(s string) bool {
return strings.TrimSpace(s) == ""
}

type SnapshotConfig struct {
Mode SnapshotMode `json:"mode" yaml:"mode"`
Timeout time.Duration `json:"timeout" yaml:"timeout"`
BatchSize int `json:"batchSize" yaml:"batchSize"`
CheckpointInterval int `json:"checkpointInterval" yaml:"checkpointInterval"`
MaxRetries int `json:"maxRetries" yaml:"maxRetries"`
RetryDelay time.Duration `json:"retryDelay" yaml:"retryDelay"`
Enabled bool `json:"enabled" yaml:"enabled"`
}

func (s *SnapshotConfig) Validate() error {
if !s.Enabled {
return nil
}

validModes := []SnapshotMode{SnapshotModeInitial, SnapshotModeNever}
isValid := false
for _, mode := range validModes {
if s.Mode == mode {
isValid = true
break
}
}
if !isValid {
return errors.New("snapshot mode must be 'initial' or 'never'")
}

if s.BatchSize <= 0 {
return errors.New("snapshot batch size must be greater than 0")
}

if s.CheckpointInterval <= 0 {
return errors.New("snapshot checkpoint interval must be greater than 0")
}

return nil
}

type SnapshotMode string

const (
SnapshotModeInitial SnapshotMode = "initial" // İlk çalışmada snapshot al
SnapshotModeNever SnapshotMode = "never" // Snapshot alma
)
126 changes: 124 additions & 2 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
"syscall"
"time"

"github.com/Trendyol/go-pq-cdc/pq/message/format"
"github.com/Trendyol/go-pq-cdc/pq/snapshot"

"github.com/Trendyol/go-pq-cdc/pq/timescaledb"

"github.com/Trendyol/go-pq-cdc/config"
Expand Down Expand Up @@ -41,9 +44,10 @@ type connector struct {
cancelCh chan os.Signal
readyCh chan struct{}
timescaleDB *timescaledb.TimescaleDB
snapshotter *snapshot.Snapshotter
listenerFunc replication.ListenerFunc
system pq.IdentifySystemResult

once sync.Once
once sync.Once
}

func NewConnectorWithConfigFile(ctx context.Context, configFilePath string, listenerFunc replication.ListenerFunc) (Connector, error) {
Expand Down Expand Up @@ -97,6 +101,17 @@ func NewConnector(ctx context.Context, cfg config.Config, listenerFunc replicati

m := metric.NewMetric(cfg.Slot.Name)

// Create snapshotter with separate state connection only if snapshot is enabled
var snapshotter *snapshot.Snapshotter
if cfg.Snapshot.Enabled {
// Create separate connection for snapshot state (to avoid transaction rollback)
snapshotStateConn, err := pq.NewConnection(ctx, cfg.DSN())
if err != nil {
return nil, errors.Wrap(err, "create state connection")
}
snapshotter = snapshot.New(cfg.Snapshot, cfg.Publication.Tables, conn, snapshotStateConn, m)
}

stream := replication.NewStream(conn, cfg, m, &system, listenerFunc)

sl, err := slot.NewSlot(ctx, cfg.DSN(), cfg.Slot, m, stream.(slot.XLogUpdater))
Expand Down Expand Up @@ -130,6 +145,8 @@ func NewConnector(ctx context.Context, cfg config.Config, listenerFunc replicati
server: http.NewServer(cfg, prometheusRegistry),
slot: sl,
timescaleDB: tdb,
snapshotter: snapshotter,
listenerFunc: listenerFunc,

cancelCh: make(chan os.Signal, 1),
readyCh: make(chan struct{}, 1),
Expand All @@ -141,6 +158,13 @@ func (c *connector) Start(ctx context.Context) {
go c.server.Listen()
})

if c.cfg.Snapshot.Enabled && c.shouldTakeSnapshot(ctx) {
if err := c.takeSnapshotWithRetry(ctx); err != nil {
logger.Error("snapshot failed after retries", "error", err)
return
}
}

c.CaptureSlot(ctx)

err := c.stream.Open(ctx)
Expand All @@ -166,6 +190,104 @@ func (c *connector) Start(ctx context.Context) {
logger.Debug("cancel channel triggered")
}

func (c *connector) shouldTakeSnapshot(ctx context.Context) bool {
if !c.cfg.Snapshot.Enabled {
return false
}

switch c.cfg.Snapshot.Mode {
case config.SnapshotModeNever:
return false
case config.SnapshotModeInitial:
state, err := c.snapshotter.LoadState(ctx, c.cfg.Slot.Name)
if err != nil {
logger.Debug("failed to load snapshot state, will take snapshot", "error", err)
return true
}
return state == nil || !state.Completed
default:
logger.Warn("invalid snapshot mode, skipping snapshot", "mode", c.cfg.Snapshot.Mode)
return false
}
}

func (c *connector) takeSnapshotWithRetry(ctx context.Context) error {
logger.Info("taking initial snapshot...")

// Create timeout context
timeoutCtx, cancel := context.WithTimeout(ctx, c.cfg.Snapshot.Timeout)
defer cancel()

var lastErr error

maxRetries := c.cfg.Snapshot.MaxRetries
for attempt := 1; attempt <= maxRetries; attempt++ {
logger.Info("snapshot attempt", "attempt", attempt, "maxRetries", maxRetries)

// Execute snapshot
err := c.snapshotter.TakeSnapshot(timeoutCtx, c.snapshotHandler, c.cfg.Slot.Name)
if err == nil {
logger.Info("snapshot completed successfully")

// Load existing state (from last checkpoint)
existingState, loadErr := c.snapshotter.LoadState(ctx, c.cfg.Slot.Name)
if loadErr != nil {
logger.Warn("failed to load final state", "error", loadErr)
}

// Mark snapshot as completed, preserving existing state
finalState := &snapshot.SnapshotState{
SlotName: c.cfg.Slot.Name,
LastSnapshotLSN: c.system.LoadXLogPos(),
LastSnapshotAt: time.Now().UTC(),
Completed: true,
}

// Preserve checkpoint data if exists
if existingState != nil {
finalState.CurrentTable = existingState.CurrentTable
finalState.CurrentOffset = existingState.CurrentOffset
finalState.TotalRows = existingState.TotalRows
}

if saveErr := c.snapshotter.SaveState(ctx, finalState); saveErr != nil {
logger.Warn("failed to mark snapshot as completed", "error", saveErr)
}

return nil
}

lastErr = err
logger.Warn("snapshot attempt failed", "attempt", attempt, "error", err)

// Check if context is done (timeout or cancellation)
select {
case <-timeoutCtx.Done():
logger.Error("snapshot timeout exceeded", "timeout", c.cfg.Snapshot.Timeout)
return errors.Wrap(timeoutCtx.Err(), "snapshot timeout")
default:
}

// Don't sleep after last attempt
if attempt < maxRetries {
logger.Info("retrying snapshot", "retryDelay", c.cfg.Snapshot.RetryDelay)
time.Sleep(c.cfg.Snapshot.RetryDelay)
}
}

return errors.Wrap(lastErr, "snapshot failed after all retries")
}

func (c *connector) snapshotHandler(event *format.Snapshot) error {
c.listenerFunc(&replication.ListenerContext{
Message: event,
Ack: func() error {
return nil // ACK isn't required for snapshot
},
})
return nil
}

func (c *connector) WaitUntilReady(ctx context.Context) error {
select {
case <-c.readyCh:
Expand Down
2 changes: 2 additions & 0 deletions example/snapshot/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
go.sum

Loading
Loading