Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,36 @@ func (a *FlowableActivity) CreateNormalizedTable(
return nil, fmt.Errorf("failed to commit normalized tables tx: %w", err)
}

// For Postgres-to-Postgres flows, migrate triggers and indexes after tables are created
if dstPgConn, ok := conn.(*connpostgres.PostgresConnector); ok {
// Get source peer name from catalog
var sourcePeerName string
var sourcePeerType protos.DBType
err := a.CatalogPool.QueryRow(ctx,
`SELECT COALESCE(sp.name, ''), COALESCE(sp.type, 0)
FROM flows f
LEFT JOIN peers sp ON f.source_peer = sp.id
WHERE f.name = $1`,
config.FlowName).Scan(&sourcePeerName, &sourcePeerType)
if err == nil && sourcePeerName != "" && sourcePeerType == protos.DBType_POSTGRES {
// Get source connector
if srcPgConn, srcPgClose, err := connectors.GetPostgresConnectorByName(ctx, config.Env, a.CatalogPool, sourcePeerName); err == nil {
defer srcPgClose(ctx)
logger.Info("Migrating triggers and indexes for Postgres-to-Postgres flow")
// Migrate full schema first (in case there are differences)
if err := connpostgres.MigrateSchemaFromSource(ctx, srcPgConn, dstPgConn, config.TableMappings); err != nil {
logger.Warn("failed to migrate schema during setup", slog.Any("error", err))
// Don't fail setup if schema migration fails, tables are already created
}
// Migrate triggers and indexes
if err := dstPgConn.MigrateTriggersAndIndexesForPostgresToPostgres(ctx, srcPgConn, config.TableMappings); err != nil {
logger.Warn("failed to migrate triggers and indexes during setup", slog.Any("error", err))
// Don't fail setup if migration fails
}
}
}
}

a.Alerter.LogFlowInfo(ctx, config.FlowName, "All destination tables have been setup")

return &protos.SetupNormalizedTableBatchOutput{
Expand Down
33 changes: 33 additions & 0 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,19 @@
return nil, fmt.Errorf("failed to sync schema: %w", err)
}

// For Postgres-to-Postgres flows, migrate triggers and indexes after schema changes
// Use type switch to check if destination is Postgres
switch dstPgConn := any(dstConn).(type) {

Check failure on line 243 in flow/activities/flowable_core.go

View workflow job for this annotation

GitHub Actions / lint

singleCaseSwitch: should rewrite switch statement to if statement (gocritic)
case *connpostgres.PostgresConnector:
Comment on lines +243 to +244
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can use type assertion like in flowable.go

if srcPgConn, srcPgClose, err := connectors.GetPostgresConnectorByName(ctx, config.Env, a.CatalogPool, config.SourceName); err == nil {

Check failure on line 245 in flow/activities/flowable_core.go

View workflow job for this annotation

GitHub Actions / lint

The line is 147 characters long, which exceeds the maximum of 144 characters. (lll)
defer srcPgClose(ctx)
if err := dstPgConn.MigrateTriggersAndIndexesForPostgresToPostgres(ctx, srcPgConn, options.TableMappings); err != nil {
logger.Warn("failed to migrate triggers and indexes", slog.Any("error", err))
// Don't fail the flow, just log a warning
}
}
}

return nil, a.applySchemaDeltas(ctx, config, options, recordBatchSync.SchemaDeltas)
}

Expand Down Expand Up @@ -338,6 +351,26 @@
return nil, err
}

// For Postgres-to-Postgres flows, migrate triggers and indexes after schema changes
// Schema deltas are already replayed in syncRecordsCore, but we migrate triggers/indexes here
// where we have access to both source and destination connectors
if len(res.TableSchemaDeltas) > 0 {
// Get destination connector again to check if it's Postgres
dstConnForMigration, dstCloseForMigration, err := connectors.GetByNameAs[connectors.CDCSyncConnectorCore](ctx, config.Env, a.CatalogPool, config.DestinationName)

Check failure on line 359 in flow/activities/flowable_core.go

View workflow job for this annotation

GitHub Actions / lint

The line is 169 characters long, which exceeds the maximum of 144 characters. (lll)
if err == nil {
defer dstCloseForMigration(ctx)
if dstPgConn, ok := dstConnForMigration.(*connpostgres.PostgresConnector); ok {
if srcPgConn, srcPgClose, err := connectors.GetPostgresConnectorByName(ctx, config.Env, a.CatalogPool, config.SourceName); err == nil {

Check failure on line 363 in flow/activities/flowable_core.go

View workflow job for this annotation

GitHub Actions / lint

The line is 151 characters long, which exceeds the maximum of 144 characters. (lll)
defer srcPgClose(ctx)
if err := dstPgConn.MigrateTriggersAndIndexesForPostgresToPostgres(ctx, srcPgConn, options.TableMappings); err != nil {
logger.Warn("failed to migrate triggers and indexes after sync", slog.Any("error", err))
// Don't fail the flow, just log a warning
}
}
}
}
}

if recordBatchSync.NeedsNormalize() {
syncState.Store(shared.Ptr("normalizing"))
normRequests.Update(res.CurrentSyncBatchID)
Expand Down
Loading
Loading