Skip to content

Reactive API observeChannels does not propagate sharded messages #3494

@drub0y

Description

@drub0y

Bug Report

The observeChannels() API does not propagate sharded pub/sub messages.

Current Behavior

Was switching some existing code that was using classic pub/sub to sharded pub/sub. When we switched to subscribing with ssubscribe I was confused why messages suddenly stopped flowing and it turns out that the RedisPubSubAdapter<> that observeChannels creates doesn't override the smessage method so it's simply dropping them on the floor.

Input Code

The original code boils down to simply doing:

connection.reactive().observeChannels()
 .map(cm -> {
    // ... do stuff w/message here ... 
 });

This still "works" from an API call perspective obviously, but never receives sharded messages.

Expected behavior/code

I would expect that sharded messages flow through this API as well for a consistent experience.

Environment

  • Lettuce version(s): 6.6.0.RELEASE
  • Redis version: 7.x (n/a here really)

Possible Solution

Here's the current implementation from main as of writing that creates a RedisPubSubAdapter<>:

RedisPubSubAdapter<K, V> listener = new RedisPubSubAdapter<K, V>() {
@Override
public void message(K channel, V message) {
sink.next(new ChannelMessage<>(channel, message));
}
};

Could also just add an override of the smessage callback method on the RedisPubSubAdapter<> created and tunnel those through to the sink:

@Override
public void smessage(K shardChannel, V message) {
    sink.next(new ChannelMessage<>(shardChannel, message));
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions