Skip to content

Commit 4b2eff7

Browse files
ilidemiCopilot
andauthored
Add more WAL metrics (#3685)
Adding a bunch of WAL metrics in hopes of better seeing the impact of deployment on a busy instance. `pg_stat_activity.wait_event_type/wait_event` for our backend: these are really the source of truth on when the wal sender becomes idle (`wait_event_type=Client, wait_event= WalSenderWaitForWAL`) because there's nothing to send (even if the current lsn is ahead of what was sent to us because of an unfinished transaction). However, it also can change many times a second and polling once every few minutes is pretty sparse. Putting it in in case it's still useful. Commit LSN on the receiver - the latest we've received from Postgres so far `pg_stat_replication.sent_lsn` - we don't always have permission to read this one but saw some discrepancies with commit LSN so keeping as optional for debugging purposes Current LSN - to diff with commit LSN Deltas between PG-side LSNs, just for convenience `pg_stat_replication_slots.spill_txns/spill_count/spill_bytes` (PG16+), `logical_decoding_work_mem` (PG13+) - to see what the PG is doing after reconnect and before it sent anything to us, and be able to diagnose bad setting --- Misc quality of life metrics: `pg_replication_slots.safe_wal_size` (PG13+) - monitor danger zone when we're behind or there's a burst `pg_replication_slots.active/wal_status`, `pg_stat_activity.state` --------- Co-authored-by: Copilot <[email protected]>
1 parent 81d02ea commit 4b2eff7

File tree

7 files changed

+386
-75
lines changed

7 files changed

+386
-75
lines changed

flow/activities/flowable.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1191,6 +1191,19 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
11911191
slotMetricGauges.SlotLagGauge = a.OtelManager.Metrics.SlotLagGauge
11921192
slotMetricGauges.RestartLSNGauge = a.OtelManager.Metrics.RestartLSNGauge
11931193
slotMetricGauges.ConfirmedFlushLSNGauge = a.OtelManager.Metrics.ConfirmedFlushLSNGauge
1194+
slotMetricGauges.SentLSNGauge = a.OtelManager.Metrics.SentLSNGauge
1195+
slotMetricGauges.CurrentWalLSNGauge = a.OtelManager.Metrics.CurrentWalLSNGauge
1196+
slotMetricGauges.RestartToConfirmedMBGauge = a.OtelManager.Metrics.RestartToConfirmedMBGauge
1197+
slotMetricGauges.ConfirmedToCurrentMBGauge = a.OtelManager.Metrics.ConfirmedToCurrentMBGauge
1198+
slotMetricGauges.SafeWalSizeGauge = a.OtelManager.Metrics.SafeWalSizeGauge
1199+
slotMetricGauges.SlotActiveGauge = a.OtelManager.Metrics.SlotActiveGauge
1200+
slotMetricGauges.WalSenderStateGauge = a.OtelManager.Metrics.WalSenderStateGauge
1201+
slotMetricGauges.WalStatusGauge = a.OtelManager.Metrics.WalStatusGauge
1202+
slotMetricGauges.LogicalDecodingWorkMemGauge = a.OtelManager.Metrics.LogicalDecodingWorkMemGauge
1203+
slotMetricGauges.StatsResetGauge = a.OtelManager.Metrics.StatsResetGauge
1204+
slotMetricGauges.SpillTxnsGauge = a.OtelManager.Metrics.SpillTxnsGauge
1205+
slotMetricGauges.SpillCountGauge = a.OtelManager.Metrics.SpillCountGauge
1206+
slotMetricGauges.SpillBytesGauge = a.OtelManager.Metrics.SpillBytesGauge
11941207
slotMetricGauges.OpenConnectionsGauge = a.OtelManager.Metrics.OpenConnectionsGauge
11951208
slotMetricGauges.OpenReplicationConnectionsGauge = a.OtelManager.Metrics.OpenReplicationConnectionsGauge
11961209
slotMetricGauges.IntervalSinceLastNormalizeGauge = a.OtelManager.Metrics.IntervalSinceLastNormalizeGauge

flow/connectors/postgres/cdc.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -881,6 +881,7 @@ func processMessage[Items model.Items](
881881
slog.Any("CommitLSN", msg.CommitLSN),
882882
slog.Any("TransactionEndLSN", msg.TransactionEndLSN))
883883
batch.UpdateLatestCheckpointID(int64(msg.CommitLSN))
884+
p.otelManager.Metrics.ReceivedCommitLSNGauge.Record(ctx, int64(msg.CommitLSN))
884885
p.otelManager.Metrics.CommitLagGauge.Record(ctx, time.Now().UTC().Sub(msg.CommitTime).Microseconds())
885886
p.commitLock = nil
886887
case *pglogrepl.RelationMessage:

flow/connectors/postgres/client.go

Lines changed: 138 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -278,52 +278,166 @@ func (c *PostgresConnector) checkSlotAndPublication(ctx context.Context, slot st
278278
}
279279

280280
func getSlotInfo(ctx context.Context, conn *pgx.Conn, slotName string, database string) ([]*protos.SlotInfo, error) {
281-
var whereClause string
282-
if slotName != "" {
283-
whereClause = "WHERE slot_name=" + utils.QuoteLiteral(slotName)
284-
} else {
285-
whereClause = "WHERE database=" + utils.QuoteLiteral(database)
286-
}
287-
288281
pgversion, err := shared.GetMajorVersion(ctx, conn)
289282
if err != nil {
290283
return nil, err
291284
}
292-
walStatusSelector := "wal_status"
285+
walStatusSelect := "prs.wal_status"
286+
safeWalSizeSelect := "prs.safe_wal_size"
293287
if pgversion < shared.POSTGRES_13 {
294-
walStatusSelector = "'unknown'"
288+
walStatusSelect = "'unknown'"
289+
safeWalSizeSelect = "NULL::bigint"
290+
}
291+
292+
ldwMBSelect := "NULL::bigint"
293+
if pgversion >= shared.POSTGRES_13 {
294+
ldwMBSelect = `(
295+
SELECT (pg_size_bytes(setting || COALESCE(unit,'')) / 1024 / 1024)::bigint
296+
FROM pg_settings WHERE name='logical_decoding_work_mem'
297+
)`
298+
}
299+
300+
statsSelect := `
301+
NULL::bigint,
302+
NULL::bigint,
303+
NULL::bigint,
304+
NULL::bigint
305+
`
306+
statsJoin := ""
307+
if pgversion >= shared.POSTGRES_16 {
308+
statsSelect = `
309+
EXTRACT(EPOCH FROM psrs.stats_reset)::bigint,
310+
psrs.spill_txns,
311+
psrs.spill_count,
312+
psrs.spill_bytes
313+
`
314+
statsJoin = `
315+
LEFT JOIN pg_stat_replication_slots AS psrs
316+
ON psrs.slot_name = prs.slot_name
317+
`
295318
}
296-
rows, err := conn.Query(ctx, fmt.Sprintf(`SELECT slot_name, redo_lsn::Text,restart_lsn::text,%s,
297-
confirmed_flush_lsn::text,active,
298-
round((CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() ELSE pg_current_wal_lsn() END
299-
- restart_lsn) / 1024 / 1024) AS MB_Behind
300-
FROM pg_control_checkpoint(),pg_replication_slots %s`, walStatusSelector, whereClause))
319+
var whereClause string
320+
if slotName != "" {
321+
whereClause = "WHERE prs.slot_name=" + utils.QuoteLiteral(slotName)
322+
} else {
323+
whereClause = "WHERE prs.database=" + utils.QuoteLiteral(database)
324+
}
325+
rows, err := conn.Query(ctx, fmt.Sprintf(`
326+
WITH current_wal AS (
327+
SELECT CASE
328+
WHEN pg_is_in_recovery()
329+
THEN pg_last_wal_receive_lsn()
330+
ELSE pg_current_wal_lsn()
331+
END AS current_lsn
332+
)
333+
SELECT
334+
prs.slot_name,
335+
pcc.redo_lsn::text,
336+
prs.restart_lsn::text,
337+
cw.current_lsn::text,
338+
%s, -- prs.wal_status
339+
%s, -- prs.safe_wal_size
340+
prs.confirmed_flush_lsn::text,
341+
psr.sent_lsn::text,
342+
prs.active,
343+
round((cw.current_lsn - prs.restart_lsn) / 1024 / 1024),
344+
round((prs.confirmed_flush_lsn - prs.restart_lsn) / 1024 / 1024),
345+
round((cw.current_lsn - prs.confirmed_flush_lsn) / 1024 / 1024),
346+
psa.wait_event_type,
347+
psa.wait_event,
348+
psa.state,
349+
%s, -- logical_decoding_work_mem megabytes
350+
%s -- stats
351+
FROM current_wal cw,
352+
pg_control_checkpoint() as pcc,
353+
(pg_replication_slots as prs
354+
LEFT JOIN pg_stat_activity as psa
355+
on psa.pid = prs.active_pid
356+
LEFT JOIN pg_stat_replication as psr
357+
on psr.pid = prs.active_pid
358+
%s)
359+
%s`,
360+
walStatusSelect,
361+
safeWalSizeSelect,
362+
ldwMBSelect,
363+
statsSelect,
364+
statsJoin,
365+
whereClause,
366+
))
301367
if err != nil {
302368
return nil, fmt.Errorf("failed to read information for slots: %w", err)
303369
}
304370
defer rows.Close()
305371
var slotInfoRows []*protos.SlotInfo
306372
for rows.Next() {
307-
var redoLSN pgtype.Text
308373
var slotName pgtype.Text
374+
var redoLSN pgtype.Text
309375
var restartLSN pgtype.Text
376+
var currentLSN pgtype.Text
377+
var walStatus pgtype.Text
378+
var safeWalSize pgtype.Int8
310379
var confirmedFlushLSN pgtype.Text
380+
var sentLSN *string
311381
var active pgtype.Bool
312382
var lagInMB pgtype.Float4
313-
var walStatus pgtype.Text
314-
err := rows.Scan(&slotName, &redoLSN, &restartLSN, &walStatus, &confirmedFlushLSN, &active, &lagInMB)
383+
var restartToConfirmedMB pgtype.Float4
384+
var confirmedToCurrentMB pgtype.Float4
385+
var waitEventType pgtype.Text
386+
var waitEvent pgtype.Text
387+
var backendState pgtype.Text
388+
var ldwMemMB pgtype.Int8
389+
var statsReset *int64
390+
var spillTxns *int64
391+
var spillCount *int64
392+
var spillBytes *int64
393+
394+
err := rows.Scan(
395+
&slotName,
396+
&redoLSN,
397+
&restartLSN,
398+
&currentLSN,
399+
&walStatus,
400+
&safeWalSize,
401+
&confirmedFlushLSN,
402+
&sentLSN,
403+
&active,
404+
&lagInMB,
405+
&restartToConfirmedMB,
406+
&confirmedToCurrentMB,
407+
&waitEventType,
408+
&waitEvent,
409+
&backendState,
410+
&ldwMemMB,
411+
&statsReset,
412+
&spillTxns,
413+
&spillCount,
414+
&spillBytes,
415+
)
315416
if err != nil {
316417
return nil, err
317418
}
318419

319420
slotInfoRows = append(slotInfoRows, &protos.SlotInfo{
320-
RedoLSN: redoLSN.String,
321-
RestartLSN: restartLSN.String,
322-
WalStatus: walStatus.String,
323-
ConfirmedFlushLSN: confirmedFlushLSN.String,
324-
SlotName: slotName.String,
325-
Active: active.Bool,
326-
LagInMb: lagInMB.Float32,
421+
SlotName: slotName.String,
422+
RedoLSN: redoLSN.String,
423+
RestartLSN: restartLSN.String,
424+
CurrentLSN: currentLSN.String,
425+
Active: active.Bool,
426+
LagInMb: lagInMB.Float32,
427+
ConfirmedFlushLSN: confirmedFlushLSN.String,
428+
SentLSN: sentLSN,
429+
RestartToConfirmedMb: restartToConfirmedMB.Float32,
430+
ConfirmedToCurrentMb: confirmedToCurrentMB.Float32,
431+
WalStatus: walStatus.String,
432+
SafeWalSize: safeWalSize.Int64,
433+
WaitEventType: waitEventType.String,
434+
WaitEvent: waitEvent.String,
435+
BackendState: backendState.String,
436+
LogicalDecodingWorkMemMb: ldwMemMB.Int64,
437+
StatsReset: statsReset,
438+
SpillTxns: spillTxns,
439+
SpillCount: spillCount,
440+
SpillBytes: spillBytes,
327441
})
328442
}
329443
return slotInfoRows, nil

0 commit comments

Comments
 (0)