Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -1174,6 +1174,20 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
slotMetricGauges.SlotLagGauge = a.OtelManager.Metrics.SlotLagGauge
slotMetricGauges.RestartLSNGauge = a.OtelManager.Metrics.RestartLSNGauge
slotMetricGauges.ConfirmedFlushLSNGauge = a.OtelManager.Metrics.ConfirmedFlushLSNGauge
slotMetricGauges.SentLSNGauge = a.OtelManager.Metrics.SentLSNGauge
slotMetricGauges.CurrentWalLSNGauge = a.OtelManager.Metrics.CurrentWalLSNGauge
slotMetricGauges.RestartToConfirmedMBGauge = a.OtelManager.Metrics.RestartToConfirmedMBGauge
slotMetricGauges.ConfirmedToSentMBGauge = a.OtelManager.Metrics.ConfirmedToSentMBGauge
slotMetricGauges.SentToCurrentMBGauge = a.OtelManager.Metrics.SentToCurrentMBGauge
slotMetricGauges.SafeWalSizeGauge = a.OtelManager.Metrics.SafeWalSizeGauge
slotMetricGauges.SlotActiveGauge = a.OtelManager.Metrics.SlotActiveGauge
slotMetricGauges.WalSenderStateGauge = a.OtelManager.Metrics.WalSenderStateGauge
slotMetricGauges.WalStatusGauge = a.OtelManager.Metrics.WalStatusGauge
slotMetricGauges.LogicalDecodingWorkMemGauge = a.OtelManager.Metrics.LogicalDecodingWorkMemGauge
slotMetricGauges.StatsResetGauge = a.OtelManager.Metrics.StatsResetGauge
slotMetricGauges.SpillTxnsGauge = a.OtelManager.Metrics.SpillTxnsGauge
slotMetricGauges.SpillCountGauge = a.OtelManager.Metrics.SpillCountGauge
slotMetricGauges.SpillBytesGauge = a.OtelManager.Metrics.SpillBytesGauge
slotMetricGauges.OpenConnectionsGauge = a.OtelManager.Metrics.OpenConnectionsGauge
slotMetricGauges.OpenReplicationConnectionsGauge = a.OtelManager.Metrics.OpenReplicationConnectionsGauge
slotMetricGauges.IntervalSinceLastNormalizeGauge = a.OtelManager.Metrics.IntervalSinceLastNormalizeGauge
Expand Down
176 changes: 152 additions & 24 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,52 +278,180 @@ func (c *PostgresConnector) checkSlotAndPublication(ctx context.Context, slot st
}

