Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource_control: watch delete with prev and refine test #7092

Merged
merged 11 commits into from
Sep 22, 2023
25 changes: 18 additions & 7 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@
cfgRevision := resp.GetHeader().GetRevision()
var watchMetaChannel, watchConfigChannel chan []*meta_storagepb.Event
if !c.ruConfig.isSingleGroupByKeyspace {
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix())
// Use WithPrevKV() to get the previous key-value pair when get Delete Event.
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix(), pd.WithPrevKV())
if err != nil {
log.Warn("watch resource group meta failed", zap.Error(err))
}
Expand All @@ -260,7 +261,8 @@
}
case <-watchRetryTimer.C:
if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil {
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix())
// Use WithPrevKV() to get the previous key-value pair when get Delete Event.
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix(), pd.WithPrevKV())

Check warning on line 265 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L265

Added line #L265 was not covered by tests
if err != nil {
log.Warn("watch resource group meta failed", zap.Error(err))
watchRetryTimer.Reset(watchRetryInterval)
Expand Down Expand Up @@ -319,18 +321,27 @@
for _, item := range resp {
metaRevision = item.Kv.ModRevision
group := &rmpb.ResourceGroup{}
if err := proto.Unmarshal(item.Kv.Value, group); err != nil {
continue
}
switch item.Type {
case meta_storagepb.Event_PUT:
if err = proto.Unmarshal(item.Kv.Value, group); err != nil {
continue

Check warning on line 327 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L327

Added line #L327 was not covered by tests
}
if item, ok := c.groupsController.Load(group.Name); ok {
gc := item.(*groupCostController)
gc.modifyMeta(group)
}
case meta_storagepb.Event_DELETE:
if _, ok := c.groupsController.LoadAndDelete(group.Name); ok {
resourceGroupStatusGauge.DeleteLabelValues(group.Name)
if item.PrevKv != nil {
if err = proto.Unmarshal(item.PrevKv.Value, group); err != nil {
continue

Check warning on line 336 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L336

Added line #L336 was not covered by tests
}
if _, ok := c.groupsController.LoadAndDelete(group.Name); ok {
resourceGroupStatusGauge.DeleteLabelValues(group.Name)
}
} else {

Check warning on line 341 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L341

Added line #L341 was not covered by tests
// Prev-kv is compacted means there must have been a delete event before this event,
// which means that this is just a duplicated event, so we can just ignore it.
log.Info("previous key-value pair has been compacted", zap.String("required-key", string(item.Kv.Key)), zap.String("value", string(item.Kv.Value)))

Check warning on line 344 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L344

Added line #L344 was not covered by tests
}
}
}
Expand Down
47 changes: 0 additions & 47 deletions client/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ type ResourceManagerClient interface {
ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error)
LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error)
WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error)
AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error)
Watch(ctx context.Context, key []byte, opts ...OpOption) (chan []*meta_storagepb.Event, error)
}
Expand Down Expand Up @@ -188,52 +187,6 @@ func (c *client) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup,
return groups, resp.Header.Revision, nil
}

// WatchResourceGroup [just for TEST] watches resource groups changes.
// It returns a stream of slices of resource groups.
// The first message in stream contains all current resource groups,
// all subsequent messages contains new events[PUT/DELETE] for all resource groups.
func (c *client) WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error) {
configChan, err := c.Watch(ctx, GroupSettingsPathPrefixBytes, WithRev(revision), WithPrefix())
if err != nil {
return nil, err
}
resourceGroupWatcherChan := make(chan []*rmpb.ResourceGroup)
go func() {
defer func() {
close(resourceGroupWatcherChan)
if r := recover(); r != nil {
log.Error("[pd] panic in ResourceManagerClient `WatchResourceGroups`", zap.Any("error", r))
return
}
}()
for {
select {
case <-ctx.Done():
return
case res, ok := <-configChan:
if !ok {
return
}
groups := make([]*rmpb.ResourceGroup, 0, len(res))
for _, item := range res {
switch item.Type {
case meta_storagepb.Event_PUT:
group := &rmpb.ResourceGroup{}
if err := proto.Unmarshal(item.Kv.Value, group); err != nil {
return
}
groups = append(groups, group)
case meta_storagepb.Event_DELETE:
continue
}
}
resourceGroupWatcherChan <- groups
}
}
}()
return resourceGroupWatcherChan, err
}

