diff --git a/lib/message_bus.rb b/lib/message_bus.rb index cb927a5c..866fc973 100644 --- a/lib/message_bus.rb +++ b/lib/message_bus.rb @@ -752,6 +752,11 @@ def new_subscriber_thread publish("/__mb_keepalive__/", Process.pid, user_ids: [-1]) if (Time.now - (@last_message || Time.now)) > keepalive_interval * 3 logger.warn "Global messages on #{Process.pid} timed out, message bus is no longer functioning correctly" + # Close the subscriber connection cooperatively so the blocked read + # errors and the backend's rescue/retry reconnects on a fresh socket. + # Reset @last_message to avoid cascade-triggering during the 1s reconnect window. + @last_message = Time.now + backend_instance.request_reconnect end timer.queue(keepalive_interval, &blk) if keepalive_interval > MIN_KEEPALIVE diff --git a/lib/message_bus/backends/base.rb b/lib/message_bus/backends/base.rb index be83e5f6..08481e63 100644 --- a/lib/message_bus/backends/base.rb +++ b/lib/message_bus/backends/base.rb @@ -175,6 +175,12 @@ def global_unsubscribe raise ConcreteClassMustImplementError end + # Closes the subscriber's connection so its blocked read returns an error + # and the backend's own rescue/retry re-establishes it cooperatively. + # Backends that cannot get a stuck connection may leave this as a no-op. + def request_reconnect + end + # Subscribe to messages on all channels. Each message since the last ID # specified will be delivered by yielding to the passed block as soon as # it is available. This will block until subscription is terminated. diff --git a/lib/message_bus/backends/postgres.rb b/lib/message_bus/backends/postgres.rb index dc9cc2f2..ef05746d 100644 --- a/lib/message_bus/backends/postgres.rb +++ b/lib/message_bus/backends/postgres.rb @@ -180,6 +180,10 @@ def unsubscribe sync { @listening_on.clear } end + def close_subscribe_connection + @subscribe_connection&.close + end + private def exec_prepared(conn, *a) @@ -378,6 +382,11 @@ def global_unsubscribe @subscribed = false end + # (see Base#request_reconnect) + def request_reconnect + client.close_subscribe_connection + end + # (see Base#global_subscribe) def global_subscribe(last_id = nil) raise ArgumentError unless block_given? diff --git a/lib/message_bus/backends/redis.rb b/lib/message_bus/backends/redis.rb index dabcd742..41f5e655 100644 --- a/lib/message_bus/backends/redis.rb +++ b/lib/message_bus/backends/redis.rb @@ -60,6 +60,7 @@ def initialize(config = {}, max_backlog_size = 1000) @flush_backlog_thread = nil @pub_redis = nil @subscribed = false + @subscriber_connection = nil # after 7 days inactive backlogs will be removed @max_backlog_age = 604_800 end @@ -286,7 +287,7 @@ def global_subscribe(last_id = nil, &blk) end begin - global_redis = new_redis_connection + global_redis = @subscriber_connection = new_redis_connection clear_backlog.call(&blk) if highest_id @@ -327,9 +328,15 @@ def global_subscribe(last_id = nil, &blk) retry ensure global_redis&.disconnect! + @subscriber_connection = nil end end + # (see Base#request_reconnect) + def request_reconnect + @subscriber_connection&.disconnect! + end + private def new_redis_connection diff --git a/spec/lib/message_bus/backend_spec.rb b/spec/lib/message_bus/backend_spec.rb index 794d4fee..21b580dd 100644 --- a/spec/lib/message_bus/backend_spec.rb +++ b/spec/lib/message_bus/backend_spec.rb @@ -392,4 +392,96 @@ got.map { |m| m.data }.must_equal ["12"] end + it "retries global_subscribe after a redis subscribe failure" do + skip "Redis backend only" unless CURRENT_BACKEND == :redis + + log_output = StringIO.new + @bus.instance_variable_set(:@logger, Logger.new(log_output)) + + fail_first_subscribe = true + real_new_redis_connection = @bus.method(:new_redis_connection) + + @bus.define_singleton_method(:new_redis_connection) do + redis = real_new_redis_connection.call + + if fail_first_subscribe + fail_first_subscribe = false + redis.define_singleton_method(:subscribe) do |_channel, &_blk| + raise IOError, "forced subscribe failure" + end + end + + redis + end + + got = [] + + t = Thread.new do + @bus.global_subscribe(0) do |msg| + got << msg + end + end + + wait_for(4000) { log_output.string.include?("forced subscribe failure") } + wait_for(5000) { @bus.subscribed } + + @bus.publish("/redis-retry", "delivered-after-retry") + wait_for(3000) { got.any? { |m| m.data == "delivered-after-retry" } } + + @bus.global_unsubscribe + t.join(2) + t.kill if t.alive? + + log_output.string.must_include "subscribe failed, reconnecting in 1 second" + got.map(&:data).must_include "delivered-after-retry" + end + + it "retries global_subscribe after request_reconnect disconnects redis" do + skip "Redis backend only" unless CURRENT_BACKEND == :redis + + log_output = StringIO.new + @bus.instance_variable_set(:@logger, Logger.new(log_output)) + subscribe_attempts = 0 + real_new_redis_connection = @bus.method(:new_redis_connection) + + @bus.define_singleton_method(:new_redis_connection) do + redis = real_new_redis_connection.call + real_subscribe = redis.method(:subscribe) + redis.define_singleton_method(:subscribe) do |*args, &blk| + subscribe_attempts += 1 + real_subscribe.call(*args, &blk) + end + redis + end + + got = [] + + t = Thread.new do + @bus.global_subscribe(0) do |msg| + got << msg + end + end + + wait_for(5000) { @bus.subscribed } + wait_for(5000) { subscribe_attempts >= 1 } + + @bus.publish("/redis-reconnect", "before-reconnect") + wait_for(3000) { got.any? { |m| m.data == "before-reconnect" } } + + @bus.request_reconnect + + wait_for(7000) { log_output.string.include?("subscribe failed, reconnecting in 1 second") } + wait_for(7000) { subscribe_attempts >= 2 } + + @bus.publish("/redis-reconnect", "after-reconnect") + wait_for(5000) { got.any? { |m| m.data == "after-reconnect" } } + + @bus.global_unsubscribe + t.join(2) + t.kill if t.alive? + + got.map(&:data).must_include "after-reconnect" + subscribe_attempts.must_be :>=, 2 + end + end diff --git a/spec/lib/message_bus_spec.rb b/spec/lib/message_bus_spec.rb index 5c7a6c0a..8ceac36a 100644 --- a/spec/lib/message_bus_spec.rb +++ b/spec/lib/message_bus_spec.rb @@ -398,4 +398,127 @@ channel.must_equal('/test') end end + + describe "keepalive reconnect watchdog" do + FAST_KEEPALIVE = 0.5 + + before do + @original_min_keepalive = MessageBus::Implementation::MIN_KEEPALIVE + MessageBus::Implementation.send(:remove_const, :MIN_KEEPALIVE) + MessageBus::Implementation.const_set(:MIN_KEEPALIVE, 0) + + @log_output = StringIO.new + @bus.configure(keepalive_interval: FAST_KEEPALIVE, logger: Logger.new(@log_output)) + end + + after do + MessageBus::Implementation.send(:remove_const, :MIN_KEEPALIVE) + MessageBus::Implementation.const_set(:MIN_KEEPALIVE, @original_min_keepalive) + end + + # Replaces global_subscribe with a blocked read simulation that never yields. + # The loop exits when global_unsubscribe flips @subscribed to false. + def stall_global_subscribe(backend) + backend.define_singleton_method(:global_subscribe) do |_last_id = nil, &_blk| + @subscribed = true + sleep(0.001) while @subscribed + end + end + + it "invokes request_reconnect and logs a warning when subscriber reads stall" do + backend = @bus.backend_instance + reconnect_calls = 0 + stall_global_subscribe(backend) + + backend.define_singleton_method(:request_reconnect) do + reconnect_calls += 1 + # Simulate reconnect teardown finishing, allowing this fake subscribe loop to exit. + @subscribed = false + end + + @bus.after_fork + wait_for(1000) { @bus.listening? } + + wait_for(4000) { reconnect_calls == 1 } + wait_for(4000) { @log_output.string.include?("no longer functioning correctly") } + + reconnect_calls.must_equal 1 + @log_output.string.must_include "timed out" + @log_output.string.must_include "no longer functioning correctly" + end + + it "resets @last_message so reconnect attempts are not immediate cascades" do + backend = @bus.backend_instance + reconnect_times = [] + stall_global_subscribe(backend) + + backend.define_singleton_method(:request_reconnect) do + reconnect_times << Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + + @bus.after_fork + wait_for(1000) { @bus.listening? } + wait_for(7000) { reconnect_times.length >= 2 } + + first_gap = reconnect_times[1] - reconnect_times[0] + first_gap.must_be :>=, FAST_KEEPALIVE * 2.8 + end + + it "keeps the timer thread running when request_reconnect raises" do + backend = @bus.backend_instance + reconnect_failed = false + stall_global_subscribe(backend) + + backend.define_singleton_method(:request_reconnect) do + reconnect_failed = true + raise IOError, "simulated connection error" + end + + @bus.after_fork + wait_for(1000) { @bus.listening? } + + wait_for(4000) { reconnect_failed } + wait_for(4000) { @log_output.string.include?("Failed to process job") } + + timer_job_ran = false + @bus.timer.queue(0.01) { timer_job_ran = true } + wait_for(1000) { timer_job_ran } + + @bus.listening?.must_equal true + @log_output.string.must_include "simulated connection error" + end + end + + describe "request_reconnect backend wiring" do + it "is a no-op on the abstract base backend" do + backend = MessageBus::Backends::Base.new + backend.request_reconnect.must_be_nil + end + + it "disconnects the redis subscriber connection" do + skip "Redis backend only" unless CURRENT_BACKEND == :redis + + backend = @bus.backend_instance + disconnect_calls = 0 + fake_connection = Object.new + fake_connection.define_singleton_method(:disconnect!) { disconnect_calls += 1 } + backend.instance_variable_set(:@subscriber_connection, fake_connection) + + backend.request_reconnect + + disconnect_calls.must_equal 1 + end + + it "closes the postgres subscribe connection" do + skip "Postgres backend only" unless CURRENT_BACKEND == :postgres + + backend = @bus.backend_instance + close_calls = 0 + backend.send(:client).define_singleton_method(:close_subscribe_connection) { close_calls += 1 } + + backend.request_reconnect + + close_calls.must_equal 1 + end + end end