Skip to content
5 changes: 5 additions & 0 deletions lib/message_bus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,11 @@ def new_subscriber_thread
publish("/__mb_keepalive__/", Process.pid, user_ids: [-1])
if (Time.now - (@last_message || Time.now)) > keepalive_interval * 3
logger.warn "Global messages on #{Process.pid} timed out, message bus is no longer functioning correctly"
# Close the subscriber connection cooperatively so the blocked read
# errors and the backend's rescue/retry reconnects on a fresh socket.
# Reset @last_message to avoid cascade-triggering during the 1s reconnect window.
@last_message = Time.now
backend_instance.request_reconnect
end

timer.queue(keepalive_interval, &blk) if keepalive_interval > MIN_KEEPALIVE
Expand Down
6 changes: 6 additions & 0 deletions lib/message_bus/backends/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ def global_unsubscribe
raise ConcreteClassMustImplementError
end

# Closes the subscriber's connection so its blocked read returns an error
# and the backend's own rescue/retry re-establishes it cooperatively.
# Backends that cannot get a stuck connection may leave this as a no-op.
def request_reconnect
end

# Subscribe to messages on all channels. Each message since the last ID
# specified will be delivered by yielding to the passed block as soon as
# it is available. This will block until subscription is terminated.
Expand Down
9 changes: 9 additions & 0 deletions lib/message_bus/backends/postgres.rb
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ def unsubscribe
sync { @listening_on.clear }
end

def close_subscribe_connection
@subscribe_connection&.close
end

private

def exec_prepared(conn, *a)
Expand Down Expand Up @@ -378,6 +382,11 @@ def global_unsubscribe
@subscribed = false
end

# (see Base#request_reconnect)
def request_reconnect
client.close_subscribe_connection
end

# (see Base#global_subscribe)
def global_subscribe(last_id = nil)
raise ArgumentError unless block_given?
Expand Down
9 changes: 8 additions & 1 deletion lib/message_bus/backends/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def initialize(config = {}, max_backlog_size = 1000)
@flush_backlog_thread = nil
@pub_redis = nil
@subscribed = false
@subscriber_connection = nil
# after 7 days inactive backlogs will be removed
@max_backlog_age = 604_800
end
Expand Down Expand Up @@ -286,7 +287,7 @@ def global_subscribe(last_id = nil, &blk)
end

begin
global_redis = new_redis_connection
global_redis = @subscriber_connection = new_redis_connection

clear_backlog.call(&blk) if highest_id

Expand Down Expand Up @@ -327,9 +328,15 @@ def global_subscribe(last_id = nil, &blk)
retry
ensure
global_redis&.disconnect!
@subscriber_connection = nil
end
end

# (see Base#request_reconnect)
def request_reconnect
@subscriber_connection&.disconnect!
end

private

def new_redis_connection
Expand Down
92 changes: 92 additions & 0 deletions spec/lib/message_bus/backend_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -392,4 +392,96 @@
got.map { |m| m.data }.must_equal ["12"]
end

it "retries global_subscribe after a redis subscribe failure" do
skip "Redis backend only" unless CURRENT_BACKEND == :redis

log_output = StringIO.new
@bus.instance_variable_set(:@logger, Logger.new(log_output))

fail_first_subscribe = true
real_new_redis_connection = @bus.method(:new_redis_connection)

@bus.define_singleton_method(:new_redis_connection) do
redis = real_new_redis_connection.call

if fail_first_subscribe
fail_first_subscribe = false
redis.define_singleton_method(:subscribe) do |_channel, &_blk|
raise IOError, "forced subscribe failure"
end
end

redis
end

got = []

t = Thread.new do
@bus.global_subscribe(0) do |msg|
got << msg
end
end

wait_for(4000) { log_output.string.include?("forced subscribe failure") }
wait_for(5000) { @bus.subscribed }

@bus.publish("/redis-retry", "delivered-after-retry")
wait_for(3000) { got.any? { |m| m.data == "delivered-after-retry" } }

@bus.global_unsubscribe
t.join(2)
t.kill if t.alive?

log_output.string.must_include "subscribe failed, reconnecting in 1 second"
got.map(&:data).must_include "delivered-after-retry"
end

it "retries global_subscribe after request_reconnect disconnects redis" do
skip "Redis backend only" unless CURRENT_BACKEND == :redis

log_output = StringIO.new
@bus.instance_variable_set(:@logger, Logger.new(log_output))
subscribe_attempts = 0
real_new_redis_connection = @bus.method(:new_redis_connection)

@bus.define_singleton_method(:new_redis_connection) do
redis = real_new_redis_connection.call
real_subscribe = redis.method(:subscribe)
redis.define_singleton_method(:subscribe) do |*args, &blk|
subscribe_attempts += 1
real_subscribe.call(*args, &blk)
end
redis
end

got = []

