From 89056e2ea8b80d6e6483a10b78ea55c290f1ae57 Mon Sep 17 00:00:00 2001 From: Loudghiri Ahmed Date: Fri, 10 Apr 2026 19:00:16 +0100 Subject: [PATCH 1/7] fix: subscriber thread never self heals after half open tcp connection (#1) --- lib/message_bus.rb | 15 ++++++- spec/lib/message_bus_spec.rb | 81 ++++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 2 deletions(-) diff --git a/lib/message_bus.rb b/lib/message_bus.rb index cb927a5c..7c83f411 100644 --- a/lib/message_bus.rb +++ b/lib/message_bus.rb @@ -752,9 +752,20 @@ def new_subscriber_thread publish("/__mb_keepalive__/", Process.pid, user_ids: [-1]) if (Time.now - (@last_message || Time.now)) > keepalive_interval * 3 logger.warn "Global messages on #{Process.pid} timed out, message bus is no longer functioning correctly" + # The subscriber thread is stuck (half-open TCP connection or similar network issue) + # and is no longer receiving messages. Kill it and let ensure_subscriber_thread + # create a fresh one with a new Redis connection. + @mutex.synchronize do + if @subscriber_thread == thread + thread.kill + @subscriber_thread = nil + end + end + ensure_subscriber_thread + # The new thread sets up its own keepalive blk stop re-queuing this one. + else + timer.queue(keepalive_interval, &blk) if keepalive_interval > MIN_KEEPALIVE end - - timer.queue(keepalive_interval, &blk) if keepalive_interval > MIN_KEEPALIVE end end diff --git a/spec/lib/message_bus_spec.rb b/spec/lib/message_bus_spec.rb index 5c7a6c0a..864f0e53 100644 --- a/spec/lib/message_bus_spec.rb +++ b/spec/lib/message_bus_spec.rb @@ -398,4 +398,85 @@ channel.must_equal('/test') end end + + describe "keepalive recovery" do + # MIN_KEEPALIVE is 20s in production, making real-timer tests take 60s+. + # We temporarily lower it so keepalive_interval=1 is accepted, which + # triggers recovery at ~4s fast enough for CI, slow enough to be real. + FAST_KEEPALIVE = 1 + + before do + @original_min_keepalive = MessageBus::Implementation::MIN_KEEPALIVE + MessageBus::Implementation.send(:remove_const, :MIN_KEEPALIVE) + MessageBus::Implementation.const_set(:MIN_KEEPALIVE, 0) + + @log_output = StringIO.new + @bus.configure(keepalive_interval: FAST_KEEPALIVE, logger: Logger.new(@log_output)) + + # Capture the real backend method before stubbing. + @real_global_subscribe = @bus.backend_instance.method(:global_subscribe) + @call_count = 0 + + # First call to global_subscribe: simulate a half-open TCP connection. + # The thread stays alive but never yields messages, so @last_message + # goes stale and the keepalive eventually fires recovery. + # + # Subsequent calls: delegate to the real backend so the recovered + # subscriber thread actually processes messages end-to-end. + real_gs = @real_global_subscribe + call_count_ref = -> { @call_count += 1 } + backend = @bus.backend_instance + + backend.define_singleton_method(:global_subscribe) do |last_id = nil, &blk| + if call_count_ref.call == 1 + @subscribed = true + loop { sleep 0.05; break unless @subscribed } + else + # Remove both stubs so the real backend handles the full lifecycle + # of the recovered thread especially global_unsubscribe, which + # destroy needs to signal client.subscribe to exit. + backend.singleton_class.remove_method(:global_subscribe) + backend.singleton_class.remove_method(:global_unsubscribe) + real_gs.call(last_id, &blk) + end + end + + backend.define_singleton_method(:global_unsubscribe) do + @subscribed = false + end + + @bus.after_fork + wait_for(2000) { @bus.listening? } + end + + after do + MessageBus::Implementation.send(:remove_const, :MIN_KEEPALIVE) + MessageBus::Implementation.const_set(:MIN_KEEPALIVE, @original_min_keepalive) + end + + it "recovers message delivery after a stuck subscriber thread" do + @bus.listening?.must_equal true + + received = [] + @bus.subscribe("/recovery-test") { |msg| received << msg.data } + + # Wait for the keepalive to detect the stuck thread and recover. + # With keepalive_interval=1, timeout = 1*3 = 3s, first eligible check at ~4s. + wait_for(8000) do + @bus.publish("/recovery-test", "post-recovery") + sleep 0.1 + received.include?("post-recovery") + end + + received.must_include("post-recovery") + @bus.listening?.must_equal true + end + + it "logs a warning when the keepalive detects a stuck subscriber" do + wait_for(8000) { @log_output.string.include?("no longer functioning correctly") } + + @log_output.string.must_include "timed out" + @log_output.string.must_include "no longer functioning correctly" + end + end end From 39050ba117e868131aa6697da97cc16fdbfc328e Mon Sep 17 00:00:00 2001 From: Ahmed Loudghiri Date: Mon, 13 Apr 2026 16:13:57 +0100 Subject: [PATCH 2/7] fix: subscriber thread never self heals after half open tcp connection --- lib/message_bus.rb | 20 +++++++------------- lib/message_bus/backends/base.rb | 6 ++++++ lib/message_bus/backends/postgres.rb | 9 +++++++++ lib/message_bus/backends/redis.rb | 9 ++++++++- 4 files changed, 30 insertions(+), 14 deletions(-) diff --git a/lib/message_bus.rb b/lib/message_bus.rb index 7c83f411..866fc973 100644 --- a/lib/message_bus.rb +++ b/lib/message_bus.rb @@ -752,20 +752,14 @@ def new_subscriber_thread publish("/__mb_keepalive__/", Process.pid, user_ids: [-1]) if (Time.now - (@last_message || Time.now)) > keepalive_interval * 3 logger.warn "Global messages on #{Process.pid} timed out, message bus is no longer functioning correctly" - # The subscriber thread is stuck (half-open TCP connection or similar network issue) - # and is no longer receiving messages. Kill it and let ensure_subscriber_thread - # create a fresh one with a new Redis connection. - @mutex.synchronize do - if @subscriber_thread == thread - thread.kill - @subscriber_thread = nil - end - end - ensure_subscriber_thread - # The new thread sets up its own keepalive blk stop re-queuing this one. - else - timer.queue(keepalive_interval, &blk) if keepalive_interval > MIN_KEEPALIVE + # Close the subscriber connection cooperatively so the blocked read + # errors and the backend's rescue/retry reconnects on a fresh socket. + # Reset @last_message to avoid cascade-triggering during the 1s reconnect window. + @last_message = Time.now + backend_instance.request_reconnect end + + timer.queue(keepalive_interval, &blk) if keepalive_interval > MIN_KEEPALIVE end end diff --git a/lib/message_bus/backends/base.rb b/lib/message_bus/backends/base.rb index be83e5f6..08481e63 100644 --- a/lib/message_bus/backends/base.rb +++ b/lib/message_bus/backends/base.rb @@ -175,6 +175,12 @@ def global_unsubscribe raise ConcreteClassMustImplementError end + # Closes the subscriber's connection so its blocked read returns an error + # and the backend's own rescue/retry re-establishes it cooperatively. + # Backends that cannot get a stuck connection may leave this as a no-op. + def request_reconnect + end + # Subscribe to messages on all channels. Each message since the last ID # specified will be delivered by yielding to the passed block as soon as # it is available. This will block until subscription is terminated. diff --git a/lib/message_bus/backends/postgres.rb b/lib/message_bus/backends/postgres.rb index dc9cc2f2..ef05746d 100644 --- a/lib/message_bus/backends/postgres.rb +++ b/lib/message_bus/backends/postgres.rb @@ -180,6 +180,10 @@ def unsubscribe sync { @listening_on.clear } end + def close_subscribe_connection + @subscribe_connection&.close + end + private def exec_prepared(conn, *a) @@ -378,6 +382,11 @@ def global_unsubscribe @subscribed = false end + # (see Base#request_reconnect) + def request_reconnect + client.close_subscribe_connection + end + # (see Base#global_subscribe) def global_subscribe(last_id = nil) raise ArgumentError unless block_given? diff --git a/lib/message_bus/backends/redis.rb b/lib/message_bus/backends/redis.rb index dabcd742..41f5e655 100644 --- a/lib/message_bus/backends/redis.rb +++ b/lib/message_bus/backends/redis.rb @@ -60,6 +60,7 @@ def initialize(config = {}, max_backlog_size = 1000) @flush_backlog_thread = nil @pub_redis = nil @subscribed = false + @subscriber_connection = nil # after 7 days inactive backlogs will be removed @max_backlog_age = 604_800 end @@ -286,7 +287,7 @@ def global_subscribe(last_id = nil, &blk) end begin - global_redis = new_redis_connection + global_redis = @subscriber_connection = new_redis_connection clear_backlog.call(&blk) if highest_id @@ -327,9 +328,15 @@ def global_subscribe(last_id = nil, &blk) retry ensure global_redis&.disconnect! + @subscriber_connection = nil end end + # (see Base#request_reconnect) + def request_reconnect + @subscriber_connection&.disconnect! + end + private def new_redis_connection From 779d65467a080223ba6d3f0f0c931c507912ceb9 Mon Sep 17 00:00:00 2001 From: Ahmed Loudghiri Date: Mon, 13 Apr 2026 16:15:14 +0100 Subject: [PATCH 3/7] fix: adjust keepalive settings for improved subscriber recovery --- spec/lib/message_bus_spec.rb | 72 +++++++++++++++++------------------- 1 file changed, 34 insertions(+), 38 deletions(-) diff --git a/spec/lib/message_bus_spec.rb b/spec/lib/message_bus_spec.rb index 864f0e53..ab860726 100644 --- a/spec/lib/message_bus_spec.rb +++ b/spec/lib/message_bus_spec.rb @@ -400,10 +400,8 @@ end describe "keepalive recovery" do - # MIN_KEEPALIVE is 20s in production, making real-timer tests take 60s+. - # We temporarily lower it so keepalive_interval=1 is accepted, which - # triggers recovery at ~4s fast enough for CI, slow enough to be real. - FAST_KEEPALIVE = 1 + # Lower MIN_KEEPALIVE to 0 so a sub-second keepalive_interval is accepted. + FAST_KEEPALIVE = 0.5 before do @original_min_keepalive = MessageBus::Implementation::MIN_KEEPALIVE @@ -413,40 +411,35 @@ @log_output = StringIO.new @bus.configure(keepalive_interval: FAST_KEEPALIVE, logger: Logger.new(@log_output)) - # Capture the real backend method before stubbing. - @real_global_subscribe = @bus.backend_instance.method(:global_subscribe) - @call_count = 0 - - # First call to global_subscribe: simulate a half-open TCP connection. - # The thread stays alive but never yields messages, so @last_message - # goes stale and the keepalive eventually fires recovery. - # - # Subsequent calls: delegate to the real backend so the recovered - # subscriber thread actually processes messages end-to-end. - real_gs = @real_global_subscribe - call_count_ref = -> { @call_count += 1 } + # Capture before stubbing so each test can expose it via @real_gs. + # Setting @real_gs = nil in a test skips entering the real backend after + # recovery (used by the "logs a warning" test to avoid a Postgres-specific + # teardown race: if LISTEN hasn't been established yet when destroy calls + # global_unsubscribe, the pg_notify is silently dropped and join hangs). + @real_gs = @bus.backend_instance.method(:global_subscribe) backend = @bus.backend_instance + reconnect_requested = false + test_ctx = self - backend.define_singleton_method(:global_subscribe) do |last_id = nil, &blk| - if call_count_ref.call == 1 - @subscribed = true - loop { sleep 0.05; break unless @subscribed } - else - # Remove both stubs so the real backend handles the full lifecycle - # of the recovered thread especially global_unsubscribe, which - # destroy needs to signal client.subscribe to exit. - backend.singleton_class.remove_method(:global_subscribe) - backend.singleton_class.remove_method(:global_unsubscribe) - real_gs.call(last_id, &blk) - end + backend.define_singleton_method(:request_reconnect) do + reconnect_requested = true end - backend.define_singleton_method(:global_unsubscribe) do - @subscribed = false + # Simulate a half-open connection: block without yielding messages so + # @last_message goes stale and the keepalive watchdog fires request_reconnect. + # The loop also breaks on teardown (!@subscribed) so destroy can join cleanly. + backend.define_singleton_method(:global_subscribe) do |last_id = nil, &blk| + @subscribed = true + loop { sleep 0.05; break if reconnect_requested || !@subscribed } + + backend.singleton_class.remove_method(:global_subscribe) + backend.singleton_class.remove_method(:request_reconnect) + + test_ctx.instance_variable_get(:@real_gs)&.call(last_id, &blk) if reconnect_requested && @subscribed end @bus.after_fork - wait_for(2000) { @bus.listening? } + wait_for(1000) { @bus.listening? } end after do @@ -454,26 +447,29 @@ MessageBus::Implementation.const_set(:MIN_KEEPALIVE, @original_min_keepalive) end - it "recovers message delivery after a stuck subscriber thread" do - @bus.listening?.must_equal true + it "recovers message delivery after a stuck subscriber" do + subscriber_thread_before = @bus.instance_variable_get(:@subscriber_thread) received = [] @bus.subscribe("/recovery-test") { |msg| received << msg.data } - # Wait for the keepalive to detect the stuck thread and recover. - # With keepalive_interval=1, timeout = 1*3 = 3s, first eligible check at ~4s. - wait_for(8000) do + wait_for(4000) do @bus.publish("/recovery-test", "post-recovery") - sleep 0.1 + sleep 0.05 received.include?("post-recovery") end received.must_include("post-recovery") @bus.listening?.must_equal true + @bus.instance_variable_get(:@subscriber_thread).must_equal subscriber_thread_before end it "logs a warning when the keepalive detects a stuck subscriber" do - wait_for(8000) { @log_output.string.include?("no longer functioning correctly") } + # Don't enter the real backend after recovery: the thread exits after the + # stuck loop, so destroy can join it immediately with no LISTEN/UNSUB race. + @real_gs = nil + + wait_for(4000) { @log_output.string.include?("no longer functioning correctly") } @log_output.string.must_include "timed out" @log_output.string.must_include "no longer functioning correctly" From 9593d4e577d03922d56010cd910f27b24b31a365 Mon Sep 17 00:00:00 2001 From: Ahmed Loudghiri Date: Tue, 14 Apr 2026 17:00:40 +0100 Subject: [PATCH 4/7] fix: enhance keepalive recovery tests for subscriber resilience --- spec/lib/message_bus_spec.rb | 169 ++++++++++++++++++++++++++++------- 1 file changed, 135 insertions(+), 34 deletions(-) diff --git a/spec/lib/message_bus_spec.rb b/spec/lib/message_bus_spec.rb index ab860726..3b392236 100644 --- a/spec/lib/message_bus_spec.rb +++ b/spec/lib/message_bus_spec.rb @@ -399,80 +399,181 @@ end end + # === Context for test generation === + # MessageBus::Implementation + # Key state: @last_message (Time), @subscribed (bool), @destroyed (bool) + # Keepalive: timer fires every keepalive_interval; when + # Time.now - @last_message > keepalive_interval * 3 + # → logs warning + calls backend_instance.request_reconnect + # → resets @last_message to avoid cascade during reconnect window + # Reconnect: backend#global_subscribe wraps in rescue/retry; request_reconnect + # closes the underlying socket so the blocked read raises and retry fires. + # Backend contract: request_reconnect is a no-op on Base; Postgres closes the + # PG subscribe connection; Redis disconnects the subscriber connection. + # ===================================== describe "keepalive recovery" do - # Lower MIN_KEEPALIVE to 0 so a sub-second keepalive_interval is accepted. FAST_KEEPALIVE = 0.5 before do @original_min_keepalive = MessageBus::Implementation::MIN_KEEPALIVE - MessageBus::Implementation.send(:remove_const, :MIN_KEEPALIVE) MessageBus::Implementation.const_set(:MIN_KEEPALIVE, 0) @log_output = StringIO.new @bus.configure(keepalive_interval: FAST_KEEPALIVE, logger: Logger.new(@log_output)) + end - # Capture before stubbing so each test can expose it via @real_gs. - # Setting @real_gs = nil in a test skips entering the real backend after - # recovery (used by the "logs a warning" test to avoid a Postgres-specific - # teardown race: if LISTEN hasn't been established yet when destroy calls - # global_unsubscribe, the pg_notify is silently dropped and join hangs). - @real_gs = @bus.backend_instance.method(:global_subscribe) - backend = @bus.backend_instance - reconnect_requested = false - test_ctx = self + after do + MessageBus::Implementation.const_set(:MIN_KEEPALIVE, @original_min_keepalive) + end + + # Simulates a half-open connection by replacing global_subscribe with a + # blocking loop that never yields messages. The loop exits when the bus's + # keepalive watchdog fires request_reconnect or when global_unsubscribe sets + # @subscribed = false (teardown path). + # + # @param backend [MessageBus::Backend::Base] the backend to patch + # @param resume_after_stall [Boolean] whether to delegate to the real + # global_subscribe after the stall clears, restoring message flow + # @return [Array(Proc, Proc)] two zero-argument callables: + # - first returns true once the keepalive watchdog has fired request_reconnect + # - second returns true once the fake stall has exited and @subscribed has + # been cleared, meaning any subsequent subscribed=true is from the real backend + def simulate_stuck_subscriber(backend, resume_after_stall: true) + real_gs = backend.method(:global_subscribe) + reconnect_fired = false + stall_exited = false backend.define_singleton_method(:request_reconnect) do - reconnect_requested = true + reconnect_fired = true end - # Simulate a half-open connection: block without yielding messages so - # @last_message goes stale and the keepalive watchdog fires request_reconnect. - # The loop also breaks on teardown (!@subscribed) so destroy can join cleanly. backend.define_singleton_method(:global_subscribe) do |last_id = nil, &blk| @subscribed = true - loop { sleep 0.05; break if reconnect_requested || !@subscribed } + # 1ms sleep so the subscriber thread reacts within ~1ms of reconnect_fired + loop { sleep 0.001; break if reconnect_fired || !@subscribed } + + should_resume = reconnect_fired && @subscribed && resume_after_stall backend.singleton_class.remove_method(:global_subscribe) backend.singleton_class.remove_method(:request_reconnect) - test_ctx.instance_variable_get(:@real_gs)&.call(last_id, &blk) if reconnect_requested && @subscribed + if should_resume + @subscribed = false + stall_exited = true # set after @subscribed=false, before real_gs + real_gs.call(last_id, &blk) + else + stall_exited = true + end end - @bus.after_fork - wait_for(1000) { @bus.listening? } - end - - after do - MessageBus::Implementation.send(:remove_const, :MIN_KEEPALIVE) - MessageBus::Implementation.const_set(:MIN_KEEPALIVE, @original_min_keepalive) + [-> { reconnect_fired }, -> { stall_exited }] end it "recovers message delivery after a stuck subscriber" do - subscriber_thread_before = @bus.instance_variable_get(:@subscriber_thread) + reconnect_fired, stall_exited = simulate_stuck_subscriber(@bus.backend_instance) + + @bus.after_fork + wait_for(1000) { @bus.listening? } received = [] @bus.subscribe("/recovery-test") { |msg| received << msg.data } - wait_for(4000) do - @bus.publish("/recovery-test", "post-recovery") - sleep 0.05 - received.include?("post-recovery") - end + # Wait for watchdog to fire, then for the fake stall to fully exit + # (stall_exited is set after @subscribed=false), then for the real + # backend to confirm it is subscribed before publishing. + wait_for(4000) { reconnect_fired.call } + wait_for(2000) { stall_exited.call } + wait_for(2000) { @bus.backend_instance.subscribed } + + @bus.publish("/recovery-test", "post-recovery") + + wait_for(2000) { received.include?("post-recovery") } received.must_include("post-recovery") @bus.listening?.must_equal true - @bus.instance_variable_get(:@subscriber_thread).must_equal subscriber_thread_before end it "logs a warning when the keepalive detects a stuck subscriber" do - # Don't enter the real backend after recovery: the thread exits after the - # stuck loop, so destroy can join it immediately with no LISTEN/UNSUB race. - @real_gs = nil + _reconnect_fired, _stall_exited = simulate_stuck_subscriber(@bus.backend_instance, resume_after_stall: false) + + @bus.after_fork + wait_for(1000) { @bus.listening? } wait_for(4000) { @log_output.string.include?("no longer functioning correctly") } @log_output.string.must_include "timed out" @log_output.string.must_include "no longer functioning correctly" end + + it "exits cleanly when destroy is called during a stall" do + _reconnect_fired, _stall_exited = simulate_stuck_subscriber(@bus.backend_instance, resume_after_stall: false) + + @bus.after_fork + wait_for(1000) { @bus.listening? } + + @bus.destroy + @bus.listening?.must_equal false + end + + it "resets @last_message so the watchdog does not cascade during reconnect" do + reconnect_count = 0 + backend = @bus.backend_instance + real_gs = backend.method(:global_subscribe) + + backend.define_singleton_method(:request_reconnect) do + reconnect_count += 1 + end + + backend.define_singleton_method(:global_subscribe) do |last_id = nil, &blk| + @subscribed = true + loop { sleep 0.05; break if reconnect_count > 0 || !@subscribed } + backend.singleton_class.remove_method(:global_subscribe) + backend.singleton_class.remove_method(:request_reconnect) + @subscribed = false + real_gs.call(last_id, &blk) if reconnect_count > 0 + end + + @bus.after_fork + wait_for(1000) { @bus.listening? } + + # Wait for the first reconnect to fire, then let two more keepalive cycles + # pass. The @last_message reset in the bus must prevent a second request. + wait_for(4000) { reconnect_count > 0 } + sleep(FAST_KEEPALIVE * 3) + + reconnect_count.must_equal 1 + end + + it "does not crash the timer thread when request_reconnect raises" do + backend = @bus.backend_instance + real_gs = backend.method(:global_subscribe) + error_raised = false + + backend.define_singleton_method(:request_reconnect) do + error_raised = true + raise IOError, "simulated connection error" + end + + # The IOError is raised in the timer thread and caught by its on_error + # handler. The subscriber loop still exits via error_raised flag because + # the assignment happens before the raise. + backend.define_singleton_method(:global_subscribe) do |last_id = nil, &blk| + @subscribed = true + loop { sleep 0.05; break if error_raised || !@subscribed } + backend.singleton_class.remove_method(:global_subscribe) + backend.singleton_class.remove_method(:request_reconnect) + @subscribed = false + real_gs.call(last_id, &blk) + end + + @bus.after_fork + wait_for(1000) { @bus.listening? } + + # Timer on_error logs "Failed to process job: " + wait_for(4000) { @log_output.string.include?("Failed to process job") } + + @bus.listening?.must_equal true + @log_output.string.must_include "simulated connection error" + end end end From cdf96a67c711fc28a0096dbbb194a0c48654c836 Mon Sep 17 00:00:00 2001 From: Ahmed Loudghiri Date: Tue, 14 Apr 2026 18:23:16 +0100 Subject: [PATCH 5/7] fix: improve subscriber resilience in keepalive recovery tests --- spec/lib/message_bus_spec.rb | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/spec/lib/message_bus_spec.rb b/spec/lib/message_bus_spec.rb index 3b392236..493971d6 100644 --- a/spec/lib/message_bus_spec.rb +++ b/spec/lib/message_bus_spec.rb @@ -517,6 +517,7 @@ def simulate_stuck_subscriber(backend, resume_after_stall: true) it "resets @last_message so the watchdog does not cascade during reconnect" do reconnect_count = 0 + stall_exited = false backend = @bus.backend_instance real_gs = backend.method(:global_subscribe) @@ -526,10 +527,11 @@ def simulate_stuck_subscriber(backend, resume_after_stall: true) backend.define_singleton_method(:global_subscribe) do |last_id = nil, &blk| @subscribed = true - loop { sleep 0.05; break if reconnect_count > 0 || !@subscribed } + loop { sleep 0.001; break if reconnect_count > 0 || !@subscribed } backend.singleton_class.remove_method(:global_subscribe) backend.singleton_class.remove_method(:request_reconnect) @subscribed = false + stall_exited = true real_gs.call(last_id, &blk) if reconnect_count > 0 end @@ -542,12 +544,17 @@ def simulate_stuck_subscriber(backend, resume_after_stall: true) sleep(FAST_KEEPALIVE * 3) reconnect_count.must_equal 1 + + # Ensure real subscriber's Queue is in @listeners before teardown. + wait_for(2000) { stall_exited } + wait_for(2000) { @bus.backend_instance.subscribed } end it "does not crash the timer thread when request_reconnect raises" do backend = @bus.backend_instance real_gs = backend.method(:global_subscribe) error_raised = false + stall_exited = false backend.define_singleton_method(:request_reconnect) do error_raised = true @@ -555,14 +562,14 @@ def simulate_stuck_subscriber(backend, resume_after_stall: true) end # The IOError is raised in the timer thread and caught by its on_error - # handler. The subscriber loop still exits via error_raised flag because - # the assignment happens before the raise. + # handler. The subscriber loop exits via error_raised (set before raise). backend.define_singleton_method(:global_subscribe) do |last_id = nil, &blk| @subscribed = true - loop { sleep 0.05; break if error_raised || !@subscribed } + loop { sleep 0.001; break if error_raised || !@subscribed } backend.singleton_class.remove_method(:global_subscribe) backend.singleton_class.remove_method(:request_reconnect) @subscribed = false + stall_exited = true real_gs.call(last_id, &blk) end @@ -572,6 +579,12 @@ def simulate_stuck_subscriber(backend, resume_after_stall: true) # Timer on_error logs "Failed to process job: " wait_for(4000) { @log_output.string.include?("Failed to process job") } + # Wait for the real subscriber's Queue to be in @listeners before the + # test ends. Without this, @bus.destroy (in after) races with Queue + # setup and global_unsubscribe's push(nil) becomes a no-op, hanging join. + wait_for(2000) { stall_exited } + wait_for(2000) { @bus.backend_instance.subscribed } + @bus.listening?.must_equal true @log_output.string.must_include "simulated connection error" end From 53be4f63da752840e638b4c6890072e7a698f6d6 Mon Sep 17 00:00:00 2001 From: Ahmed Loudghiri Date: Tue, 14 Apr 2026 18:55:19 +0100 Subject: [PATCH 6/7] fix: refine keepalive reconnect tests for improved subscriber handling --- spec/lib/message_bus_spec.rb | 200 ++++++++++++----------------------- 1 file changed, 66 insertions(+), 134 deletions(-) diff --git a/spec/lib/message_bus_spec.rb b/spec/lib/message_bus_spec.rb index 493971d6..8ceac36a 100644 --- a/spec/lib/message_bus_spec.rb +++ b/spec/lib/message_bus_spec.rb @@ -399,23 +399,12 @@ end end - # === Context for test generation === - # MessageBus::Implementation - # Key state: @last_message (Time), @subscribed (bool), @destroyed (bool) - # Keepalive: timer fires every keepalive_interval; when - # Time.now - @last_message > keepalive_interval * 3 - # → logs warning + calls backend_instance.request_reconnect - # → resets @last_message to avoid cascade during reconnect window - # Reconnect: backend#global_subscribe wraps in rescue/retry; request_reconnect - # closes the underlying socket so the blocked read raises and retry fires. - # Backend contract: request_reconnect is a no-op on Base; Postgres closes the - # PG subscribe connection; Redis disconnects the subscriber connection. - # ===================================== - describe "keepalive recovery" do + describe "keepalive reconnect watchdog" do FAST_KEEPALIVE = 0.5 before do @original_min_keepalive = MessageBus::Implementation::MIN_KEEPALIVE + MessageBus::Implementation.send(:remove_const, :MIN_KEEPALIVE) MessageBus::Implementation.const_set(:MIN_KEEPALIVE, 0) @log_output = StringIO.new @@ -423,170 +412,113 @@ end after do + MessageBus::Implementation.send(:remove_const, :MIN_KEEPALIVE) MessageBus::Implementation.const_set(:MIN_KEEPALIVE, @original_min_keepalive) end - # Simulates a half-open connection by replacing global_subscribe with a - # blocking loop that never yields messages. The loop exits when the bus's - # keepalive watchdog fires request_reconnect or when global_unsubscribe sets - # @subscribed = false (teardown path). - # - # @param backend [MessageBus::Backend::Base] the backend to patch - # @param resume_after_stall [Boolean] whether to delegate to the real - # global_subscribe after the stall clears, restoring message flow - # @return [Array(Proc, Proc)] two zero-argument callables: - # - first returns true once the keepalive watchdog has fired request_reconnect - # - second returns true once the fake stall has exited and @subscribed has - # been cleared, meaning any subsequent subscribed=true is from the real backend - def simulate_stuck_subscriber(backend, resume_after_stall: true) - real_gs = backend.method(:global_subscribe) - reconnect_fired = false - stall_exited = false - - backend.define_singleton_method(:request_reconnect) do - reconnect_fired = true - end - - backend.define_singleton_method(:global_subscribe) do |last_id = nil, &blk| + # Replaces global_subscribe with a blocked read simulation that never yields. + # The loop exits when global_unsubscribe flips @subscribed to false. + def stall_global_subscribe(backend) + backend.define_singleton_method(:global_subscribe) do |_last_id = nil, &_blk| @subscribed = true - # 1ms sleep so the subscriber thread reacts within ~1ms of reconnect_fired - loop { sleep 0.001; break if reconnect_fired || !@subscribed } - - should_resume = reconnect_fired && @subscribed && resume_after_stall - - backend.singleton_class.remove_method(:global_subscribe) - backend.singleton_class.remove_method(:request_reconnect) - - if should_resume - @subscribed = false - stall_exited = true # set after @subscribed=false, before real_gs - real_gs.call(last_id, &blk) - else - stall_exited = true - end + sleep(0.001) while @subscribed end - - [-> { reconnect_fired }, -> { stall_exited }] end - it "recovers message delivery after a stuck subscriber" do - reconnect_fired, stall_exited = simulate_stuck_subscriber(@bus.backend_instance) - - @bus.after_fork - wait_for(1000) { @bus.listening? } - - received = [] - @bus.subscribe("/recovery-test") { |msg| received << msg.data } - - # Wait for watchdog to fire, then for the fake stall to fully exit - # (stall_exited is set after @subscribed=false), then for the real - # backend to confirm it is subscribed before publishing. - wait_for(4000) { reconnect_fired.call } - wait_for(2000) { stall_exited.call } - wait_for(2000) { @bus.backend_instance.subscribed } - - @bus.publish("/recovery-test", "post-recovery") - - wait_for(2000) { received.include?("post-recovery") } - - received.must_include("post-recovery") - @bus.listening?.must_equal true - end + it "invokes request_reconnect and logs a warning when subscriber reads stall" do + backend = @bus.backend_instance + reconnect_calls = 0 + stall_global_subscribe(backend) - it "logs a warning when the keepalive detects a stuck subscriber" do - _reconnect_fired, _stall_exited = simulate_stuck_subscriber(@bus.backend_instance, resume_after_stall: false) + backend.define_singleton_method(:request_reconnect) do + reconnect_calls += 1 + # Simulate reconnect teardown finishing, allowing this fake subscribe loop to exit. + @subscribed = false + end @bus.after_fork wait_for(1000) { @bus.listening? } + wait_for(4000) { reconnect_calls == 1 } wait_for(4000) { @log_output.string.include?("no longer functioning correctly") } + reconnect_calls.must_equal 1 @log_output.string.must_include "timed out" @log_output.string.must_include "no longer functioning correctly" end - it "exits cleanly when destroy is called during a stall" do - _reconnect_fired, _stall_exited = simulate_stuck_subscriber(@bus.backend_instance, resume_after_stall: false) + it "resets @last_message so reconnect attempts are not immediate cascades" do + backend = @bus.backend_instance + reconnect_times = [] + stall_global_subscribe(backend) + + backend.define_singleton_method(:request_reconnect) do + reconnect_times << Process.clock_gettime(Process::CLOCK_MONOTONIC) + end @bus.after_fork wait_for(1000) { @bus.listening? } + wait_for(7000) { reconnect_times.length >= 2 } - @bus.destroy - @bus.listening?.must_equal false + first_gap = reconnect_times[1] - reconnect_times[0] + first_gap.must_be :>=, FAST_KEEPALIVE * 2.8 end - it "resets @last_message so the watchdog does not cascade during reconnect" do - reconnect_count = 0 - stall_exited = false + it "keeps the timer thread running when request_reconnect raises" do backend = @bus.backend_instance - real_gs = backend.method(:global_subscribe) + reconnect_failed = false + stall_global_subscribe(backend) backend.define_singleton_method(:request_reconnect) do - reconnect_count += 1 - end - - backend.define_singleton_method(:global_subscribe) do |last_id = nil, &blk| - @subscribed = true - loop { sleep 0.001; break if reconnect_count > 0 || !@subscribed } - backend.singleton_class.remove_method(:global_subscribe) - backend.singleton_class.remove_method(:request_reconnect) - @subscribed = false - stall_exited = true - real_gs.call(last_id, &blk) if reconnect_count > 0 + reconnect_failed = true + raise IOError, "simulated connection error" end @bus.after_fork wait_for(1000) { @bus.listening? } - # Wait for the first reconnect to fire, then let two more keepalive cycles - # pass. The @last_message reset in the bus must prevent a second request. - wait_for(4000) { reconnect_count > 0 } - sleep(FAST_KEEPALIVE * 3) + wait_for(4000) { reconnect_failed } + wait_for(4000) { @log_output.string.include?("Failed to process job") } - reconnect_count.must_equal 1 + timer_job_ran = false + @bus.timer.queue(0.01) { timer_job_ran = true } + wait_for(1000) { timer_job_ran } - # Ensure real subscriber's Queue is in @listeners before teardown. - wait_for(2000) { stall_exited } - wait_for(2000) { @bus.backend_instance.subscribed } + @bus.listening?.must_equal true + @log_output.string.must_include "simulated connection error" + end + end + + describe "request_reconnect backend wiring" do + it "is a no-op on the abstract base backend" do + backend = MessageBus::Backends::Base.new + backend.request_reconnect.must_be_nil end - it "does not crash the timer thread when request_reconnect raises" do + it "disconnects the redis subscriber connection" do + skip "Redis backend only" unless CURRENT_BACKEND == :redis + backend = @bus.backend_instance - real_gs = backend.method(:global_subscribe) - error_raised = false - stall_exited = false + disconnect_calls = 0 + fake_connection = Object.new + fake_connection.define_singleton_method(:disconnect!) { disconnect_calls += 1 } + backend.instance_variable_set(:@subscriber_connection, fake_connection) - backend.define_singleton_method(:request_reconnect) do - error_raised = true - raise IOError, "simulated connection error" - end + backend.request_reconnect - # The IOError is raised in the timer thread and caught by its on_error - # handler. The subscriber loop exits via error_raised (set before raise). - backend.define_singleton_method(:global_subscribe) do |last_id = nil, &blk| - @subscribed = true - loop { sleep 0.001; break if error_raised || !@subscribed } - backend.singleton_class.remove_method(:global_subscribe) - backend.singleton_class.remove_method(:request_reconnect) - @subscribed = false - stall_exited = true - real_gs.call(last_id, &blk) - end + disconnect_calls.must_equal 1 + end - @bus.after_fork - wait_for(1000) { @bus.listening? } + it "closes the postgres subscribe connection" do + skip "Postgres backend only" unless CURRENT_BACKEND == :postgres - # Timer on_error logs "Failed to process job: " - wait_for(4000) { @log_output.string.include?("Failed to process job") } + backend = @bus.backend_instance + close_calls = 0 + backend.send(:client).define_singleton_method(:close_subscribe_connection) { close_calls += 1 } - # Wait for the real subscriber's Queue to be in @listeners before the - # test ends. Without this, @bus.destroy (in after) races with Queue - # setup and global_unsubscribe's push(nil) becomes a no-op, hanging join. - wait_for(2000) { stall_exited } - wait_for(2000) { @bus.backend_instance.subscribed } + backend.request_reconnect - @bus.listening?.must_equal true - @log_output.string.must_include "simulated connection error" + close_calls.must_equal 1 end end end From 46399f9d3456cb01c249afe103bf7e6677b09655 Mon Sep 17 00:00:00 2001 From: Ahmed Loudghiri Date: Tue, 14 Apr 2026 19:39:23 +0100 Subject: [PATCH 7/7] fix: add tests for redis global_subscribe retry behavior on connection failures --- spec/lib/message_bus/backend_spec.rb | 92 ++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/spec/lib/message_bus/backend_spec.rb b/spec/lib/message_bus/backend_spec.rb index 794d4fee..21b580dd 100644 --- a/spec/lib/message_bus/backend_spec.rb +++ b/spec/lib/message_bus/backend_spec.rb @@ -392,4 +392,96 @@ got.map { |m| m.data }.must_equal ["12"] end + it "retries global_subscribe after a redis subscribe failure" do + skip "Redis backend only" unless CURRENT_BACKEND == :redis + + log_output = StringIO.new + @bus.instance_variable_set(:@logger, Logger.new(log_output)) + + fail_first_subscribe = true + real_new_redis_connection = @bus.method(:new_redis_connection) + + @bus.define_singleton_method(:new_redis_connection) do + redis = real_new_redis_connection.call + + if fail_first_subscribe + fail_first_subscribe = false + redis.define_singleton_method(:subscribe) do |_channel, &_blk| + raise IOError, "forced subscribe failure" + end + end + + redis + end + + got = [] + + t = Thread.new do + @bus.global_subscribe(0) do |msg| + got << msg + end + end + + wait_for(4000) { log_output.string.include?("forced subscribe failure") } + wait_for(5000) { @bus.subscribed } + + @bus.publish("/redis-retry", "delivered-after-retry") + wait_for(3000) { got.any? { |m| m.data == "delivered-after-retry" } } + + @bus.global_unsubscribe + t.join(2) + t.kill if t.alive? + + log_output.string.must_include "subscribe failed, reconnecting in 1 second" + got.map(&:data).must_include "delivered-after-retry" + end + + it "retries global_subscribe after request_reconnect disconnects redis" do + skip "Redis backend only" unless CURRENT_BACKEND == :redis + + log_output = StringIO.new + @bus.instance_variable_set(:@logger, Logger.new(log_output)) + subscribe_attempts = 0 + real_new_redis_connection = @bus.method(:new_redis_connection) + + @bus.define_singleton_method(:new_redis_connection) do + redis = real_new_redis_connection.call + real_subscribe = redis.method(:subscribe) + redis.define_singleton_method(:subscribe) do |*args, &blk| + subscribe_attempts += 1 + real_subscribe.call(*args, &blk) + end + redis + end + + got = [] + + t = Thread.new do + @bus.global_subscribe(0) do |msg| + got << msg + end + end + + wait_for(5000) { @bus.subscribed } + wait_for(5000) { subscribe_attempts >= 1 } + + @bus.publish("/redis-reconnect", "before-reconnect") + wait_for(3000) { got.any? { |m| m.data == "before-reconnect" } } + + @bus.request_reconnect + + wait_for(7000) { log_output.string.include?("subscribe failed, reconnecting in 1 second") } + wait_for(7000) { subscribe_attempts >= 2 } + + @bus.publish("/redis-reconnect", "after-reconnect") + wait_for(5000) { got.any? { |m| m.data == "after-reconnect" } } + + @bus.global_unsubscribe + t.join(2) + t.kill if t.alive? + + got.map(&:data).must_include "after-reconnect" + subscribe_attempts.must_be :>=, 2 + end + end