Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ SRCS := $(wildcard src/*.c) \
OBJS = $(filter-out src/spock_output.o, $(SRCS:.c=.o))

PG_CPPFLAGS += -I$(libpq_srcdir) \
-I$(realpath include) \
-I$(realpath src/compat/$(PGVER)) \
-I"$(realpath include)" \
-I"$(realpath src/compat/$(PGVER))" \
-Werror=implicit-function-declaration
SHLIB_LINK += $(libpq) $(filter -lintl, $(LIBS))

Expand All @@ -38,11 +38,11 @@ include $(PGXS)
spock_output.o: src/spock_output.c
$(CC) $(CFLAGS) $(CPPFLAGS) -c -o $@ $<

spock_version=$(shell grep "^\#define \<SPOCK_VERSION\>" $(realpath include/spock.h) | cut -d'"' -f2)
spock_version=$(shell grep "^\#define \<SPOCK_VERSION\>" "$(realpath include/spock.h)" | cut -d'"' -f2)
requires =
control_path = $(abspath $(srcdir))/spock.control
spock.control: spock.control.in include/spock.h
sed 's/__SPOCK_VERSION__/$(spock_version)/;s/__REQUIRES__/$(requires)/' $(realpath $(srcdir)/spock.control.in) > $(control_path)
sed 's/__SPOCK_VERSION__/$(spock_version)/;s/__REQUIRES__/$(requires)/' "$(realpath $(srcdir)/spock.control.in)" > "$(control_path)"


all: spock.control
Expand Down
35 changes: 27 additions & 8 deletions docs/configuring.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,33 @@ keepalive options, etc.
the upstream server disappears unexpectedly. To disable them add
`keepalives = 0` to `spock.extra_connection_options`.

#### `spock.feedback_frequency`

Controls how many WAL messages the apply worker processes before sending
an LSN feedback packet to the provider. Lower values increase feedback
overhead due to synchronous socket flushes; higher values reduce overhead
during bulk catch-up. There is a time-based guard (wal_sender_timeout / 2)
that ensures connection liveness regardless of this setting. The default
is 200.
### `wal_sender_timeout`

For Spock replication, set `wal_sender_timeout` to a conservative value such
as `5min` (300000ms) on each node in `postgresql.conf`:

```
wal_sender_timeout = '5min'
```

The default PostgreSQL value of `60s` can cause spurious disconnects when
the subscriber is busy applying a large transaction and cannot send feedback
in time. A higher value gives the apply worker enough headroom while still
detecting truly dead connections. Liveness detection is primarily handled by
TCP keepalives, and `spock.apply_idle_timeout` provides an additional
subscriber-side safety net.

### `spock.apply_idle_timeout`

Maximum idle time (in seconds) before the apply worker reconnects to the
provider. This acts as a safety net for detecting a hung walsender that keeps
the TCP connection alive but stops sending data. The timer resets on any
received message. Set to `0` to disable and rely solely on TCP keepalive for
liveness detection. Default: `300` (5 minutes).

```
spock.apply_idle_timeout = 300
```

### `spock.include_ddl_repset`

Expand Down
2 changes: 1 addition & 1 deletion include/spock.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ extern bool allow_ddl_from_functions;
extern int restart_delay_default;
extern int restart_delay_on_exception;
extern int spock_replay_queue_size; /* Deprecated - no longer used */
extern int spock_feedback_frequency;
extern bool check_all_uc_indexes;
extern bool spock_enable_quiet_mode;
extern int log_origin_change;
extern int spock_apply_idle_timeout;

extern char *shorten_hash(const char *str, int maxlen);
extern void gen_slot_name(Name slot_name, char *dbname,
Expand Down
42 changes: 21 additions & 21 deletions src/spock.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@ bool allow_ddl_from_functions = false;
int restart_delay_default;
int restart_delay_on_exception;
int spock_replay_queue_size; /* Deprecated - no longer used */
int spock_feedback_frequency;
bool check_all_uc_indexes = false;
bool spock_enable_quiet_mode = false;
int log_origin_change = SPOCK_ORIGIN_NONE;
int spock_apply_idle_timeout = 300;

static emit_log_hook_type prev_emit_log_hook = NULL;
static Checkpoint_hook_type prev_Checkpoint_hook = NULL;
Expand Down Expand Up @@ -317,7 +317,7 @@ get_spock_table_oid(const char *table)
return reloid;
}

#define CONN_PARAM_ARRAY_SIZE 9
#define CONN_PARAM_ARRAY_SIZE 10

static PGconn *
spock_connect_base(const char *connstr, const char *appname,
Expand Down Expand Up @@ -357,13 +357,13 @@ spock_connect_base(const char *connstr, const char *appname,
vals[i] = "1";
i++;
keys[i] = "keepalives_idle";
vals[i] = "20";
vals[i] = "10";
i++;
keys[i] = "keepalives_interval";
vals[i] = "20";
vals[i] = "5";
i++;
keys[i] = "keepalives_count";
vals[i] = "5";
vals[i] = "3";
i++;
keys[i] = "replication";
vals[i] = replication ? "database" : NULL;
Expand Down Expand Up @@ -1167,22 +1167,6 @@ _PG_init(void)
NULL,
NULL);

DefineCustomIntVariable("spock.feedback_frequency",
"Number of WAL messages between feedback to provider",
"Controls how often the apply worker sends LSN feedback "
"to the provider during replication. Lower values increase "
"feedback overhead; the time-based guard (wal_sender_timeout/2) "
"ensures liveness regardless of this setting.",
&spock_feedback_frequency,
200,
1,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);

DefineCustomEnumVariable("spock.readonly",
gettext_noop("Controls cluster read-only mode."),
NULL,
Expand Down Expand Up @@ -1223,6 +1207,22 @@ _PG_init(void)
PGC_SUSET, 0,
NULL, NULL, NULL);

DefineCustomIntVariable("spock.apply_idle_timeout",
"Maximum idle time in seconds before apply worker reconnects",
"Safety net for detecting a hung walsender that keeps the "
"TCP connection alive but stops sending data. The timer "
"resets on any received message. Set to 0 to disable and "
"rely solely on TCP keepalive for liveness detection.",
&spock_apply_idle_timeout,
300,
0,
INT_MAX,
PGC_SIGHUP,
GUC_UNIT_S,
NULL,
NULL,
NULL);

if (IsBinaryUpgrade)
return;

Expand Down
83 changes: 18 additions & 65 deletions src/spock_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,6 @@ static bool should_log_exception(bool failed);
static ApplyReplayEntry * apply_replay_entry_create(int r, char *buf);
static void apply_replay_entry_free(ApplyReplayEntry * entry);
static void apply_replay_queue_reset(void);
static void maybe_send_feedback(PGconn *applyconn, XLogRecPtr lsn_to_send,
TimestampTz *last_receive_timestamp);
static void append_feedback_position(XLogRecPtr recvpos);
static void get_feedback_position(XLogRecPtr *recvpos, XLogRecPtr *writepos,
XLogRecPtr *flushpos, XLogRecPtr *max_recvpos);
Expand Down Expand Up @@ -2967,25 +2965,24 @@ apply_work(PGconn *streamConn)
}

/*
* The walsender is supposed to ping us for a status update every
* wal_sender_timeout / 2 milliseconds. If we don't get those, we
* assume that we have lost the connection.
*
* Note: keepalive configuration is supposed to cover this but is
* apparently unreliable.
* Connection liveness is handled by TCP keepalive (primary)
* and PQstatus == CONNECTION_BAD (above). The idle timeout
* below is a safety net for the case where the walsender
* process is alive but hung -- TCP probes succeed because the
* kernel ACKs them, but no data is being sent.
*/
if (rc & WL_TIMEOUT)
if (rc & WL_TIMEOUT && spock_apply_idle_timeout > 0)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like if walsender just doesn't have data to send for a long time, subscriber will restart. Am I wrong?

It would be better to modify walsender little: skip keepalive messages being busy and rely on TCP status. But send keepalive messages if no data arrives from the WAL. In this case we don't need any subscriber-side GUC at all.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does a slightly different task. We are relying on TCP_KEEPALIVE. The idle time is just for the guard; if the work is stuck at the application level, but the kernel TCP keep-alive will continue, there will be no way to restart the wal_sender.

{
TimestampTz timeout;

timeout = TimestampTzPlusMilliseconds(last_receive_timestamp,
(wal_sender_timeout * 3) / 2);
(long) spock_apply_idle_timeout * 1000L);
if (GetCurrentTimestamp() > timeout)
{
MySpockWorker->worker_status = SPOCK_WORKER_STATUS_STOPPED;
elog(ERROR, "SPOCK %s: terminating apply due to missing "
"walsender ping",
MySubscription->name);
elog(ERROR, "SPOCK %s: no data received for %d seconds, "
"reconnecting (spock.apply_idle_timeout)",
MySubscription->name, spock_apply_idle_timeout);
}
}

