Skip to content

Commit a91c005

Browse files
ti-chi-botrleungx
andauthored
client: introduce circuit breaker for region calls (#8856) (#9913)
ref #8678 Signed-off-by: Ryan Leung <[email protected]> Co-authored-by: Ryan Leung <[email protected]>
1 parent a3a405b commit a91c005

File tree

15 files changed

+1221
-327
lines changed

15 files changed

+1221
-327
lines changed
Lines changed: 346 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,346 @@
1+
// Copyright 2024 TiKV Project Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
package circuitbreaker
15+
16+
import (
17+
"context"
18+
"fmt"
19+
"strings"
20+
"sync"
21+
"time"
22+
23+
"github.com/tikv/pd/client/errs"
24+
m "github.com/tikv/pd/client/metrics"
25+
26+
"github.com/prometheus/client_golang/prometheus"
27+
"go.uber.org/zap"
28+
29+
"github.com/pingcap/log"
30+
)
31+
32+
// Overloading is a type describing service return value
33+
type Overloading bool
34+
35+
const (
36+
// No means the service is not overloaded
37+
No = false
38+
// Yes means the service is overloaded
39+
Yes = true
40+
)
41+
42+
// Settings describes configuration for Circuit Breaker
43+
type Settings struct {
44+
// Defines the error rate threshold to trip the circuit breaker.
45+
ErrorRateThresholdPct uint32
46+
// Defines the average qps over the `error_rate_window` that must be met before evaluating the error rate threshold.
47+
MinQPSForOpen uint32
48+
// Defines how long to track errors before evaluating error_rate_threshold.
49+
ErrorRateWindow time.Duration
50+
// Defines how long to wait after circuit breaker is open before go to half-open state to send a probe request.
51+
CoolDownInterval time.Duration
52+
// Defines how many subsequent requests to test after cooldown period before fully close the circuit.
53+
HalfOpenSuccessCount uint32
54+
}
55+
56+
// AlwaysClosedSettings is a configuration that never trips the circuit breaker.
57+
var AlwaysClosedSettings = Settings{
58+
ErrorRateThresholdPct: 0, // never trips
59+
ErrorRateWindow: 10 * time.Second, // effectively results in testing for new settings every 10 seconds
60+
MinQPSForOpen: 10,
61+
CoolDownInterval: 10 * time.Second,
62+
HalfOpenSuccessCount: 1,
63+
}
64+
65+
// CircuitBreaker is a state machine to prevent sending requests that are likely to fail.
66+
type CircuitBreaker struct {
67+
config *Settings
68+
name string
69+
70+
sync.RWMutex
71+
state *State
72+
73+
successCounter prometheus.Counter
74+
errorCounter prometheus.Counter
75+
overloadCounter prometheus.Counter
76+
fastFailCounter prometheus.Counter
77+
}
78+
79+
// StateType is a type that represents a state of CircuitBreaker.
80+
type StateType int
81+
82+
// States of CircuitBreaker.
83+
const (
84+
StateClosed StateType = iota
85+
StateOpen
86+
StateHalfOpen
87+
)
88+
89+
// String implements stringer interface.
90+
func (s StateType) String() string {
91+
switch s {
92+
case StateClosed:
93+
return "closed"
94+
case StateOpen:
95+
return "open"
96+
case StateHalfOpen:
97+
return "half-open"
98+
default:
99+
return fmt.Sprintf("unknown state: %d", s)
100+
}
101+
}
102+
103+
var replacer = strings.NewReplacer(" ", "_", "-", "_")
104+
105+
// NewCircuitBreaker returns a new CircuitBreaker configured with the given Settings.
106+
func NewCircuitBreaker(name string, st Settings) *CircuitBreaker {
107+
cb := new(CircuitBreaker)
108+
cb.name = name
109+
cb.config = &st
110+
cb.state = cb.newState(time.Now(), StateClosed)
111+
112+
m.RegisterConsumer(func() {
113+
registerMetrics(cb)
114+
})
115+
return cb
116+
}
117+
118+
func registerMetrics(cb *CircuitBreaker) {
119+
metricName := replacer.Replace(cb.name)
120+
cb.successCounter = m.CircuitBreakerCounters.WithLabelValues(metricName, "success")
121+
cb.errorCounter = m.CircuitBreakerCounters.WithLabelValues(metricName, "error")
122+
cb.overloadCounter = m.CircuitBreakerCounters.WithLabelValues(metricName, "overload")
123+
cb.fastFailCounter = m.CircuitBreakerCounters.WithLabelValues(metricName, "fast_fail")
124+
}
125+
126+
// IsEnabled returns true if the circuit breaker is enabled.
127+
func (cb *CircuitBreaker) IsEnabled() bool {
128+
cb.RLock()
129+
defer cb.RUnlock()
130+
return cb.config.ErrorRateThresholdPct > 0
131+
}
132+
133+
// ChangeSettings changes the CircuitBreaker settings.
134+
// The changes will be reflected only in the next evaluation window.
135+
func (cb *CircuitBreaker) ChangeSettings(apply func(config *Settings)) {
136+
cb.Lock()
137+
defer cb.Unlock()
138+
139+
apply(cb.config)
140+
log.Debug("circuit breaker settings changed", zap.Any("config", cb.config))
141+
}
142+
143+
// Execute calls the given function if the CircuitBreaker is closed and returns the result of execution.
144+
// Execute returns an error instantly if the CircuitBreaker is open.
145+
// https://github.com/tikv/rfcs/blob/master/text/0115-circuit-breaker.md
146+
func (cb *CircuitBreaker) Execute(call func() (Overloading, error)) error {
147+
state, err := cb.onRequest()
148+
if err != nil {
149+
cb.fastFailCounter.Inc()
150+
return err
151+
}
152+
153+
defer func() {
154+
e := recover()
155+
if e != nil {
156+
cb.emitMetric(Yes, err)
157+
cb.onResult(state, Yes)
158+
panic(e)
159+
}
160+
}()
161+
162+
overloaded, err := call()
163+
cb.emitMetric(overloaded, err)
164+
cb.onResult(state, overloaded)
165+
return err
166+
}
167+
168+
func (cb *CircuitBreaker) onRequest() (*State, error) {
169+
cb.Lock()
170+
defer cb.Unlock()
171+
172+
state, err := cb.state.onRequest(cb)
173+
cb.state = state
174+
return state, err
175+
}
176+
177+
func (cb *CircuitBreaker) onResult(state *State, overloaded Overloading) {
178+
cb.Lock()
179+
defer cb.Unlock()
180+
181+
// even if the circuit breaker already moved to a new state while the request was in progress,
182+
// it is still ok to update the old state, but it is not relevant anymore
183+
state.onResult(overloaded)
184+
}
185+
186+
func (cb *CircuitBreaker) emitMetric(overloaded Overloading, err error) {
187+
switch overloaded {
188+
case No:
189+
cb.successCounter.Inc()
190+
case Yes:
191+
cb.overloadCounter.Inc()
192+
default:
193+
panic("unknown state")
194+
}
195+
if err != nil {
196+
cb.errorCounter.Inc()
197+
}
198+
}
199+
200+
// State represents the state of CircuitBreaker.
201+
type State struct {
202+
stateType StateType
203+
cb *CircuitBreaker
204+
end time.Time
205+
206+
pendingCount uint32
207+
successCount uint32
208+
failureCount uint32
209+
}
210+
211+
// newState creates a new State with the given configuration and reset all success/failure counters.
212+
func (cb *CircuitBreaker) newState(now time.Time, stateType StateType) *State {
213+
var end time.Time
214+
var pendingCount uint32
215+
switch stateType {
216+
case StateClosed:
217+
end = now.Add(cb.config.ErrorRateWindow)
218+
case StateOpen:
219+
end = now.Add(cb.config.CoolDownInterval)
220+
case StateHalfOpen:
221+
// we transition to HalfOpen state on the first request after the cooldown period,
222+
// so we start with 1 pending request
223+
pendingCount = 1
224+
default:
225+
panic("unknown state")
226+
}
227+
return &State{
228+
cb: cb,
229+
stateType: stateType,
230+
pendingCount: pendingCount,
231+
end: end,
232+
}
233+
}
234+
235+
// onRequest transitions the state to the next state based on the current state and the previous requests results
236+
// The implementation represents a state machine for CircuitBreaker
237+
// All state transitions happens at the request evaluation time only
238+
// Circuit breaker start with a closed state, allows all requests to pass through and always lasts for a fixed duration of `Settings.ErrorRateWindow`.
239+
// If `Settings.ErrorRateThresholdPct` is breached at the end of the window, then it moves to Open state, otherwise it moves to a new Closed state with a new window.
240+
// Open state fails all request, it has a fixed duration of `Settings.CoolDownInterval` and always moves to HalfOpen state at the end of the interval.
241+
// HalfOpen state does not have a fixed duration and lasts till `Settings.HalfOpenSuccessCount` are evaluated.
242+
// If any of `Settings.HalfOpenSuccessCount` fails then it moves back to Open state, otherwise it moves to Closed state.
243+
func (s *State) onRequest(cb *CircuitBreaker) (*State, error) {
244+
var now = time.Now()
245+
switch s.stateType {
246+
case StateClosed:
247+
if now.After(s.end) {
248+
// ErrorRateWindow is over, let's evaluate the error rate
249+
if s.cb.config.ErrorRateThresholdPct > 0 { // otherwise circuit breaker is disabled
250+
total := s.failureCount + s.successCount
251+
if total > 0 {
252+
observedErrorRatePct := s.failureCount * 100 / total
253+
if total >= uint32(s.cb.config.ErrorRateWindow.Seconds())*s.cb.config.MinQPSForOpen && observedErrorRatePct >= s.cb.config.ErrorRateThresholdPct {
254+
// the error threshold is breached, let's move to open state and start failing all requests
255+
log.Error("circuit breaker tripped and starting to fail all requests",
256+
zap.String("name", cb.name),
257+
zap.Uint32("observed-err-rate-pct", observedErrorRatePct),
258+
zap.Any("config", cb.config))
259+
return cb.newState(now, StateOpen), errs.ErrCircuitBreakerOpen
260+
}
261+
}
262+
}
263+
// the error threshold is not breached or there were not enough requests to evaluate it,
264+
// continue in the closed state and allow all requests
265+
return cb.newState(now, StateClosed), nil
266+
}
267+
// continue in closed state till ErrorRateWindow is over
268+
return s, nil
269+
case StateOpen:
270+
if s.cb.config.ErrorRateThresholdPct == 0 {
271+
return cb.newState(now, StateClosed), nil
272+
}
273+
274+
if now.After(s.end) {
275+
// CoolDownInterval is over, it is time to transition to half-open state
276+
log.Info("circuit breaker cooldown period is over. Transitioning to half-open state to test the service",
277+
zap.String("name", cb.name),
278+
zap.Any("config", cb.config))
279+
return cb.newState(now, StateHalfOpen), nil
280+
} else {
281+
// continue in the open state till CoolDownInterval is over
282+
return s, errs.ErrCircuitBreakerOpen
283+
}
284+
case StateHalfOpen:
285+
if s.cb.config.ErrorRateThresholdPct == 0 {
286+
return cb.newState(now, StateClosed), nil
287+
}
288+
289+
// do we need some expire time here in case of one of pending requests is stuck forever?
290+
if s.failureCount > 0 {
291+
// there were some failures during half-open state, let's go back to open state to wait a bit longer
292+
log.Error("circuit breaker goes from half-open to open again as errors persist and continue to fail all requests",
293+
zap.String("name", cb.name),
294+
zap.Any("config", cb.config))
295+
return cb.newState(now, StateOpen), errs.ErrCircuitBreakerOpen
296+
} else if s.successCount == s.cb.config.HalfOpenSuccessCount {
297+
// all probe requests are succeeded, we can move to closed state and allow all requests
298+
log.Info("circuit breaker is closed and start allowing all requests",
299+
zap.String("name", cb.name),
300+
zap.Any("config", cb.config))
301+
return cb.newState(now, StateClosed), nil
302+
} else if s.pendingCount < s.cb.config.HalfOpenSuccessCount {
303+
// allow more probe requests and continue in half-open state
304+
s.pendingCount++
305+
return s, nil
306+
} else {
307+
// continue in half-open state till all probe requests are done and fail all other requests for now
308+
return s, errs.ErrCircuitBreakerOpen
309+
}
310+
default:
311+
panic("unknown state")
312+
}
313+
}
314+
315+
func (s *State) onResult(overloaded Overloading) {
316+
switch overloaded {
317+
case No:
318+
s.successCount++
319+
case Yes:
320+
s.failureCount++
321+
default:
322+
panic("unknown state")
323+
}
324+
}
325+
326+
// Define context key type
327+
type cbCtxKey struct{}
328+
329+
// Key used to store circuit breaker
330+
var CircuitBreakerKey = cbCtxKey{}
331+
332+
// FromContext retrieves the circuit breaker from the context
333+
func FromContext(ctx context.Context) *CircuitBreaker {
334+
if ctx == nil {
335+
return nil
336+
}
337+
if cb, ok := ctx.Value(CircuitBreakerKey).(*CircuitBreaker); ok {
338+
return cb
339+
}
340+
return nil
341+
}
342+
343+
// WithCircuitBreaker stores the circuit breaker into a new context
344+
func WithCircuitBreaker(ctx context.Context, cb *CircuitBreaker) context.Context {
345+
return context.WithValue(ctx, CircuitBreakerKey, cb)
346+
}

0 commit comments

Comments
 (0)