t = Thread.new do
@bus.global_subscribe(0) do |msg|
got << msg
end
end

wait_for(5000) { @bus.subscribed }
wait_for(5000) { subscribe_attempts >= 1 }

@bus.publish("/redis-reconnect", "before-reconnect")
wait_for(3000) { got.any? { |m| m.data == "before-reconnect" } }

@bus.request_reconnect

wait_for(7000) { log_output.string.include?("subscribe failed, reconnecting in 1 second") }
wait_for(7000) { subscribe_attempts >= 2 }

@bus.publish("/redis-reconnect", "after-reconnect")
wait_for(5000) { got.any? { |m| m.data == "after-reconnect" } }

@bus.global_unsubscribe
t.join(2)
t.kill if t.alive?

got.map(&:data).must_include "after-reconnect"
subscribe_attempts.must_be :>=, 2
end

end
123 changes: 123 additions & 0 deletions spec/lib/message_bus_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -398,4 +398,127 @@
channel.must_equal('/test')
end
end

describe "keepalive reconnect watchdog" do
FAST_KEEPALIVE = 0.5

before do
@original_min_keepalive = MessageBus::Implementation::MIN_KEEPALIVE
MessageBus::Implementation.send(:remove_const, :MIN_KEEPALIVE)
MessageBus::Implementation.const_set(:MIN_KEEPALIVE, 0)

@log_output = StringIO.new
@bus.configure(keepalive_interval: FAST_KEEPALIVE, logger: Logger.new(@log_output))
end

after do
MessageBus::Implementation.send(:remove_const, :MIN_KEEPALIVE)
MessageBus::Implementation.const_set(:MIN_KEEPALIVE, @original_min_keepalive)
end

# Replaces global_subscribe with a blocked read simulation that never yields.
# The loop exits when global_unsubscribe flips @subscribed to false.
def stall_global_subscribe(backend)
backend.define_singleton_method(:global_subscribe) do |_last_id = nil, &_blk|
@subscribed = true
sleep(0.001) while @subscribed
end
end

it "invokes request_reconnect and logs a warning when subscriber reads stall" do
backend = @bus.backend_instance
reconnect_calls = 0
stall_global_subscribe(backend)

backend.define_singleton_method(:request_reconnect) do
reconnect_calls += 1
# Simulate reconnect teardown finishing, allowing this fake subscribe loop to exit.
@subscribed = false
end

@bus.after_fork
wait_for(1000) { @bus.listening? }

wait_for(4000) { reconnect_calls == 1 }
wait_for(4000) { @log_output.string.include?("no longer functioning correctly") }

reconnect_calls.must_equal 1
@log_output.string.must_include "timed out"
@log_output.string.must_include "no longer functioning correctly"
end

it "resets @last_message so reconnect attempts are not immediate cascades" do
backend = @bus.backend_instance
reconnect_times = []
stall_global_subscribe(backend)

backend.define_singleton_method(:request_reconnect) do
reconnect_times << Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

@bus.after_fork
wait_for(1000) { @bus.listening? }
wait_for(7000) { reconnect_times.length >= 2 }

first_gap = reconnect_times[1] - reconnect_times[0]
first_gap.must_be :>=, FAST_KEEPALIVE * 2.8
end

it "keeps the timer thread running when request_reconnect raises" do
backend = @bus.backend_instance
reconnect_failed = false
stall_global_subscribe(backend)

backend.define_singleton_method(:request_reconnect) do
reconnect_failed = true
raise IOError, "simulated connection error"
end

@bus.after_fork
wait_for(1000) { @bus.listening? }

wait_for(4000) { reconnect_failed }
wait_for(4000) { @log_output.string.include?("Failed to process job") }

timer_job_ran = false
@bus.timer.queue(0.01) { timer_job_ran = true }
wait_for(1000) { timer_job_ran }

@bus.listening?.must_equal true
@log_output.string.must_include "simulated connection error"
end
end

describe "request_reconnect backend wiring" do
it "is a no-op on the abstract base backend" do
backend = MessageBus::Backends::Base.new
backend.request_reconnect.must_be_nil
end

it "disconnects the redis subscriber connection" do
skip "Redis backend only" unless CURRENT_BACKEND == :redis

backend = @bus.backend_instance
disconnect_calls = 0
fake_connection = Object.new
fake_connection.define_singleton_method(:disconnect!) { disconnect_calls += 1 }
backend.instance_variable_set(:@subscriber_connection, fake_connection)

backend.request_reconnect

disconnect_calls.must_equal 1
end

it "closes the postgres subscribe connection" do
skip "Postgres backend only" unless CURRENT_BACKEND == :postgres

backend = @bus.backend_instance
close_calls = 0
backend.send(:client).define_singleton_method(:close_subscribe_connection) { close_calls += 1 }

backend.request_reconnect

close_calls.must_equal 1
end
end
end