Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 16 additions & 20 deletions pkg/mcs/resourcemanager/server/token_buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ import (
)

const (
defaultRefillRate = 10000
defaultModeratedBurstRate = 10000
defaultInitialTokens = 10 * 10000
defaultReserveRatio = 0.5
defaultLoanCoefficient = 2
maxAssignTokens = math.MaxFloat64 / 1024 // assume max client connect is 1024
slotExpireTimeout = 10 * time.Minute
defaultRefillRate = 10000
defaultModeratedBurstRate = 10000
defaultInitialTokens = 10 * 10000
defaultReserveRatio = 0.5
defaultLoanCoefficient = 2
maxAssignTokens = math.MaxFloat64 / 1024 // assume max client connect is 1024
slotExpireTimeout = 10 * time.Minute
defaultConsumptionBiasWeight = 0.75
)

type burstableMode int
Expand Down Expand Up @@ -233,10 +234,10 @@ func (gts *GroupTokenBucketState) resetLoan() {
}

func (gtb *GroupTokenBucket) balanceSlotTokens(
now time.Time,
clientUniqueID uint64,
requiredToken, tokensForBalance float64,
) {
now := time.Now()
slot, exist := gtb.tokenSlots[clientUniqueID]
if !exist {
// Only slots that require a positive number will be considered alive,
Expand All @@ -258,10 +259,10 @@ func (gtb *GroupTokenBucket) balanceSlotTokens(
}
}

if time.Since(gtb.lastCheckExpireSlot) >= slotExpireTimeout {
if now.Sub(gtb.lastCheckExpireSlot) >= slotExpireTimeout {
gtb.lastCheckExpireSlot = now
for clientUniqueID, slot := range gtb.tokenSlots {
if time.Since(slot.lastReqTime) >= slotExpireTimeout {
if now.Sub(slot.lastReqTime) >= slotExpireTimeout {
delete(gtb.tokenSlots, clientUniqueID)
log.Info("delete resource group slot because expire",
zap.Time("last-req-time", slot.lastReqTime),
Expand Down Expand Up @@ -293,15 +294,10 @@ func (gtb *GroupTokenBucket) balanceSlotTokens(

slot.fillRate, slot.burstLimit = gtb.calcRateAndBurstLimit(evenRatio)
} else {
// In order to have fewer tokens available to clients that are currently consuming more.
// We have the following formula:
// client1: (1 - a/N + 1/N) * 1/N
// client2: (1 - b/N + 1/N) * 1/N
// ...
// clientN: (1 - n/N + 1/N) * 1/N
// Sum is:
// (N - (a+b+...+n)/N +1) * 1/N => (N - 1 + 1) * 1/N => 1
ratio := (1 - slot.requireTokensSum/gtb.clientConsumptionTokensSum + evenRatio) * evenRatio
// Blend historical consumption with an even share so active clients receive
// proportionally higher refill rates without starving low-traffic ones.
consumptionShare := slot.requireTokensSum / gtb.clientConsumptionTokensSum
ratio := evenRatio*(1-defaultConsumptionBiasWeight) + consumptionShare*defaultConsumptionBiasWeight

assignTokens := tokensForBalance * ratio
fillRate, burstLimit := gtb.calcRateAndBurstLimit(ratio)
Expand Down Expand Up @@ -428,7 +424,7 @@ func (gtb *GroupTokenBucket) updateTokens(now time.Time, burstLimit int64, clien
gtb.resetLoan()
}
// Balance each slots.
gtb.balanceSlotTokens(clientUniqueID, requiredToken, tokensForBalance)
gtb.balanceSlotTokens(now, clientUniqueID, requiredToken, tokensForBalance)
}

func (gtb *GroupTokenBucket) inspectAnomalies(
Expand Down