func (c *client) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) {
req := &tokenRequest{
done: make(chan error, 1),
Expand Down
2 changes: 1 addition & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2638,7 +2638,7 @@
} else {
// Prev-kv is compacted means there must have been a delete event before this event,
// which means that this is just a duplicated event, so we can just ignore it.
log.Info("previous key-value pair has been compacted", zap.String("previous key", string(e.Kv.Key)))
log.Info("previous key-value pair has been compacted", zap.String("required-key", string(e.Kv.Key)))

Check warning on line 2641 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L2641

Added line #L2641 was not covered by tests
}
}
}
Expand Down
166 changes: 86 additions & 80 deletions tests/integrations/mcs/resourcemanager/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,51 +205,25 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() {
},
},
}
// Mock get revision by listing
for i := 0; i < 3; i++ {
group.Name += strconv.Itoa(i)
resp, err := cli.AddResourceGroup(suite.ctx, group)
group.Name = "test"
re.NoError(err)
re.Contains(resp, "Success!")
}
lresp, revision, err := cli.LoadResourceGroups(suite.ctx)
re.NoError(err)
re.Equal(len(lresp), 4)
re.Greater(revision, int64(0))
tcs := tokenConsumptionPerSecond{rruTokensAtATime: 100}
re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/disableWatch", "return(true)"))
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/disableWatch"))
}()
controllerKeySpace, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil, controller.EnableSingleGroupByKeyspace())
controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil)
controller.Start(suite.ctx)
defer controller.Stop()
controller.OnRequestWait(suite.ctx, "test0", tcs.makeReadRequest())
meta := controller.GetActiveResourceGroup("test0")
metaShadow, err := controller.GetResourceGroup("test0")
re.NoError(err)
re.Equal(meta.RUSettings.RU, group.RUSettings.RU)
re.Equal(metaShadow.RUSettings.RU, group.RUSettings.RU)

controllerKeySpace.OnRequestWait(suite.ctx, "test0", tcs.makeReadRequest())
metaKeySpace := controllerKeySpace.GetActiveResourceGroup("test0")
re.Equal(metaKeySpace.RUSettings.RU, group.RUSettings.RU)

