diff --git a/tests/integrations/mcs/resource_manager/resource_manager_test.go b/tests/integrations/mcs/resource_manager/resource_manager_test.go index 96c555ccd54..cd03e814800 100644 --- a/tests/integrations/mcs/resource_manager/resource_manager_test.go +++ b/tests/integrations/mcs/resource_manager/resource_manager_test.go @@ -179,15 +179,15 @@ func (suite *resourceManagerClientTestSuite) resignAndWaitLeader() { func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { re := suite.Require() cli := suite.client + groupNamePrefix := "watch_test" group := &rmpb.ResourceGroup{ - Name: "test", + Name: groupNamePrefix, Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ Settings: &rmpb.TokenLimitSettings{ FillRate: 10000, }, - Tokens: 100000, }, }, } @@ -199,7 +199,7 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { var meta *rmpb.ResourceGroup groupsNum := 10 for i := 0; i < groupsNum; i++ { - group.Name = "test" + strconv.Itoa(i) + group.Name = groupNamePrefix + strconv.Itoa(i) resp, err := cli.AddResourceGroup(suite.ctx, group) re.NoError(err) re.Contains(resp, "Success!") @@ -212,25 +212,25 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { re.NotNil(meta) } // Mock modify resource groups - modifySettings := func(gs *rmpb.ResourceGroup) { + modifySettings := func(gs *rmpb.ResourceGroup, fillRate uint64) { gs.RUSettings = &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ Settings: &rmpb.TokenLimitSettings{ - FillRate: 20000, + FillRate: fillRate, }, }, } } for i := 0; i < groupsNum; i++ { - group.Name = "test" + strconv.Itoa(i) - modifySettings(group) + group.Name = groupNamePrefix + strconv.Itoa(i) + modifySettings(group, 20000) resp, err := cli.ModifyResourceGroup(suite.ctx, group) re.NoError(err) re.Contains(resp, "Success!") } for i := 0; i < groupsNum; i++ { testutil.Eventually(re, func() bool { - name := "test" + strconv.Itoa(i) + name := groupNamePrefix + strconv.Itoa(i) meta = controller.GetActiveResourceGroup(name) if meta != nil { return meta.RUSettings.RU.Settings.FillRate == uint64(20000) @@ -241,7 +241,7 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { // Mock reset watch stream re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/watchStreamError", "return(true)")) - group.Name = "test" + strconv.Itoa(groupsNum) + group.Name = groupNamePrefix + strconv.Itoa(groupsNum) resp, err := cli.AddResourceGroup(suite.ctx, group) re.NoError(err) re.Contains(resp, "Success!") @@ -249,13 +249,13 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { meta, err = controller.GetResourceGroup(group.Name) re.NotNil(meta) re.NoError(err) - modifySettings(group) + modifySettings(group, 30000) resp, err = cli.ModifyResourceGroup(suite.ctx, group) re.NoError(err) re.Contains(resp, "Success!") testutil.Eventually(re, func() bool { meta = controller.GetActiveResourceGroup(group.Name) - return meta.RUSettings.RU.Settings.FillRate == uint64(20000) + return meta.RUSettings.RU.Settings.FillRate == uint64(30000) }, testutil.WithTickInterval(100*time.Millisecond)) re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/watchStreamError")) @@ -263,7 +263,7 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { suite.cleanupResourceGroups() for i := 0; i < groupsNum; i++ { testutil.Eventually(re, func() bool { - name := "test" + strconv.Itoa(i) + name := groupNamePrefix + strconv.Itoa(i) meta = controller.GetActiveResourceGroup(name) return meta == nil }, testutil.WithTickInterval(50*time.Millisecond)) @@ -289,7 +289,7 @@ func (suite *resourceManagerClientTestSuite) TestWatchWithSingleGroupByKeyspace( // Mock add resource group. group := &rmpb.ResourceGroup{ - Name: "test", + Name: "keyspace_test", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ @@ -312,6 +312,28 @@ func (suite *resourceManagerClientTestSuite) TestWatchWithSingleGroupByKeyspace( controllerKeySpace.OnRequestWait(suite.ctx, group.Name, tcs.makeReadRequest()) metaKeySpace := controllerKeySpace.GetActiveResourceGroup(group.Name) re.Equal(metaKeySpace.RUSettings.RU, group.RUSettings.RU) + + // Mock modify resource groups + modifySettings := func(gs *rmpb.ResourceGroup, fillRate uint64) { + gs.RUSettings = &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: fillRate, + }, + }, + } + } + modifySettings(group, 20000) + resp, err = cli.ModifyResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + + testutil.Eventually(re, func() bool { + meta = controller.GetActiveResourceGroup(group.Name) + return meta.RUSettings.RU.Settings.FillRate == uint64(20000) + }, testutil.WithTickInterval(100*time.Millisecond)) + metaKeySpace = controllerKeySpace.GetActiveResourceGroup(group.Name) + re.Equal(metaKeySpace.RUSettings.RU.Settings.FillRate, uint64(10000)) } const buffDuration = time.Millisecond * 300 @@ -351,11 +373,21 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { re := suite.Require() cli := suite.client - for _, group := range suite.initGroups { - resp, err := cli.AddResourceGroup(suite.ctx, group) - re.NoError(err) - re.Contains(resp, "Success!") + rg := &rmpb.ResourceGroup{ + Name: "controller_test", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: 10000, + }, + Tokens: 100000, + }, + }, } + resp, err := cli.AddResourceGroup(suite.ctx, rg) + re.NoError(err) + re.Contains(resp, "Success!") cfg := &controller.RequestUnitConfig{ ReadBaseCost: 1, @@ -374,7 +406,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { len int }{ { - resourceGroupName: suite.initGroups[0].Name, + resourceGroupName: rg.Name, len: 8, tcs: []tokenConsumptionPerSecond{ {rruTokensAtATime: 50, wruTokensAtATime: 20, times: 100, waitDuration: 0}, @@ -425,7 +457,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate", "return(true)")) tcs := tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 900000000, times: 1, waitDuration: 0} wreq := tcs.makeWriteRequest() - _, _, err := controller.OnRequestWait(suite.ctx, suite.initGroups[0].Name, wreq) + _, _, err = controller.OnRequestWait(suite.ctx, rg.Name, wreq) re.Error(err) time.Sleep(time.Millisecond * 200) re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate")) @@ -568,7 +600,22 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { re := suite.Require() cli := suite.client - for _, group := range suite.initGroups { + groupNames := []string{"penalty_test1", "penalty_test2"} + // Mock add 2 resource groups. + group := &rmpb.ResourceGroup{ + Name: groupNames[0], + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: 10000, + }, + Tokens: 100000, + }, + }, + } + for _, name := range groupNames { + group.Name = name resp, err := cli.AddResourceGroup(suite.ctx, group) re.NoError(err) re.Contains(resp, "Success!") @@ -584,7 +631,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { c, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg, controller.EnableSingleGroupByKeyspace()) c.Start(suite.ctx) - resourceGroupName := suite.initGroups[1].Name + resourceGroupName := groupNames[0] // init req := controller.NewTestRequestInfo(false, 0, 2 /* store2 */) resp := controller.NewTestResponseInfo(0, time.Duration(30), true) @@ -643,7 +690,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { re.NoError(err) // from different group, should be zero - resourceGroupName = suite.initGroups[2].Name + resourceGroupName = groupNames[1] req4 := controller.NewTestRequestInfo(true, 50, 1 /* store2 */) resp4 := controller.NewTestResponseInfo(0, time.Duration(10), true) _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req4) @@ -657,8 +704,7 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() { re := suite.Require() cli := suite.client - groups := make([]*rmpb.ResourceGroup, 0) - groups = append(groups, suite.initGroups...) + groups := suite.initGroups for _, group := range groups { resp, err := cli.AddResourceGroup(suite.ctx, group) re.NoError(err) @@ -670,7 +716,6 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() { } re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/resource_manager/server/fastPersist", `return(true)`)) suite.resignAndWaitLeader() - groups = append(groups, &rmpb.ResourceGroup{Name: "test3"}) for i := 0; i < 3; i++ { for _, group := range groups { requests := make([]*rmpb.RequestUnitItem, 0) @@ -730,8 +775,8 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { expectMarshal string modifySettings func(*rmpb.ResourceGroup) }{ - {"test1", rmpb.GroupMode_RUMode, true, true, - `{"name":"test1","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":10000},"state":{"initialized":false}}},"priority":0}`, + {"CRUD_test1", rmpb.GroupMode_RUMode, true, true, + `{"name":"CRUD_test1","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":10000},"state":{"initialized":false}}},"priority":0}`, func(gs *rmpb.ResourceGroup) { gs.RUSettings = &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ @@ -743,8 +788,8 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { }, }, - {"test2", rmpb.GroupMode_RUMode, true, true, - `{"name":"test2","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":20000},"state":{"initialized":false}}},"priority":0}`, + {"CRUD_test2", rmpb.GroupMode_RUMode, true, true, + `{"name":"CRUD_test2","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":20000},"state":{"initialized":false}}},"priority":0}`, func(gs *rmpb.ResourceGroup) { gs.RUSettings = &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ @@ -755,8 +800,8 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { } }, }, - {"test2", rmpb.GroupMode_RUMode, false, true, - `{"name":"test2","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":30000,"burst_limit":-1},"state":{"initialized":false}}},"priority":0}`, + {"CRUD_test2", rmpb.GroupMode_RUMode, false, true, + `{"name":"CRUD_test2","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":30000,"burst_limit":-1},"state":{"initialized":false}}},"priority":0}`, func(gs *rmpb.ResourceGroup) { gs.RUSettings = &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ @@ -971,15 +1016,10 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientFailover() cli := suite.client group := &rmpb.ResourceGroup{ - Name: "test3", + Name: "failover_test", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{ - RU: &rmpb.TokenBucket{ - Settings: &rmpb.TokenLimitSettings{ - FillRate: 10000, - }, - Tokens: 100000, - }, + RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{}}, }, } addResp, err := cli.AddResourceGroup(suite.ctx, group) @@ -1008,8 +1048,9 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientDegradedMo re := suite.Require() cli := suite.client + groupName := "mode_test" group := &rmpb.ResourceGroup{ - Name: "modetest", + Name: groupName, Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ @@ -1044,15 +1085,15 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientDegradedMo rruTokensAtATime: 0, wruTokensAtATime: 2, } - controller.OnRequestWait(suite.ctx, "modetest", tc.makeWriteRequest()) + controller.OnRequestWait(suite.ctx, groupName, tc.makeWriteRequest()) time.Sleep(time.Second * 2) beginTime := time.Now() // This is used to make sure resource group in lowRU. for i := 0; i < 100; i++ { - controller.OnRequestWait(suite.ctx, "modetest", tc2.makeWriteRequest()) + controller.OnRequestWait(suite.ctx, groupName, tc2.makeWriteRequest()) } for i := 0; i < 100; i++ { - controller.OnRequestWait(suite.ctx, "modetest", tc.makeWriteRequest()) + controller.OnRequestWait(suite.ctx, groupName, tc.makeWriteRequest()) } endTime := time.Now() // we can not check `inDegradedMode` because of data race. @@ -1104,18 +1145,20 @@ func (suite *resourceManagerClientTestSuite) TestRemoveStaleResourceGroup() { re := suite.Require() cli := suite.client - for _, group := range suite.initGroups { - resp, err := cli.AddResourceGroup(suite.ctx, group) - re.NoError(err) - re.Contains(resp, "Success!") + // Mock add resource group. + group := &rmpb.ResourceGroup{ + Name: "stale_test", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{}}, + }, } + resp, err := cli.AddResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") - ruConfig := &controller.RequestUnitConfig{ - ReadBaseCost: 1, - ReadCostPerByte: 1, - } re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/fastCleanup", `return(true)`)) - controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, ruConfig) + controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil) controller.Start(suite.ctx) testConfig := struct { @@ -1131,13 +1174,13 @@ func (suite *resourceManagerClientTestSuite) TestRemoveStaleResourceGroup() { rreq := testConfig.tcs.makeReadRequest() rres := testConfig.tcs.makeReadResponse() for j := 0; j < testConfig.times; j++ { - controller.OnRequestWait(suite.ctx, suite.initGroups[0].Name, rreq) - controller.OnResponse(suite.initGroups[0].Name, rreq, rres) + controller.OnRequestWait(suite.ctx, group.Name, rreq) + controller.OnResponse(group.Name, rreq, rres) time.Sleep(100 * time.Microsecond) } time.Sleep(1 * time.Second) - re.Nil(controller.GetActiveResourceGroup(suite.initGroups[0].Name)) + re.Nil(controller.GetActiveResourceGroup(group.Name)) re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/fastCleanup")) controller.Stop() @@ -1146,11 +1189,18 @@ func (suite *resourceManagerClientTestSuite) TestRemoveStaleResourceGroup() { func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigChanged() { re := suite.Require() cli := suite.client - for _, group := range suite.initGroups { - resp, err := cli.AddResourceGroup(suite.ctx, group) - re.NoError(err) - re.Contains(resp, "Success!") + // Mock add resource group. + group := &rmpb.ResourceGroup{ + Name: "config_change_test", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{}}, + }, } + resp, err := cli.AddResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + c1, err := controller.NewResourceGroupController(suite.ctx, 1, cli, nil) re.NoError(err) c1.Start(suite.ctx)