Skip to content

Commit aa75cbb

Browse files
authored
fix(resourcemanager): add validation for token assignment anomaly (#9597)
ref #9455 Add safeguard checks and warnings during token assignment to handle potential invalid token values. Signed-off-by: JmPotato <[email protected]>
1 parent ca23c92 commit aa75cbb

File tree

4 files changed

+99
-33
lines changed

4 files changed

+99
-33
lines changed

pkg/mcs/resourcemanager/server/resource_group.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,9 @@ func (rus *RequestUnitSettings) Clone() *RequestUnitSettings {
6969
}
7070

7171
// NewRequestUnitSettings creates a new RequestUnitSettings with the given token bucket.
72-
func NewRequestUnitSettings(tokenBucket *rmpb.TokenBucket) *RequestUnitSettings {
72+
func NewRequestUnitSettings(resourceGroupName string, tokenBucket *rmpb.TokenBucket) *RequestUnitSettings {
7373
return &RequestUnitSettings{
74-
RU: NewGroupTokenBucket(tokenBucket),
74+
RU: NewGroupTokenBucket(resourceGroupName, tokenBucket),
7575
}
7676
}
7777

@@ -241,9 +241,9 @@ func FromProtoResourceGroup(group *rmpb.ResourceGroup) *ResourceGroup {
241241
switch group.GetMode() {
242242
case rmpb.GroupMode_RUMode:
243243
if group.GetRUSettings() == nil {
244-
rg.RUSettings = NewRequestUnitSettings(nil)
244+
rg.RUSettings = NewRequestUnitSettings(rg.Name, nil)
245245
} else {
246-
rg.RUSettings = NewRequestUnitSettings(group.GetRUSettings().GetRU())
246+
rg.RUSettings = NewRequestUnitSettings(rg.Name, group.GetRUSettings().GetRU())
247247
}
248248
if group.RUStats != nil {
249249
rg.RUConsumption = group.RUStats
@@ -268,6 +268,9 @@ func (rg *ResourceGroup) RequestRU(
268268
}
269269
// First, try to get tokens from the resource group.
270270
tb, trickleTimeMs := rg.RUSettings.RU.request(now, requiredToken, targetPeriodMs, clientUniqueID)
271+
if tb == nil {
272+
return nil
273+
}
271274
// Then, try to apply the service limit.
272275
grantedTokens := tb.GetTokens()
273276
limitedTokens := sl.applyServiceLimit(now, grantedTokens)

pkg/mcs/resourcemanager/server/resource_group_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626

2727
func TestPatchResourceGroup(t *testing.T) {
2828
re := require.New(t)
29-
rg := &ResourceGroup{Name: "test", Mode: rmpb.GroupMode_RUMode, RUSettings: NewRequestUnitSettings(nil)}
29+
rg := &ResourceGroup{Name: testResourceGroupName, Mode: rmpb.GroupMode_RUMode, RUSettings: NewRequestUnitSettings(testResourceGroupName, nil)}
3030
testCaseRU := []struct {
3131
patchJSONString string
3232
expectJSONString string

pkg/mcs/resourcemanager/server/token_buckets.go

Lines changed: 83 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,24 @@ type tokenSlot struct {
148148
lastReqTime time.Time
149149
}
150150

151+
func (ts *tokenSlot) logFields() []zap.Field {
152+
return []zap.Field{
153+
zap.Uint64("slot-fill-rate", ts.fillRate),
154+
zap.Int64("slot-burst-limit", ts.burstLimit),
155+
zap.Float64("slot-require-tokens-sum", ts.requireTokensSum),
156+
zap.Float64("slot-token-capacity", ts.tokenCapacity),
157+
zap.Float64("slot-last-token-capacity", ts.lastTokenCapacity),
158+
zap.Time("slot-last-req-time", ts.lastReqTime),
159+
}
160+
}
161+
151162
// GroupTokenBucketState is the running state of TokenBucket.
152163
type GroupTokenBucketState struct {
153-
Tokens float64 `json:"tokens,omitempty"`
164+
Tokens float64 `json:"tokens,omitempty"`
165+
LastUpdate *time.Time `json:"last_update,omitempty"`
166+
Initialized bool `json:"initialized"`
167+
168+
resourceGroupName string
154169
// ClientUniqueID -> TokenSlot
155170
tokenSlots map[uint64]*tokenSlot
156171
clientConsumptionTokensSum float64
@@ -173,8 +188,6 @@ type GroupTokenBucketState struct {
173188
// means the burst limit is overridden.
174189
overrideBurstLimit int64
175190

176-
LastUpdate *time.Time `json:"last_update,omitempty"`
177-
Initialized bool `json:"initialized"`
178191
// settingChanged is used to avoid that the number of tokens returned is jitter because of changing fill rate.
179192
settingChanged bool
180193
lastCheckExpireSlot time.Time
@@ -198,6 +211,7 @@ func (gts *GroupTokenBucketState) clone() *GroupTokenBucketState {
198211
Tokens: gts.Tokens,
199212
LastUpdate: lastUpdate,
200213
Initialized: gts.Initialized,
214+
resourceGroupName: gts.resourceGroupName,
201215
tokenSlots: tokenSlots,
202216
overrideFillRate: gts.overrideFillRate,
203217
overrideBurstLimit: gts.overrideBurstLimit,
@@ -210,16 +224,11 @@ func (gts *GroupTokenBucketState) resetLoan() {
210224
gts.settingChanged = false
211225
gts.Tokens = 0
212226
gts.clientConsumptionTokensSum = 0
213-
evenRatio := 1.0
214-
if l := len(gts.tokenSlots); l > 0 {
215-
evenRatio = 1 / float64(l)
216-
}
217-
218-
evenTokens := gts.Tokens * evenRatio
227+
// Reset all slots.
219228
for _, slot := range gts.tokenSlots {
220229
slot.requireTokensSum = 0
221-
slot.tokenCapacity = evenTokens
222-
slot.lastTokenCapacity = evenTokens
230+
slot.tokenCapacity = 0
231+
slot.lastTokenCapacity = 0
223232
}
224233
}
225234

@@ -330,14 +339,15 @@ func (gtb *GroupTokenBucket) calcRateAndBurstLimit(ratio float64) (fillRate uint
330339
}
331340

332341
// NewGroupTokenBucket returns a new GroupTokenBucket
333-
func NewGroupTokenBucket(tokenBucket *rmpb.TokenBucket) *GroupTokenBucket {
342+
func NewGroupTokenBucket(resourceGroupName string, tokenBucket *rmpb.TokenBucket) *GroupTokenBucket {
334343
if tokenBucket == nil || tokenBucket.Settings == nil {
335344
return &GroupTokenBucket{}
336345
}
337346
return &GroupTokenBucket{
338347
Settings: tokenBucket.GetSettings(),
339348
GroupTokenBucketState: GroupTokenBucketState{
340349
Tokens: tokenBucket.GetTokens(),
350+
resourceGroupName: resourceGroupName,
341351
tokenSlots: make(map[uint64]*tokenSlot),
342352
overrideFillRate: -1,
343353
overrideBurstLimit: -1,
@@ -418,43 +428,93 @@ func (gtb *GroupTokenBucket) updateTokens(now time.Time, burstLimit int64, clien
418428
gtb.balanceSlotTokens(clientUniqueID, requiredToken, elapseTokens)
419429
}
420430

431+
func (gtb *GroupTokenBucket) inspectAnomalies(
432+
tb *rmpb.TokenBucket,
433+
slot *tokenSlot,
434+
logFields []zap.Field,
435+
) bool {
436+
var errMsg string
437+
// Verify whether the allocated token is invalid, such as negative values, math.Inf, or math.NaN.
438+
if tb.Tokens <= 0 || math.IsInf(tb.Tokens, 0) || math.IsNaN(tb.Tokens) {
439+
errMsg = "assigned token is invalid"
440+
}
441+
// Verify whether the state of the slot is abnormal.
442+
if math.IsInf(slot.tokenCapacity, 0) || math.IsNaN(slot.tokenCapacity) {
443+
errMsg = "slot token capacity is invalid"
444+
}
445+
// If there is any error, reset the group token bucket to avoid the group token bucket is in a bad state.
446+
isAnomaly := len(errMsg) > 0
447+
if isAnomaly {
448+
logFields = append(logFields,
449+
append(
450+
slot.logFields(),
451+
zap.String("resource-group-name", gtb.resourceGroupName),
452+
zap.String("settings", gtb.Settings.String()),
453+
zap.Float64("tokens", gtb.Tokens),
454+
zap.Float64("client-consumption-tokens-sum", gtb.clientConsumptionTokensSum),
455+
zap.Int("slot-len", len(gtb.tokenSlots)),
456+
)...,
457+
)
458+
log.Error(errMsg, logFields...)
459+
// Reset after logging to keep the original context.
460+
gtb.resetLoan()
461+
}
462+
return isAnomaly
463+
}
464+
421465
// request requests tokens from the corresponding slot.
422-
func (gtb *GroupTokenBucket) request(now time.Time,
466+
func (gtb *GroupTokenBucket) request(
467+
now time.Time,
423468
requiredToken float64,
424469
targetPeriodMs, clientUniqueID uint64,
425470
) (*rmpb.TokenBucket, int64) {
426471
burstLimit := gtb.getBurstLimit()
427472
gtb.updateTokens(now, burstLimit, clientUniqueID, requiredToken)
428473
slot, ok := gtb.tokenSlots[clientUniqueID]
429474
if !ok {
430-
return &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{BurstLimit: burstLimit}}, 0
475+
return &rmpb.TokenBucket{
476+
Settings: &rmpb.TokenLimitSettings{BurstLimit: burstLimit},
477+
Tokens: 0.0,
478+
}, 0
431479
}
432480
res, trickleDuration := slot.assignSlotTokens(requiredToken, targetPeriodMs)
481+
// Inspect the group token bucket and the assigned token result to catch any anomalies.
482+
if isAnomaly := gtb.inspectAnomalies(res, slot, []zap.Field{
483+
zap.Time("now", now),
484+
zap.Uint64("client-unique-id", clientUniqueID),
485+
zap.Uint64("target-period-ms", targetPeriodMs),
486+
zap.Float64("required-token", requiredToken),
487+
zap.Float64("assigned-tokens", res.Tokens),
488+
}); isAnomaly {
489+
// Return nil here to prevent sending any unexpected result to the client.
490+
// The client has to retry later to access the resource group whose state has been reset.
491+
return nil, 0
492+
}
433493
// Update bucket to record all tokens.
434494
gtb.Tokens -= slot.lastTokenCapacity - slot.tokenCapacity
435495
slot.lastTokenCapacity = slot.tokenCapacity
436-
437496
return res, trickleDuration
438497
}
439498

440499
func (ts *tokenSlot) assignSlotTokens(requiredToken float64, targetPeriodMs uint64) (*rmpb.TokenBucket, int64) {
441-
var res rmpb.TokenBucket
442-
burstLimit := ts.burstLimit
443-
res.Settings = &rmpb.TokenLimitSettings{BurstLimit: burstLimit}
500+
res := &rmpb.TokenBucket{
501+
Settings: &rmpb.TokenLimitSettings{BurstLimit: ts.burstLimit},
502+
Tokens: 0.0,
503+
}
444504
if getBurstableMode(res.Settings) == unlimited {
445505
res.Tokens = requiredToken
446-
return &res, 0
506+
return res, 0
447507
}
448508
// FillRate is used for the token server unavailable in abnormal situation.
449509
if requiredToken <= 0 {
450-
return &res, 0
510+
return res, 0
451511
}
452512
// If the current tokens can directly meet the requirement, returns the need token.
453513
if ts.tokenCapacity >= requiredToken {
454514
ts.tokenCapacity -= requiredToken
455515
// granted the total request tokens
456516
res.Tokens = requiredToken
457-
return &res, 0
517+
return res, 0
458518
}
459519

460520
// Firstly allocate the remaining tokens
@@ -472,6 +532,7 @@ func (ts *tokenSlot) assignSlotTokens(requiredToken float64, targetPeriodMs uint
472532
targetPeriodTimeSec = targetPeriodTime.Seconds()
473533
trickleTime = 0.
474534
fillRate = ts.fillRate
535+
burstLimit = ts.burstLimit
475536
)
476537

477538
loanCoefficient := defaultLoanCoefficient
@@ -547,5 +608,5 @@ func (ts *tokenSlot) assignSlotTokens(requiredToken float64, targetPeriodMs uint
547608
} else {
548609
trickleDuration = targetPeriodTime
549610
}
550-
return &res, trickleDuration.Milliseconds()
611+
return res, trickleDuration.Milliseconds()
551612
}

pkg/mcs/resourcemanager/server/token_buckets_test.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
2525
)
2626

27+
const testResourceGroupName = "test"
28+
2729
func TestGroupTokenBucketUpdateAndPatch(t *testing.T) {
2830
re := require.New(t)
2931
tbSetting := &rmpb.TokenBucket{
@@ -35,7 +37,7 @@ func TestGroupTokenBucketUpdateAndPatch(t *testing.T) {
3537
}
3638

3739
clientUniqueID := uint64(0)
38-
tb := NewGroupTokenBucket(tbSetting)
40+
tb := NewGroupTokenBucket(testResourceGroupName, tbSetting)
3941
time1 := time.Now()
4042
tb.request(time1, 0, 0, clientUniqueID)
4143
re.LessOrEqual(math.Abs(tbSetting.Tokens-tb.Tokens), 1e-7)
@@ -62,7 +64,7 @@ func TestGroupTokenBucketUpdateAndPatch(t *testing.T) {
6264
BurstLimit: -1,
6365
},
6466
}
65-
tb = NewGroupTokenBucket(tbSetting)
67+
tb = NewGroupTokenBucket(testResourceGroupName, tbSetting)
6668
tb.request(time2, 0, 0, clientUniqueID)
6769
re.LessOrEqual(math.Abs(tbSetting.Tokens), 1e-7)
6870
time3 := time.Now()
@@ -76,7 +78,7 @@ func TestGroupTokenBucketUpdateAndPatch(t *testing.T) {
7678
BurstLimit: -1,
7779
},
7880
}
79-
tb = NewGroupTokenBucket(tbSetting)
81+
tb = NewGroupTokenBucket(testResourceGroupName, tbSetting)
8082
tb.request(time3, 0, 0, clientUniqueID)
8183
re.LessOrEqual(math.Abs(tbSetting.Tokens-200000), 1e-7)
8284
time.Sleep(10 * time.Millisecond)
@@ -95,7 +97,7 @@ func TestGroupTokenBucketRequest(t *testing.T) {
9597
},
9698
}
9799

98-
gtb := NewGroupTokenBucket(tbSetting)
100+
gtb := NewGroupTokenBucket(testResourceGroupName, tbSetting)
99101
time1 := time.Now()
100102
clientUniqueID := uint64(0)
101103
tb, trickle := gtb.request(time1, 190000, uint64(time.Second)*10/uint64(time.Millisecond), clientUniqueID)
@@ -128,7 +130,7 @@ func TestGroupTokenBucketRequest(t *testing.T) {
128130
func TestGroupTokenBucketRequestBurstLimit(t *testing.T) {
129131
re := require.New(t)
130132
testGroupSetting := func(tbSetting *rmpb.TokenBucket, expectedFillRate, expectedBurstLimit int64) {
131-
gtb := NewGroupTokenBucket(tbSetting)
133+
gtb := NewGroupTokenBucket(testResourceGroupName, tbSetting)
132134
time1 := time.Now()
133135
clientUniqueID := uint64(0)
134136
gtb.request(time1, 190000, uint64(time.Second)*10/uint64(time.Millisecond), clientUniqueID)
@@ -219,7 +221,7 @@ func TestGroupTokenBucketRequestLoop(t *testing.T) {
219221
},
220222
}
221223

222-
gtb := NewGroupTokenBucket(tbSetting)
224+
gtb := NewGroupTokenBucket(testResourceGroupName, tbSetting)
223225
clientUniqueID := uint64(0)
224226
initialTime := time.Now()
225227

0 commit comments

Comments
 (0)