controller.OnRequestWait(suite.ctx, "test1", tcs.makeReadRequest())
meta = controller.GetActiveResourceGroup("test1")
metaShadow, err = controller.GetResourceGroup("test1")
re.NoError(err)
re.Equal(meta.RUSettings.RU, group.RUSettings.RU)
re.Equal(metaShadow.RUSettings.RU, group.RUSettings.RU)
suite.NoError(err)
// Mock add resource groups
for i := 3; i < 9; i++ {
var meta *rmpb.ResourceGroup
groupsNum := 10
for i := 0; i < groupsNum; i++ {
group.Name = "test" + strconv.Itoa(i)
resp, err := cli.AddResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")

// Make sure the resource group active
meta, err = controller.GetResourceGroup(group.Name)
re.NotNil(meta)
re.NoError(err)
meta = controller.GetActiveResourceGroup(group.Name)
re.NotNil(meta)
}
// Mock modify resource groups
modifySettings := func(gs *rmpb.ResourceGroup) {
Expand All @@ -261,65 +235,97 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() {
},
}
}
re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/watchStreamError", "return(true)"))
for i := 0; i < 2; i++ {
if i == 1 {
testutil.Eventually(re, func() bool {
meta = controller.GetActiveResourceGroup("test0")
return meta.RUSettings.RU.Settings.FillRate == uint64(20000)
}, testutil.WithTickInterval(50*time.Millisecond))
metaKeySpace = controllerKeySpace.GetActiveResourceGroup("test0")
re.Equal(metaKeySpace.RUSettings.RU.Settings.FillRate, uint64(10000))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/watchStreamError", "return(true)"))
}
for i := 0; i < groupsNum; i++ {
group.Name = "test" + strconv.Itoa(i)
modifySettings(group)
resp, err := cli.ModifyResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")
}
time.Sleep(time.Millisecond * 50)
meta = controller.GetActiveResourceGroup("test1")
re.Equal(meta.RUSettings.RU.Settings.FillRate, uint64(10000))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/watchStreamError"))
for i := 0; i < groupsNum; i++ {
testutil.Eventually(re, func() bool {
name := "test" + strconv.Itoa(i)
meta = controller.GetActiveResourceGroup(name)
if meta != nil {
return meta.RUSettings.RU.Settings.FillRate == uint64(20000)
}
return false
}, testutil.WithTickInterval(50*time.Millisecond))
}

// Mock reset watch stream
re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/watchStreamError", "return(true)"))
group.Name = "test" + strconv.Itoa(groupsNum)
resp, err := cli.AddResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")
// Make sure the resource group active
meta, err = controller.GetResourceGroup(group.Name)
re.NotNil(meta)
re.NoError(err)
modifySettings(group)
resp, err = cli.ModifyResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")
testutil.Eventually(re, func() bool {
meta = controller.GetActiveResourceGroup("test1")
meta = controller.GetActiveResourceGroup(group.Name)
return meta.RUSettings.RU.Settings.FillRate == uint64(20000)
}, testutil.WithTickInterval(100*time.Millisecond))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/watchStreamError"))

for i := 2; i < 9; i++ {
group.Name = "test" + strconv.Itoa(i)
modifySettings(group)
resp, err := cli.ModifyResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")
}
// Mock delete resource groups
suite.cleanupResourceGroups()
time.Sleep(time.Second)
meta = controller.GetActiveResourceGroup(group.Name)
re.Nil(meta)
for i := 0; i < groupsNum; i++ {
testutil.Eventually(re, func() bool {
name := "test" + strconv.Itoa(i)
meta = controller.GetActiveResourceGroup(name)
return meta == nil
}, testutil.WithTickInterval(50*time.Millisecond))
}
}

// Check watch result
watchChan, err := suite.client.WatchResourceGroup(suite.ctx, revision)
re.NoError(err)
i := 0
for {
select {
case <-time.After(time.Second):
return
case res := <-watchChan:
for _, r := range res {
if i < 6 {
suite.Equal(uint64(10000), r.RUSettings.RU.Settings.FillRate)
} else {
suite.Equal(uint64(20000), r.RUSettings.RU.Settings.FillRate)
}
i++
}
}
func (suite *resourceManagerClientTestSuite) TestWatchWithSingleGroupByKeyspace() {
re := suite.Require()
cli := suite.client

// We need to disable watch stream for `isSingleGroupByKeyspace`.
re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/disableWatch", "return(true)"))
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/disableWatch"))
}()
// Distinguish the controller with and without enabling `isSingleGroupByKeyspace`.
controllerKeySpace, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil, controller.EnableSingleGroupByKeyspace())
controller, _ := controller.NewResourceGroupController(suite.ctx, 2, cli, nil)
controller.Start(suite.ctx)
controllerKeySpace.Start(suite.ctx)
defer controllerKeySpace.Stop()
defer controller.Stop()

// Mock add resource group.
group := &rmpb.ResourceGroup{
Name: "test",
Mode: rmpb.GroupMode_RUMode,
RUSettings: &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: 10000,
},
Tokens: 100000,
},
},
}
resp, err := cli.AddResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")

tcs := tokenConsumptionPerSecond{rruTokensAtATime: 100}
controller.OnRequestWait(suite.ctx, group.Name, tcs.makeReadRequest())
meta := controller.GetActiveResourceGroup(group.Name)
re.Equal(meta.RUSettings.RU, group.RUSettings.RU)

controllerKeySpace.OnRequestWait(suite.ctx, group.Name, tcs.makeReadRequest())
metaKeySpace := controllerKeySpace.GetActiveResourceGroup(group.Name)
re.Equal(metaKeySpace.RUSettings.RU, group.RUSettings.RU)
}

const buffDuration = time.Millisecond * 300
Expand Down
Loading