From 14f223da0f16f6330af8dbc23eefc88758e200b0 Mon Sep 17 00:00:00 2001 From: Mason Sharp Date: Mon, 30 Mar 2026 17:49:43 -0700 Subject: [PATCH] Use non-transactional LogLogicalMessage for sync_event to eliminate WAL send delay sync_event() previously used a transactional LogLogicalMessage which required the reorder buffer to process a full BEGIN/COMMIT cycle before delivery to subscribers. This caused the sync event LSN to not appear in pg_replication_origin_status until another unrelated COMMIT advanced the WAL sender. Switch to a non-transactional message with flush=true, which: - Bypasses the reorder buffer for immediate delivery - Explicitly flushes WAL to wake the walsender via WalSndWakeupProcessRequests - Uses (LogLogicalMessage)() to bypass the 4-arg compat macro and pass the 5th flush parameter directly on PG17+; calls XLogFlush on PG15/16 On the output plugin side, send the startup message before the sync event if it hasn't been sent yet. This is necessary because the startup message is normally sent during pg_decode_begin_txn, but a non-transactional message can arrive before any transaction is decoded on a newly-created subscription, causing a protocol version mismatch that crashes the apply worker. On the apply side, accept non-transactional messages and explicitly call replorigin_session_advance() so pg_replication_origin_status reflects the sync position immediately. Diagnostic logging added to trace sync_event flow across provider, output plugin, and subscriber. --- src/spock_apply.c | 29 ++++++++++++++++++++++------- src/spock_functions.c | 20 +++++++++++++++++++- src/spock_output_plugin.c | 22 ++++++++++++++++++---- 3 files changed, 59 insertions(+), 12 deletions(-) diff --git a/src/spock_apply.c b/src/spock_apply.c index 46d7d96d..e754e629 100644 --- a/src/spock_apply.c +++ b/src/spock_apply.c @@ -2372,7 +2372,7 @@ handle_message(StringInfo s) lsn = pq_getmsgint64(s); transactional = pq_getmsgbyte(s); prefix = pq_getmsgstring(s); - if (lsn == InvalidXLogRecPtr || !transactional || + if (lsn == InvalidXLogRecPtr || strcmp(prefix, SPOCK_MESSAGE_PREFIX) != 0) /* @@ -2396,13 +2396,28 @@ handle_message(StringInfo s) (void) msg; /* unused */ /* - * Do nothing. The main idea is to update the progress status. - * This message must be transactional. That means if we see it - * here, the transaction has been committed on the publisher - * and delivered to the subscriber (note: not exactly for - * streaming mode). The progress status will eventually be - * updated in the commit section of this transaction. + * For non-transactional sync events, explicitly advance the + * replication origin so pg_replication_origin_status.remote_lsn + * reflects this position immediately. For transactional + * messages the origin is advanced at commit time in + * handle_commit(). */ + elog(DEBUG1, "SPOCK %s: received sync_event at %X/%X " + "(transactional=%d, origin=%d)", + MySubscription->name, + LSN_FORMAT_ARGS(lsn), + transactional, + replorigin_session_origin); + + if (!transactional && + replorigin_session_origin != InvalidRepOriginId && + replorigin_session_origin != DoNotReplicateId) + { + replorigin_session_advance(lsn, InvalidXLogRecPtr); + elog(DEBUG1, "SPOCK %s: advanced origin to %X/%X", + MySubscription->name, + LSN_FORMAT_ARGS(lsn)); + } } break; diff --git a/src/spock_functions.c b/src/spock_functions.c index a8e0190f..3f0be6d3 100644 --- a/src/spock_functions.c +++ b/src/spock_functions.c @@ -3293,8 +3293,26 @@ spock_create_sync_event(PG_FUNCTION_ARGS) message.eorigin = node->node->id; memset(NameStr(message.ename), 0, NAMEDATALEN); + /* + * Use a non-transactional message with flush. This bypasses the reorder + * buffer so the walsender delivers it immediately, and the explicit WAL + * flush wakes the walsender via WalSndWakeupProcessRequests. + * + * PG17+ has a 5th flush parameter; on PG15/16 we call XLogFlush manually. + * Parentheses around the function name bypass the 4-arg compat macro on + * PG17/18 so we can pass the 5th parameter directly. + */ +#if PG_VERSION_NUM >= 170000 + lsn = (LogLogicalMessage)(SPOCK_MESSAGE_PREFIX, (char *) &message, + sizeof(message), false, true); +#else lsn = LogLogicalMessage(SPOCK_MESSAGE_PREFIX, (char *) &message, - sizeof(message), true); + sizeof(message), false); + XLogFlush(lsn); +#endif + + elog(DEBUG1, "SPOCK sync_event: emitted non-transactional message at %X/%X", + LSN_FORMAT_ARGS(lsn)); PG_RETURN_LSN(lsn); } diff --git a/src/spock_output_plugin.c b/src/spock_output_plugin.c index 9eaf4524..ef0e6040 100644 --- a/src/spock_output_plugin.c +++ b/src/spock_output_plugin.c @@ -783,16 +783,30 @@ pg_decode_message(LogicalDecodingContext *ctx, MemoryContext oldctx; SpockOutputData *data; - Assert(txn != NULL); - + /* txn is NULL for non-transactional messages */ data = (SpockOutputData *) ctx->output_plugin_private; oldctx = MemoryContextSwitchTo(data->context); + /* + * The startup message is normally sent in pg_decode_begin_txn. + * A non-transactional sync event can arrive before any + * transaction is decoded (e.g. right after slot creation). + * Send the startup message first so the subscriber knows the + * protocol version and can parse the proto-v5 header. + */ + if (!startup_message_sent) + send_startup_message(ctx, data, false); + + elog(DEBUG1, "SPOCK output: sending sync_event at %X/%X " + "(transactional=%d, startup_sent=%d)", + LSN_FORMAT_ARGS(message_lsn), + transactional, startup_message_sent); + OutputPluginPrepareWrite(ctx, true); spock_write_message(ctx->out, - txn->xid, + txn ? txn->xid : InvalidTransactionId, message_lsn, - true, + transactional, prefix, message_size, message);