-
Notifications
You must be signed in to change notification settings - Fork 752
client: introduce circuit breaker for region calls (#8856) #9913
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
ti-chi-bot
merged 11 commits into
tikv:release-8.5
from
ti-chi-bot:cherry-pick-8856-to-release-8.5
Dec 10, 2025
Merged
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
2c5a79e
This is an automated cherry-pick of #8856
Tema b98b59c
fix
rleungx 6effde1
fix
rleungx ad72800
client: fix pd client metrics registration (#8994)
dc7e548
metrics: fix typo (#9046)
rleungx 588154f
client: add `IsEnabled` for circuit breaker (#9049)
rleungx c46a272
client: skip if circuit breaker is disabled (#9078)
rleungx 9594e7a
client: use debug level (#9119)
rleungx b2ecca5
fix
rleungx f5275a9
fix
rleungx 73eb87b
*: closed immediately if error rate is changed to 0 (#8887)
rleungx File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,346 @@ | ||
| // Copyright 2024 TiKV Project Authors. | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
| package circuitbreaker | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "strings" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/tikv/pd/client/errs" | ||
| m "github.com/tikv/pd/client/metrics" | ||
|
|
||
| "github.com/prometheus/client_golang/prometheus" | ||
| "go.uber.org/zap" | ||
|
|
||
| "github.com/pingcap/log" | ||
| ) | ||
|
|
||
| // Overloading is a type describing service return value | ||
| type Overloading bool | ||
|
|
||
| const ( | ||
| // No means the service is not overloaded | ||
| No = false | ||
| // Yes means the service is overloaded | ||
| Yes = true | ||
| ) | ||
|
|
||
| // Settings describes configuration for Circuit Breaker | ||
| type Settings struct { | ||
| // Defines the error rate threshold to trip the circuit breaker. | ||
| ErrorRateThresholdPct uint32 | ||
| // Defines the average qps over the `error_rate_window` that must be met before evaluating the error rate threshold. | ||
| MinQPSForOpen uint32 | ||
| // Defines how long to track errors before evaluating error_rate_threshold. | ||
| ErrorRateWindow time.Duration | ||
| // Defines how long to wait after circuit breaker is open before go to half-open state to send a probe request. | ||
| CoolDownInterval time.Duration | ||
| // Defines how many subsequent requests to test after cooldown period before fully close the circuit. | ||
| HalfOpenSuccessCount uint32 | ||
| } | ||
|
|
||
| // AlwaysClosedSettings is a configuration that never trips the circuit breaker. | ||
| var AlwaysClosedSettings = Settings{ | ||
| ErrorRateThresholdPct: 0, // never trips | ||
| ErrorRateWindow: 10 * time.Second, // effectively results in testing for new settings every 10 seconds | ||
| MinQPSForOpen: 10, | ||
| CoolDownInterval: 10 * time.Second, | ||
| HalfOpenSuccessCount: 1, | ||
| } | ||
|
|
||
| // CircuitBreaker is a state machine to prevent sending requests that are likely to fail. | ||
| type CircuitBreaker struct { | ||
| config *Settings | ||
| name string | ||
|
|
||
| sync.RWMutex | ||
| state *State | ||
|
|
||
| successCounter prometheus.Counter | ||
| errorCounter prometheus.Counter | ||
| overloadCounter prometheus.Counter | ||
| fastFailCounter prometheus.Counter | ||
| } | ||
|
|
||
| // StateType is a type that represents a state of CircuitBreaker. | ||
| type StateType int | ||
|
|
||
| // States of CircuitBreaker. | ||
| const ( | ||
| StateClosed StateType = iota | ||
| StateOpen | ||
| StateHalfOpen | ||
| ) | ||
|
|
||
| // String implements stringer interface. | ||
| func (s StateType) String() string { | ||
| switch s { | ||
| case StateClosed: | ||
| return "closed" | ||
| case StateOpen: | ||
| return "open" | ||
| case StateHalfOpen: | ||
| return "half-open" | ||
| default: | ||
| return fmt.Sprintf("unknown state: %d", s) | ||
| } | ||
| } | ||
|
|
||
| var replacer = strings.NewReplacer(" ", "_", "-", "_") | ||
|
|
||
| // NewCircuitBreaker returns a new CircuitBreaker configured with the given Settings. | ||
| func NewCircuitBreaker(name string, st Settings) *CircuitBreaker { | ||
| cb := new(CircuitBreaker) | ||
| cb.name = name | ||
| cb.config = &st | ||
| cb.state = cb.newState(time.Now(), StateClosed) | ||
|
|
||
| m.RegisterConsumer(func() { | ||
| registerMetrics(cb) | ||
| }) | ||
| return cb | ||
| } | ||
|
|
||
| func registerMetrics(cb *CircuitBreaker) { | ||
| metricName := replacer.Replace(cb.name) | ||
| cb.successCounter = m.CircuitBreakerCounters.WithLabelValues(metricName, "success") | ||
| cb.errorCounter = m.CircuitBreakerCounters.WithLabelValues(metricName, "error") | ||
| cb.overloadCounter = m.CircuitBreakerCounters.WithLabelValues(metricName, "overload") | ||
| cb.fastFailCounter = m.CircuitBreakerCounters.WithLabelValues(metricName, "fast_fail") | ||
| } | ||
|
|
||
| // IsEnabled returns true if the circuit breaker is enabled. | ||
| func (cb *CircuitBreaker) IsEnabled() bool { | ||
| cb.RLock() | ||
| defer cb.RUnlock() | ||
| return cb.config.ErrorRateThresholdPct > 0 | ||
| } | ||
|
|
||
| // ChangeSettings changes the CircuitBreaker settings. | ||
| // The changes will be reflected only in the next evaluation window. | ||
| func (cb *CircuitBreaker) ChangeSettings(apply func(config *Settings)) { | ||
| cb.Lock() | ||
| defer cb.Unlock() | ||
|
|
||
| apply(cb.config) | ||
| log.Debug("circuit breaker settings changed", zap.Any("config", cb.config)) | ||
| } | ||
|
|
||
| // Execute calls the given function if the CircuitBreaker is closed and returns the result of execution. | ||
| // Execute returns an error instantly if the CircuitBreaker is open. | ||
| // https://github.com/tikv/rfcs/blob/master/text/0115-circuit-breaker.md | ||
| func (cb *CircuitBreaker) Execute(call func() (Overloading, error)) error { | ||
| state, err := cb.onRequest() | ||
| if err != nil { | ||
| cb.fastFailCounter.Inc() | ||
| return err | ||
| } | ||
|
|
||
| defer func() { | ||
| e := recover() | ||
| if e != nil { | ||
| cb.emitMetric(Yes, err) | ||
| cb.onResult(state, Yes) | ||
| panic(e) | ||
| } | ||
| }() | ||
|
|
||
| overloaded, err := call() | ||
| cb.emitMetric(overloaded, err) | ||
| cb.onResult(state, overloaded) | ||
| return err | ||
| } | ||
|
|
||
| func (cb *CircuitBreaker) onRequest() (*State, error) { | ||
| cb.Lock() | ||
| defer cb.Unlock() | ||
|
|
||
| state, err := cb.state.onRequest(cb) | ||
| cb.state = state | ||
| return state, err | ||
| } | ||
|
|
||
| func (cb *CircuitBreaker) onResult(state *State, overloaded Overloading) { | ||
| cb.Lock() | ||
| defer cb.Unlock() | ||
|
|
||
| // even if the circuit breaker already moved to a new state while the request was in progress, | ||
| // it is still ok to update the old state, but it is not relevant anymore | ||
| state.onResult(overloaded) | ||
| } | ||
|
|
||
| func (cb *CircuitBreaker) emitMetric(overloaded Overloading, err error) { | ||
| switch overloaded { | ||
| case No: | ||
| cb.successCounter.Inc() | ||
| case Yes: | ||
| cb.overloadCounter.Inc() | ||
| default: | ||
| panic("unknown state") | ||
| } | ||
| if err != nil { | ||
| cb.errorCounter.Inc() | ||
| } | ||
| } | ||
|
|
||
| // State represents the state of CircuitBreaker. | ||
| type State struct { | ||
| stateType StateType | ||
| cb *CircuitBreaker | ||
| end time.Time | ||
|
|
||
| pendingCount uint32 | ||
| successCount uint32 | ||
| failureCount uint32 | ||
| } | ||
|
|
||
| // newState creates a new State with the given configuration and reset all success/failure counters. | ||
| func (cb *CircuitBreaker) newState(now time.Time, stateType StateType) *State { | ||
| var end time.Time | ||
| var pendingCount uint32 | ||
| switch stateType { | ||
| case StateClosed: | ||
| end = now.Add(cb.config.ErrorRateWindow) | ||
| case StateOpen: | ||
| end = now.Add(cb.config.CoolDownInterval) | ||
| case StateHalfOpen: | ||
| // we transition to HalfOpen state on the first request after the cooldown period, | ||
| // so we start with 1 pending request | ||
| pendingCount = 1 | ||
| default: | ||
| panic("unknown state") | ||
| } | ||
| return &State{ | ||
| cb: cb, | ||
| stateType: stateType, | ||
| pendingCount: pendingCount, | ||
| end: end, | ||
| } | ||
| } | ||
|
|
||
| // onRequest transitions the state to the next state based on the current state and the previous requests results | ||
| // The implementation represents a state machine for CircuitBreaker | ||
| // All state transitions happens at the request evaluation time only | ||
| // Circuit breaker start with a closed state, allows all requests to pass through and always lasts for a fixed duration of `Settings.ErrorRateWindow`. | ||
| // If `Settings.ErrorRateThresholdPct` is breached at the end of the window, then it moves to Open state, otherwise it moves to a new Closed state with a new window. | ||
| // Open state fails all request, it has a fixed duration of `Settings.CoolDownInterval` and always moves to HalfOpen state at the end of the interval. | ||
| // HalfOpen state does not have a fixed duration and lasts till `Settings.HalfOpenSuccessCount` are evaluated. | ||
| // If any of `Settings.HalfOpenSuccessCount` fails then it moves back to Open state, otherwise it moves to Closed state. | ||
| func (s *State) onRequest(cb *CircuitBreaker) (*State, error) { | ||
| var now = time.Now() | ||
| switch s.stateType { | ||
| case StateClosed: | ||
| if now.After(s.end) { | ||
| // ErrorRateWindow is over, let's evaluate the error rate | ||
| if s.cb.config.ErrorRateThresholdPct > 0 { // otherwise circuit breaker is disabled | ||
| total := s.failureCount + s.successCount | ||
| if total > 0 { | ||
| observedErrorRatePct := s.failureCount * 100 / total | ||
| if total >= uint32(s.cb.config.ErrorRateWindow.Seconds())*s.cb.config.MinQPSForOpen && observedErrorRatePct >= s.cb.config.ErrorRateThresholdPct { | ||
| // the error threshold is breached, let's move to open state and start failing all requests | ||
| log.Error("circuit breaker tripped and starting to fail all requests", | ||
| zap.String("name", cb.name), | ||
| zap.Uint32("observed-err-rate-pct", observedErrorRatePct), | ||
| zap.Any("config", cb.config)) | ||
| return cb.newState(now, StateOpen), errs.ErrCircuitBreakerOpen | ||
| } | ||
| } | ||
| } | ||
| // the error threshold is not breached or there were not enough requests to evaluate it, | ||
| // continue in the closed state and allow all requests | ||
| return cb.newState(now, StateClosed), nil | ||
| } | ||
| // continue in closed state till ErrorRateWindow is over | ||
| return s, nil | ||
| case StateOpen: | ||
| if s.cb.config.ErrorRateThresholdPct == 0 { | ||
| return cb.newState(now, StateClosed), nil | ||
| } | ||
|
|
||
| if now.After(s.end) { | ||
| // CoolDownInterval is over, it is time to transition to half-open state | ||
| log.Info("circuit breaker cooldown period is over. Transitioning to half-open state to test the service", | ||
| zap.String("name", cb.name), | ||
| zap.Any("config", cb.config)) | ||
| return cb.newState(now, StateHalfOpen), nil | ||
| } else { | ||
| // continue in the open state till CoolDownInterval is over | ||
| return s, errs.ErrCircuitBreakerOpen | ||
| } | ||
| case StateHalfOpen: | ||
| if s.cb.config.ErrorRateThresholdPct == 0 { | ||
| return cb.newState(now, StateClosed), nil | ||
| } | ||
|
|
||
| // do we need some expire time here in case of one of pending requests is stuck forever? | ||
| if s.failureCount > 0 { | ||
| // there were some failures during half-open state, let's go back to open state to wait a bit longer | ||
| log.Error("circuit breaker goes from half-open to open again as errors persist and continue to fail all requests", | ||
| zap.String("name", cb.name), | ||
| zap.Any("config", cb.config)) | ||
| return cb.newState(now, StateOpen), errs.ErrCircuitBreakerOpen | ||
| } else if s.successCount == s.cb.config.HalfOpenSuccessCount { | ||
| // all probe requests are succeeded, we can move to closed state and allow all requests | ||
| log.Info("circuit breaker is closed and start allowing all requests", | ||
| zap.String("name", cb.name), | ||
| zap.Any("config", cb.config)) | ||
| return cb.newState(now, StateClosed), nil | ||
| } else if s.pendingCount < s.cb.config.HalfOpenSuccessCount { | ||
| // allow more probe requests and continue in half-open state | ||
| s.pendingCount++ | ||
| return s, nil | ||
| } else { | ||
| // continue in half-open state till all probe requests are done and fail all other requests for now | ||
| return s, errs.ErrCircuitBreakerOpen | ||
| } | ||
| default: | ||
| panic("unknown state") | ||
| } | ||
| } | ||
|
|
||
| func (s *State) onResult(overloaded Overloading) { | ||
| switch overloaded { | ||
| case No: | ||
| s.successCount++ | ||
| case Yes: | ||
| s.failureCount++ | ||
| default: | ||
| panic("unknown state") | ||
| } | ||
| } | ||
|
|
||
| // Define context key type | ||
| type cbCtxKey struct{} | ||
|
|
||
| // Key used to store circuit breaker | ||
| var CircuitBreakerKey = cbCtxKey{} | ||
|
|
||
| // FromContext retrieves the circuit breaker from the context | ||
| func FromContext(ctx context.Context) *CircuitBreaker { | ||
| if ctx == nil { | ||
| return nil | ||
| } | ||
| if cb, ok := ctx.Value(CircuitBreakerKey).(*CircuitBreaker); ok { | ||
| return cb | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // WithCircuitBreaker stores the circuit breaker into a new context | ||
| func WithCircuitBreaker(ctx context.Context, cb *CircuitBreaker) context.Context { | ||
| return context.WithValue(ctx, CircuitBreakerKey, cb) | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.