Skip to content

Commit feef191

Browse files
authored
Better metrics for CDC store (#3700)
1 parent e11f9d7 commit feef191

File tree

2 files changed

+88
-10
lines changed

2 files changed

+88
-10
lines changed

flow/connectors/postgres/cdc.go

Lines changed: 75 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,14 @@ func PullCdcRecords[Items model.Items](
486486
return err
487487
}
488488
var fetchedBytes, totalFetchedBytes, allFetchedBytes atomic.Int64
489+
// Accumulate TOAST metrics locally and flush periodically
490+
type toastMetricKey struct {
491+
table string
492+
status string
493+
}
494+
toastValueMetrics := make(map[toastMetricKey]int64)
495+
toastRowMetrics := make(map[toastMetricKey]int64)
496+
toastMetricsMutex := &sync.Mutex{}
489497
pullStart := time.Now()
490498
defer func() {
491499
if cdcRecordsStorage.IsEmpty() {
@@ -504,13 +512,36 @@ func PullCdcRecords[Items model.Items](
504512
logger.Info("pulling records start")
505513

506514
waitingForCommit := false
515+
flushToastMetrics := func() {
516+
toastMetricsMutex.Lock()
517+
flushingValueMetrics := toastValueMetrics
518+
flushingRowMetrics := toastRowMetrics
519+
toastRowMetrics = make(map[toastMetricKey]int64)
520+
toastValueMetrics = make(map[toastMetricKey]int64)
521+
toastMetricsMutex.Unlock()
522+
523+
for key, count := range flushingValueMetrics {
524+
p.otelManager.Metrics.ToastValuesCounter.Add(ctx, count,
525+
metric.WithAttributeSet(attribute.NewSet(
526+
attribute.String("table", key.table),
527+
attribute.String("backfilled", key.status))))
528+
}
529+
for key, count := range flushingRowMetrics {
530+
p.otelManager.Metrics.ToastRowsCounter.Add(ctx, count,
531+
metric.WithAttributeSet(attribute.NewSet(
532+
attribute.String("table", key.table),
533+
attribute.String("backfill_status", key.status))))
534+
}
535+
}
507536
defer func() {
508537
p.otelManager.Metrics.FetchedBytesCounter.Add(ctx, fetchedBytes.Swap(0))
509538
p.otelManager.Metrics.AllFetchedBytesCounter.Add(ctx, allFetchedBytes.Swap(0))
539+
flushToastMetrics()
510540
}()
511541
shutdown := shared.Interval(ctx, time.Minute, func() {
512542
p.otelManager.Metrics.FetchedBytesCounter.Add(ctx, fetchedBytes.Swap(0))
513543
p.otelManager.Metrics.AllFetchedBytesCounter.Add(ctx, allFetchedBytes.Swap(0))
544+
flushToastMetrics()
514545
logger.Info("pulling records",
515546
slog.Int("records", cdcRecordsStorage.Len()),
516547
slog.Int64("bytes", totalFetchedBytes.Load()),
@@ -705,6 +736,16 @@ func PullCdcRecords[Items model.Items](
705736
// TODO: replident is cached here, should not cache since it can change
706737
isFullReplica := req.TableNameSchemaMapping[tableName].IsReplicaIdentityFull
707738
if isFullReplica {
739+
var colCount int64
740+
if recItems, ok := any(r.NewItems).(model.RecordItems); ok {
741+
colCount = int64(len(recItems.ColToVal))
742+
}
743+
toastMetricsMutex.Lock()
744+
// Backfilling unchanged TOAST not applicable because of replica identity full, reporting as such
745+
toastValueMetrics[toastMetricKey{tableName, "na_replica_identity_full"}] += colCount
746+
toastRowMetrics[toastMetricKey{tableName, "na_replica_identity_full"}]++
747+
toastMetricsMutex.Unlock()
748+
708749
if err := addRecordWithKey(model.TableWithPkey{}, rec); err != nil {
709750
return err
710751
}
@@ -714,23 +755,51 @@ func PullCdcRecords[Items model.Items](
714755
return err
715756
}
716757

758+
var nonUnchangedToastCount int64
759+
if recItems, ok := any(r.NewItems).(model.RecordItems); ok {
760+
nonUnchangedToastCount = int64(len(recItems.ColToVal))
761+
}
762+
unchangedToastCount := len(r.UnchangedToastColumns)
763+
var backfilledCount int
764+
717765
latestRecord, ok, err := cdcRecordsStorage.Get(tablePkeyVal)
718766
if err != nil {
719767
return err
720768
}
721769
if ok {
722770
// iterate through unchanged toast cols and set them in new record
723771
updatedCols := r.NewItems.UpdateIfNotExists(latestRecord.GetItems())
772+
backfilledCount = len(updatedCols)
724773
for _, col := range updatedCols {
725774
delete(r.UnchangedToastColumns, col)
726775
}
727-
p.otelManager.Metrics.UnchangedToastValuesCounter.Add(ctx, int64(len(updatedCols)),
728-
metric.WithAttributeSet(attribute.NewSet(
729-
attribute.Bool("backfilled", true))))
730776
}
731-
p.otelManager.Metrics.UnchangedToastValuesCounter.Add(ctx, int64(len(r.UnchangedToastColumns)),
732-
metric.WithAttributeSet(attribute.NewSet(
733-
attribute.Bool("backfilled", false))))
777+
778+
// Report metrics
779+
toastMetricsMutex.Lock()
780+
if backfilledCount > 0 {
781+
toastValueMetrics[toastMetricKey{tableName, "true"}] += int64(backfilledCount)
782+
}
783+
remainingUnchangedToast := len(r.UnchangedToastColumns)
784+
if remainingUnchangedToast > 0 {
785+
toastValueMetrics[toastMetricKey{tableName, "false"}] += int64(remainingUnchangedToast)
786+
}
787+
if nonUnchangedToastCount > 0 {
788+
toastValueMetrics[toastMetricKey{tableName, "na_not_unchanged_toast"}] += nonUnchangedToastCount
789+
}
790+
var rowBackfillStatus string
791+
switch {
792+
case unchangedToastCount == 0:
793+
rowBackfillStatus = "na_no_toast" // No unchanged TOAST values, nothing to backfill
794+
case backfilledCount == unchangedToastCount:
795+
rowBackfillStatus = "fully" // Everything was backfilled
796+
case backfilledCount > 0:
797+
rowBackfillStatus = "partially" // Some values were backfilled
798+
default:
799+
rowBackfillStatus = "none" // Couldn't backfill anything
800+
}
801+
toastRowMetrics[toastMetricKey{tableName, rowBackfillStatus}]++
802+
toastMetricsMutex.Unlock()
734803

735804
if err := addRecordWithKey(tablePkeyVal, rec); err != nil {
736805
return err

flow/otel_metrics/otel_manager.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ const (
7575
WorkloadTotalReplicasGaugeName = "workload_total_replicas"
7676
LogRetentionGaugeName = "log_retention"
7777
LatestConsumedLogEventGaugeName = "latest_consumed_log_event"
78-
UnchangedToastValuesCounterName = "unchanged_toast_values"
78+
ToastRowsCounterName = "toast_rows"
79+
ToastValuesCounterName = "toast_values"
7980
)
8081

8182
type Metrics struct {
@@ -127,7 +128,8 @@ type Metrics struct {
127128
WorkloadTotalReplicasGauge metric.Int64Gauge
128129
LatestConsumedLogEventGauge metric.Int64Gauge
129130
LogRetentionGauge metric.Float64Gauge
130-
UnchangedToastValuesCounter metric.Int64Counter
131+
ToastRowsCounter metric.Int64Counter
132+
ToastValuesCounter metric.Int64Counter
131133
}
132134

133135
type SlotMetricGauges struct {
@@ -543,9 +545,16 @@ func (om *OtelManager) setupMetrics(ctx context.Context) error {
543545
return err
544546
}
545547

546-
if om.Metrics.UnchangedToastValuesCounter, err = om.GetOrInitInt64Counter(BuildMetricName(UnchangedToastValuesCounterName),
548+
if om.Metrics.ToastRowsCounter, err = om.GetOrInitInt64Counter(BuildMetricName(ToastRowsCounterName),
547549
metric.WithDescription(
548-
"Counter of unchanged TOAST values (Postgres only), with `backfilled` indicating whether the original was found in the CDC store"),
550+
"Counter of UPDATE rows by TOAST backfill status (Postgres only)"),
551+
); err != nil {
552+
return err
553+
}
554+
555+
if om.Metrics.ToastValuesCounter, err = om.GetOrInitInt64Counter(BuildMetricName(ToastValuesCounterName),
556+
metric.WithDescription(
557+
"Counter of column values by TOAST backfill status (Postgres only)"),
549558
); err != nil {
550559
return err
551560
}

0 commit comments

Comments
 (0)