Skip to content

Commit

Permalink
Resource manager: disable watch when isSingleGroupByKeyspace is true (t…
Browse files Browse the repository at this point in the history
…ikv#6624)

close tikv#6623

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and nolouch committed Dec 6, 2023
1 parent 2a826c1 commit ba26ef4
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 2 deletions.
12 changes: 10 additions & 2 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,12 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
if err != nil {
log.Warn("load resource group revision failed", zap.Error(err))
}
watchChannel, err := c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(revision), pd.WithPrefix())
var watchChannel chan []*meta_storagepb.Event
if !c.config.isSingleGroupByKeyspace {
watchChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(revision), pd.WithPrefix())
}
watchRetryTimer := time.NewTimer(watchRetryInterval)
if err == nil {
if err == nil || c.config.isSingleGroupByKeyspace {
watchRetryTimer.Stop()
}
defer watchRetryTimer.Stop()
Expand Down Expand Up @@ -254,6 +257,11 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
case <-emergencyTokenAcquisitionTicker.C:
c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition)
case resp, ok := <-watchChannel:
failpoint.Inject("disableWatch", func() {
if c.config.isSingleGroupByKeyspace {
panic("disableWatch")
}
})
if !ok {
watchChannel = nil
watchRetryTimer.Reset(watchRetryInterval)
Expand Down
12 changes: 12 additions & 0 deletions tests/integrations/mcs/resource_manager/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() {
re.Equal(len(lresp), 4)
re.Greater(revision, int64(0))
tcs := tokenConsumptionPerSecond{rruTokensAtATime: 100}
re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/disableWatch", "return(true)"))
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/disableWatch"))
}()
controllerKeySpace, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil, controller.EnableSingleGroupByKeyspace())
controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil)
controller.Start(suite.ctx)
defer controller.Stop()
Expand All @@ -212,6 +217,11 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() {
re.NoError(err)
re.Equal(meta.RUSettings.RU, group.RUSettings.RU)
re.Equal(metaShadow.RUSettings.RU, group.RUSettings.RU)

controllerKeySpace.OnRequestWait(suite.ctx, "test0", tcs.makeReadRequest())
metaKeySpace := controllerKeySpace.GetActiveResourceGroup("test0")
re.Equal(metaKeySpace.RUSettings.RU, group.RUSettings.RU)

controller.OnRequestWait(suite.ctx, "test1", tcs.makeReadRequest())
meta = controller.GetActiveResourceGroup("test1")
metaShadow, err = controller.GetResourceGroup("test1")
Expand Down Expand Up @@ -243,6 +253,8 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() {
meta = controller.GetActiveResourceGroup("test0")
return meta.RUSettings.RU.Settings.FillRate == uint64(20000)
}, testutil.WithTickInterval(50*time.Millisecond))
metaKeySpace = controllerKeySpace.GetActiveResourceGroup("test0")
re.Equal(metaKeySpace.RUSettings.RU.Settings.FillRate, uint64(10000))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/watchStreamError", "return(true)"))
}
group.Name = "test" + strconv.Itoa(i)
Expand Down

0 comments on commit ba26ef4

Please sign in to comment.