Skip to content

Commit 513f556

Browse files
committed
Implement granular status in cdc and snapshot
1 parent 9acfbab commit 513f556

32 files changed

+1025
-278
lines changed

flow/activities/flowable.go

Lines changed: 236 additions & 88 deletions
Large diffs are not rendered by default.

flow/activities/flowable_core.go

Lines changed: 72 additions & 57 deletions
Large diffs are not rendered by default.

flow/activities/snapshot_activity.go

Lines changed: 73 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111

1212
"github.com/PeerDB-io/peerdb/flow/alerting"
1313
"github.com/PeerDB-io/peerdb/flow/connectors"
14+
"github.com/PeerDB-io/peerdb/flow/connectors/utils"
15+
"github.com/PeerDB-io/peerdb/flow/connectors/utils/monitoring"
1416
"github.com/PeerDB-io/peerdb/flow/generated/protos"
1517
"github.com/PeerDB-io/peerdb/flow/internal"
1618
"github.com/PeerDB-io/peerdb/flow/shared"
@@ -36,7 +38,7 @@ type SnapshotActivity struct {
3638
}
3739

3840
// closes the slot signal
39-
func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName string) error {
41+
func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName string) {
4042
a.SnapshotStatesMutex.Lock()
4143
defer a.SnapshotStatesMutex.Unlock()
4244

@@ -48,7 +50,29 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName s
4850
delete(a.SlotSnapshotStates, flowJobName)
4951
}
5052
a.Alerter.LogFlowEvent(ctx, flowJobName, "Ended Snapshot Flow Job")
53+
}
54+
55+
func (a *SnapshotActivity) InitializeSnapshot(
56+
ctx context.Context, flowName string,
57+
) (int32, error) {
58+
ctx = context.WithValue(ctx, shared.FlowNameKey, flowName)
59+
logger := internal.LoggerFromCtx(ctx)
60+
61+
snapshotID, err := monitoring.InitializeSnapshot(ctx, logger, a.CatalogPool, flowName)
62+
if err != nil {
63+
return -1, a.Alerter.LogFlowErrorNoStatus(ctx, flowName, err)
64+
}
65+
return snapshotID, nil
66+
}
5167

68+
func (a *SnapshotActivity) FinishSnapshot(
69+
ctx context.Context, flowName string, snapshotID int32,
70+
) error {
71+
ctx = context.WithValue(ctx, shared.FlowNameKey, flowName)
72+
logger := internal.LoggerFromCtx(ctx)
73+
if err := monitoring.FinishSnapshot(ctx, logger, a.CatalogPool, flowName, snapshotID); err != nil {
74+
return a.Alerter.LogFlowSnapshotError(ctx, flowName, snapshotID, err)
75+
}
5276
return nil
5377
}
5478

@@ -63,7 +87,8 @@ func (a *SnapshotActivity) SetupReplication(
6387

6488
conn, err := connectors.GetByNameAs[connectors.CDCPullConnectorCore](ctx, nil, a.CatalogPool, config.PeerName)
6589
if err != nil {
66-
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to get connector: %w", err))
90+
conErr := fmt.Errorf("failed to get connector: %w", err)
91+
return nil, a.Alerter.LogFlowSnapshotError(ctx, config.FlowJobName, config.SnapshotId, conErr)
6792
}
6893

6994
logger.Info("waiting for slot to be created...")
@@ -72,7 +97,8 @@ func (a *SnapshotActivity) SetupReplication(
7297
if err != nil {
7398
connectors.CloseConnector(ctx, conn)
7499
// it is important to close the connection here as it is not closed in CloseSlotKeepAlive
75-
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("slot error: %w", err))
100+
slotErr := fmt.Errorf("slot error: %w", err)
101+
return nil, a.Alerter.LogFlowSnapshotError(ctx, config.FlowJobName, config.SnapshotId, slotErr)
76102
} else if slotInfo.Conn == nil && slotInfo.SlotName == "" {
77103
connectors.CloseConnector(ctx, conn)
78104
logger.Info("replication setup without slot")
@@ -99,20 +125,22 @@ func (a *SnapshotActivity) SetupReplication(
99125
}, nil
100126
}
101127

102-
func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, peer string, env map[string]string) error {
128+
func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, peer string, env map[string]string, snapshotID int32) error {
103129
shutdown := heartbeatRoutine(ctx, func() string {
104130
return "maintaining transaction snapshot"
105131
})
106132
defer shutdown()
107133
conn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, nil, a.CatalogPool, peer)
108134
if err != nil {
109-
return a.Alerter.LogFlowError(ctx, sessionID, err)
135+
getErr := fmt.Errorf("failed to get connector: %w", err)
136+
return a.Alerter.LogFlowSnapshotError(ctx, sessionID, snapshotID, getErr)
110137
}
111138
defer connectors.CloseConnector(ctx, conn)
112139

