Skip to content

Commit f38cb57

Browse files
committed
- improve extensbility that will needed in aa-failover feature
1 parent 5d96c67 commit f38cb57

File tree

5 files changed

+60
-6
lines changed

5 files changed

+60
-6
lines changed

src/main/java/io/lettuce/core/CommandListenerWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
* @since 6.1
4646
*/
4747
@SuppressWarnings("unchecked")
48-
public class CommandListenerWriter implements RedisChannelWriter {
48+
public class CommandListenerWriter implements RedisChannelWriter, Delegating<RedisChannelWriter> {
4949

5050
private final RedisChannelWriter delegate;
5151

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.lettuce.core;
2+
3+
/**
4+
* A delegating interface that allows access to the underlying delegate.
5+
*
6+
* @param <T> the type of the delegate.
7+
*
8+
* @author Ali Takavci
9+
* @since 7.1
10+
*/
11+
public interface Delegating<T> {
12+
13+
/**
14+
* The underlying delegate.
15+
*
16+
* @return never {@code null}.
17+
*/
18+
T getDelegate();
19+
20+
/**
21+
* Unwrap the underlying delegate, through the recursively wrapped {@link Delegating} interfaces if available.
22+
*
23+
* @return the unwrapped delegate.
24+
*/
25+
default T unwrap() {
26+
T delegate = getDelegate();
27+
if (delegate instanceof Delegating) {
28+
@SuppressWarnings("unchecked")
29+
T unwrapped = ((Delegating<T>) delegate).unwrap();
30+
return unwrapped;
31+
}
32+
return delegate;
33+
}
34+
35+
}

src/main/java/io/lettuce/core/RedisClient.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandalone
275275

276276
logger.debug("Trying to get a Redis connection for: {}", redisURI);
277277

278-
DefaultEndpoint endpoint = new DefaultEndpoint(getOptions(), getResources());
278+
DefaultEndpoint endpoint = createEndpoint();
279279
RedisChannelWriter writer = endpoint;
280280

281281
if (CommandExpiryWriter.isSupported(getOptions())) {
@@ -408,7 +408,7 @@ private <K, V> ConnectionFuture<StatefulRedisPubSubConnection<K, V>> connectPubS
408408
assertNotNull(codec);
409409
checkValidRedisURI(redisURI);
410410

411-
PubSubEndpoint<K, V> endpoint = new PubSubEndpoint<>(getOptions(), getResources());
411+
PubSubEndpoint<K, V> endpoint = createPubSubEndpoint();
412412
RedisChannelWriter writer = endpoint;
413413

414414
if (CommandExpiryWriter.isSupported(getOptions())) {
@@ -575,7 +575,7 @@ private <K, V> ConnectionFuture<StatefulRedisSentinelConnection<K, V>> doConnect
575575
connectionBuilder.clientOptions(ClientOptions.copyOf(getOptions()));
576576
connectionBuilder.clientResources(getResources());
577577

578-
DefaultEndpoint endpoint = new DefaultEndpoint(getOptions(), getResources());
578+
DefaultEndpoint endpoint = createEndpoint();
579579
RedisChannelWriter writer = endpoint;
580580

581581
if (CommandExpiryWriter.isSupported(getOptions())) {
@@ -840,4 +840,22 @@ private void checkForRedisURI() {
840840
checkValidRedisURI(this.redisURI);
841841
}
842842

843+
/**
844+
* Create a new {@link DefaultEndpoint}. Subclasses may override this method to change the default behavior.
845+
*
846+
* @return a new {@link DefaultEndpoint}.
847+
*/
848+
protected DefaultEndpoint createEndpoint() {
849+
return new DefaultEndpoint(getOptions(), getResources());
850+
}
851+
852+
/**
853+
* Create a new {@link PubSubEndpoint}. Subclasses may override this method to change the default behavior.
854+
*
855+
* @return a new {@link PubSubEndpoint}.
856+
*/
857+
protected <K, V> PubSubEndpoint<K, V> createPubSubEndpoint() {
858+
return new PubSubEndpoint<>(getOptions(), getResources());
859+
}
860+
843861
}

src/main/java/io/lettuce/core/cluster/NodeSelectionInvocationHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ private Object getExecutions(Map<RedisClusterNode, Object> executions, long time
178178

179179
if (executionModel == ExecutionModel.REACTIVE) {
180180
Map<RedisClusterNode, CompletionStage<? extends Publisher<?>>> reactiveExecutions = (Map) executions;
181-
return new ReactiveExecutionsImpl<>(reactiveExecutions);
181+
return new ReactiveExecutionsImpl<>((Map) reactiveExecutions);
182182
}
183183

184184
Map<RedisClusterNode, CompletionStage<?>> asyncExecutions = (Map) executions;

src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.concurrent.TimeUnit;
2929

3030
import io.lettuce.core.ClientOptions;
31+
import io.lettuce.core.Delegating;
3132
import io.lettuce.core.RedisChannelWriter;
3233
import io.lettuce.core.TimeoutOptions;
3334
import io.lettuce.core.internal.ExceptionFactory;
@@ -45,7 +46,7 @@
4546
* @since 5.1
4647
* @see io.lettuce.core.TimeoutOptions
4748
*/
48-
public class CommandExpiryWriter implements RedisChannelWriter {
49+
public class CommandExpiryWriter implements RedisChannelWriter, Delegating<RedisChannelWriter> {
4950

5051
private final RedisChannelWriter delegate;
5152

0 commit comments

Comments
 (0)