Skip to content

Commit

Permalink
resource_manager: Fix for delayed deletion of created groups in TestW…
Browse files Browse the repository at this point in the history
…atchResourceGroup (#7154)

close #7144

Signed-off-by: husharp <jinhao.hu@pingcap.com>
  • Loading branch information
HuSharp authored and nolouch committed Dec 6, 2023
1 parent c6189f6 commit b239ee5
Showing 1 changed file with 108 additions and 58 deletions.
166 changes: 108 additions & 58 deletions tests/integrations/mcs/resource_manager/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,15 @@ func (suite *resourceManagerClientTestSuite) resignAndWaitLeader() {
func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() {
re := suite.Require()
cli := suite.client
groupNamePrefix := "watch_test"
group := &rmpb.ResourceGroup{
Name: "test",
Name: groupNamePrefix,
Mode: rmpb.GroupMode_RUMode,
RUSettings: &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: 10000,
},
Tokens: 100000,
},
},
}
Expand All @@ -199,7 +199,7 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() {
var meta *rmpb.ResourceGroup
groupsNum := 10
for i := 0; i < groupsNum; i++ {
group.Name = "test" + strconv.Itoa(i)
group.Name = groupNamePrefix + strconv.Itoa(i)
resp, err := cli.AddResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")
Expand All @@ -212,25 +212,25 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() {
re.NotNil(meta)
}
// Mock modify resource groups
modifySettings := func(gs *rmpb.ResourceGroup) {
modifySettings := func(gs *rmpb.ResourceGroup, fillRate uint64) {
gs.RUSettings = &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: 20000,
FillRate: fillRate,
},
},
}
}
for i := 0; i < groupsNum; i++ {
group.Name = "test" + strconv.Itoa(i)
modifySettings(group)
group.Name = groupNamePrefix + strconv.Itoa(i)
modifySettings(group, 20000)
resp, err := cli.ModifyResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")
}
for i := 0; i < groupsNum; i++ {
testutil.Eventually(re, func() bool {
name := "test" + strconv.Itoa(i)
name := groupNamePrefix + strconv.Itoa(i)
meta = controller.GetActiveResourceGroup(name)
if meta != nil {
return meta.RUSettings.RU.Settings.FillRate == uint64(20000)
Expand All @@ -241,29 +241,29 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() {

// Mock reset watch stream
re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/watchStreamError", "return(true)"))
group.Name = "test" + strconv.Itoa(groupsNum)
group.Name = groupNamePrefix + strconv.Itoa(groupsNum)
resp, err := cli.AddResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")
// Make sure the resource group active
meta, err = controller.GetResourceGroup(group.Name)
re.NotNil(meta)
re.NoError(err)
modifySettings(group)
modifySettings(group, 30000)
resp, err = cli.ModifyResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")
testutil.Eventually(re, func() bool {
meta = controller.GetActiveResourceGroup(group.Name)
return meta.RUSettings.RU.Settings.FillRate == uint64(20000)
return meta.RUSettings.RU.Settings.FillRate == uint64(30000)
}, testutil.WithTickInterval(100*time.Millisecond))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/watchStreamError"))

// Mock delete resource groups
suite.cleanupResourceGroups()
for i := 0; i < groupsNum; i++ {
testutil.Eventually(re, func() bool {
name := "test" + strconv.Itoa(i)
name := groupNamePrefix + strconv.Itoa(i)
meta = controller.GetActiveResourceGroup(name)
return meta == nil
}, testutil.WithTickInterval(50*time.Millisecond))
Expand All @@ -289,7 +289,7 @@ func (suite *resourceManagerClientTestSuite) TestWatchWithSingleGroupByKeyspace(

// Mock add resource group.
group := &rmpb.ResourceGroup{
Name: "test",
Name: "keyspace_test",
Mode: rmpb.GroupMode_RUMode,
RUSettings: &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Expand All @@ -312,6 +312,28 @@ func (suite *resourceManagerClientTestSuite) TestWatchWithSingleGroupByKeyspace(
controllerKeySpace.OnRequestWait(suite.ctx, group.Name, tcs.makeReadRequest())
metaKeySpace := controllerKeySpace.GetActiveResourceGroup(group.Name)
re.Equal(metaKeySpace.RUSettings.RU, group.RUSettings.RU)

// Mock modify resource groups
modifySettings := func(gs *rmpb.ResourceGroup, fillRate uint64) {
gs.RUSettings = &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: fillRate,
},
},
}
}
modifySettings(group, 20000)
resp, err = cli.ModifyResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")

testutil.Eventually(re, func() bool {
meta = controller.GetActiveResourceGroup(group.Name)
return meta.RUSettings.RU.Settings.FillRate == uint64(20000)
}, testutil.WithTickInterval(100*time.Millisecond))
metaKeySpace = controllerKeySpace.GetActiveResourceGroup(group.Name)
re.Equal(metaKeySpace.RUSettings.RU.Settings.FillRate, uint64(10000))
}

const buffDuration = time.Millisecond * 300
Expand Down Expand Up @@ -351,11 +373,21 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() {
re := suite.Require()
cli := suite.client

for _, group := range suite.initGroups {
resp, err := cli.AddResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")
rg := &rmpb.ResourceGroup{
Name: "controller_test",
Mode: rmpb.GroupMode_RUMode,
RUSettings: &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: 10000,
},
Tokens: 100000,
},
},
}
resp, err := cli.AddResourceGroup(suite.ctx, rg)
re.NoError(err)
re.Contains(resp, "Success!")

cfg := &controller.RequestUnitConfig{
ReadBaseCost: 1,
Expand All @@ -374,7 +406,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() {
len int
}{
{
resourceGroupName: suite.initGroups[0].Name,
resourceGroupName: rg.Name,
len: 8,
tcs: []tokenConsumptionPerSecond{
{rruTokensAtATime: 50, wruTokensAtATime: 20, times: 100, waitDuration: 0},
Expand Down Expand Up @@ -425,7 +457,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() {
re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate", "return(true)"))
tcs := tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 900000000, times: 1, waitDuration: 0}
wreq := tcs.makeWriteRequest()
_, _, err := controller.OnRequestWait(suite.ctx, suite.initGroups[0].Name, wreq)
_, _, err = controller.OnRequestWait(suite.ctx, rg.Name, wreq)
re.Error(err)
time.Sleep(time.Millisecond * 200)
re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate"))
Expand Down Expand Up @@ -568,7 +600,22 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() {
re := suite.Require()
cli := suite.client

for _, group := range suite.initGroups {
groupNames := []string{"penalty_test1", "penalty_test2"}
// Mock add 2 resource groups.
group := &rmpb.ResourceGroup{
Name: groupNames[0],
Mode: rmpb.GroupMode_RUMode,
RUSettings: &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: 10000,
},
Tokens: 100000,
},
},
}
for _, name := range groupNames {
group.Name = name
resp, err := cli.AddResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")
Expand All @@ -584,7 +631,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() {
c, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg, controller.EnableSingleGroupByKeyspace())
c.Start(suite.ctx)

resourceGroupName := suite.initGroups[1].Name
resourceGroupName := groupNames[0]
// init
req := controller.NewTestRequestInfo(false, 0, 2 /* store2 */)
resp := controller.NewTestResponseInfo(0, time.Duration(30), true)
Expand Down Expand Up @@ -643,7 +690,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() {
re.NoError(err)

// from different group, should be zero
resourceGroupName = suite.initGroups[2].Name
resourceGroupName = groupNames[1]
req4 := controller.NewTestRequestInfo(true, 50, 1 /* store2 */)
resp4 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req4)
Expand All @@ -657,8 +704,7 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() {
re := suite.Require()
cli := suite.client

groups := make([]*rmpb.ResourceGroup, 0)
groups = append(groups, suite.initGroups...)
groups := suite.initGroups
for _, group := range groups {
resp, err := cli.AddResourceGroup(suite.ctx, group)
re.NoError(err)
Expand All @@ -670,7 +716,6 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() {
}
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/resource_manager/server/fastPersist", `return(true)`))
suite.resignAndWaitLeader()
groups = append(groups, &rmpb.ResourceGroup{Name: "test3"})
for i := 0; i < 3; i++ {
for _, group := range groups {
requests := make([]*rmpb.RequestUnitItem, 0)
Expand Down Expand Up @@ -730,8 +775,8 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() {
expectMarshal string
modifySettings func(*rmpb.ResourceGroup)
}{
{"test1", rmpb.GroupMode_RUMode, true, true,
`{"name":"test1","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":10000},"state":{"initialized":false}}},"priority":0}`,
{"CRUD_test1", rmpb.GroupMode_RUMode, true, true,
`{"name":"CRUD_test1","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":10000},"state":{"initialized":false}}},"priority":0}`,
func(gs *rmpb.ResourceGroup) {
gs.RUSettings = &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Expand All @@ -743,8 +788,8 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() {
},
},

{"test2", rmpb.GroupMode_RUMode, true, true,
`{"name":"test2","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":20000},"state":{"initialized":false}}},"priority":0}`,
{"CRUD_test2", rmpb.GroupMode_RUMode, true, true,
`{"name":"CRUD_test2","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":20000},"state":{"initialized":false}}},"priority":0}`,
func(gs *rmpb.ResourceGroup) {
gs.RUSettings = &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Expand All @@ -755,8 +800,8 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() {
}
},
},
{"test2", rmpb.GroupMode_RUMode, false, true,
`{"name":"test2","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":30000,"burst_limit":-1},"state":{"initialized":false}}},"priority":0}`,
{"CRUD_test2", rmpb.GroupMode_RUMode, false, true,
`{"name":"CRUD_test2","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":30000,"burst_limit":-1},"state":{"initialized":false}}},"priority":0}`,
func(gs *rmpb.ResourceGroup) {
gs.RUSettings = &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Expand Down Expand Up @@ -971,15 +1016,10 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientFailover()
cli := suite.client

group := &rmpb.ResourceGroup{
Name: "test3",
Name: "failover_test",
Mode: rmpb.GroupMode_RUMode,
RUSettings: &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: 10000,
},
Tokens: 100000,
},
RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{}},
},
}
addResp, err := cli.AddResourceGroup(suite.ctx, group)
Expand Down Expand Up @@ -1008,8 +1048,9 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientDegradedMo
re := suite.Require()
cli := suite.client

groupName := "mode_test"
group := &rmpb.ResourceGroup{
Name: "modetest",
Name: groupName,
Mode: rmpb.GroupMode_RUMode,
RUSettings: &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Expand Down Expand Up @@ -1044,15 +1085,15 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientDegradedMo
rruTokensAtATime: 0,
wruTokensAtATime: 2,
}
controller.OnRequestWait(suite.ctx, "modetest", tc.makeWriteRequest())
controller.OnRequestWait(suite.ctx, groupName, tc.makeWriteRequest())
time.Sleep(time.Second * 2)
beginTime := time.Now()
// This is used to make sure resource group in lowRU.
for i := 0; i < 100; i++ {
controller.OnRequestWait(suite.ctx, "modetest", tc2.makeWriteRequest())
controller.OnRequestWait(suite.ctx, groupName, tc2.makeWriteRequest())
}
for i := 0; i < 100; i++ {
controller.OnRequestWait(suite.ctx, "modetest", tc.makeWriteRequest())
controller.OnRequestWait(suite.ctx, groupName, tc.makeWriteRequest())
}
endTime := time.Now()
// we can not check `inDegradedMode` because of data race.
Expand Down Expand Up @@ -1104,18 +1145,20 @@ func (suite *resourceManagerClientTestSuite) TestRemoveStaleResourceGroup() {
re := suite.Require()
cli := suite.client

for _, group := range suite.initGroups {
resp, err := cli.AddResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")
// Mock add resource group.
group := &rmpb.ResourceGroup{
Name: "stale_test",
Mode: rmpb.GroupMode_RUMode,
RUSettings: &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{}},
},
}
resp, err := cli.AddResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")

ruConfig := &controller.RequestUnitConfig{
ReadBaseCost: 1,
ReadCostPerByte: 1,
}
re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/fastCleanup", `return(true)`))
controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, ruConfig)
controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil)
controller.Start(suite.ctx)

testConfig := struct {
Expand All @@ -1131,13 +1174,13 @@ func (suite *resourceManagerClientTestSuite) TestRemoveStaleResourceGroup() {
rreq := testConfig.tcs.makeReadRequest()
rres := testConfig.tcs.makeReadResponse()
for j := 0; j < testConfig.times; j++ {
controller.OnRequestWait(suite.ctx, suite.initGroups[0].Name, rreq)
controller.OnResponse(suite.initGroups[0].Name, rreq, rres)
controller.OnRequestWait(suite.ctx, group.Name, rreq)
controller.OnResponse(group.Name, rreq, rres)
time.Sleep(100 * time.Microsecond)
}
time.Sleep(1 * time.Second)

re.Nil(controller.GetActiveResourceGroup(suite.initGroups[0].Name))
re.Nil(controller.GetActiveResourceGroup(group.Name))

re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/fastCleanup"))
controller.Stop()
Expand All @@ -1146,11 +1189,18 @@ func (suite *resourceManagerClientTestSuite) TestRemoveStaleResourceGroup() {
func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigChanged() {
re := suite.Require()
cli := suite.client
for _, group := range suite.initGroups {
resp, err := cli.AddResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")
// Mock add resource group.
group := &rmpb.ResourceGroup{
Name: "config_change_test",
Mode: rmpb.GroupMode_RUMode,
RUSettings: &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{}},
},
}
resp, err := cli.AddResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")

c1, err := controller.NewResourceGroupController(suite.ctx, 1, cli, nil)
re.NoError(err)
c1.Start(suite.ctx)
Expand Down

0 comments on commit b239ee5

Please sign in to comment.