func getSlotInfo(ctx context.Context, conn *pgx.Conn, slotName string, database string) ([]*protos.SlotInfo, error) {
var whereClause string
if slotName != "" {
whereClause = "WHERE slot_name=" + utils.QuoteLiteral(slotName)
} else {
whereClause = "WHERE database=" + utils.QuoteLiteral(database)
}

pgversion, err := shared.GetMajorVersion(ctx, conn)
if err != nil {
return nil, err
}
walStatusSelector := "wal_status"
walStatusSelect := "prs.wal_status"
safeWalSizeSelect := "prs.safe_wal_size"
if pgversion < shared.POSTGRES_13 {
walStatusSelector = "'unknown'"
walStatusSelect = "'unknown'"
safeWalSizeSelect = "NULL::bigint"
}

ldwMBSelect := "NULL::bigint"
ldwPendingSelect := "NULL::boolean"
if pgversion >= shared.POSTGRES_13 {
ldwMBSelect = `(
SELECT (pg_size_bytes(setting || COALESCE(unit,'')) / 1024 / 1024)::bigint
FROM pg_settings WHERE name='logical_decoding_work_mem'
)`
ldwPendingSelect = `(
SELECT pending_restart
FROM pg_settings WHERE name='logical_decoding_work_mem'
)`
}

statsSelect := `
NULL::bigint,
NULL::bigint,
NULL::bigint,
NULL::bigint
`
statsJoin := ""
if pgversion >= shared.POSTGRES_16 {
statsSelect = `
EXTRACT(EPOCH FROM psrs.stats_reset)::bigint,
psrs.spill_txns,
psrs.spill_count,
psrs.spill_bytes
`
statsJoin = `
LEFT JOIN pg_stat_replication_slots AS psrs
ON psrs.slot_name = prs.slot_name
`
}
rows, err := conn.Query(ctx, fmt.Sprintf(`SELECT slot_name, redo_lsn::Text,restart_lsn::text,%s,
confirmed_flush_lsn::text,active,
round((CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() ELSE pg_current_wal_lsn() END
- restart_lsn) / 1024 / 1024) AS MB_Behind
FROM pg_control_checkpoint(),pg_replication_slots %s`, walStatusSelector, whereClause))
var whereClause string
if slotName != "" {
whereClause = "WHERE prs.slot_name=" + utils.QuoteLiteral(slotName)
} else {
whereClause = "WHERE prs.database=" + utils.QuoteLiteral(database)
}
rows, err := conn.Query(ctx, fmt.Sprintf(`
WITH current_wal AS (
SELECT CASE
WHEN pg_is_in_recovery()
THEN pg_last_wal_receive_lsn()
ELSE pg_current_wal_lsn()
END AS current_lsn
)
SELECT
prs.slot_name,
pcc.redo_lsn::text,
prs.restart_lsn::text,
cw.current_lsn::text,
%s, -- prs.wal_status
%s, -- prs.safe_wal_size
prs.confirmed_flush_lsn::text,
psr.sent_lsn::text,
prs.active,
round((cw.current_lsn - prs.restart_lsn) / 1024 / 1024),
round((prs.confirmed_flush_lsn - prs.restart_lsn) / 1024 / 1024),
round((psr.sent_lsn - prs.confirmed_flush_lsn) / 1024 / 1024),
round((cw.current_lsn - psr.sent_lsn) / 1024 / 1024),
psa.wait_event_type,
psa.wait_event,
psa.state,
%s, -- logical_decoding_work_mem megabytes
%s, -- logical_decoding_work_mem pending_restart
%s -- stats
FROM current_wal cw,
pg_control_checkpoint() as pcc,
(pg_replication_slots as prs
LEFT JOIN pg_stat_activity as psa
on psa.pid = prs.active_pid
LEFT JOIN pg_stat_replication as psr
on psr.pid = prs.active_pid
%s)
%s`,
walStatusSelect,
safeWalSizeSelect,
ldwMBSelect,
ldwPendingSelect,
statsSelect,
statsJoin,
whereClause,
))
if err != nil {
return nil, fmt.Errorf("failed to read information for slots: %w", err)
}
defer rows.Close()
var slotInfoRows []*protos.SlotInfo
for rows.Next() {
var redoLSN pgtype.Text
var slotName pgtype.Text
var redoLSN pgtype.Text
var restartLSN pgtype.Text
var currentLSN pgtype.Text
var walStatus pgtype.Text
var safeWalSize pgtype.Int8
var confirmedFlushLSN pgtype.Text
var sentLSN pgtype.Text
var active pgtype.Bool
var lagInMB pgtype.Float4
var walStatus pgtype.Text
err := rows.Scan(&slotName, &redoLSN, &restartLSN, &walStatus, &confirmedFlushLSN, &active, &lagInMB)
var restartToConfirmedMB pgtype.Float4
var confirmedToSentMB pgtype.Float4
var sentToCurrentMB pgtype.Float4
var waitEventType pgtype.Text
var waitEvent pgtype.Text
var backendState pgtype.Text
var ldwMemMB pgtype.Int8
var ldwPendingRestart pgtype.Bool
var statsReset pgtype.Int8
var spillTxns pgtype.Int8
var spillCount pgtype.Int8
var spillBytes pgtype.Int8

err := rows.Scan(
&slotName,
&redoLSN,
&restartLSN,
&currentLSN,
&walStatus,
&safeWalSize,
&confirmedFlushLSN,
&sentLSN,
&active,
&lagInMB,
&restartToConfirmedMB,
&confirmedToSentMB,
&sentToCurrentMB,
&waitEventType,
&waitEvent,
&backendState,
&ldwMemMB,
&ldwPendingRestart,
&statsReset,
&spillTxns,
&spillCount,
&spillBytes,
)
if err != nil {
return nil, err
}

slotInfoRows = append(slotInfoRows, &protos.SlotInfo{
RedoLSN: redoLSN.String,
RestartLSN: restartLSN.String,
WalStatus: walStatus.String,
ConfirmedFlushLSN: confirmedFlushLSN.String,
SlotName: slotName.String,
Active: active.Bool,
LagInMb: lagInMB.Float32,
SlotName: slotName.String,
RedoLSN: redoLSN.String,
RestartLSN: restartLSN.String,
CurrentLSN: currentLSN.String,
Active: active.Bool,
LagInMb: lagInMB.Float32,
ConfirmedFlushLSN: confirmedFlushLSN.String,
SentLSN: sentLSN.String,
RestartToConfirmedMb: restartToConfirmedMB.Float32,
ConfirmedToSentMb: confirmedToSentMB.Float32,
SentToCurrentMb: sentToCurrentMB.Float32,
WalStatus: walStatus.String,
SafeWalSize: safeWalSize.Int64,
WaitEventType: waitEventType.String,
WaitEvent: waitEvent.String,
BackendState: backendState.String,
LogicalDecodingWorkMemMb: ldwMemMB.Int64,
LogicalDecodingWorkMemPendingRestart: ldwPendingRestart.Bool,
StatsReset: statsReset.Int64,
SpillTxns: spillTxns.Int64,
SpillCount: spillCount.Int64,
SpillBytes: spillBytes.Int64,
})
}
return slotInfoRows, nil
Expand Down
Loading
Loading