diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 79128037fc5..8cdd827baa9 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -212,9 +212,12 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { if err != nil { log.Warn("load resource group revision failed", zap.Error(err)) } - watchChannel, err := c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(revision), pd.WithPrefix()) + var watchChannel chan []*meta_storagepb.Event + if !c.config.isSingleGroupByKeyspace { + watchChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(revision), pd.WithPrefix()) + } watchRetryTimer := time.NewTimer(watchRetryInterval) - if err == nil { + if err == nil || c.config.isSingleGroupByKeyspace { watchRetryTimer.Stop() } defer watchRetryTimer.Stop() @@ -254,6 +257,11 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { case <-emergencyTokenAcquisitionTicker.C: c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition) case resp, ok := <-watchChannel: + failpoint.Inject("disableWatch", func() { + if c.config.isSingleGroupByKeyspace { + panic("disableWatch") + } + }) if !ok { watchChannel = nil watchRetryTimer.Reset(watchRetryInterval) diff --git a/tests/integrations/mcs/resource_manager/resource_manager_test.go b/tests/integrations/mcs/resource_manager/resource_manager_test.go index 3758c41be75..cd88d312305 100644 --- a/tests/integrations/mcs/resource_manager/resource_manager_test.go +++ b/tests/integrations/mcs/resource_manager/resource_manager_test.go @@ -203,6 +203,11 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { re.Equal(len(lresp), 4) re.Greater(revision, int64(0)) tcs := tokenConsumptionPerSecond{rruTokensAtATime: 100} + re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/disableWatch", "return(true)")) + defer func() { + re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/disableWatch")) + }() + controllerKeySpace, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil, controller.EnableSingleGroupByKeyspace()) controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil) controller.Start(suite.ctx) defer controller.Stop() @@ -212,6 +217,11 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { re.NoError(err) re.Equal(meta.RUSettings.RU, group.RUSettings.RU) re.Equal(metaShadow.RUSettings.RU, group.RUSettings.RU) + + controllerKeySpace.OnRequestWait(suite.ctx, "test0", tcs.makeReadRequest()) + metaKeySpace := controllerKeySpace.GetActiveResourceGroup("test0") + re.Equal(metaKeySpace.RUSettings.RU, group.RUSettings.RU) + controller.OnRequestWait(suite.ctx, "test1", tcs.makeReadRequest()) meta = controller.GetActiveResourceGroup("test1") metaShadow, err = controller.GetResourceGroup("test1") @@ -243,6 +253,8 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { meta = controller.GetActiveResourceGroup("test0") return meta.RUSettings.RU.Settings.FillRate == uint64(20000) }, testutil.WithTickInterval(50*time.Millisecond)) + metaKeySpace = controllerKeySpace.GetActiveResourceGroup("test0") + re.Equal(metaKeySpace.RUSettings.RU.Settings.FillRate, uint64(10000)) re.NoError(failpoint.Enable("github.com/tikv/pd/client/watchStreamError", "return(true)")) } group.Name = "test" + strconv.Itoa(i)