Skip to content

Commit b99ba65

Browse files
committed
- draft implementation for automatic-failover
1 parent 5d96c67 commit b99ba65

39 files changed

+5192
-5
lines changed
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
package io.lettuce.core.failover;
2+
3+
import java.io.IOException;
4+
import java.net.ConnectException;
5+
import java.util.Arrays;
6+
import java.util.HashSet;
7+
import java.util.Set;
8+
import java.util.concurrent.ConcurrentHashMap;
9+
import java.util.concurrent.TimeoutException;
10+
import java.util.function.Predicate;
11+
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
15+
import io.lettuce.core.RedisCommandTimeoutException;
16+
import io.lettuce.core.RedisConnectionException;
17+
import io.lettuce.core.failover.api.CircuitBreakerStateListener;
18+
import io.lettuce.core.failover.metrics.CircuitBreakerMetrics;
19+
import io.lettuce.core.failover.metrics.CircuitBreakerMetricsImpl;
20+
21+
/**
22+
* Circuit breaker for tracking command metrics and managing circuit breaker state. Wraps CircuitBreakerMetrics and exposes it
23+
* via {@link #getMetrics()}.
24+
*
25+
* @author Ali Takavci
26+
* @since 7.1
27+
*/
28+
public class CircuitBreaker {
29+
30+
private static final Logger log = LoggerFactory.getLogger(CircuitBreaker.class);
31+
32+
private final CircuitBreakerMetrics metrics;
33+
34+
private final CircuitBreakerConfig config;
35+
36+
private volatile State currentState = State.CLOSED;
37+
38+
private Predicate<Throwable> exceptionsPredicate;
39+
40+
private final Set<CircuitBreakerStateListener> listeners = ConcurrentHashMap.newKeySet();
41+
42+
/**
43+
* Create a circuit breaker instance.
44+
*/
45+
public CircuitBreaker(CircuitBreakerConfig config) {
46+
this.metrics = new CircuitBreakerMetricsImpl();
47+
this.config = config;
48+
this.exceptionsPredicate = createExceptionsPredicate(config.trackedExceptions);
49+
}
50+
51+
/**
52+
* Get the metrics tracked by this circuit breaker.
53+
*
54+
* @return the circuit breaker metrics
55+
*/
56+
public CircuitBreakerMetrics getMetrics() {
57+
return metrics;
58+
}
59+
60+
@Override
61+
public String toString() {
62+
return "CircuitBreaker{" + "metrics=" + metrics + ", config=" + config + '}';
63+
}
64+
65+
public boolean isCircuitBreakerTrackedException(Throwable error) {
66+
return exceptionsPredicate.test(error);
67+
}
68+
69+
private static Predicate<Throwable> createExceptionsPredicate(Set<Class<? extends Throwable>> trackedExceptions) {
70+
return throwable -> {
71+
Class<? extends Throwable> errorClass = throwable.getClass();
72+
for (Class<? extends Throwable> trackedException : trackedExceptions) {
73+
if (trackedException.isAssignableFrom(errorClass)) {
74+
return true;
75+
}
76+
}
77+
return false;
78+
};
79+
}
80+
81+
public void evaluateMetrics() {
82+
boolean evaluationResult = metrics.getSnapshot().getFailureRate() >= config.getFailureRateThreshold()
83+
&& metrics.getSnapshot().getFailureCount() >= config.getMinimumNumberOfFailures();
84+
if (evaluationResult) {
85+
stateTransitionTo(State.OPEN);
86+
}
87+
}
88+
89+
private void stateTransitionTo(State newState) {
90+
State previousState = this.currentState;
91+
if (previousState != newState) {
92+
this.currentState = newState;
93+
fireStateChanged(previousState, newState);
94+
}
95+
}
96+
97+
public State getCurrentState() {
98+
return currentState;
99+
}
100+
101+
/**
102+
* Add a listener for circuit breaker state change events.
103+
*
104+
* @param listener the listener to add, must not be {@code null}
105+
*/
106+
public void addListener(CircuitBreakerStateListener listener) {
107+
listeners.add(listener);
108+
}
109+
110+
/**
111+
* Remove a listener for circuit breaker state change events.
112+
*
113+
* @param listener the listener to remove, must not be {@code null}
114+
*/
115+
public void removeListener(CircuitBreakerStateListener listener) {
116+
listeners.remove(listener);
117+
}
118+
119+
/**
120+
* Fire a state change event to all registered listeners.
121+
*
122+
* @param previousState the previous state
123+
* @param newState the new state
124+
*/
125+
private void fireStateChanged(State previousState, State newState) {
126+
CircuitBreakerStateChangeEvent event = new CircuitBreakerStateChangeEvent(this, previousState, newState);
127+
for (CircuitBreakerStateListener listener : listeners) {
128+
try {
129+
listener.onCircuitBreakerStateChange(event);
130+
} catch (Exception e) {
131+
// Ignore listener exceptions to prevent one bad listener from affecting others
132+
log.error("Error notifying listener " + listener + " of state change " + event, e);
133+
}
134+
}
135+
}
136+
137+
public static enum State {
138+
CLOSED, OPEN
139+
}
140+
141+
public static class CircuitBreakerConfig {
142+
143+
private final static float DEFAULT_FAILURE_RATE_THRESHOLD = 10;
144+
145+
private final static int DEFAULT_MINIMUM_NUMBER_OF_FAILURES = 1000;
146+
147+
private final static Set<Class<? extends Throwable>> DEFAULT_TRACKED_EXCEPTIONS = new HashSet<>(Arrays.asList(
148+
149+
// Connection failures
150+
RedisConnectionException.class, // Connection establishment failures
151+
IOException.class, // Network I/O failures (includes ClosedChannelException)
152+
ConnectException.class, // Connection refused, etc.
153+
154+
// Timeout failures
155+
RedisCommandTimeoutException.class, // Command execution timeout
156+
TimeoutException.class // Generic timeout
157+
158+
));
159+
160+
public static final CircuitBreakerConfig DEFAULT = new CircuitBreakerConfig();
161+
162+
private final Set<Class<? extends Throwable>> trackedExceptions;
163+
164+
private final float failureThreshold;
165+
166+
private final int minimumNumberOfFailures;
167+
168+
private CircuitBreakerConfig() {
169+
this(DEFAULT_FAILURE_RATE_THRESHOLD, DEFAULT_MINIMUM_NUMBER_OF_FAILURES, DEFAULT_TRACKED_EXCEPTIONS);
170+
}
171+
172+
public CircuitBreakerConfig(float failureThreshold, int minimumNumberOfFailures,
173+
Set<Class<? extends Throwable>> trackedExceptions) {
174+
this.trackedExceptions = trackedExceptions;
175+
this.failureThreshold = failureThreshold;
176+
this.minimumNumberOfFailures = minimumNumberOfFailures;
177+
}
178+
179+
public Set<Class<? extends Throwable>> getTrackedExceptions() {
180+
return trackedExceptions;
181+
}
182+
183+
public float getFailureRateThreshold() {
184+
return failureThreshold;
185+
}
186+
187+
public int getMinimumNumberOfFailures() {
188+
return minimumNumberOfFailures;
189+
}
190+
191+
}
192+
193+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package io.lettuce.core.failover;
2+
3+
/**
4+
* Event representing a circuit breaker state change.
5+
*
6+
* @author Ali Takavci
7+
* @since 7.1
8+
*/
9+
public class CircuitBreakerStateChangeEvent {
10+
11+
private final CircuitBreaker circuitBreaker;
12+
13+
private final CircuitBreaker.State previousState;
14+
15+
private final CircuitBreaker.State newState;
16+
17+
private final long timestamp;
18+
19+
/**
20+
* Create a new circuit breaker state change event.
21+
*
22+
* @param circuitBreaker the circuit breaker instance
23+
* @param previousState the previous state
24+
* @param newState the new state
25+
*/
26+
public CircuitBreakerStateChangeEvent(CircuitBreaker circuitBreaker, CircuitBreaker.State previousState,
27+
CircuitBreaker.State newState) {
28+
this.circuitBreaker = circuitBreaker;
29+
this.previousState = previousState;
30+
this.newState = newState;
31+
this.timestamp = System.currentTimeMillis();
32+
}
33+
34+
/**
35+
* Get the circuit breaker instance that changed state.
36+
*
37+
* @return the circuit breaker instance
38+
*/
39+
public CircuitBreaker getCircuitBreaker() {
40+
return circuitBreaker;
41+
}
42+
43+
/**
44+
* Get the previous state before the transition.
45+
*
46+
* @return the previous state
47+
*/
48+
public CircuitBreaker.State getPreviousState() {
49+
return previousState;
50+
}
51+
52+
/**
53+
* Get the new state after the transition.
54+
*
55+
* @return the new state
56+
*/
57+
public CircuitBreaker.State getNewState() {
58+
return newState;
59+
}
60+
61+
/**
62+
* Get the timestamp when the state change occurred.
63+
*
64+
* @return the timestamp in milliseconds since epoch
65+
*/
66+
public long getTimestamp() {
67+
return timestamp;
68+
}
69+
70+
@Override
71+
public String toString() {
72+
return "CircuitBreakerStateChangeEvent{" + "previousState=" + previousState + ", newState=" + newState + ", timestamp="
73+
+ timestamp + '}';
74+
}
75+
76+
}

0 commit comments

Comments
 (0)