diff --git a/Makefile b/Makefile index c0b0f9aa..e8e5600e 100644 --- a/Makefile +++ b/Makefile @@ -19,8 +19,8 @@ SRCS := $(wildcard src/*.c) \ OBJS = $(filter-out src/spock_output.o, $(SRCS:.c=.o)) PG_CPPFLAGS += -I$(libpq_srcdir) \ - -I$(realpath include) \ - -I$(realpath src/compat/$(PGVER)) \ + -I"$(realpath include)" \ + -I"$(realpath src/compat/$(PGVER))" \ -Werror=implicit-function-declaration SHLIB_LINK += $(libpq) $(filter -lintl, $(LIBS)) @@ -38,11 +38,11 @@ include $(PGXS) spock_output.o: src/spock_output.c $(CC) $(CFLAGS) $(CPPFLAGS) -c -o $@ $< -spock_version=$(shell grep "^\#define \" $(realpath include/spock.h) | cut -d'"' -f2) +spock_version=$(shell grep "^\#define \" "$(realpath include/spock.h)" | cut -d'"' -f2) requires = control_path = $(abspath $(srcdir))/spock.control spock.control: spock.control.in include/spock.h - sed 's/__SPOCK_VERSION__/$(spock_version)/;s/__REQUIRES__/$(requires)/' $(realpath $(srcdir)/spock.control.in) > $(control_path) + sed 's/__SPOCK_VERSION__/$(spock_version)/;s/__REQUIRES__/$(requires)/' "$(realpath $(srcdir)/spock.control.in)" > "$(control_path)" all: spock.control diff --git a/docs/configuring.md b/docs/configuring.md index 7021ba88..de7240a3 100644 --- a/docs/configuring.md +++ b/docs/configuring.md @@ -191,14 +191,33 @@ keepalive options, etc. the upstream server disappears unexpectedly. To disable them add `keepalives = 0` to `spock.extra_connection_options`. -#### `spock.feedback_frequency` - -Controls how many WAL messages the apply worker processes before sending -an LSN feedback packet to the provider. Lower values increase feedback -overhead due to synchronous socket flushes; higher values reduce overhead -during bulk catch-up. There is a time-based guard (wal_sender_timeout / 2) -that ensures connection liveness regardless of this setting. The default -is 200. +### `wal_sender_timeout` + +For Spock replication, set `wal_sender_timeout` to a conservative value such +as `5min` (300000ms) on each node in `postgresql.conf`: + +``` +wal_sender_timeout = '5min' +``` + +The default PostgreSQL value of `60s` can cause spurious disconnects when +the subscriber is busy applying a large transaction and cannot send feedback +in time. A higher value gives the apply worker enough headroom while still +detecting truly dead connections. Liveness detection is primarily handled by +TCP keepalives, and `spock.apply_idle_timeout` provides an additional +subscriber-side safety net. + +### `spock.apply_idle_timeout` + +Maximum idle time (in seconds) before the apply worker reconnects to the +provider. This acts as a safety net for detecting a hung walsender that keeps +the TCP connection alive but stops sending data. The timer resets on any +received message. Set to `0` to disable and rely solely on TCP keepalive for +liveness detection. Default: `300` (5 minutes). + +``` +spock.apply_idle_timeout = 300 +``` ### `spock.include_ddl_repset` diff --git a/include/spock.h b/include/spock.h index 891d616b..291b0975 100644 --- a/include/spock.h +++ b/include/spock.h @@ -51,10 +51,10 @@ extern bool allow_ddl_from_functions; extern int restart_delay_default; extern int restart_delay_on_exception; extern int spock_replay_queue_size; /* Deprecated - no longer used */ -extern int spock_feedback_frequency; extern bool check_all_uc_indexes; extern bool spock_enable_quiet_mode; extern int log_origin_change; +extern int spock_apply_idle_timeout; extern char *shorten_hash(const char *str, int maxlen); extern void gen_slot_name(Name slot_name, char *dbname, diff --git a/src/spock.c b/src/spock.c index f23c9f94..8e58d494 100644 --- a/src/spock.c +++ b/src/spock.c @@ -144,10 +144,10 @@ bool allow_ddl_from_functions = false; int restart_delay_default; int restart_delay_on_exception; int spock_replay_queue_size; /* Deprecated - no longer used */ -int spock_feedback_frequency; bool check_all_uc_indexes = false; bool spock_enable_quiet_mode = false; int log_origin_change = SPOCK_ORIGIN_NONE; +int spock_apply_idle_timeout = 300; static emit_log_hook_type prev_emit_log_hook = NULL; static Checkpoint_hook_type prev_Checkpoint_hook = NULL; @@ -317,7 +317,7 @@ get_spock_table_oid(const char *table) return reloid; } -#define CONN_PARAM_ARRAY_SIZE 9 +#define CONN_PARAM_ARRAY_SIZE 10 static PGconn * spock_connect_base(const char *connstr, const char *appname, @@ -357,13 +357,13 @@ spock_connect_base(const char *connstr, const char *appname, vals[i] = "1"; i++; keys[i] = "keepalives_idle"; - vals[i] = "20"; + vals[i] = "10"; i++; keys[i] = "keepalives_interval"; - vals[i] = "20"; + vals[i] = "5"; i++; keys[i] = "keepalives_count"; - vals[i] = "5"; + vals[i] = "3"; i++; keys[i] = "replication"; vals[i] = replication ? "database" : NULL; @@ -1167,22 +1167,6 @@ _PG_init(void) NULL, NULL); - DefineCustomIntVariable("spock.feedback_frequency", - "Number of WAL messages between feedback to provider", - "Controls how often the apply worker sends LSN feedback " - "to the provider during replication. Lower values increase " - "feedback overhead; the time-based guard (wal_sender_timeout/2) " - "ensures liveness regardless of this setting.", - &spock_feedback_frequency, - 200, - 1, - INT_MAX, - PGC_SIGHUP, - 0, - NULL, - NULL, - NULL); - DefineCustomEnumVariable("spock.readonly", gettext_noop("Controls cluster read-only mode."), NULL, @@ -1223,6 +1207,22 @@ _PG_init(void) PGC_SUSET, 0, NULL, NULL, NULL); + DefineCustomIntVariable("spock.apply_idle_timeout", + "Maximum idle time in seconds before apply worker reconnects", + "Safety net for detecting a hung walsender that keeps the " + "TCP connection alive but stops sending data. The timer " + "resets on any received message. Set to 0 to disable and " + "rely solely on TCP keepalive for liveness detection.", + &spock_apply_idle_timeout, + 300, + 0, + INT_MAX, + PGC_SIGHUP, + GUC_UNIT_S, + NULL, + NULL, + NULL); + if (IsBinaryUpgrade) return; diff --git a/src/spock_apply.c b/src/spock_apply.c index 17e5bc99..2ca69640 100644 --- a/src/spock_apply.c +++ b/src/spock_apply.c @@ -229,8 +229,6 @@ static bool should_log_exception(bool failed); static ApplyReplayEntry * apply_replay_entry_create(int r, char *buf); static void apply_replay_entry_free(ApplyReplayEntry * entry); static void apply_replay_queue_reset(void); -static void maybe_send_feedback(PGconn *applyconn, XLogRecPtr lsn_to_send, - TimestampTz *last_receive_timestamp); static void append_feedback_position(XLogRecPtr recvpos); static void get_feedback_position(XLogRecPtr *recvpos, XLogRecPtr *writepos, XLogRecPtr *flushpos, XLogRecPtr *max_recvpos); @@ -2967,25 +2965,24 @@ apply_work(PGconn *streamConn) } /* - * The walsender is supposed to ping us for a status update every - * wal_sender_timeout / 2 milliseconds. If we don't get those, we - * assume that we have lost the connection. - * - * Note: keepalive configuration is supposed to cover this but is - * apparently unreliable. + * Connection liveness is handled by TCP keepalive (primary) + * and PQstatus == CONNECTION_BAD (above). The idle timeout + * below is a safety net for the case where the walsender + * process is alive but hung -- TCP probes succeed because the + * kernel ACKs them, but no data is being sent. */ - if (rc & WL_TIMEOUT) + if (rc & WL_TIMEOUT && spock_apply_idle_timeout > 0) { TimestampTz timeout; timeout = TimestampTzPlusMilliseconds(last_receive_timestamp, - (wal_sender_timeout * 3) / 2); + (long) spock_apply_idle_timeout * 1000L); if (GetCurrentTimestamp() > timeout) { MySpockWorker->worker_status = SPOCK_WORKER_STATUS_STOPPED; - elog(ERROR, "SPOCK %s: terminating apply due to missing " - "walsender ping", - MySubscription->name); + elog(ERROR, "SPOCK %s: no data received for %d seconds, " + "reconnecting (spock.apply_idle_timeout)", + MySubscription->name, spock_apply_idle_timeout); } } @@ -3015,13 +3012,6 @@ apply_work(PGconn *streamConn) */ if (spock_readonly == READONLY_ALL) { - /* - * Send feedback to keep walsender alive - we may avoid it - * with introduction of TCP keepalive approach. - */ - maybe_send_feedback(applyconn, last_received, - &last_receive_timestamp); - /* * In case of an exception we can't break out of the loop * because exception processing code may also modify the @@ -3049,8 +3039,6 @@ apply_work(PGconn *streamConn) /* We are not in replay mode so receive from the stream */ r = PQgetCopyData(applyconn, &buf, 1); - last_receive_timestamp = GetCurrentTimestamp(); - /* Check for errors */ if (r == -1) { @@ -3082,6 +3070,14 @@ apply_work(PGconn *streamConn) break; } + /* + * We received actual data. Update the idle-timeout clock + * only here, after confirming r > 0, so that a WL_TIMEOUT + * spin with no incoming data does not silently reset the + * timer and mask a hung walsender. + */ + last_receive_timestamp = GetCurrentTimestamp(); + /* * We have a valid message, create an apply queue entry * but don't add it to the queue yet. @@ -3111,16 +3107,6 @@ apply_work(PGconn *streamConn) end_lsn = pq_getmsgint64(msg); pq_getmsgint64(msg); /* sendTime */ - /* - * Call maybe_send_feedback before last_received is - * updated. This ordering guarantees that feedback LSN - * never advertises a position beyond what has actually - * been received and processed. Prevents skipping over - * unapplied changes due to premature flush LSN. - */ - maybe_send_feedback(applyconn, last_received, - &last_receive_timestamp); - if (last_received < start_lsn) last_received = start_lsn; @@ -4106,39 +4092,6 @@ apply_replay_queue_reset(void) MemoryContextReset(ApplyReplayContext); } -/* - * Check if we should send feedback based on message count or timeout. - * Resets internal state if feedback is sent. - */ -static void -maybe_send_feedback(PGconn *applyconn, XLogRecPtr lsn_to_send, - TimestampTz *last_receive_timestamp) -{ - static int w_message_count = 0; - TimestampTz now = GetCurrentTimestamp(); - - w_message_count++; - - /* - * Send feedback if wal_sender_timeout/2 has passed or after - * spock.feedback_frequency 'w' messages. - */ - if (TimestampDifferenceExceeds(*last_receive_timestamp, now, wal_sender_timeout / 2) || - w_message_count >= spock_feedback_frequency) - { - elog(DEBUG2, "SPOCK %s: force sending feedback after %d 'w' messages or timeout", - MySubscription->name, w_message_count); - - /* - * We need to send feedback to the walsender process to avoid remote - * wal_sender_timeout. - */ - send_feedback(applyconn, lsn_to_send, now, true); - *last_receive_timestamp = now; - w_message_count = 0; - } -} - /* * Advance the replication origin for forwarded transactions. *