-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Description
Bug Report
The observeChannels() API does not propagate sharded pub/sub messages.
Current Behavior
Was switching some existing code that was using classic pub/sub to sharded pub/sub. When we switched to subscribing with ssubscribe I was confused why messages suddenly stopped flowing and it turns out that the RedisPubSubAdapter<> that observeChannels creates doesn't override the smessage method so it's simply dropping them on the floor.
Input Code
The original code boils down to simply doing:
connection.reactive().observeChannels()
.map(cm -> {
// ... do stuff w/message here ...
});This still "works" from an API call perspective obviously, but never receives sharded messages.
Expected behavior/code
I would expect that sharded messages flow through this API as well for a consistent experience.
Environment
- Lettuce version(s): 6.6.0.RELEASE
- Redis version: 7.x (n/a here really)
Possible Solution
Here's the current implementation from main as of writing that creates a RedisPubSubAdapter<>:
lettuce/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java
Lines 98 to 105 in ec10c60
| RedisPubSubAdapter<K, V> listener = new RedisPubSubAdapter<K, V>() { | |
| @Override | |
| public void message(K channel, V message) { | |
| sink.next(new ChannelMessage<>(channel, message)); | |
| } | |
| }; |
Could also just add an override of the smessage callback method on the RedisPubSubAdapter<> created and tunnel those through to the sink:
@Override
public void smessage(K shardChannel, V message) {
sink.next(new ChannelMessage<>(shardChannel, message));
}