113140
exportSnapshotOutput, tx, err := conn.ExportTxSnapshot(ctx, env)
114141
if err != nil {
115-
return err
142+
exportErr := fmt.Errorf("failed to export tx snapshot: %w", err)
143+
return a.Alerter.LogFlowSnapshotError(ctx, sessionID, snapshotID, exportErr)
116144
}
117145

118146
a.SnapshotStatesMutex.Lock()
@@ -134,7 +162,10 @@ func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, pee
134162
a.SnapshotStatesMutex.Lock()
135163
delete(a.TxSnapshotStates, sessionID)
136164
a.SnapshotStatesMutex.Unlock()
137-
return conn.FinishExport(tx)
165+
if err := conn.FinishExport(tx); err != nil {
166+
finishErr := fmt.Errorf("failed to finish export: %w", err)
167+
return a.Alerter.LogFlowSnapshotError(ctx, sessionID, snapshotID, finishErr)
168+
}
138169
}
139170
time.Sleep(time.Minute)
140171
}
@@ -166,6 +197,40 @@ func (a *SnapshotActivity) LoadTableSchema(
166197
ctx context.Context,
167198
flowName string,
168199
tableName string,
200+
snapshotID int32,
169201
) (*protos.TableSchema, error) {
170-
return internal.LoadTableSchemaFromCatalog(ctx, a.CatalogPool, flowName, tableName)
202+
schema, err := internal.LoadTableSchemaFromCatalog(ctx, a.CatalogPool, flowName, tableName)
203+
if err != nil {
204+
loadErr := fmt.Errorf("failed to load schema from catalog: %w", err)
205+
return nil, a.Alerter.LogFlowSnapshotError(ctx, flowName, snapshotID, loadErr)
206+
}
207+
return schema, nil
208+
}
209+
210+
func (a *SnapshotActivity) ParseSchemaTable(
211+
ctx context.Context,
212+
flowName string,
213+
tableName string,
214+
snapshotID int32,
215+
) (*utils.SchemaTable, error) {
216+
parsedTable, err := utils.ParseSchemaTable(tableName)
217+
if err != nil {
218+
parseErr := fmt.Errorf("failed to parse schema table: %w", err)
219+
return nil, a.Alerter.LogFlowSnapshotError(ctx, flowName, snapshotID, parseErr)
220+
}
221+
return parsedTable, nil
222+
}
223+
224+
func (a *SnapshotActivity) GetPeerType(
225+
ctx context.Context,
226+
flowName string,
227+
peerName string,
228+
snapshotID int32,
229+
) (protos.DBType, error) {
230+
dbtype, err := connectors.LoadPeerType(ctx, a.CatalogPool, peerName)
231+
if err != nil {
232+
peerErr := fmt.Errorf("failed to get peer type: %w", err)
233+
return 0, a.Alerter.LogFlowSnapshotError(ctx, flowName, snapshotID, peerErr)
234+
}
235+
return dbtype, nil
171236
}

flow/alerting/alerting.go

Lines changed: 68 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -433,19 +433,34 @@ func (a *Alerter) LogNonFlowEvent(ctx context.Context, eventType telemetry.Event
433433
a.sendTelemetryMessage(ctx, logger, string(eventType)+":"+key, message, level)
434434
}
435435

436-
// logFlowErrorInternal pushes the error to the errors table and emits a metric as well as a telemetry message
437-
func (a *Alerter) logFlowErrorInternal(ctx context.Context, flowName, errorType string, inErr error, loggerFunc func(string, ...any)) {
438-
logger := internal.LoggerFromCtx(ctx)
436+
// logFlowErrorImpl pushes the error to the errors table and emits a metric as well as a telemetry message
437+
func (a *Alerter) logFlowErrorImpl(ctx context.Context, flowName, errorType string, inErr error, logger log.Logger, loggerFunc func(string, ...any)) {
439438
inErrWithStack := fmt.Sprintf("%+v", inErr)
440439
loggerFunc(inErr.Error(), slog.String("stack", inErrWithStack))
441-
if _, err := a.CatalogPool.Exec(
442-
ctx, "INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)",
443-
flowName, inErrWithStack, errorType,
444-
); err != nil {
445-
logger.Error("failed to insert flow error", slog.Any("error", err))
446-
return
440+
retryInterval := time.Second
441+
for {
442+
if _, err := a.CatalogPool.Exec(
443+
ctx, "INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)",
444+
flowName, inErrWithStack, errorType,
445+
); err != nil {
446+
insertErr := shared.LogError(logger, fmt.Errorf("failed to insert flow error: %w", err))
447+
errInfo := ErrorInfo{Source: ErrorSourcePostgresCatalog, Code: "UNKNOWN"}
448+
a.emitClassifiedError(ctx, logger, flowName, ErrorInternal, errInfo, insertErr, insertErr.Error(), loggerFunc)
449+
time.Sleep(retryInterval)
450+
retryInterval = min(retryInterval*2, time.Minute)
451+
continue
452+
}
453+
break
447454
}
448455

456+
errorClass, errInfo := GetErrorClass(ctx, inErr)
457+
a.emitClassifiedError(ctx, logger, flowName, errorClass, errInfo, inErr, inErrWithStack, loggerFunc)
458+
}
459+
460+
func (a *Alerter) emitClassifiedError(
461+
ctx context.Context, logger log.Logger, flowName string, errorClass ErrorClass, errInfo ErrorInfo,
462+
inErr error, inErrWithStack string, loggerFunc func(string, ...any),
463+
) {
449464
var tags []string
450465
if errors.Is(inErr, context.Canceled) {
451466
tags = append(tags, string(shared.ErrTypeCanceled))
@@ -477,14 +492,11 @@ func (a *Alerter) logFlowErrorInternal(ctx context.Context, flowName, errorType
477492
if errors.As(inErr, &sshErr) {
478493
tags = append(tags, string(shared.ErrTypeNet))
479494
}
480-
481-
errorClass, errInfo := GetErrorClass(ctx, inErr)
482495
tags = append(tags, "errorClass:"+errorClass.String(), "errorAction:"+errorClass.ErrorAction().String())
483-
484496
if !internal.PeerDBTelemetryErrorActionBasedAlertingEnabled() || errorClass.ErrorAction() == NotifyTelemetry {
485-
// Warnings alert us just like errors until there's a customer warning system
486497
a.sendTelemetryMessage(ctx, logger, flowName, inErrWithStack, telemetry.ERROR, tags...)
487498
}
499+
488500
loggerFunc(fmt.Sprintf("Emitting classified error '%s'", inErr.Error()),
489501
slog.Any("error", inErr),
490502
slog.Any("errorClass", errorClass),
@@ -500,15 +512,55 @@ func (a *Alerter) logFlowErrorInternal(ctx context.Context, flowName, errorType
500512
a.otelManager.Metrics.ErrorEmittedGauge.Record(ctx, 1, errorAttributeSet)
501513
}
502514

503-
func (a *Alerter) LogFlowError(ctx context.Context, flowName string, inErr error) error {
515+
func (a *Alerter) LogFlowErrorNoStatus(ctx context.Context, flowName string, inErr error) error {
516+
logger := internal.LoggerFromCtx(ctx)
517+
// TODO check that this one just logs without updating status
518+
a.logFlowErrorImpl(ctx, flowName, "error", inErr, logger, logger.Error)
519+
return inErr
520+
}
521+
522+
func (a *Alerter) LogFlowSyncError(ctx context.Context, flowName string, batchID int64, inErr error) error {
523+
logger := internal.LoggerFromCtx(ctx)
524+
// TODO use batchID
525+
a.logFlowErrorImpl(ctx, flowName, "error", inErr, logger, logger.Error)
526+
return inErr
527+
}
528+
529+
func (a *Alerter) LogFlowNormalizeError(ctx context.Context, flowName string, batchID int64, inErr error) error {
530+
logger := internal.LoggerFromCtx(ctx)
531+
// TODO use batchID
532+
a.logFlowErrorImpl(ctx, flowName, "error", inErr, logger, logger.Error)
533+
return inErr
534+
}
535+
536+
func (a *Alerter) LogFlowSnapshotError(ctx context.Context, flowName string, snapshotID int32, inErr error) error {
537+
logger := internal.LoggerFromCtx(ctx)
538+
// TODO use snapshotID
539+
a.logFlowErrorImpl(ctx, flowName, "error", inErr, logger, logger.Error)
540+
return inErr
541+
}
542+
543+
func (a *Alerter) LogFlowSnapshotQRepError(
544+
ctx context.Context, flowName string, snapshotID int32, qRepRunID string, inErr error,
545+
) error {
546+
logger := internal.LoggerFromCtx(ctx)
547+
// TODO use snapshotID, qRepRunID
548+
a.logFlowErrorImpl(ctx, flowName, "error", inErr, logger, logger.Error)
549+
return inErr
550+
}
551+
552+
func (a *Alerter) LogFlowSnapshotPartitionError(
553+
ctx context.Context, flowName string, snapshotID int32, partitionID string, inErr error,
554+
) error {
504555
logger := internal.LoggerFromCtx(ctx)
505-
a.logFlowErrorInternal(ctx, flowName, "error", inErr, logger.Error)
556+
// TODO use snapshotID, partitionID
557+
a.logFlowErrorImpl(ctx, flowName, "error", inErr, logger, logger.Error)
506558
return inErr
507559
}
508560

509561
func (a *Alerter) LogFlowWarning(ctx context.Context, flowName string, inErr error) {
510562
logger := internal.LoggerFromCtx(ctx)
511-
a.logFlowErrorInternal(ctx, flowName, "warn", inErr, logger.Warn)
563+
a.logFlowErrorImpl(ctx, flowName, "warn", inErr, logger, logger.Warn)
512564
}
513565

514566
func (a *Alerter) LogFlowEvent(ctx context.Context, flowName string, info string) {

flow/alerting/classifier.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,30 @@ func GetErrorClass(ctx context.Context, err error) (ErrorClass, ErrorInfo) {
205205
}
206206
}
207207

208+
var dynamicConfError *exceptions.DynamicConfError
209+
if errors.As(err, &dynamicConfError) {
210+
return ErrorInternal, ErrorInfo{
211+
Source: ErrorSourceOther,
212+
Code: "UNKNOWN",
213+
}
214+
}
215+
216+
var protoMarshalError *exceptions.ProtoMarshalError
217+
if errors.As(err, &protoMarshalError) {
218+
return ErrorInternal, ErrorInfo{
219+
Source: ErrorSourceOther,
220+
Code: "UNKNOWN",
221+
}
222+
}
223+
224+
var protoUnmarshalError *exceptions.ProtoUnmarshalError
225+
if errors.As(err, &protoUnmarshalError) {
226+
return ErrorInternal, ErrorInfo{
227+
Source: ErrorSourceOther,
228+
Code: "UNKNOWN",
229+
}
230+
}
231+
208232
if errors.Is(err, context.Canceled) {
209233
// Generally happens during workflow cancellation
210234
return ErrorIgnoreContextCancelled, ErrorInfo{

flow/alerting/classifier_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ func TestClickHousePushingToViewShouldBeMvError(t *testing.T) {
155155
is not supported: while converting source column created_at to destination column created_at:
156156
while pushing to view db_name.hello_mv`,
157157
}
158-
errorClass, errInfo := GetErrorClass(t.Context(), exceptions.NewNormalizationError(fmt.Errorf("error in WAL: %w", err)))
158+
errorClass, errInfo := GetErrorClass(t.Context(), exceptions.NewNormalizationError("error in WAL: %w", err))
159159
assert.Equal(t, ErrorNotifyMVOrView, errorClass, "Unexpected error class")
160160
assert.Equal(t, ErrorInfo{
161161
Source: ErrorSourceClickHouse,
@@ -191,8 +191,8 @@ func TestClickHouseChaoticNormalizeErrorShouldBeNotifyMVNow(t *testing.T) {
191191
Left key __table1.column_2 type String. Right key __table2.column_1 type Int64`,
192192
}
193193
errorClass, errInfo := GetErrorClass(t.Context(),
194-
exceptions.NewNormalizationError(fmt.Errorf(`Normalization Error: failed to normalize records:
195-
error while inserting into normalized table table_A: %w`, err)))
194+
exceptions.NewNormalizationError(`Normalization Error: failed to normalize records:
195+
error while inserting into normalized table table_A: %w`, err))
196196
assert.Equal(t, ErrorNotifyMVOrView, errorClass, "Unexpected error class")
197197
assert.Equal(t, ErrorInfo{
198198
Source: ErrorSourceClickHouse,
@@ -382,7 +382,7 @@ func TestClickHouseUnknownTableShouldBeDestinationModified(t *testing.T) {
382382
Message: "Table abc does not exist.",
383383
}
384384
errorClass, errInfo := GetErrorClass(t.Context(),
385-
exceptions.NewNormalizationError(fmt.Errorf("failed to normalize records: %w", err)))
385+
exceptions.NewNormalizationError("failed to normalize records: %w", err))
386386
assert.Equal(t, ErrorNotifyDestinationModified, errorClass, "Unexpected error class")
387387
assert.Equal(t, ErrorInfo{
388388
Source: ErrorSourceClickHouse,
@@ -398,7 +398,7 @@ func TestClickHouseUnkownTableWhilePushingToViewShouldBeNotifyMVNow(t *testing.T
398398
Message: "Table abc does not exist. Maybe you meant abc2?: while executing 'FUNCTION func()': while pushing to view some_mv (some-uuid-here)",
399399
}
400400
errorClass, errInfo := GetErrorClass(t.Context(),
401-
exceptions.NewNormalizationError(fmt.Errorf("failed to normalize records: %w", err)))
401+
exceptions.NewNormalizationError("failed to normalize records: %w", err))
402402
assert.Equal(t, ErrorNotifyMVOrView, errorClass, "Unexpected error class")
403403
assert.Equal(t, ErrorInfo{
404404
Source: ErrorSourceClickHouse,

flow/cmd/handler.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
tEnums "go.temporal.io/api/enums/v1"
1313
"go.temporal.io/api/workflowservice/v1"
1414
"go.temporal.io/sdk/client"
15-
"google.golang.org/protobuf/proto"
1615
"google.golang.org/protobuf/types/known/durationpb"
1716

1817
"github.com/PeerDB-io/peerdb/flow/alerting"
@@ -146,6 +145,11 @@ func (h *FlowRequestHandler) CreateCDCFlow(
146145
return nil, fmt.Errorf("unable to update flow config in catalog: %w", err)
147146
}
148147

148+
if err := h.createGranularStatusEntry(ctx, cfg); err != nil {
149+
slog.Error("unable to create granular status entry", slog.Any("error", err))
150+
return nil, fmt.Errorf("unable to create granular status entry: %w", err)
151+
}
152+
149153
if _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.CDCFlowWorkflow, cfg, nil); err != nil {
150154
slog.Error("unable to start PeerFlow workflow", slog.Any("error", err))
151155
return nil, fmt.Errorf("unable to start PeerFlow workflow: %w", err)
@@ -163,6 +167,20 @@ func (h *FlowRequestHandler) updateFlowConfigInCatalog(
163167
return internal.UpdateCDCConfigInCatalog(ctx, h.pool, slog.Default(), cfg)
164168
}
165169

170+
func (h *FlowRequestHandler) createGranularStatusEntry(
171+
ctx context.Context,
172+
cfg *protos.FlowConnectionConfigs,
173+
) error {
174+
if _, err := h.pool.Exec(ctx, `
175+
INSERT INTO peerdb_stats.granular_status (flow_name, snapshot_succeeding,
176+
sync_succeeding, normalize_succeeding, slot_lag_low) VALUES ($1, true, true, true, true)
177+
`, cfg.FlowJobName,
178+
); err != nil {
179+
return fmt.Errorf("unable to insert into granular status table for flow %s", cfg.FlowJobName)
180+
}
181+
return nil
182+
}
183+
166184
func (h *FlowRequestHandler) CreateQRepFlow(
167185
ctx context.Context, req *protos.CreateQRepFlowRequest,
168186
) (*protos.CreateQRepFlowResponse, error) {
@@ -220,7 +238,7 @@ func (h *FlowRequestHandler) updateQRepConfigInCatalog(
220238
ctx context.Context,
221239
cfg *protos.QRepConfig,
222240
) error {
223-
cfgBytes, err := proto.Marshal(cfg)
241+
cfgBytes, err := internal.ProtoMarshal(cfg)
224242
if err != nil {
225243
return fmt.Errorf("unable to marshal qrep config: %w", err)
226244
}

0 commit comments

Comments
 (0)