Expand Down Expand Up @@ -3015,13 +3012,6 @@ apply_work(PGconn *streamConn)
*/
if (spock_readonly == READONLY_ALL)
{
/*
* Send feedback to keep walsender alive - we may avoid it
* with introduction of TCP keepalive approach.
*/
maybe_send_feedback(applyconn, last_received,
&last_receive_timestamp);

/*
* In case of an exception we can't break out of the loop
* because exception processing code may also modify the
Expand Down Expand Up @@ -3049,8 +3039,6 @@ apply_work(PGconn *streamConn)
/* We are not in replay mode so receive from the stream */
r = PQgetCopyData(applyconn, &buf, 1);

last_receive_timestamp = GetCurrentTimestamp();

/* Check for errors */
if (r == -1)
{
Expand Down Expand Up @@ -3082,6 +3070,14 @@ apply_work(PGconn *streamConn)
break;
}

/*
* We received actual data. Update the idle-timeout clock
* only here, after confirming r > 0, so that a WL_TIMEOUT
* spin with no incoming data does not silently reset the
* timer and mask a hung walsender.
*/
last_receive_timestamp = GetCurrentTimestamp();

/*
* We have a valid message, create an apply queue entry
* but don't add it to the queue yet.
Expand Down Expand Up @@ -3111,16 +3107,6 @@ apply_work(PGconn *streamConn)
end_lsn = pq_getmsgint64(msg);
pq_getmsgint64(msg); /* sendTime */

/*
* Call maybe_send_feedback before last_received is
* updated. This ordering guarantees that feedback LSN
* never advertises a position beyond what has actually
* been received and processed. Prevents skipping over
* unapplied changes due to premature flush LSN.
*/
maybe_send_feedback(applyconn, last_received,
&last_receive_timestamp);

if (last_received < start_lsn)
last_received = start_lsn;

Expand Down Expand Up @@ -4106,39 +4092,6 @@ apply_replay_queue_reset(void)
MemoryContextReset(ApplyReplayContext);
}

/*
* Check if we should send feedback based on message count or timeout.
* Resets internal state if feedback is sent.
*/
static void
maybe_send_feedback(PGconn *applyconn, XLogRecPtr lsn_to_send,
TimestampTz *last_receive_timestamp)
{
static int w_message_count = 0;
TimestampTz now = GetCurrentTimestamp();

w_message_count++;

/*
* Send feedback if wal_sender_timeout/2 has passed or after
* spock.feedback_frequency 'w' messages.
*/
if (TimestampDifferenceExceeds(*last_receive_timestamp, now, wal_sender_timeout / 2) ||
w_message_count >= spock_feedback_frequency)
{
elog(DEBUG2, "SPOCK %s: force sending feedback after %d 'w' messages or timeout",
MySubscription->name, w_message_count);

/*
* We need to send feedback to the walsender process to avoid remote
* wal_sender_timeout.
*/
send_feedback(applyconn, lsn_to_send, now, true);
*last_receive_timestamp = now;
w_message_count = 0;
}
}

/*
* Advance the replication origin for forwarded transactions.
*
Expand Down
Loading