Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions src/spock_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -2372,7 +2372,7 @@ handle_message(StringInfo s)
lsn = pq_getmsgint64(s);
transactional = pq_getmsgbyte(s);
prefix = pq_getmsgstring(s);
if (lsn == InvalidXLogRecPtr || !transactional ||
if (lsn == InvalidXLogRecPtr ||
strcmp(prefix, SPOCK_MESSAGE_PREFIX) != 0)

/*
Expand All @@ -2396,13 +2396,28 @@ handle_message(StringInfo s)
(void) msg; /* unused */

/*
* Do nothing. The main idea is to update the progress status.
* This message must be transactional. That means if we see it
* here, the transaction has been committed on the publisher
* and delivered to the subscriber (note: not exactly for
* streaming mode). The progress status will eventually be
* updated in the commit section of this transaction.
* For non-transactional sync events, explicitly advance the
* replication origin so pg_replication_origin_status.remote_lsn
* reflects this position immediately. For transactional
* messages the origin is advanced at commit time in
* handle_commit().
*/
elog(DEBUG1, "SPOCK %s: received sync_event at %X/%X "
"(transactional=%d, origin=%d)",
MySubscription->name,
LSN_FORMAT_ARGS(lsn),
transactional,
replorigin_session_origin);

if (!transactional &&
replorigin_session_origin != InvalidRepOriginId &&
replorigin_session_origin != DoNotReplicateId)
{
replorigin_session_advance(lsn, InvalidXLogRecPtr);
elog(DEBUG1, "SPOCK %s: advanced origin to %X/%X",
MySubscription->name,
LSN_FORMAT_ARGS(lsn));
}
}
break;

Expand Down
20 changes: 19 additions & 1 deletion src/spock_functions.c
Original file line number Diff line number Diff line change
Expand Up @@ -3293,8 +3293,26 @@ spock_create_sync_event(PG_FUNCTION_ARGS)
message.eorigin = node->node->id;
memset(NameStr(message.ename), 0, NAMEDATALEN);

/*
* Use a non-transactional message with flush. This bypasses the reorder
* buffer so the walsender delivers it immediately, and the explicit WAL
* flush wakes the walsender via WalSndWakeupProcessRequests.
*
* PG17+ has a 5th flush parameter; on PG15/16 we call XLogFlush manually.
* Parentheses around the function name bypass the 4-arg compat macro on
* PG17/18 so we can pass the 5th parameter directly.
*/
#if PG_VERSION_NUM >= 170000
lsn = (LogLogicalMessage)(SPOCK_MESSAGE_PREFIX, (char *) &message,
sizeof(message), false, true);
#else
lsn = LogLogicalMessage(SPOCK_MESSAGE_PREFIX, (char *) &message,
sizeof(message), true);
sizeof(message), false);
XLogFlush(lsn);
#endif
Comment on lines +3296 to +3312
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Find and examine the spock_create_sync_event function around the flagged lines
fd -t f spock_functions.c | head -5

Repository: pgEdge/spock

Length of output: 77


🏁 Script executed:

# Get context around lines 3296-3312
head -3320 src/spock_functions.c | tail -100 | cat -n

Repository: pgEdge/spock

Length of output: 4001


🏁 Script executed:

# Search for IsTransactionBlock in the codebase and PostgreSQL
rg -i "IsTransactionBlock|IsSubTransaction" --max-count=20

Repository: pgEdge/spock

Length of output: 215


🏁 Script executed:

# Find callers of spock_create_sync_event to understand usage patterns
rg "spock_create_sync_event" -A 3 -B 3

Repository: pgEdge/spock

Length of output: 1492


🏁 Script executed:

# Check the function definition and any comments about barrier semantics
ast-grep --pattern $'Datum
spock_create_sync_event($_) {
  $$$
}'

Repository: pgEdge/spock

Length of output: 2102


🏁 Script executed:

# Find the exact lines with IsTransactionBlock usage
rg -n "IsTransactionBlock|IsSubTransaction" src/spock_functions.c

Repository: pgEdge/spock

Length of output: 179


🏁 Script executed:

# Get context around those lines to see the pattern
rg -B 10 -A 5 "IsTransactionBlock" src/spock_functions.c

Repository: pgEdge/spock

Length of output: 857


🏁 Script executed:

# Check if there's any documentation or design notes about sync_event in comments
rg -B 5 -A 15 "spock_create_sync_event|sync_event|synchronization event" src/spock_functions.c | head -80

Repository: pgEdge/spock

Length of output: 2697


Add transaction-block guard to spock_create_sync_event().

The function documents barrier semantics ("ensuring that all changes up to and including the event have been replicated"), but uses non-transactional LogLogicalMessage() which bypasses the reorder buffer and is decoded immediately. This creates a correctness hole: BEGIN; INSERT ...; SELECT spock_create_sync_event(); COMMIT; exposes the sync marker before the transactional changes are replayed on the subscriber, violating the barrier contract.

Add a guard at the start of the function (lines 3284–3290, after check_local_node()) to reject explicit transaction blocks and subtransactions, matching the pattern already used in alter_subscription_disable() and alter_subscription_enable() at lines 781 and 817:

Suggested guard
 Datum
 spock_create_sync_event(PG_FUNCTION_ARGS)
 {
 	SpockLocalNode *node;
 	SpockSyncEventMessage message;
 	XLogRecPtr	lsn;
 
 	node = check_local_node(true);
+	if (IsTransactionBlock() || IsSubTransaction())
+		ereport(ERROR,
+				(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
+				 errmsg("spock_create_sync_event() cannot be run inside a transaction block")));
+
 	message.mtype = SPOCK_SYNC_EVENT_MSG;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/spock_functions.c` around lines 3296 - 3312, spock_create_sync_event
currently emits a non-transactional logical message which can be observed before
surrounding transactional changes are committed; add an explicit guard at the
start of spock_create_sync_event (immediately after the existing
check_local_node() call) that rejects being called inside an explicit
transaction block or a subtransaction (use the same check/ereport pattern used
in alter_subscription_disable() and alter_subscription_enable() to detect
IsInTransactionBlock()/IsInSubTransaction() and raise an ERROR with a clear
message), so the function can only be run in a top-level non-transactional
context.


elog(DEBUG1, "SPOCK sync_event: emitted non-transactional message at %X/%X",
LSN_FORMAT_ARGS(lsn));

PG_RETURN_LSN(lsn);
}
Expand Down
22 changes: 18 additions & 4 deletions src/spock_output_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -783,16 +783,30 @@ pg_decode_message(LogicalDecodingContext *ctx,
MemoryContext oldctx;
SpockOutputData *data;

Assert(txn != NULL);

/* txn is NULL for non-transactional messages */
data = (SpockOutputData *) ctx->output_plugin_private;
oldctx = MemoryContextSwitchTo(data->context);

/*
* The startup message is normally sent in pg_decode_begin_txn.
* A non-transactional sync event can arrive before any
* transaction is decoded (e.g. right after slot creation).
* Send the startup message first so the subscriber knows the
* protocol version and can parse the proto-v5 header.
*/
if (!startup_message_sent)
send_startup_message(ctx, data, false);

elog(DEBUG1, "SPOCK output: sending sync_event at %X/%X "
"(transactional=%d, startup_sent=%d)",
LSN_FORMAT_ARGS(message_lsn),
transactional, startup_message_sent);

OutputPluginPrepareWrite(ctx, true);
spock_write_message(ctx->out,
txn->xid,
txn ? txn->xid : InvalidTransactionId,
message_lsn,
true,
transactional,
prefix,
message_size,
message);
Expand Down
Loading