Skip to content

Commit ca23c92

Browse files
authored
test(resourcemanager): add service limit integration tests (#9594)
ref #9296 Add integration tests for the service limit. Signed-off-by: JmPotato <[email protected]>
1 parent 910b97f commit ca23c92

File tree

9 files changed

+215
-52
lines changed

9 files changed

+215
-52
lines changed

pkg/mcs/resourcemanager/server/grpc_service.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,9 +196,9 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu
196196
logFields[0] = zap.Uint32("keyspace-id", keyspaceID)
197197
logFields[1] = zap.String("resource-group", resourceGroupName)
198198
// Get keyspace resource group manager to apply service limit later.
199-
krgm := s.manager.getKeyspaceResourceGroupManager(keyspaceID)
199+
krgm, err := s.manager.accessKeyspaceResourceGroupManager(keyspaceID, resourceGroupName)
200200
if krgm == nil {
201-
log.Warn("keyspace resource group manager not found", logFields...)
201+
log.Warn("keyspace resource group manager not found", append(logFields, zap.Error(err))...)
202202
continue
203203
}
204204
// Get the resource group from manager to acquire token buckets.

pkg/mcs/resourcemanager/server/keyspace_manager.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@ const (
3636
maxGroupNameLength = 32
3737
middlePriority = 8
3838
maxPriority = 16
39-
unlimitedRate = math.MaxInt32
40-
unlimitedBurstLimit = -1
39+
// UnlimitedRate is the unlimited fill rate used for the default resource group.
40+
UnlimitedRate = math.MaxInt32
41+
// UnlimitedBurstLimit is the unlimited burst limit used for the default resource group.
42+
UnlimitedBurstLimit = -1
4143
// DefaultResourceGroupName is the reserved default resource group name within each keyspace.
4244
DefaultResourceGroupName = "default"
4345
// defaultRUTrackerTimeConstant is the default time-aware EMA time constant for the RU tracker.
@@ -121,8 +123,8 @@ func (krgm *keyspaceResourceGroupManager) initDefaultResourceGroup() {
121123
RUSettings: &RequestUnitSettings{
122124
RU: &GroupTokenBucket{
123125
Settings: &rmpb.TokenLimitSettings{
124-
FillRate: unlimitedRate,
125-
BurstLimit: unlimitedBurstLimit,
126+
FillRate: UnlimitedRate,
127+
BurstLimit: UnlimitedBurstLimit,
126128
},
127129
},
128130
},
@@ -638,7 +640,7 @@ func (krgm *keyspaceResourceGroupManager) cleanupOverrides() {
638640
// This ensures the burstability of the resource groups can be properly invalidated.
639641
func (krgm *keyspaceResourceGroupManager) invalidateBurstability(serviceLimit float64) {
640642
for _, group := range krgm.getMutableResourceGroupList() {
641-
if group.getBurstLimit() >= 0 {
643+
if group.getBurstLimit(true) >= 0 {
642644
continue
643645
}
644646
group.overrideBurstLimit(int64(serviceLimit))

pkg/mcs/resourcemanager/server/keyspace_manager_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ func TestInitDefaultResourceGroup(t *testing.T) {
5353
re.Equal(uint32(middlePriority), defaultGroup.Priority)
5454

5555
// Verify the default resource group has unlimited rate and burst limit.
56-
re.Equal(float64(unlimitedRate), defaultGroup.getFillRate())
57-
re.Equal(int64(unlimitedBurstLimit), defaultGroup.getBurstLimit())
56+
re.Equal(float64(UnlimitedRate), defaultGroup.getFillRate())
57+
re.Equal(int64(UnlimitedBurstLimit), defaultGroup.getBurstLimit())
5858
}
5959

6060
func TestAddResourceGroup(t *testing.T) {
@@ -815,8 +815,8 @@ func TestConciliateFillRate(t *testing.T) {
815815
name: "Default group with unlimited burst limit",
816816
serviceLimit: 100,
817817
priorityList: []uint32{1},
818-
fillRateSettingList: []uint64{unlimitedRate},
819-
burstLimitSettingList: []int64{unlimitedBurstLimit},
818+
fillRateSettingList: []uint64{UnlimitedRate},
819+
burstLimitSettingList: []int64{UnlimitedBurstLimit},
820820
ruDemandList: []float64{1000},
821821
// Default group with unlimited settings, basic demand = min(1000, unlimitedRate) = 1000
822822
// Basic demand 1000 > service limit 100, so gets proportional allocation: 100*(1000/1000)=100

pkg/mcs/resourcemanager/server/resource_group.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,6 @@ func (rg *ResourceGroup) RequestRU(
263263
) *rmpb.GrantedRUTokenBucket {
264264
rg.Lock()
265265
defer rg.Unlock()
266-
267266
if rg.RUSettings == nil || rg.RUSettings.RU.Settings == nil {
268267
return nil
269268
}

pkg/mcs/resourcemanager/server/token_buckets.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ func (gtb *GroupTokenBucket) balanceSlotTokens(
320320

321321
func (gtb *GroupTokenBucket) calcRateAndBurstLimit(ratio float64) (fillRate uint64, burstLimit int64) {
322322
if gtb.getBurstableMode() == moderated {
323-
fillRate = uint64(math.Min(gtb.getFillRate()+defaultModeratedBurstRate, unlimitedRate) * ratio)
323+
fillRate = uint64(math.Min(gtb.getFillRate()+defaultModeratedBurstRate, UnlimitedRate) * ratio)
324324
burstLimit = int64(fillRate)
325325
return
326326
}

pkg/mcs/resourcemanager/server/token_buckets_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,9 @@ func TestGroupTokenBucketRequestBurstLimit(t *testing.T) {
163163
Tokens: 200000,
164164
Settings: &rmpb.TokenLimitSettings{
165165
FillRate: 2000,
166-
BurstLimit: unlimitedBurstLimit,
166+
BurstLimit: UnlimitedBurstLimit,
167167
},
168-
}, 2000, unlimitedBurstLimit)
168+
}, 2000, UnlimitedBurstLimit)
169169

170170
testGroupSetting(&rmpb.TokenBucket{
171171
Tokens: 200000,
@@ -179,34 +179,34 @@ func TestGroupTokenBucketRequestBurstLimit(t *testing.T) {
179179
testGroupSetting(&rmpb.TokenBucket{
180180
Tokens: 200000,
181181
Settings: &rmpb.TokenLimitSettings{
182-
FillRate: unlimitedRate,
182+
FillRate: UnlimitedRate,
183183
BurstLimit: 2000,
184184
},
185-
}, unlimitedRate, 2000)
185+
}, UnlimitedRate, 2000)
186186

187187
testGroupSetting(&rmpb.TokenBucket{
188188
Tokens: 200000,
189189
Settings: &rmpb.TokenLimitSettings{
190-
FillRate: unlimitedRate,
190+
FillRate: UnlimitedRate,
191191
BurstLimit: 0,
192192
},
193-
}, unlimitedRate, 0) // burstLimit = 0 is a special case
193+
}, UnlimitedRate, 0) // burstLimit = 0 is a special case
194194

195195
testGroupSetting(&rmpb.TokenBucket{
196196
Tokens: 200000,
197197
Settings: &rmpb.TokenLimitSettings{
198-
FillRate: unlimitedRate,
199-
BurstLimit: unlimitedBurstLimit,
198+
FillRate: UnlimitedRate,
199+
BurstLimit: UnlimitedBurstLimit,
200200
},
201-
}, unlimitedRate, unlimitedBurstLimit)
201+
}, UnlimitedRate, UnlimitedBurstLimit)
202202

203203
testGroupSetting(&rmpb.TokenBucket{
204204
Tokens: 200000,
205205
Settings: &rmpb.TokenLimitSettings{
206-
FillRate: unlimitedRate,
206+
FillRate: UnlimitedRate,
207207
BurstLimit: -2,
208208
},
209-
}, unlimitedRate, unlimitedRate)
209+
}, UnlimitedRate, UnlimitedRate)
210210
}
211211

212212
func TestGroupTokenBucketRequestLoop(t *testing.T) {

tests/integrations/mcs/resourcemanager/api_test.go

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -65,18 +65,10 @@ func (suite *resourceManagerAPITestSuite) TearDownTest() {
6565
suite.cluster.Destroy()
6666
}
6767

68-
func (suite *resourceManagerAPITestSuite) getEndpoint(re *require.Assertions, elems ...string) string {
69-
endpoint, err := url.JoinPath(
70-
suite.cluster.GetLeaderServer().GetAddr(),
71-
append([]string{apis.APIPathPrefix}, elems...)...,
72-
)
73-
re.NoError(err)
74-
return endpoint
75-
}
76-
7768
// sendRequest is a helper function to send HTTP requests and handle common response processing
78-
func (suite *resourceManagerAPITestSuite) sendRequest(
69+
func sendRequest(
7970
re *require.Assertions,
71+
leaderAddr string,
8072
method, path string,
8173
queryParams url.Values,
8274
body any,
@@ -87,7 +79,8 @@ func (suite *resourceManagerAPITestSuite) sendRequest(
8779
re.NoError(err)
8880
bodyReader = bytes.NewBuffer(data)
8981
}
90-
path = suite.getEndpoint(re, path)
82+
path, err := url.JoinPath(leaderAddr, apis.APIPathPrefix, path)
83+
re.NoError(err)
9184
if len(queryParams) > 0 {
9285
path += "?" + queryParams.Encode()
9386
}
@@ -108,7 +101,7 @@ func (suite *resourceManagerAPITestSuite) mustSendRequest(
108101
method, path string,
109102
body any,
110103
) []byte {
111-
bodyBytes, statusCode := suite.sendRequest(re, method, path, nil, body)
104+
bodyBytes, statusCode := sendRequest(re, suite.cluster.GetLeaderServer().GetAddr(), method, path, nil, body)
112105
re.Equal(http.StatusOK, statusCode, string(bodyBytes))
113106
return bodyBytes
114107
}
@@ -270,7 +263,7 @@ func (suite *resourceManagerAPITestSuite) mustUpdateResourceGroup(re *require.As
270263
func (suite *resourceManagerAPITestSuite) mustGetResourceGroup(re *require.Assertions, name string, keyspaceName string) *server.ResourceGroup {
271264
queryParams := url.Values{}
272265
queryParams.Set("keyspace_name", keyspaceName)
273-
bodyBytes, statusCode := suite.sendRequest(re, http.MethodGet, "/config/group/"+name, queryParams, nil)
266+
bodyBytes, statusCode := sendRequest(re, suite.server.GetAddr(), http.MethodGet, "/config/group/"+name, queryParams, nil)
274267
if statusCode != http.StatusOK {
275268
re.Equal(http.StatusNotFound, statusCode)
276269
return nil
@@ -283,7 +276,7 @@ func (suite *resourceManagerAPITestSuite) mustGetResourceGroup(re *require.Asser
283276
func (suite *resourceManagerAPITestSuite) mustGetResourceGroupList(re *require.Assertions, keyspaceName string) []*server.ResourceGroup {
284277
queryParams := url.Values{}
285278
queryParams.Set("keyspace_name", keyspaceName)
286-
bodyBytes, statusCode := suite.sendRequest(re, http.MethodGet, "/config/groups", queryParams, nil)
279+
bodyBytes, statusCode := sendRequest(re, suite.server.GetAddr(), http.MethodGet, "/config/groups", queryParams, nil)
287280
if statusCode != http.StatusOK {
288281
re.Equal(http.StatusNotFound, statusCode)
289282
return nil
@@ -296,7 +289,7 @@ func (suite *resourceManagerAPITestSuite) mustGetResourceGroupList(re *require.A
296289
func (suite *resourceManagerAPITestSuite) mustDeleteResourceGroup(re *require.Assertions, name string, keyspaceName string) {
297290
queryParams := url.Values{}
298291
queryParams.Set("keyspace_name", keyspaceName)
299-
bodyBytes, statusCode := suite.sendRequest(re, http.MethodDelete, "/config/group/"+name, queryParams, nil)
292+
bodyBytes, statusCode := sendRequest(re, suite.server.GetAddr(), http.MethodDelete, "/config/group/"+name, queryParams, nil)
300293
if statusCode != http.StatusOK {
301294
re.Equal(http.StatusNotFound, statusCode)
302295
return
@@ -344,9 +337,10 @@ func (suite *resourceManagerAPITestSuite) TestKeyspaceServiceLimitAPI() {
344337
},
345338
)
346339
re.NoError(err)
340+
leaderAddr := suite.cluster.GetLeaderServer().GetAddr()
347341
for _, keyspaceName := range []string{"", "test_keyspace"} {
348342
// Get the keyspace service limit.
349-
limit, statusCode := suite.tryToGetKeyspaceServiceLimit(re, keyspaceName)
343+
limit, statusCode := tryToGetKeyspaceServiceLimit(re, leaderAddr, keyspaceName)
350344
if len(keyspaceName) == 0 {
351345
// The null keyspace is always available.
352346
re.Equal(http.StatusOK, statusCode)
@@ -357,29 +351,29 @@ func (suite *resourceManagerAPITestSuite) TestKeyspaceServiceLimitAPI() {
357351
re.Equal(0.0, limit)
358352
}
359353
// Try to set the keyspace service limit to a negative value.
360-
resp, statusCode := suite.tryToSetKeyspaceServiceLimit(re, keyspaceName, -1.0)
354+
resp, statusCode := tryToSetKeyspaceServiceLimit(re, leaderAddr, keyspaceName, -1.0)
361355
re.Equal(http.StatusBadRequest, statusCode)
362356
re.Equal("service_limit must be non-negative", resp)
363357
// Set the keyspace service limit to a positive value.
364-
resp, statusCode = suite.tryToSetKeyspaceServiceLimit(re, keyspaceName, 1.0)
358+
resp, statusCode = tryToSetKeyspaceServiceLimit(re, leaderAddr, keyspaceName, 1.0)
365359
re.Equal(http.StatusOK, statusCode)
366360
re.Equal("Success!", resp)
367-
limit, statusCode = suite.tryToGetKeyspaceServiceLimit(re, keyspaceName)
361+
limit, statusCode = tryToGetKeyspaceServiceLimit(re, leaderAddr, keyspaceName)
368362
re.Equal(http.StatusOK, statusCode)
369363
re.Equal(1.0, limit)
370364
}
371365
// Try to set a non-existing keyspace's service limit.
372-
resp, statusCode := suite.tryToSetKeyspaceServiceLimit(re, "non_existing_keyspace", 1.0)
366+
resp, statusCode := tryToSetKeyspaceServiceLimit(re, leaderAddr, "non_existing_keyspace", 1.0)
373367
re.Equal(http.StatusBadRequest, statusCode)
374368
re.Equal("keyspace not found with name: non_existing_keyspace", resp)
375369
// Try to get a non-existing keyspace's service limit.
376-
limit, statusCode := suite.tryToGetKeyspaceServiceLimit(re, "non_existing_keyspace")
370+
limit, statusCode := tryToGetKeyspaceServiceLimit(re, leaderAddr, "non_existing_keyspace")
377371
re.Equal(http.StatusBadRequest, statusCode)
378372
re.Equal(0.0, limit)
379373
}
380374

381-
func (suite *resourceManagerAPITestSuite) tryToGetKeyspaceServiceLimit(re *require.Assertions, keyspaceName string) (float64, int) {
382-
bodyBytes, statusCode := suite.sendRequest(re, http.MethodGet, "/config/keyspace/service-limit/"+keyspaceName, nil, nil)
375+
func tryToGetKeyspaceServiceLimit(re *require.Assertions, leaderAddr, keyspaceName string) (float64, int) {
376+
bodyBytes, statusCode := sendRequest(re, leaderAddr, http.MethodGet, "/config/keyspace/service-limit/"+keyspaceName, nil, nil)
383377
if statusCode != http.StatusOK {
384378
return 0.0, statusCode
385379
}
@@ -390,9 +384,10 @@ func (suite *resourceManagerAPITestSuite) tryToGetKeyspaceServiceLimit(re *requi
390384
return limiter.ServiceLimit, statusCode
391385
}
392386

393-
func (suite *resourceManagerAPITestSuite) tryToSetKeyspaceServiceLimit(re *require.Assertions, keyspaceName string, limit float64) (string, int) {
394-
bodyBytes, statusCode := suite.sendRequest(
387+
func tryToSetKeyspaceServiceLimit(re *require.Assertions, leaderAddr, keyspaceName string, limit float64) (string, int) {
388+
bodyBytes, statusCode := sendRequest(
395389
re,
390+
leaderAddr,
396391
http.MethodPost,
397392
"/config/keyspace/service-limit/"+keyspaceName,
398393
nil,

tests/integrations/mcs/resourcemanager/resource_manager_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func (suite *resourceManagerClientTestSuite) SetupSuite() {
9292
re.NoError(err)
9393
leader := suite.cluster.GetServer(suite.cluster.WaitLeader())
9494
re.NotNil(leader)
95-
waitLeader(re, suite.client, leader.GetAddr())
95+
waitLeaderServingClient(re, suite.client, leader.GetAddr())
9696

9797
suite.initGroups = []*rmpb.ResourceGroup{
9898
{
@@ -149,7 +149,7 @@ func (suite *resourceManagerClientTestSuite) SetupSuite() {
149149
}
150150
}
151151

152-
func waitLeader(re *require.Assertions, cli pd.Client, leaderAddr string) {
152+
func waitLeaderServingClient(re *require.Assertions, cli pd.Client, leaderAddr string) {
153153
innerCli, ok := cli.(interface{ GetServiceDiscovery() sd.ServiceDiscovery })
154154
re.True(ok)
155155
re.NotNil(innerCli)
@@ -191,7 +191,7 @@ func (suite *resourceManagerClientTestSuite) resignAndWaitLeader(re *require.Ass
191191
re.NoError(suite.cluster.ResignLeader())
192192
newLeader := suite.cluster.GetServer(suite.cluster.WaitLeader())
193193
re.NotNil(newLeader)
194-
waitLeader(re, suite.client, newLeader.GetAddr())
194+
waitLeaderServingClient(re, suite.client, newLeader.GetAddr())
195195
}
196196

197197
func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() {

0 commit comments

Comments
 (0)