From 985e2a487d4d9014d03aa5a2b56952f0dc6d83fd Mon Sep 17 00:00:00 2001 From: Hu# Date: Wed, 27 Sep 2023 15:48:20 +0800 Subject: [PATCH 1/8] resource_manager: Fix for delayed deletion of created groups in TestWatchResourceGroup (#7154) close tikv/pd#7144 Signed-off-by: husharp --- .../resourcemanager/resource_manager_test.go | 236 ++++++++++-------- 1 file changed, 137 insertions(+), 99 deletions(-) diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index f72f0ff817b..ed6a3ee501c 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -129,20 +129,6 @@ func (suite *resourceManagerClientTestSuite) SetupSuite() { }, }, }, - { - Name: "background_job", - Mode: rmpb.GroupMode_RUMode, - RUSettings: &rmpb.GroupRequestUnitSettings{ - RU: &rmpb.TokenBucket{ - Settings: &rmpb.TokenLimitSettings{ - FillRate: 10000, - BurstLimit: -1, - }, - Tokens: 100000, - }, - }, - BackgroundSettings: &rmpb.BackgroundSettings{JobTypes: []string{"br", "lightning"}}, - }, } } @@ -191,19 +177,17 @@ func (suite *resourceManagerClientTestSuite) resignAndWaitLeader() { } func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { - // TODO: fix the unstable part at line 248. - suite.T().Skip() re := suite.Require() cli := suite.client + groupNamePrefix := "watch_test" group := &rmpb.ResourceGroup{ - Name: "test", + Name: groupNamePrefix, Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ Settings: &rmpb.TokenLimitSettings{ FillRate: 10000, }, - Tokens: 100000, }, }, } @@ -215,7 +199,7 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { var meta *rmpb.ResourceGroup groupsNum := 10 for i := 0; i < groupsNum; i++ { - group.Name = "test" + strconv.Itoa(i) + group.Name = groupNamePrefix + strconv.Itoa(i) resp, err := cli.AddResourceGroup(suite.ctx, group) re.NoError(err) re.Contains(resp, "Success!") @@ -228,25 +212,25 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { re.NotNil(meta) } // Mock modify resource groups - modifySettings := func(gs *rmpb.ResourceGroup) { + modifySettings := func(gs *rmpb.ResourceGroup, fillRate uint64) { gs.RUSettings = &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ Settings: &rmpb.TokenLimitSettings{ - FillRate: 20000, + FillRate: fillRate, }, }, } } for i := 0; i < groupsNum; i++ { - group.Name = "test" + strconv.Itoa(i) - modifySettings(group) + group.Name = groupNamePrefix + strconv.Itoa(i) + modifySettings(group, 20000) resp, err := cli.ModifyResourceGroup(suite.ctx, group) re.NoError(err) re.Contains(resp, "Success!") } for i := 0; i < groupsNum; i++ { testutil.Eventually(re, func() bool { - name := "test" + strconv.Itoa(i) + name := groupNamePrefix + strconv.Itoa(i) meta = controller.GetActiveResourceGroup(name) if meta != nil { return meta.RUSettings.RU.Settings.FillRate == uint64(20000) @@ -257,7 +241,7 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { // Mock reset watch stream re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/watchStreamError", "return(true)")) - group.Name = "test" + strconv.Itoa(groupsNum) + group.Name = groupNamePrefix + strconv.Itoa(groupsNum) resp, err := cli.AddResourceGroup(suite.ctx, group) re.NoError(err) re.Contains(resp, "Success!") @@ -265,13 +249,13 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { meta, err = controller.GetResourceGroup(group.Name) re.NotNil(meta) re.NoError(err) - modifySettings(group) + modifySettings(group, 30000) resp, err = cli.ModifyResourceGroup(suite.ctx, group) re.NoError(err) re.Contains(resp, "Success!") testutil.Eventually(re, func() bool { meta = controller.GetActiveResourceGroup(group.Name) - return meta.RUSettings.RU.Settings.FillRate == uint64(20000) + return meta.RUSettings.RU.Settings.FillRate == uint64(30000) }, testutil.WithTickInterval(100*time.Millisecond)) re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/watchStreamError")) @@ -279,7 +263,7 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { suite.cleanupResourceGroups() for i := 0; i < groupsNum; i++ { testutil.Eventually(re, func() bool { - name := "test" + strconv.Itoa(i) + name := groupNamePrefix + strconv.Itoa(i) meta = controller.GetActiveResourceGroup(name) return meta == nil }, testutil.WithTickInterval(50*time.Millisecond)) @@ -305,7 +289,7 @@ func (suite *resourceManagerClientTestSuite) TestWatchWithSingleGroupByKeyspace( // Mock add resource group. group := &rmpb.ResourceGroup{ - Name: "test", + Name: "keyspace_test", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ @@ -328,6 +312,28 @@ func (suite *resourceManagerClientTestSuite) TestWatchWithSingleGroupByKeyspace( controllerKeySpace.OnRequestWait(suite.ctx, group.Name, tcs.makeReadRequest()) metaKeySpace := controllerKeySpace.GetActiveResourceGroup(group.Name) re.Equal(metaKeySpace.RUSettings.RU, group.RUSettings.RU) + + // Mock modify resource groups + modifySettings := func(gs *rmpb.ResourceGroup, fillRate uint64) { + gs.RUSettings = &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: fillRate, + }, + }, + } + } + modifySettings(group, 20000) + resp, err = cli.ModifyResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + + testutil.Eventually(re, func() bool { + meta = controller.GetActiveResourceGroup(group.Name) + return meta.RUSettings.RU.Settings.FillRate == uint64(20000) + }, testutil.WithTickInterval(100*time.Millisecond)) + metaKeySpace = controllerKeySpace.GetActiveResourceGroup(group.Name) + re.Equal(metaKeySpace.RUSettings.RU.Settings.FillRate, uint64(10000)) } const buffDuration = time.Millisecond * 300 @@ -367,11 +373,21 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { re := suite.Require() cli := suite.client - for _, group := range suite.initGroups { - resp, err := cli.AddResourceGroup(suite.ctx, group) - re.NoError(err) - re.Contains(resp, "Success!") + rg := &rmpb.ResourceGroup{ + Name: "controller_test", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: 10000, + }, + Tokens: 100000, + }, + }, } + resp, err := cli.AddResourceGroup(suite.ctx, rg) + re.NoError(err) + re.Contains(resp, "Success!") cfg := &controller.RequestUnitConfig{ ReadBaseCost: 1, @@ -390,7 +406,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { len int }{ { - resourceGroupName: suite.initGroups[0].Name, + resourceGroupName: rg.Name, len: 8, tcs: []tokenConsumptionPerSecond{ {rruTokensAtATime: 50, wruTokensAtATime: 20, times: 100, waitDuration: 0}, @@ -441,7 +457,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate", "return(true)")) tcs := tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 900000000, times: 1, waitDuration: 0} wreq := tcs.makeWriteRequest() - _, _, err := controller.OnRequestWait(suite.ctx, suite.initGroups[0].Name, wreq) + _, _, err = controller.OnRequestWait(suite.ctx, rg.Name, wreq) re.Error(err) time.Sleep(time.Millisecond * 200) re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate")) @@ -584,7 +600,22 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { re := suite.Require() cli := suite.client - for _, group := range suite.initGroups { + groupNames := []string{"penalty_test1", "penalty_test2"} + // Mock add 2 resource groups. + group := &rmpb.ResourceGroup{ + Name: groupNames[0], + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: 10000, + }, + Tokens: 100000, + }, + }, + } + for _, name := range groupNames { + group.Name = name resp, err := cli.AddResourceGroup(suite.ctx, group) re.NoError(err) re.Contains(resp, "Success!") @@ -600,7 +631,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { c, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg, controller.EnableSingleGroupByKeyspace()) c.Start(suite.ctx) - resourceGroupName := suite.initGroups[1].Name + resourceGroupName := groupNames[0] // init req := controller.NewTestRequestInfo(false, 0, 2 /* store2 */) resp := controller.NewTestResponseInfo(0, time.Duration(30), true) @@ -659,7 +690,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { re.NoError(err) // from different group, should be zero - resourceGroupName = suite.initGroups[2].Name + resourceGroupName = groupNames[1] req4 := controller.NewTestRequestInfo(true, 50, 1 /* store2 */) resp4 := controller.NewTestResponseInfo(0, time.Duration(10), true) _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req4) @@ -676,8 +707,7 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() { re := suite.Require() cli := suite.client - groups := make([]*rmpb.ResourceGroup, 0) - groups = append(groups, suite.initGroups...) + groups := suite.initGroups for _, group := range groups { resp, err := cli.AddResourceGroup(suite.ctx, group) re.NoError(err) @@ -689,7 +719,6 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() { } re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/resourcemanager/server/fastPersist", `return(true)`)) suite.resignAndWaitLeader() - groups = append(groups, &rmpb.ResourceGroup{Name: "test3"}) for i := 0; i < 3; i++ { for _, group := range groups { requests := make([]*rmpb.RequestUnitItem, 0) @@ -762,8 +791,8 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { expectMarshal string modifySettings func(*rmpb.ResourceGroup) }{ - {"test1", rmpb.GroupMode_RUMode, true, true, - `{"name":"test1","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":10000},"state":{"initialized":false}}},"priority":0}`, + {"CRUD_test1", rmpb.GroupMode_RUMode, true, true, + `{"name":"CRUD_test1","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":10000},"state":{"initialized":false}}},"priority":0}`, func(gs *rmpb.ResourceGroup) { gs.RUSettings = &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ @@ -775,8 +804,8 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { }, }, - {"test2", rmpb.GroupMode_RUMode, true, true, - `{"name":"test2","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":20000},"state":{"initialized":false}}},"priority":0,"runaway_settings":{"rule":{"exec_elapsed_time_ms":10000},"action":2},"background_settings":{"job_types":["test"]}}`, + {"CRUD_test2", rmpb.GroupMode_RUMode, true, true, + `{"name":"CRUD_test2","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":20000},"state":{"initialized":false}}},"priority":0,"runaway_settings":{"rule":{"exec_elapsed_time_ms":10000},"action":2},"background_settings":{"job_types":["test"]}}`, func(gs *rmpb.ResourceGroup) { gs.RUSettings = &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ @@ -796,8 +825,8 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { } }, }, - {"test2", rmpb.GroupMode_RUMode, false, true, - `{"name":"test2","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":30000,"burst_limit":-1},"state":{"initialized":false}}},"priority":0,"runaway_settings":{"rule":{"exec_elapsed_time_ms":1000},"action":3,"watch":{"lasting_duration_ms":100000,"type":2}},"background_settings":{"job_types":["br","lightning"]}}`, + {"CRUD_test2", rmpb.GroupMode_RUMode, false, true, + `{"name":"CRUD_test2","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":30000,"burst_limit":-1},"state":{"initialized":false}}},"priority":0,"runaway_settings":{"rule":{"exec_elapsed_time_ms":1000},"action":3,"watch":{"lasting_duration_ms":100000,"type":2}},"background_settings":{"job_types":["br","lightning"]}}`, func(gs *rmpb.ResourceGroup) { gs.RUSettings = &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ @@ -1033,15 +1062,10 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientFailover() cli := suite.client group := &rmpb.ResourceGroup{ - Name: "test3", + Name: "failover_test", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{ - RU: &rmpb.TokenBucket{ - Settings: &rmpb.TokenLimitSettings{ - FillRate: 10000, - }, - Tokens: 100000, - }, + RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{}}, }, } addResp, err := cli.AddResourceGroup(suite.ctx, group) @@ -1070,8 +1094,9 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientDegradedMo re := suite.Require() cli := suite.client + groupName := "mode_test" group := &rmpb.ResourceGroup{ - Name: "modetest", + Name: groupName, Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ @@ -1106,16 +1131,15 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientDegradedMo rruTokensAtATime: 0, wruTokensAtATime: 2, } - resourceName := "modetest" - controller.OnRequestWait(suite.ctx, resourceName, tc.makeWriteRequest()) + controller.OnRequestWait(suite.ctx, groupName, tc.makeWriteRequest()) time.Sleep(time.Second * 2) beginTime := time.Now() // This is used to make sure resource group in lowRU. for i := 0; i < 100; i++ { - controller.OnRequestWait(suite.ctx, resourceName, tc2.makeWriteRequest()) + controller.OnRequestWait(suite.ctx, groupName, tc2.makeWriteRequest()) } for i := 0; i < 100; i++ { - controller.OnRequestWait(suite.ctx, resourceName, tc.makeWriteRequest()) + controller.OnRequestWait(suite.ctx, groupName, tc.makeWriteRequest()) } endTime := time.Now() // we can not check `inDegradedMode` because of data race. @@ -1167,18 +1191,20 @@ func (suite *resourceManagerClientTestSuite) TestRemoveStaleResourceGroup() { re := suite.Require() cli := suite.client - for _, group := range suite.initGroups { - resp, err := cli.AddResourceGroup(suite.ctx, group) - re.NoError(err) - re.Contains(resp, "Success!") + // Mock add resource group. + group := &rmpb.ResourceGroup{ + Name: "stale_test", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{}}, + }, } + resp, err := cli.AddResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") - ruConfig := &controller.RequestUnitConfig{ - ReadBaseCost: 1, - ReadCostPerByte: 1, - } re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/fastCleanup", `return(true)`)) - controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, ruConfig) + controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil) controller.Start(suite.ctx) testConfig := struct { @@ -1194,13 +1220,13 @@ func (suite *resourceManagerClientTestSuite) TestRemoveStaleResourceGroup() { rreq := testConfig.tcs.makeReadRequest() rres := testConfig.tcs.makeReadResponse() for j := 0; j < testConfig.times; j++ { - controller.OnRequestWait(suite.ctx, suite.initGroups[0].Name, rreq) - controller.OnResponse(suite.initGroups[0].Name, rreq, rres) + controller.OnRequestWait(suite.ctx, group.Name, rreq) + controller.OnResponse(group.Name, rreq, rres) time.Sleep(100 * time.Microsecond) } time.Sleep(1 * time.Second) - re.Nil(controller.GetActiveResourceGroup(suite.initGroups[0].Name)) + re.Nil(controller.GetActiveResourceGroup(group.Name)) re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/fastCleanup")) controller.Stop() @@ -1210,47 +1236,52 @@ func (suite *resourceManagerClientTestSuite) TestCheckBackgroundJobs() { re := suite.Require() cli := suite.client - for _, group := range suite.initGroups { - resp, err := cli.AddResourceGroup(suite.ctx, group) - re.NoError(err) - re.Contains(resp, "Success!") + enableBackgroundGroup := func(enable bool) string { + if enable { + return "background_enable" + } else { + return "background_unable" + } } - - cfg := &controller.RequestUnitConfig{ - ReadBaseCost: 1, - ReadCostPerByte: 1, - WriteBaseCost: 1, - WriteCostPerByte: 1, - CPUMsCost: 1, + // Mock add resource group. + group := &rmpb.ResourceGroup{ + Name: enableBackgroundGroup(false), + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{}}, + }, } - c, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg) + resp, err := cli.AddResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + group.Name = enableBackgroundGroup(true) + group.BackgroundSettings = &rmpb.BackgroundSettings{JobTypes: []string{"br", "lightning"}} + resp, err = cli.AddResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + + c, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil) c.Start(suite.ctx) - resourceGroupName := suite.initGroups[0].Name + resourceGroupName := enableBackgroundGroup(false) re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_default")) // test fallback for nil. re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_lightning")) re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_ddl")) re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "")) - resourceGroupName = "background_job" + resourceGroupName = enableBackgroundGroup(true) re.True(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_br")) re.True(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_lightning")) // test fallback for nil. re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_ddl")) // modify `Default` to check fallback. - resp, err := cli.ModifyResourceGroup(suite.ctx, &rmpb.ResourceGroup{ + resp, err = cli.ModifyResourceGroup(suite.ctx, &rmpb.ResourceGroup{ Name: "default", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{ - RU: &rmpb.TokenBucket{ - Settings: &rmpb.TokenLimitSettings{ - FillRate: 1, - BurstLimit: -1, - }, - Tokens: 1, - }, + RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{}}, }, BackgroundSettings: &rmpb.BackgroundSettings{JobTypes: []string{"lightning", "ddl"}}, }) @@ -1265,14 +1296,14 @@ func (suite *resourceManagerClientTestSuite) TestCheckBackgroundJobs() { return false }, testutil.WithTickInterval(50*time.Millisecond)) - resourceGroupName = suite.initGroups[0].Name + resourceGroupName = enableBackgroundGroup(false) re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_default")) // test fallback for `"lightning", "ddl"`. re.True(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_lightning")) re.True(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_ddl")) re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "")) - resourceGroupName = "background_job" + resourceGroupName = enableBackgroundGroup(true) re.True(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_br")) re.True(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_lightning")) // test fallback for `"lightning", "ddl"`. @@ -1284,11 +1315,18 @@ func (suite *resourceManagerClientTestSuite) TestCheckBackgroundJobs() { func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigChanged() { re := suite.Require() cli := suite.client - for _, group := range suite.initGroups { - resp, err := cli.AddResourceGroup(suite.ctx, group) - re.NoError(err) - re.Contains(resp, "Success!") + // Mock add resource group. + group := &rmpb.ResourceGroup{ + Name: "config_change_test", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{}}, + }, } + resp, err := cli.AddResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + c1, err := controller.NewResourceGroupController(suite.ctx, 1, cli, nil) re.NoError(err) c1.Start(suite.ctx) From 8e844d8483655ca3cd902220c372621c6c14cb43 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 27 Sep 2023 16:43:47 +0800 Subject: [PATCH 2/8] *: use syncutil lock (#7157) ref tikv/pd#4399 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/balancer/round_robin.go | 5 +++-- pkg/btree/btree_generic.go | 5 +++-- pkg/btree/btree_generic_test.go | 6 ++++-- pkg/election/leadership.go | 4 ++-- pkg/keyspace/tso_keyspace_group.go | 3 ++- pkg/mcs/resourcemanager/server/manager.go | 4 ++-- pkg/mcs/resourcemanager/server/resource_group.go | 4 ++-- pkg/memory/meminfo.go | 10 +++++----- pkg/ratelimit/limiter_test.go | 11 ++++++----- pkg/schedule/schedulers/scheduler_controller.go | 3 ++- pkg/statistics/region_collection.go | 6 +++--- pkg/tso/keyspace_group_manager.go | 3 ++- pkg/tso/keyspace_group_manager_test.go | 7 ++++--- pkg/utils/etcdutil/etcdutil.go | 3 ++- pkg/utils/etcdutil/etcdutil_test.go | 9 +++++---- pkg/utils/tempurl/tempurl.go | 4 ++-- server/apiv2/handlers/tso_keyspace_group.go | 4 ++-- server/grpc_service.go | 3 ++- server/server.go | 5 +++-- tests/cluster.go | 9 +++++---- tests/integrations/client/global_config_test.go | 4 ++-- tests/server/id/id_test.go | 3 ++- tests/server/tso/consistency_test.go | 3 ++- 23 files changed, 67 insertions(+), 51 deletions(-) diff --git a/pkg/balancer/round_robin.go b/pkg/balancer/round_robin.go index 5013a447d3e..51abb575481 100644 --- a/pkg/balancer/round_robin.go +++ b/pkg/balancer/round_robin.go @@ -15,13 +15,14 @@ package balancer import ( - "sync" "sync/atomic" + + "github.com/tikv/pd/pkg/utils/syncutil" ) // RoundRobin is a balancer that selects nodes in a round-robin fashion. type RoundRobin[T uint32 | string] struct { - sync.RWMutex + syncutil.RWMutex nodes []T exists map[T]struct{} next uint32 diff --git a/pkg/btree/btree_generic.go b/pkg/btree/btree_generic.go index 634a24b5bd8..630cb25abcd 100644 --- a/pkg/btree/btree_generic.go +++ b/pkg/btree/btree_generic.go @@ -78,7 +78,8 @@ package btree import ( "sort" - "sync" + + "github.com/tikv/pd/pkg/utils/syncutil" ) // Item represents a single object in the tree. @@ -101,7 +102,7 @@ const ( // FreeList, in particular when they're created with Clone. // Two Btrees using the same freelist are safe for concurrent write access. type FreeListG[T Item[T]] struct { - mu sync.Mutex + mu syncutil.Mutex freelist []*node[T] } diff --git a/pkg/btree/btree_generic_test.go b/pkg/btree/btree_generic_test.go index 228269056a9..751fb2744e9 100644 --- a/pkg/btree/btree_generic_test.go +++ b/pkg/btree/btree_generic_test.go @@ -36,6 +36,8 @@ import ( "sort" "sync" "testing" + + "github.com/tikv/pd/pkg/utils/syncutil" ) // perm returns a random permutation of n Int items in the range [0, n). @@ -752,7 +754,7 @@ func BenchmarkDescendLessOrEqual(b *testing.B) { const cloneTestSize = 10000 -func cloneTestG[T Item[T]](t *testing.T, b *BTreeG[T], start int, p []T, wg *sync.WaitGroup, trees *[]*BTreeG[T], lock *sync.Mutex) { +func cloneTestG[T Item[T]](t *testing.T, b *BTreeG[T], start int, p []T, wg *sync.WaitGroup, trees *[]*BTreeG[T], lock *syncutil.Mutex) { t.Logf("Starting new clone at %v", start) lock.Lock() *trees = append(*trees, b) @@ -773,7 +775,7 @@ func TestCloneConcurrentOperationsG(t *testing.T) { p := perm(cloneTestSize) var wg sync.WaitGroup wg.Add(1) - go cloneTestG(t, b, 0, p, &wg, &trees, &sync.Mutex{}) + go cloneTestG(t, b, 0, p, &wg, &trees, &syncutil.Mutex{}) wg.Wait() want := rang(cloneTestSize) t.Logf("Starting equality checks on %d trees", len(trees)) diff --git a/pkg/election/leadership.go b/pkg/election/leadership.go index 692344eb52a..d5d73e90b58 100644 --- a/pkg/election/leadership.go +++ b/pkg/election/leadership.go @@ -16,7 +16,6 @@ package election import ( "context" - "sync" "sync/atomic" "time" @@ -27,6 +26,7 @@ import ( "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/grpcutil" + "github.com/tikv/pd/pkg/utils/syncutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" @@ -61,7 +61,7 @@ type Leadership struct { keepAliveCtx context.Context keepAliveCancelFunc context.CancelFunc - keepAliveCancelFuncLock sync.Mutex + keepAliveCancelFuncLock syncutil.Mutex } // NewLeadership creates a new Leadership. diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index a092e5b18a6..c8694c4a7c6 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -35,6 +35,7 @@ import ( "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/syncutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" @@ -60,7 +61,7 @@ type GroupManager struct { client *clientv3.Client clusterID uint64 - sync.RWMutex + syncutil.RWMutex // groups is the cache of keyspace group related information. // user kind -> keyspace group groups map[endpoint.UserKind]*indexedHeap diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index 1731faf8af1..03db817cf12 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -20,7 +20,6 @@ import ( "math" "sort" "strings" - "sync" "time" "github.com/gogo/protobuf/proto" @@ -34,6 +33,7 @@ import ( "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/jsonutil" "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/syncutil" "go.uber.org/zap" ) @@ -49,7 +49,7 @@ const ( // Manager is the manager of resource group. type Manager struct { - sync.RWMutex + syncutil.RWMutex srv bs.Server controllerConfig *ControllerConfig groups map[string]*ResourceGroup diff --git a/pkg/mcs/resourcemanager/server/resource_group.go b/pkg/mcs/resourcemanager/server/resource_group.go index 74bc463002b..863cfd19026 100644 --- a/pkg/mcs/resourcemanager/server/resource_group.go +++ b/pkg/mcs/resourcemanager/server/resource_group.go @@ -17,19 +17,19 @@ package server import ( "encoding/json" - "sync" "time" "github.com/pingcap/errors" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/log" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/syncutil" "go.uber.org/zap" ) // ResourceGroup is the definition of a resource group, for REST API. type ResourceGroup struct { - sync.RWMutex + syncutil.RWMutex Name string `json:"name"` Mode rmpb.GroupMode `json:"mode"` // RU settings diff --git a/pkg/memory/meminfo.go b/pkg/memory/meminfo.go index 0981ddacdfb..5a81cf57d3d 100644 --- a/pkg/memory/meminfo.go +++ b/pkg/memory/meminfo.go @@ -15,7 +15,6 @@ package memory import ( - "sync" "time" "github.com/pingcap/failpoint" @@ -23,6 +22,7 @@ import ( "github.com/pingcap/sysutil" "github.com/shirou/gopsutil/v3/mem" "github.com/tikv/pd/pkg/cgroup" + "github.com/tikv/pd/pkg/utils/syncutil" "go.uber.org/zap" "golang.org/x/exp/constraints" ) @@ -76,7 +76,7 @@ func MemUsedNormal() (uint64, error) { type memInfoCache struct { updateTime time.Time - mu *sync.RWMutex + mu *syncutil.RWMutex mem uint64 } @@ -168,13 +168,13 @@ func init() { MemUsed = MemUsedNormal } memLimit = &memInfoCache{ - mu: &sync.RWMutex{}, + mu: &syncutil.RWMutex{}, } memUsage = &memInfoCache{ - mu: &sync.RWMutex{}, + mu: &syncutil.RWMutex{}, } serverMemUsage = &memInfoCache{ - mu: &sync.RWMutex{}, + mu: &syncutil.RWMutex{}, } _, err := MemTotal() mustNil(err) diff --git a/pkg/ratelimit/limiter_test.go b/pkg/ratelimit/limiter_test.go index 6443028eec6..d5d9829816a 100644 --- a/pkg/ratelimit/limiter_test.go +++ b/pkg/ratelimit/limiter_test.go @@ -20,6 +20,7 @@ import ( "time" "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/utils/syncutil" "golang.org/x/time/rate" ) @@ -33,7 +34,7 @@ func TestUpdateConcurrencyLimiter(t *testing.T) { label := "test" status := limiter.Update(label, opts...) re.True(status&ConcurrencyChanged != 0) - var lock sync.Mutex + var lock syncutil.Mutex successCount, failedCount := 0, 0 var wg sync.WaitGroup for i := 0; i < 15; i++ { @@ -118,7 +119,7 @@ func TestUpdateQPSLimiter(t *testing.T) { status := limiter.Update(label, opts...) re.True(status&QPSChanged != 0) - var lock sync.Mutex + var lock syncutil.Mutex successCount, failedCount := 0, 0 var wg sync.WaitGroup wg.Add(3) @@ -173,7 +174,7 @@ func TestQPSLimiter(t *testing.T) { opt(label, limiter) } - var lock sync.Mutex + var lock syncutil.Mutex successCount, failedCount := 0, 0 var wg sync.WaitGroup wg.Add(200) @@ -208,7 +209,7 @@ func TestTwoLimiters(t *testing.T) { opt(label, limiter) } - var lock sync.Mutex + var lock syncutil.Mutex successCount, failedCount := 0, 0 var wg sync.WaitGroup wg.Add(200) @@ -245,7 +246,7 @@ func TestTwoLimiters(t *testing.T) { } func countRateLimiterHandleResult(limiter *Limiter, label string, successCount *int, - failedCount *int, lock *sync.Mutex, wg *sync.WaitGroup) { + failedCount *int, lock *syncutil.Mutex, wg *sync.WaitGroup) { result := limiter.Allow(label) lock.Lock() defer lock.Unlock() diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go index 4d72699b0fe..4f22f50f81d 100644 --- a/pkg/schedule/schedulers/scheduler_controller.go +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -30,6 +30,7 @@ import ( "github.com/tikv/pd/pkg/schedule/plan" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/syncutil" "go.uber.org/zap" ) @@ -39,7 +40,7 @@ var denySchedulersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues // Controller is used to manage all schedulers. type Controller struct { - sync.RWMutex + syncutil.RWMutex wg sync.WaitGroup ctx context.Context cluster sche.SchedulerCluster diff --git a/pkg/statistics/region_collection.go b/pkg/statistics/region_collection.go index 6d3ac3f1760..c79eb0a3132 100644 --- a/pkg/statistics/region_collection.go +++ b/pkg/statistics/region_collection.go @@ -15,13 +15,13 @@ package statistics import ( - "sync" "time" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/utils/syncutil" ) // RegionInfoProvider is an interface to provide the region information. @@ -85,7 +85,7 @@ type RegionInfoWithTS struct { // RegionStatistics is used to record the status of regions. type RegionStatistics struct { - sync.RWMutex + syncutil.RWMutex rip RegionInfoProvider conf sc.CheckerConfigProvider stats map[RegionStatisticType]map[uint64]*RegionInfoWithTS @@ -284,7 +284,7 @@ func (r *RegionStatistics) Reset() { // LabelStatistics is the statistics of the level of labels. type LabelStatistics struct { - sync.RWMutex + syncutil.RWMutex regionLabelStats map[uint64]string labelCounter map[string]int } diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 3b352884eab..badcb18d5d8 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -43,6 +43,7 @@ import ( "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/memberutil" + "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/pkg/utils/typeutil" "go.etcd.io/etcd/clientv3" @@ -62,7 +63,7 @@ const ( ) type state struct { - sync.RWMutex + syncutil.RWMutex // ams stores the allocator managers of the keyspace groups. Each keyspace group is // assigned with an allocator manager managing its global/local tso allocators. // Use a fixed size array to maximize the efficiency of concurrent access to diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index 91c68a5a268..c20abfc5f79 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -36,6 +36,7 @@ import ( mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/tsoutil" @@ -751,7 +752,7 @@ func (suite *keyspaceGroupManagerTestSuite) runTestLoadKeyspaceGroupsAssignment( defer mgr.Close() step := 30 - mux := sync.Mutex{} + mux := syncutil.Mutex{} wg := sync.WaitGroup{} for i := 0; i < numberOfKeyspaceGroupsToAdd; i += step { wg.Add(1) @@ -872,12 +873,12 @@ func addKeyspaceGroupAssignment( groupID uint32, rootPath string, svcAddrs []string, - priorites []int, + priorities []int, keyspaces []uint32, ) error { members := make([]endpoint.KeyspaceGroupMember, len(svcAddrs)) for i, svcAddr := range svcAddrs { - members[i] = endpoint.KeyspaceGroupMember{Address: svcAddr, Priority: priorites[i]} + members[i] = endpoint.KeyspaceGroupMember{Address: svcAddr, Priority: priorities[i]} } group := &endpoint.KeyspaceGroup{ ID: groupID, diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 336e0593f8a..1432b6e37c3 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -31,6 +31,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/utils/grpcutil" "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/utils/typeutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/etcdserver" @@ -566,7 +567,7 @@ type LoopWatcher struct { postEventFn func() error // forceLoadMu is used to ensure two force loads have minimal interval. - forceLoadMu sync.RWMutex + forceLoadMu syncutil.RWMutex // lastTimeForceLoad is used to record the last time force loading data from etcd. lastTimeForceLoad time.Time diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index 80194a6287e..f7fadd3bbf6 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/failpoint" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/testutil" "go.etcd.io/etcd/clientv3" @@ -398,7 +399,7 @@ func (suite *loopWatcherTestSuite) TearDownSuite() { func (suite *loopWatcherTestSuite) TestLoadWithoutKey() { cache := struct { - sync.RWMutex + syncutil.RWMutex data map[string]struct{} }{ data: make(map[string]struct{}), @@ -428,7 +429,7 @@ func (suite *loopWatcherTestSuite) TestLoadWithoutKey() { func (suite *loopWatcherTestSuite) TestCallBack() { cache := struct { - sync.RWMutex + syncutil.RWMutex data map[string]struct{} }{ data: make(map[string]struct{}), @@ -494,7 +495,7 @@ func (suite *loopWatcherTestSuite) TestWatcherLoadLimit() { suite.put(fmt.Sprintf("TestWatcherLoadLimit%d", i), "") } cache := struct { - sync.RWMutex + syncutil.RWMutex data []string }{ data: make([]string, 0), @@ -532,7 +533,7 @@ func (suite *loopWatcherTestSuite) TestWatcherLoadLimit() { func (suite *loopWatcherTestSuite) TestWatcherBreak() { cache := struct { - sync.RWMutex + syncutil.RWMutex data string }{} checkCache := func(expect string) { diff --git a/pkg/utils/tempurl/tempurl.go b/pkg/utils/tempurl/tempurl.go index 4b3a680bd14..421513ff001 100644 --- a/pkg/utils/tempurl/tempurl.go +++ b/pkg/utils/tempurl/tempurl.go @@ -17,15 +17,15 @@ package tempurl import ( "fmt" "net" - "sync" "time" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/utils/syncutil" ) var ( - testAddrMutex sync.Mutex + testAddrMutex syncutil.Mutex testAddrMap = make(map[string]struct{}) ) diff --git a/server/apiv2/handlers/tso_keyspace_group.go b/server/apiv2/handlers/tso_keyspace_group.go index 1e64e125038..a580b21f705 100644 --- a/server/apiv2/handlers/tso_keyspace_group.go +++ b/server/apiv2/handlers/tso_keyspace_group.go @@ -18,7 +18,6 @@ import ( "net/http" "strconv" "strings" - "sync" "github.com/gin-gonic/gin" "github.com/pingcap/errors" @@ -26,6 +25,7 @@ import ( "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/server" "github.com/tikv/pd/server/apiv2/middlewares" ) @@ -210,7 +210,7 @@ type SplitKeyspaceGroupByIDParams struct { } var patrolKeyspaceAssignmentState struct { - sync.RWMutex + syncutil.RWMutex patrolled bool } diff --git a/server/grpc_service.go b/server/grpc_service.go index dd53416d30d..febb666c22d 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -42,6 +42,7 @@ import ( "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/grpcutil" "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/pkg/versioninfo" "github.com/tikv/pd/server/cluster" @@ -199,7 +200,7 @@ func (s *GrpcServer) GetMinTSFromTSOService(dcLocation string) (*pdpb.Timestamp, } // Get the minimal timestamp from the TSO servers/pods - var mutex sync.Mutex + var mutex syncutil.Mutex resps := make([]*tsopb.GetMinTSResponse, len(addrs)) wg := sync.WaitGroup{} wg.Add(len(addrs)) diff --git a/server/server.go b/server/server.go index ca131debb29..d3fe5446a03 100644 --- a/server/server.go +++ b/server/server.go @@ -73,6 +73,7 @@ import ( "github.com/tikv/pd/pkg/utils/grpcutil" "github.com/tikv/pd/pkg/utils/jsonutil" "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/pkg/versioninfo" @@ -201,7 +202,7 @@ type Server struct { clientConns sync.Map tsoClientPool struct { - sync.RWMutex + syncutil.RWMutex clients map[string]tsopb.TSO_TsoClient } @@ -259,7 +260,7 @@ func CreateServer(ctx context.Context, cfg *config.Config, services []string, le DiagnosticsServer: sysutil.NewDiagnosticsServer(cfg.Log.File.Filename), mode: mode, tsoClientPool: struct { - sync.RWMutex + syncutil.RWMutex clients map[string]tsopb.TSO_TsoClient }{ clients: make(map[string]tsopb.TSO_TsoClient), diff --git a/tests/cluster.go b/tests/cluster.go index c49f3cd982d..5b1cb7f06fc 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -39,6 +39,7 @@ import ( "github.com/tikv/pd/pkg/swaggerserver" "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server" "github.com/tikv/pd/server/api" @@ -68,7 +69,7 @@ var ( // TestServer is only for test. type TestServer struct { - sync.RWMutex + syncutil.RWMutex server *server.Server grpcServer *server.GrpcServer state int32 @@ -445,7 +446,7 @@ type TestCluster struct { servers map[string]*TestServer // tsPool is used to check the TSO uniqueness among the test cluster tsPool struct { - sync.Mutex + syncutil.Mutex pool map[uint64]struct{} } schedulingCluster *TestSchedulingCluster @@ -491,7 +492,7 @@ func createTestCluster(ctx context.Context, initialServerCount int, isAPIService config: config, servers: servers, tsPool: struct { - sync.Mutex + syncutil.Mutex pool map[uint64]struct{} }{ pool: make(map[uint64]struct{}), @@ -512,7 +513,7 @@ func restartTestCluster( config: cluster.config, servers: make(map[string]*TestServer, len(cluster.servers)), tsPool: struct { - sync.Mutex + syncutil.Mutex pool map[uint64]struct{} }{ pool: make(map[uint64]struct{}), diff --git a/tests/integrations/client/global_config_test.go b/tests/integrations/client/global_config_test.go index 83a3384b92f..bfe432807ce 100644 --- a/tests/integrations/client/global_config_test.go +++ b/tests/integrations/client/global_config_test.go @@ -17,7 +17,6 @@ package client_test import ( "path" "strconv" - "sync" "testing" "time" @@ -28,6 +27,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/utils/assertutil" + "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server" "go.uber.org/zap" @@ -54,7 +54,7 @@ type globalConfigTestSuite struct { server *server.GrpcServer client pd.Client cleanup testutil.CleanupFunc - mu sync.Mutex + mu syncutil.Mutex } func TestGlobalConfigTestSuite(t *testing.T) { diff --git a/tests/server/id/id_test.go b/tests/server/id/id_test.go index 737aa4deac2..4742424d9b5 100644 --- a/tests/server/id/id_test.go +++ b/tests/server/id/id_test.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/tests" "go.uber.org/goleak" @@ -55,7 +56,7 @@ func TestID(t *testing.T) { var wg sync.WaitGroup - var m sync.Mutex + var m syncutil.Mutex ids := make(map[uint64]struct{}) for i := 0; i < 10; i++ { diff --git a/tests/server/tso/consistency_test.go b/tests/server/tso/consistency_test.go index 9cfadbf5ba3..c50b791b47f 100644 --- a/tests/server/tso/consistency_test.go +++ b/tests/server/tso/consistency_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/grpcutil" + "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/server/config" @@ -42,7 +43,7 @@ type tsoConsistencyTestSuite struct { leaderServer *tests.TestServer dcClientMap map[string]pdpb.PDClient - tsPoolMutex sync.Mutex + tsPoolMutex syncutil.Mutex tsPool map[uint64]struct{} } From 67529748f4ac89a7eec9d7ac1e5764d2afac0082 Mon Sep 17 00:00:00 2001 From: Hu# Date: Wed, 27 Sep 2023 17:04:47 +0800 Subject: [PATCH 3/8] scheduler: fix scheduler save config (#7108) close tikv/pd#6897 Signed-off-by: husharp Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/schedule/schedulers/evict_leader.go | 2 +- pkg/schedule/schedulers/scheduler.go | 14 +-- .../schedulers/scheduler_controller.go | 8 ++ server/cluster/cluster_test.go | 3 +- tests/server/cluster/cluster_test.go | 115 ++++++++++++++++++ 5 files changed, 133 insertions(+), 9 deletions(-) diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index 3c3f0603408..a5c67856df8 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -181,7 +181,7 @@ func newEvictLeaderScheduler(opController *operator.Controller, conf *evictLeade } } -// EvictStores returns the IDs of the evict-stores. +// EvictStoreIDs returns the IDs of the evict-stores. func (s *evictLeaderScheduler) EvictStoreIDs() []uint64 { return s.conf.getStores() } diff --git a/pkg/schedule/schedulers/scheduler.go b/pkg/schedule/schedulers/scheduler.go index 1c624dcd916..ba02c280d40 100644 --- a/pkg/schedule/schedulers/scheduler.go +++ b/pkg/schedule/schedulers/scheduler.go @@ -124,16 +124,16 @@ func CreateScheduler(typ string, oc *operator.Controller, storage endpoint.Confi return nil, errs.ErrSchedulerCreateFuncNotRegistered.FastGenByArgs(typ) } - s, err := fn(oc, storage, dec, removeSchedulerCb...) - if err != nil { - return nil, err - } + return fn(oc, storage, dec, removeSchedulerCb...) +} + +// SaveSchedulerConfig saves the config of the specified scheduler. +func SaveSchedulerConfig(storage endpoint.ConfigStorage, s Scheduler) error { data, err := s.EncodeConfig() if err != nil { - return nil, err + return err } - err = storage.SaveSchedulerConfig(s.GetName(), data) - return s, err + return storage.SaveSchedulerConfig(s.GetName(), data) } // FindSchedulerTypeByName finds the type of the specified name. diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go index 4f22f50f81d..46b4947b6cd 100644 --- a/pkg/schedule/schedulers/scheduler_controller.go +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -138,6 +138,10 @@ func (c *Controller) AddSchedulerHandler(scheduler Scheduler, args ...string) er } c.schedulerHandlers[name] = scheduler + if err := SaveSchedulerConfig(c.storage, scheduler); err != nil { + log.Error("can not save HTTP scheduler config", zap.String("scheduler-name", scheduler.GetName()), errs.ZapError(err)) + return err + } c.cluster.GetSchedulerConfig().AddSchedulerCfg(scheduler.GetType(), args) return nil } @@ -188,6 +192,10 @@ func (c *Controller) AddScheduler(scheduler Scheduler, args ...string) error { c.wg.Add(1) go c.runScheduler(s) c.schedulers[s.Scheduler.GetName()] = s + if err := SaveSchedulerConfig(c.storage, scheduler); err != nil { + log.Error("can not save scheduler config", zap.String("scheduler-name", scheduler.GetName()), errs.ZapError(err)) + return err + } c.cluster.GetSchedulerConfig().AddSchedulerCfg(s.Scheduler.GetType(), args) return nil } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index aa826e34406..33236c5d40c 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -3111,8 +3111,9 @@ func TestPersistScheduler(t *testing.T) { // whether the schedulers added or removed in dynamic way are recorded in opt _, newOpt, err := newTestScheduleConfig() re.NoError(err) - _, err = schedulers.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedulers.ConfigJSONDecoder([]byte("null"))) + shuffle, err := schedulers.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedulers.ConfigJSONDecoder([]byte("null"))) re.NoError(err) + re.NoError(controller.AddScheduler(shuffle)) // suppose we add a new default enable scheduler sc.DefaultSchedulers = append(sc.DefaultSchedulers, sc.SchedulerConfig{Type: "shuffle-region"}) defer func() { diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index e1b04c4ebc1..701eb9b5d69 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -37,6 +37,7 @@ import ( "github.com/tikv/pd/pkg/mock/mockid" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/syncer" "github.com/tikv/pd/pkg/tso" @@ -47,6 +48,7 @@ import ( "github.com/tikv/pd/server/cluster" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" + "github.com/tikv/pd/tests/server/api" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -1275,6 +1277,119 @@ func TestStaleTermHeartbeat(t *testing.T) { re.NoError(err) } +func TestTransferLeaderForScheduler(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker", `return(true)`)) + tc, err := tests.NewTestCluster(ctx, 2) + defer tc.Destroy() + re.NoError(err) + err = tc.RunInitialServers() + re.NoError(err) + tc.WaitLeader() + // start + leaderServer := tc.GetServer(tc.GetLeader()) + re.NoError(leaderServer.BootstrapCluster()) + rc := leaderServer.GetServer().GetRaftCluster() + re.NotNil(rc) + + storesNum := 2 + grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) + for i := 1; i <= storesNum; i++ { + store := &metapb.Store{ + Id: uint64(i), + Address: "127.0.0.1:" + strconv.Itoa(i), + } + resp, err := putStore(grpcPDClient, leaderServer.GetClusterID(), store) + re.NoError(err) + re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) + } + // region heartbeat + id := leaderServer.GetAllocator() + putRegionWithLeader(re, rc, id, 1) + + time.Sleep(time.Second) + re.True(leaderServer.GetRaftCluster().IsPrepared()) + // Add evict leader scheduler + api.MustAddScheduler(re, leaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 1, + }) + api.MustAddScheduler(re, leaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 2, + }) + // Check scheduler updated. + schedulersController := rc.GetCoordinator().GetSchedulersController() + re.Len(schedulersController.GetSchedulerNames(), 6) + checkEvictLeaderSchedulerExist(re, schedulersController, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2}) + + // transfer PD leader to another PD + tc.ResignLeader() + rc.Stop() + tc.WaitLeader() + leaderServer = tc.GetServer(tc.GetLeader()) + rc1 := leaderServer.GetServer().GetRaftCluster() + rc1.Start(leaderServer.GetServer()) + re.NoError(err) + re.NotNil(rc1) + // region heartbeat + id = leaderServer.GetAllocator() + putRegionWithLeader(re, rc1, id, 1) + time.Sleep(time.Second) + re.True(leaderServer.GetRaftCluster().IsPrepared()) + // Check scheduler updated. + schedulersController = rc1.GetCoordinator().GetSchedulersController() + re.Len(schedulersController.GetSchedulerNames(), 6) + checkEvictLeaderSchedulerExist(re, schedulersController, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2}) + + // transfer PD leader back to the previous PD + tc.ResignLeader() + rc1.Stop() + tc.WaitLeader() + leaderServer = tc.GetServer(tc.GetLeader()) + rc = leaderServer.GetServer().GetRaftCluster() + rc.Start(leaderServer.GetServer()) + re.NotNil(rc) + // region heartbeat + id = leaderServer.GetAllocator() + putRegionWithLeader(re, rc, id, 1) + time.Sleep(time.Second) + re.True(leaderServer.GetRaftCluster().IsPrepared()) + // Check scheduler updated + schedulersController = rc.GetCoordinator().GetSchedulersController() + re.Len(schedulersController.GetSchedulerNames(), 6) + checkEvictLeaderSchedulerExist(re, schedulersController, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2}) + + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker")) +} + +func checkEvictLeaderSchedulerExist(re *require.Assertions, sc *schedulers.Controller, exist bool) { + testutil.Eventually(re, func() bool { + if !exist { + return sc.GetScheduler(schedulers.EvictLeaderName) == nil + } + return sc.GetScheduler(schedulers.EvictLeaderName) != nil + }) +} + +func checkEvictLeaderStoreIDs(re *require.Assertions, sc *schedulers.Controller, expected []uint64) { + handler, ok := sc.GetSchedulerHandlers()[schedulers.EvictLeaderName] + re.True(ok) + h, ok := handler.(interface { + EvictStoreIDs() []uint64 + }) + re.True(ok) + var evictStoreIDs []uint64 + testutil.Eventually(re, func() bool { + evictStoreIDs = h.EvictStoreIDs() + return len(evictStoreIDs) == len(expected) + }) + re.ElementsMatch(evictStoreIDs, expected) +} + func putRegionWithLeader(re *require.Assertions, rc *cluster.RaftCluster, id id.Allocator, storeID uint64) { for i := 0; i < 3; i++ { regionID, err := id.Alloc() From 7a9e56679ddd336d0402bca69338b07eebcf188e Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 27 Sep 2023 19:58:46 +0800 Subject: [PATCH 4/8] mcs: make scheduling server support operator http interface (#7090) ref tikv/pd#5839 Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/mcs/scheduling/server/apis/v1/api.go | 138 +++++-- pkg/mcs/scheduling/server/server.go | 6 + pkg/schedule/handler/handler.go | 222 ++++++++++++ pkg/schedule/operator/kind.go | 3 + pkg/utils/apiutil/apiutil.go | 14 + server/api/operator.go | 263 +------------- server/api/server_test.go | 49 +++ server/api/trend.go | 12 +- tests/cluster.go | 3 + tests/integrations/mcs/scheduling/api_test.go | 17 +- tests/pdctl/operator/operator_test.go | 86 ++--- {server => tests/server}/api/operator_test.go | 339 +++++++++--------- tests/testutil.go | 87 ++++- 13 files changed, 730 insertions(+), 509 deletions(-) rename {server => tests/server}/api/operator_test.go (54%) diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index 3d1c3921470..e66bf00ef94 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -27,6 +27,9 @@ import ( "github.com/joho/godotenv" scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server" "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/schedule" + sche "github.com/tikv/pd/pkg/schedule/core" + "github.com/tikv/pd/pkg/schedule/handler" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" @@ -35,6 +38,7 @@ import ( // APIPathPrefix is the prefix of the API path. const APIPathPrefix = "/scheduling/api/v1" +const handlerKey = "handler" var ( once sync.Once @@ -62,6 +66,18 @@ type Service struct { rd *render.Render } +type server struct { + server *scheserver.Server +} + +func (s *server) GetCoordinator() *schedule.Coordinator { + return s.server.GetCoordinator() +} + +func (s *server) GetCluster() sche.SharedCluster { + return s.server.GetCluster() +} + func createIndentRender() *render.Render { return render.New(render.Options{ IndentJSON: true, @@ -81,6 +97,7 @@ func NewService(srv *scheserver.Service) *Service { apiHandlerEngine.Use(gzip.Gzip(gzip.DefaultCompression)) apiHandlerEngine.Use(func(c *gin.Context) { c.Set(multiservicesapi.ServiceContextKey, srv.Server) + c.Set(handlerKey, handler.NewHandler(&server{server: srv.Server})) c.Next() }) apiHandlerEngine.Use(multiservicesapi.ServiceRedirector()) @@ -115,7 +132,10 @@ func (s *Service) RegisterCheckersRouter() { func (s *Service) RegisterOperatorsRouter() { router := s.root.Group("operators") router.GET("", getOperators) - router.GET("/:id", getOperatorByID) + router.POST("", createOperator) + router.GET("/:id", getOperatorByRegion) + router.DELETE("/:id", deleteOperatorByRegion) + router.GET("/records", getOperatorRecords) } // @Tags operators @@ -126,8 +146,8 @@ func (s *Service) RegisterOperatorsRouter() { // @Failure 400 {string} string "The input is invalid." // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /operators/{id} [GET] -func getOperatorByID(c *gin.Context) { - svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) +func getOperatorByRegion(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) id := c.Param("id") regionID, err := strconv.ParseUint(id, 10, 64) @@ -136,13 +156,13 @@ func getOperatorByID(c *gin.Context) { return } - opController := svr.GetCoordinator().GetOperatorController() - if opController == nil { + op, err := handler.GetOperatorStatus(regionID) + if err != nil { c.String(http.StatusInternalServerError, err.Error()) return } - c.IndentedJSON(http.StatusOK, opController.GetOperatorStatus(regionID)) + c.IndentedJSON(http.StatusOK, op) } // @Tags operators @@ -153,40 +173,104 @@ func getOperatorByID(c *gin.Context) { // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /operators [GET] func getOperators(c *gin.Context) { - svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) + handler := c.MustGet(handlerKey).(*handler.Handler) var ( results []*operator.Operator - ops []*operator.Operator err error ) - opController := svr.GetCoordinator().GetOperatorController() - if opController == nil { - c.String(http.StatusInternalServerError, err.Error()) - return - } kinds := c.QueryArray("kind") if len(kinds) == 0 { - results = opController.GetOperators() + results, err = handler.GetOperators() } else { - for _, kind := range kinds { - switch kind { - case "admin": - ops = opController.GetOperatorsOfKind(operator.OpAdmin) - case "leader": - ops = opController.GetOperatorsOfKind(operator.OpLeader) - case "region": - ops = opController.GetOperatorsOfKind(operator.OpRegion) - case "waiting": - ops = opController.GetWaitingOperators() - } - results = append(results, ops...) - } + results, err = handler.GetOperatorsByKinds(kinds) } + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } c.IndentedJSON(http.StatusOK, results) } +// @Tags operator +// @Summary Cancel a Region's pending operator. +// @Param region_id path int true "A Region's Id" +// @Produce json +// @Success 200 {string} string "The pending operator is canceled." +// @Failure 400 {string} string "The input is invalid." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /operators/{region_id} [delete] +func deleteOperatorByRegion(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + id := c.Param("id") + + regionID, err := strconv.ParseUint(id, 10, 64) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + if err = handler.RemoveOperator(regionID); err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + + c.String(http.StatusOK, "The pending operator is canceled.") +} + +// @Tags operator +// @Summary lists the finished operators since the given timestamp in second. +// @Param from query integer false "From Unix timestamp" +// @Produce json +// @Success 200 {object} []operator.OpRecord +// @Failure 400 {string} string "The request is invalid." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /operators/records [get] +func getOperatorRecords(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + from, err := apiutil.ParseTime(c.Query("from")) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + records, err := handler.GetRecords(from) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + c.IndentedJSON(http.StatusOK, records) +} + +// FIXME: details of input json body params +// @Tags operator +// @Summary Create an operator. +// @Accept json +// @Param body body object true "json params" +// @Produce json +// @Success 200 {string} string "The operator is created." +// @Failure 400 {string} string "The input is invalid." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /operators [post] +func createOperator(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + var input map[string]interface{} + if err := c.BindJSON(&input); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + statusCode, result, err := handler.HandleOperatorCreation(input) + if err != nil { + c.String(statusCode, err.Error()) + return + } + if statusCode == http.StatusOK && result == nil { + c.String(http.StatusOK, "The operator is created.") + return + } + c.IndentedJSON(statusCode, result) +} + // @Tags checkers // @Summary Get checker by name // @Param name path string true "The name of the checker." diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index fd7621bf2cb..c1aecc2f18b 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -469,6 +469,12 @@ func (s *Server) startWatcher() (err error) { return err } +// GetPersistConfig returns the persist config. +// It's used to test. +func (s *Server) GetPersistConfig() *config.PersistConfig { + return s.persistConfig +} + // CreateServer creates the Server func CreateServer(ctx context.Context, cfg *config.Config) *Server { svr := &Server{ diff --git a/pkg/schedule/handler/handler.go b/pkg/schedule/handler/handler.go index d48941726d0..d9c162ac1cc 100644 --- a/pkg/schedule/handler/handler.go +++ b/pkg/schedule/handler/handler.go @@ -17,6 +17,7 @@ package handler import ( "bytes" "encoding/hex" + "net/http" "strings" "time" @@ -32,6 +33,7 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/schedule/scatter" + "github.com/tikv/pd/pkg/utils/typeutil" ) // Server is the interface for handler about schedule. @@ -126,6 +128,32 @@ func (h *Handler) GetOperators() ([]*operator.Operator, error) { return c.GetOperators(), nil } +// GetOperatorsByKinds returns the running operators by kinds. +func (h *Handler) GetOperatorsByKinds(kinds []string) ([]*operator.Operator, error) { + var ( + results []*operator.Operator + ops []*operator.Operator + err error + ) + for _, kind := range kinds { + switch kind { + case operator.OpAdmin.String(): + ops, err = h.GetAdminOperators() + case operator.OpLeader.String(): + ops, err = h.GetLeaderOperators() + case operator.OpRegion.String(): + ops, err = h.GetRegionOperators() + case operator.OpWaiting: + ops, err = h.GetWaitingOperators() + } + if err != nil { + return nil, err + } + results = append(results, ops...) + } + return results, nil +} + // GetWaitingOperators returns the waiting operators. func (h *Handler) GetWaitingOperators() ([]*operator.Operator, error) { c, err := h.GetOperatorController() @@ -184,6 +212,170 @@ func (h *Handler) GetRecords(from time.Time) ([]*operator.OpRecord, error) { return records, nil } +// HandleOperatorCreation processes the request and creates an operator based on the provided input. +// It supports various types of operators such as transfer-leader, transfer-region, add-peer, remove-peer, merge-region, split-region, scatter-region, and scatter-regions. +// The function validates the input, performs the corresponding operation, and returns the HTTP status code, response body, and any error encountered during the process. +func (h *Handler) HandleOperatorCreation(input map[string]interface{}) (int, interface{}, error) { + name, ok := input["name"].(string) + if !ok { + return http.StatusBadRequest, nil, errors.Errorf("missing operator name") + } + switch name { + case "transfer-leader": + regionID, ok := input["region_id"].(float64) + if !ok { + return http.StatusBadRequest, nil, errors.Errorf("missing region id") + } + storeID, ok := input["to_store_id"].(float64) + if !ok { + return http.StatusBadRequest, nil, errors.Errorf("missing store id to transfer leader to") + } + if err := h.AddTransferLeaderOperator(uint64(regionID), uint64(storeID)); err != nil { + return http.StatusInternalServerError, nil, err + } + case "transfer-region": + regionID, ok := input["region_id"].(float64) + if !ok { + return http.StatusBadRequest, nil, errors.Errorf("missing region id") + } + storeIDs, ok := parseStoreIDsAndPeerRole(input["to_store_ids"], input["peer_roles"]) + if !ok { + return http.StatusBadRequest, nil, errors.Errorf("invalid store ids to transfer region to") + } + if len(storeIDs) == 0 { + return http.StatusBadRequest, nil, errors.Errorf("missing store ids to transfer region to") + } + if err := h.AddTransferRegionOperator(uint64(regionID), storeIDs); err != nil { + return http.StatusInternalServerError, nil, err + } + case "transfer-peer": + regionID, ok := input["region_id"].(float64) + if !ok { + return http.StatusBadRequest, nil, errors.Errorf("missing region id") + } + fromID, ok := input["from_store_id"].(float64) + if !ok { + return http.StatusBadRequest, nil, errors.Errorf("invalid store id to transfer peer from") + } + toID, ok := input["to_store_id"].(float64) + if !ok { + return http.StatusBadRequest, nil, errors.Errorf("invalid store id to transfer peer to") + } + if err := h.AddTransferPeerOperator(uint64(regionID), uint64(fromID), uint64(toID)); err != nil { + return http.StatusInternalServerError, nil, err + } + case "add-peer": + regionID, ok := input["region_id"].(float64) + if !ok { + return http.StatusBadRequest, nil, errors.Errorf("missing region id") + } + storeID, ok := input["store_id"].(float64) + if !ok { + return http.StatusBadRequest, nil, errors.Errorf("invalid store id to transfer peer to") + } + if err := h.AddAddPeerOperator(uint64(regionID), uint64(storeID)); err != nil { + return http.StatusInternalServerError, nil, err + } + case "add-learner": + regionID, ok := input["region_id"].(float64) + if !ok { + return http.StatusBadRequest, nil, errors.Errorf("missing region id") + } + storeID, ok := input["store_id"].(float64) + if !ok { + return http.StatusBadRequest, nil, errors.Errorf("invalid store id to transfer peer to") + } + if err := h.AddAddLearnerOperator(uint64(regionID), uint64(storeID)); err != nil { + return http.StatusInternalServerError, nil, err + } + case "remove-peer": + regionID, ok := input["region_id"].(float64) + if !ok { + return http.StatusBadRequest, nil, errors.Errorf("missing region id") + } + storeID, ok := input["store_id"].(float64) + if !ok { + return http.StatusBadRequest, nil, errors.Errorf("invalid store id to transfer peer to") + } + if err := h.AddRemovePeerOperator(uint64(regionID), uint64(storeID)); err != nil { + return http.StatusInternalServerError, nil, err + } + case "merge-region": + regionID, ok := input["source_region_id"].(float64) + if !ok { + return http.StatusBadRequest, nil, errors.Errorf("missing region id") + } + targetID, ok := input["target_region_id"].(float64) + if !ok { + return http.StatusBadRequest, nil, errors.Errorf("invalid target region id to merge to") + } + if err := h.AddMergeRegionOperator(uint64(regionID), uint64(targetID)); err != nil { + return http.StatusInternalServerError, nil, err + } + case "split-region": + regionID, ok := input["region_id"].(float64) + if !ok { + return http.StatusBadRequest, nil, errors.Errorf("missing region id") + } + policy, ok := input["policy"].(string) + if !ok { + return http.StatusBadRequest, nil, errors.Errorf("missing split policy") + } + var keys []string + if ks, ok := input["keys"]; ok { + for _, k := range ks.([]interface{}) { + key, ok := k.(string) + if !ok { + return http.StatusBadRequest, nil, errors.Errorf("bad format keys") + } + keys = append(keys, key) + } + } + if err := h.AddSplitRegionOperator(uint64(regionID), policy, keys); err != nil { + return http.StatusInternalServerError, nil, err + } + case "scatter-region": + regionID, ok := input["region_id"].(float64) + if !ok { + return http.StatusBadRequest, nil, errors.Errorf("missing region id") + } + group, _ := input["group"].(string) + if err := h.AddScatterRegionOperator(uint64(regionID), group); err != nil { + return http.StatusInternalServerError, nil, err + } + case "scatter-regions": + // support both receiving key ranges or regionIDs + startKey, _ := input["start_key"].(string) + endKey, _ := input["end_key"].(string) + ids, ok := typeutil.JSONToUint64Slice(input["region_ids"]) + if !ok { + return http.StatusBadRequest, nil, errors.Errorf("region_ids is invalid") + } + group, _ := input["group"].(string) + // retry 5 times if retryLimit not defined + retryLimit := 5 + if rl, ok := input["retry_limit"].(float64); ok { + retryLimit = int(rl) + } + processedPercentage, err := h.AddScatterRegionsOperators(ids, startKey, endKey, group, retryLimit) + errorMessage := "" + if err != nil { + errorMessage = err.Error() + } + s := struct { + ProcessedPercentage int `json:"processed-percentage"` + Error string `json:"error"` + }{ + ProcessedPercentage: processedPercentage, + Error: errorMessage, + } + return http.StatusOK, s, nil + default: + return http.StatusBadRequest, nil, errors.Errorf("unknown operator") + } + return http.StatusOK, nil, nil +} + // AddTransferLeaderOperator adds an operator to transfer leader to the store. func (h *Handler) AddTransferLeaderOperator(regionID uint64, storeID uint64) error { c := h.GetCluster() @@ -498,3 +690,33 @@ func checkStoreState(c sche.SharedCluster, storeID uint64) error { } return nil } + +func parseStoreIDsAndPeerRole(ids interface{}, roles interface{}) (map[uint64]placement.PeerRoleType, bool) { + items, ok := ids.([]interface{}) + if !ok { + return nil, false + } + storeIDToPeerRole := make(map[uint64]placement.PeerRoleType) + storeIDs := make([]uint64, 0, len(items)) + for _, item := range items { + id, ok := item.(float64) + if !ok { + return nil, false + } + storeIDs = append(storeIDs, uint64(id)) + storeIDToPeerRole[uint64(id)] = "" + } + + peerRoles, ok := roles.([]interface{}) + // only consider roles having the same length with ids as the valid case + if ok && len(peerRoles) == len(storeIDs) { + for i, v := range storeIDs { + switch pr := peerRoles[i].(type) { + case string: + storeIDToPeerRole[v] = placement.PeerRoleType(pr) + default: + } + } + } + return storeIDToPeerRole, true +} diff --git a/pkg/schedule/operator/kind.go b/pkg/schedule/operator/kind.go index 265eea5ade6..0187a64c568 100644 --- a/pkg/schedule/operator/kind.go +++ b/pkg/schedule/operator/kind.go @@ -20,6 +20,9 @@ import ( "github.com/pingcap/errors" ) +// OpWaiting is the status of a waiting operators. +const OpWaiting = "waiting" + // OpKind is a bit field to identify operator types. type OpKind uint32 diff --git a/pkg/utils/apiutil/apiutil.go b/pkg/utils/apiutil/apiutil.go index 2c476042da0..633dc8fa557 100644 --- a/pkg/utils/apiutil/apiutil.go +++ b/pkg/utils/apiutil/apiutil.go @@ -27,6 +27,7 @@ import ( "path" "strconv" "strings" + "time" "github.com/gorilla/mux" "github.com/pingcap/errcode" @@ -477,3 +478,16 @@ func copyHeader(dst, src http.Header) { } } } + +// ParseTime parses a time string with the format "1694580288" +// If the string is empty, it returns a zero time. +func ParseTime(t string) (time.Time, error) { + if len(t) == 0 { + return time.Time{}, nil + } + i, err := strconv.ParseInt(t, 10, 64) + if err != nil { + return time.Time{}, err + } + return time.Unix(i, 0), nil +} diff --git a/server/api/operator.go b/server/api/operator.go index 6645a601fb0..7ff7d2d7c51 100644 --- a/server/api/operator.go +++ b/server/api/operator.go @@ -21,9 +21,7 @@ import ( "github.com/gorilla/mux" "github.com/tikv/pd/pkg/schedule/operator" - "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/utils/apiutil" - "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/server" "github.com/unrolled/render" ) @@ -76,37 +74,20 @@ func (h *operatorHandler) GetOperatorsByRegion(w http.ResponseWriter, r *http.Re func (h *operatorHandler) GetOperators(w http.ResponseWriter, r *http.Request) { var ( results []*operator.Operator - ops []*operator.Operator err error ) kinds, ok := r.URL.Query()["kind"] if !ok { results, err = h.Handler.GetOperators() - if err != nil { - h.r.JSON(w, http.StatusInternalServerError, err.Error()) - return - } } else { - for _, kind := range kinds { - switch kind { - case "admin": - ops, err = h.GetAdminOperators() - case "leader": - ops, err = h.GetLeaderOperators() - case "region": - ops, err = h.GetRegionOperators() - case "waiting": - ops, err = h.GetWaitingOperators() - } - if err != nil { - h.r.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - results = append(results, ops...) - } + results, err = h.Handler.GetOperatorsByKinds(kinds) } + if err != nil { + h.r.JSON(w, http.StatusInternalServerError, err.Error()) + return + } h.r.JSON(w, http.StatusOK, results) } @@ -126,198 +107,16 @@ func (h *operatorHandler) CreateOperator(w http.ResponseWriter, r *http.Request) return } - name, ok := input["name"].(string) - if !ok { - h.r.JSON(w, http.StatusBadRequest, "missing operator name") + statusCode, result, err := h.HandleOperatorCreation(input) + if err != nil { + h.r.JSON(w, statusCode, err.Error()) return } - - switch name { - case "transfer-leader": - regionID, ok := input["region_id"].(float64) - if !ok { - h.r.JSON(w, http.StatusBadRequest, "missing region id") - return - } - storeID, ok := input["to_store_id"].(float64) - if !ok { - h.r.JSON(w, http.StatusBadRequest, "missing store id to transfer leader to") - return - } - if err := h.AddTransferLeaderOperator(uint64(regionID), uint64(storeID)); err != nil { - h.r.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - case "transfer-region": - regionID, ok := input["region_id"].(float64) - if !ok { - h.r.JSON(w, http.StatusBadRequest, "missing region id") - return - } - storeIDs, ok := parseStoreIDsAndPeerRole(input["to_store_ids"], input["peer_roles"]) - if !ok { - h.r.JSON(w, http.StatusBadRequest, "invalid store ids to transfer region to") - return - } - if len(storeIDs) == 0 { - h.r.JSON(w, http.StatusBadRequest, "missing store ids to transfer region to") - return - } - if err := h.AddTransferRegionOperator(uint64(regionID), storeIDs); err != nil { - h.r.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - case "transfer-peer": - regionID, ok := input["region_id"].(float64) - if !ok { - h.r.JSON(w, http.StatusBadRequest, "missing region id") - return - } - fromID, ok := input["from_store_id"].(float64) - if !ok { - h.r.JSON(w, http.StatusBadRequest, "invalid store id to transfer peer from") - return - } - toID, ok := input["to_store_id"].(float64) - if !ok { - h.r.JSON(w, http.StatusBadRequest, "invalid store id to transfer peer to") - return - } - if err := h.AddTransferPeerOperator(uint64(regionID), uint64(fromID), uint64(toID)); err != nil { - h.r.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - case "add-peer": - regionID, ok := input["region_id"].(float64) - if !ok { - h.r.JSON(w, http.StatusBadRequest, "missing region id") - return - } - storeID, ok := input["store_id"].(float64) - if !ok { - h.r.JSON(w, http.StatusBadRequest, "invalid store id to transfer peer to") - return - } - if err := h.AddAddPeerOperator(uint64(regionID), uint64(storeID)); err != nil { - h.r.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - case "add-learner": - regionID, ok := input["region_id"].(float64) - if !ok { - h.r.JSON(w, http.StatusBadRequest, "missing region id") - return - } - storeID, ok := input["store_id"].(float64) - if !ok { - h.r.JSON(w, http.StatusBadRequest, "invalid store id to transfer peer to") - return - } - if err := h.AddAddLearnerOperator(uint64(regionID), uint64(storeID)); err != nil { - h.r.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - case "remove-peer": - regionID, ok := input["region_id"].(float64) - if !ok { - h.r.JSON(w, http.StatusBadRequest, "missing region id") - return - } - storeID, ok := input["store_id"].(float64) - if !ok { - h.r.JSON(w, http.StatusBadRequest, "invalid store id to transfer peer to") - return - } - if err := h.AddRemovePeerOperator(uint64(regionID), uint64(storeID)); err != nil { - h.r.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - case "merge-region": - regionID, ok := input["source_region_id"].(float64) - if !ok { - h.r.JSON(w, http.StatusBadRequest, "missing region id") - return - } - targetID, ok := input["target_region_id"].(float64) - if !ok { - h.r.JSON(w, http.StatusBadRequest, "invalid target region id to merge to") - return - } - if err := h.AddMergeRegionOperator(uint64(regionID), uint64(targetID)); err != nil { - h.r.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - case "split-region": - regionID, ok := input["region_id"].(float64) - if !ok { - h.r.JSON(w, http.StatusBadRequest, "missing region id") - return - } - policy, ok := input["policy"].(string) - if !ok { - h.r.JSON(w, http.StatusBadRequest, "missing split policy") - return - } - var keys []string - if ks, ok := input["keys"]; ok { - for _, k := range ks.([]interface{}) { - key, ok := k.(string) - if !ok { - h.r.JSON(w, http.StatusBadRequest, "bad format keys") - return - } - keys = append(keys, key) - } - } - if err := h.AddSplitRegionOperator(uint64(regionID), policy, keys); err != nil { - h.r.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - case "scatter-region": - regionID, ok := input["region_id"].(float64) - if !ok { - h.r.JSON(w, http.StatusBadRequest, "missing region id") - return - } - group, _ := input["group"].(string) - if err := h.AddScatterRegionOperator(uint64(regionID), group); err != nil { - h.r.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - case "scatter-regions": - // support both receiving key ranges or regionIDs - startKey, _ := input["start_key"].(string) - endKey, _ := input["end_key"].(string) - ids, ok := typeutil.JSONToUint64Slice(input["region_ids"]) - if !ok { - h.r.JSON(w, http.StatusBadRequest, "region_ids is invalid") - return - } - group, _ := input["group"].(string) - // retry 5 times if retryLimit not defined - retryLimit := 5 - if rl, ok := input["retry_limit"].(float64); ok { - retryLimit = int(rl) - } - processedPercentage, err := h.AddScatterRegionsOperators(ids, startKey, endKey, group, retryLimit) - errorMessage := "" - if err != nil { - errorMessage = err.Error() - } - s := struct { - ProcessedPercentage int `json:"processed-percentage"` - Error string `json:"error"` - }{ - ProcessedPercentage: processedPercentage, - Error: errorMessage, - } - h.r.JSON(w, http.StatusOK, &s) - return - default: - h.r.JSON(w, http.StatusBadRequest, "unknown operator") + if statusCode == http.StatusOK && result == nil { + h.r.JSON(w, http.StatusOK, "The operator is created.") return } - h.r.JSON(w, http.StatusOK, "The operator is created.") + h.r.JSON(w, statusCode, result) } // @Tags operator @@ -354,14 +153,16 @@ func (h *operatorHandler) DeleteOperatorByRegion(w http.ResponseWriter, r *http. // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /operators/records [get] func (h *operatorHandler) GetOperatorRecords(w http.ResponseWriter, r *http.Request) { - var from time.Time - if fromStr := r.URL.Query()["from"]; len(fromStr) > 0 { - fromInt, err := strconv.ParseInt(fromStr[0], 10, 64) + var ( + from time.Time + err error + ) + if froms := r.URL.Query()["from"]; len(froms) > 0 { + from, err = apiutil.ParseTime(froms[0]) if err != nil { h.r.JSON(w, http.StatusBadRequest, err.Error()) return } - from = time.Unix(fromInt, 0) } records, err := h.GetRecords(from) if err != nil { @@ -370,33 +171,3 @@ func (h *operatorHandler) GetOperatorRecords(w http.ResponseWriter, r *http.Requ } h.r.JSON(w, http.StatusOK, records) } - -func parseStoreIDsAndPeerRole(ids interface{}, roles interface{}) (map[uint64]placement.PeerRoleType, bool) { - items, ok := ids.([]interface{}) - if !ok { - return nil, false - } - storeIDToPeerRole := make(map[uint64]placement.PeerRoleType) - storeIDs := make([]uint64, 0, len(items)) - for _, item := range items { - id, ok := item.(float64) - if !ok { - return nil, false - } - storeIDs = append(storeIDs, uint64(id)) - storeIDToPeerRole[uint64(id)] = "" - } - - peerRoles, ok := roles.([]interface{}) - // only consider roles having the same length with ids as the valid case - if ok && len(peerRoles) == len(storeIDs) { - for i, v := range storeIDs { - switch pr := peerRoles[i].(type) { - case string: - storeIDToPeerRole[v] = placement.PeerRoleType(pr) - default: - } - } - } - return storeIDToPeerRole, true -} diff --git a/server/api/server_test.go b/server/api/server_test.go index 2e89ad797c3..22989b92a03 100644 --- a/server/api/server_test.go +++ b/server/api/server_test.go @@ -28,10 +28,12 @@ import ( "github.com/pingcap/log" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/assertutil" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/pkg/versioninfo" "github.com/tikv/pd/server" "github.com/tikv/pd/server/config" "go.uber.org/goleak" @@ -135,6 +137,53 @@ func mustBootstrapCluster(re *require.Assertions, s *server.Server) { re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) } +func mustPutRegion(re *require.Assertions, svr *server.Server, regionID, storeID uint64, start, end []byte, opts ...core.RegionCreateOption) *core.RegionInfo { + leader := &metapb.Peer{ + Id: regionID, + StoreId: storeID, + } + metaRegion := &metapb.Region{ + Id: regionID, + StartKey: start, + EndKey: end, + Peers: []*metapb.Peer{leader}, + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}, + } + r := core.NewRegionInfo(metaRegion, leader, opts...) + err := svr.GetRaftCluster().HandleRegionHeartbeat(r) + re.NoError(err) + return r +} + +func mustPutStore(re *require.Assertions, svr *server.Server, id uint64, state metapb.StoreState, nodeState metapb.NodeState, labels []*metapb.StoreLabel) { + s := &server.GrpcServer{Server: svr} + _, err := s.PutStore(context.Background(), &pdpb.PutStoreRequest{ + Header: &pdpb.RequestHeader{ClusterId: svr.ClusterID()}, + Store: &metapb.Store{ + Id: id, + Address: fmt.Sprintf("tikv%d", id), + State: state, + NodeState: nodeState, + Labels: labels, + Version: versioninfo.MinSupportedVersion(versioninfo.Version2_0).String(), + }, + }) + re.NoError(err) + if state == metapb.StoreState_Up { + _, err = s.StoreHeartbeat(context.Background(), &pdpb.StoreHeartbeatRequest{ + Header: &pdpb.RequestHeader{ClusterId: svr.ClusterID()}, + Stats: &pdpb.StoreStats{StoreId: id}, + }) + re.NoError(err) + } +} + +func mustRegionHeartbeat(re *require.Assertions, svr *server.Server, region *core.RegionInfo) { + cluster := svr.GetRaftCluster() + err := cluster.HandleRegionHeartbeat(region) + re.NoError(err) +} + type serviceTestSuite struct { suite.Suite svr *server.Server diff --git a/server/api/trend.go b/server/api/trend.go index 79c43f3c5fb..5dd82e79ec7 100644 --- a/server/api/trend.go +++ b/server/api/trend.go @@ -16,10 +16,10 @@ package api import ( "net/http" - "strconv" "time" "github.com/tikv/pd/pkg/statistics" + "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/server" "github.com/unrolled/render" @@ -89,14 +89,16 @@ func newTrendHandler(s *server.Server, rd *render.Render) *trendHandler { // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /trend [get] func (h *trendHandler) GetTrend(w http.ResponseWriter, r *http.Request) { - var from time.Time - if fromStr := r.URL.Query()["from"]; len(fromStr) > 0 { - fromInt, err := strconv.ParseInt(fromStr[0], 10, 64) + var ( + from time.Time + err error + ) + if froms := r.URL.Query()["from"]; len(froms) > 0 { + from, err = apiutil.ParseTime(froms[0]) if err != nil { h.rd.JSON(w, http.StatusBadRequest, err.Error()) return } - from = time.Unix(fromInt, 0) } stores, err := h.getTrendStores() diff --git a/tests/cluster.go b/tests/cluster.go index 5b1cb7f06fc..ae1ae331856 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -835,6 +835,9 @@ func (c *TestCluster) Destroy() { log.Error("failed to destroy the cluster:", errs.ZapError(err)) } } + if c.schedulingCluster != nil { + c.schedulingCluster.Destroy() + } } // CheckClusterDCLocation will force the cluster to do the dc-location check in order to speed up the test. diff --git a/tests/integrations/mcs/scheduling/api_test.go b/tests/integrations/mcs/scheduling/api_test.go index 311c8a3fbed..e91d3cd633e 100644 --- a/tests/integrations/mcs/scheduling/api_test.go +++ b/tests/integrations/mcs/scheduling/api_test.go @@ -132,10 +132,21 @@ func (suite *apiTestSuite) TestAPIForward() { re.NoError(err) re.Len(slice, 0) - err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "operators/2"), &resp, - testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "operators"), []byte(``), + testutil.StatusNotOK(re), testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + suite.NoError(err) + + err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "operators/2"), nil, + testutil.StatusNotOK(re), testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + + err = testutil.CheckDelete(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "operators/2"), + testutil.StatusNotOK(re), testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + + err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "operators/records"), nil, + testutil.StatusNotOK(re), testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) re.NoError(err) - re.Nil(resp) // Test checker: only read-only requests are forwarded err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "checker/merge"), &resp, diff --git a/tests/pdctl/operator/operator_test.go b/tests/pdctl/operator/operator_test.go index a95c620adcf..1752c28a3c0 100644 --- a/tests/pdctl/operator/operator_test.go +++ b/tests/pdctl/operator/operator_test.go @@ -15,7 +15,6 @@ package operator_test import ( - "context" "encoding/hex" "encoding/json" "strconv" @@ -24,7 +23,7 @@ import ( "time" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" @@ -32,14 +31,18 @@ import ( pdctlCmd "github.com/tikv/pd/tools/pd-ctl/pdctl" ) -func TestOperator(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - var err error +type operatorTestSuite struct { + suite.Suite +} + +func TestOperatorTestSuite(t *testing.T) { + suite.Run(t, new(operatorTestSuite)) +} + +func (suite *operatorTestSuite) TestOperator() { var start time.Time start = start.Add(time.Hour) - cluster, err := tests.NewTestCluster(ctx, 1, + opts := []tests.ConfigOption{ // TODO: enable placementrules func(conf *config.Config, serverName string) { conf.Replication.MaxReplicas = 2 @@ -48,12 +51,14 @@ func TestOperator(t *testing.T) { func(conf *config.Config, serverName string) { conf.Schedule.MaxStoreDownTime.Duration = time.Since(start) }, - ) - re.NoError(err) - err = cluster.RunInitialServers() - re.NoError(err) - cluster.WaitLeader() - pdAddr := cluster.GetConfig().GetClientURL() + } + env := tests.NewSchedulingTestEnvironment(suite.T(), opts...) + env.RunTestInTwoModes(suite.checkOperator) +} + +func (suite *operatorTestSuite) checkOperator(cluster *tests.TestCluster) { + re := suite.Require() + cmd := pdctlCmd.GetRootCmd() stores := []*metapb.Store{ @@ -79,8 +84,6 @@ func TestOperator(t *testing.T) { }, } - leaderServer := cluster.GetLeaderServer() - re.NoError(leaderServer.BootstrapCluster()) for _, store := range stores { tests.MustPutStore(re, cluster, store) } @@ -93,7 +96,18 @@ func TestOperator(t *testing.T) { {Id: 3, StoreId: 1}, {Id: 4, StoreId: 2}, })) - defer cluster.Destroy() + + pdAddr := cluster.GetLeaderServer().GetAddr() + args := []string{"-u", pdAddr, "operator", "show"} + var slice []string + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.NoError(json.Unmarshal(output, &slice)) + re.Len(slice, 0) + args = []string{"-u", pdAddr, "operator", "check", "2"} + output, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.Contains(string(output), "operator not found") var testCases = []struct { cmd []string @@ -175,9 +189,10 @@ func TestOperator(t *testing.T) { } for _, testCase := range testCases { - _, err := pdctl.ExecuteCommand(cmd, testCase.cmd...) + output, err = pdctl.ExecuteCommand(cmd, testCase.cmd...) re.NoError(err) - output, err := pdctl.ExecuteCommand(cmd, testCase.show...) + re.NotContains(string(output), "Failed") + output, err = pdctl.ExecuteCommand(cmd, testCase.show...) re.NoError(err) re.Contains(string(output), testCase.expect) start := time.Now() @@ -190,11 +205,11 @@ func TestOperator(t *testing.T) { } // operator add merge-region - args := []string{"-u", pdAddr, "operator", "add", "merge-region", "1", "3"} + args = []string{"-u", pdAddr, "operator", "add", "merge-region", "1", "3"} _, err = pdctl.ExecuteCommand(cmd, args...) re.NoError(err) args = []string{"-u", pdAddr, "operator", "show"} - output, err := pdctl.ExecuteCommand(cmd, args...) + output, err = pdctl.ExecuteCommand(cmd, args...) re.NoError(err) re.Contains(string(output), "merge region 1 into region 3") args = []string{"-u", pdAddr, "operator", "remove", "1"} @@ -252,32 +267,3 @@ func TestOperator(t *testing.T) { return strings.Contains(string(output1), "Success!") || strings.Contains(string(output2), "Success!") }) } - -func TestForwardOperatorRequest(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - cluster, err := tests.NewTestAPICluster(ctx, 1) - re.NoError(err) - re.NoError(cluster.RunInitialServers()) - re.NotEmpty(cluster.WaitLeader()) - server := cluster.GetLeaderServer() - re.NoError(server.BootstrapCluster()) - backendEndpoints := server.GetAddr() - tc, err := tests.NewTestSchedulingCluster(ctx, 2, backendEndpoints) - re.NoError(err) - defer tc.Destroy() - tc.WaitForPrimaryServing(re) - - cmd := pdctlCmd.GetRootCmd() - args := []string{"-u", backendEndpoints, "operator", "show"} - var slice []string - output, err := pdctl.ExecuteCommand(cmd, args...) - re.NoError(err) - re.NoError(json.Unmarshal(output, &slice)) - re.Len(slice, 0) - args = []string{"-u", backendEndpoints, "operator", "check", "2"} - output, err = pdctl.ExecuteCommand(cmd, args...) - re.NoError(err) - re.Contains(string(output), "null") -} diff --git a/server/api/operator_test.go b/tests/server/api/operator_test.go similarity index 54% rename from server/api/operator_test.go rename to tests/server/api/operator_test.go index 1675fdd40c7..a6f11a49889 100644 --- a/server/api/operator_test.go +++ b/tests/server/api/operator_test.go @@ -15,62 +15,88 @@ package api import ( - "context" "errors" "fmt" - "io" + "net/http" "strconv" "strings" "testing" "time" - "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/kvproto/pkg/pdpb" - "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/mock/mockhbstream" pdoperator "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" tu "github.com/tikv/pd/pkg/utils/testutil" - "github.com/tikv/pd/pkg/versioninfo" - "github.com/tikv/pd/server" "github.com/tikv/pd/server/config" + "github.com/tikv/pd/tests" +) + +var ( + // testDialClient used to dial http request. only used for test. + testDialClient = &http.Client{ + Transport: &http.Transport{ + DisableKeepAlives: true, + }, + } ) type operatorTestSuite struct { suite.Suite - svr *server.Server - cleanup tu.CleanupFunc - urlPrefix string } func TestOperatorTestSuite(t *testing.T) { suite.Run(t, new(operatorTestSuite)) } -func (suite *operatorTestSuite) SetupSuite() { - re := suite.Require() - suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/unexpectedOperator", "return(true)")) - suite.svr, suite.cleanup = mustNewServer(re, func(cfg *config.Config) { cfg.Replication.MaxReplicas = 1 }) - server.MustWaitLeader(re, []*server.Server{suite.svr}) - - addr := suite.svr.GetAddr() - suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix) +func (suite *operatorTestSuite) TestOperator() { + opts := []tests.ConfigOption{ + func(conf *config.Config, serverName string) { + conf.Replication.MaxReplicas = 1 + }, + } + env := tests.NewSchedulingTestEnvironment(suite.T(), opts...) + env.RunTestInTwoModes(suite.checkAddRemovePeer) - mustBootstrapCluster(re, suite.svr) -} + env = tests.NewSchedulingTestEnvironment(suite.T(), opts...) + env.RunTestInTwoModes(suite.checkMergeRegionOperator) -func (suite *operatorTestSuite) TearDownSuite() { - suite.cleanup() + opts = []tests.ConfigOption{ + func(conf *config.Config, serverName string) { + conf.Replication.MaxReplicas = 3 + }, + } + env = tests.NewSchedulingTestEnvironment(suite.T(), opts...) + env.RunTestInTwoModes(suite.checkTransferRegionWithPlacementRule) } -func (suite *operatorTestSuite) TestAddRemovePeer() { +func (suite *operatorTestSuite) checkAddRemovePeer(cluster *tests.TestCluster) { re := suite.Require() - mustPutStore(re, suite.svr, 1, metapb.StoreState_Up, metapb.NodeState_Serving, nil) - mustPutStore(re, suite.svr, 2, metapb.StoreState_Up, metapb.NodeState_Serving, nil) + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 2, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 3, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + }, + } + for _, store := range stores { + tests.MustPutStore(re, cluster, store) + } peer1 := &metapb.Peer{Id: 1, StoreId: 1} peer2 := &metapb.Peer{Id: 2, StoreId: 2} region := &metapb.Region{ @@ -82,123 +108,126 @@ func (suite *operatorTestSuite) TestAddRemovePeer() { }, } regionInfo := core.NewRegionInfo(region, peer1) - mustRegionHeartbeat(re, suite.svr, regionInfo) + tests.MustPutRegionInfo(re, cluster, regionInfo) - regionURL := fmt.Sprintf("%s/operators/%d", suite.urlPrefix, region.GetId()) - operator := mustReadURL(re, regionURL) - suite.Contains(operator, "operator not found") - recordURL := fmt.Sprintf("%s/operators/records?from=%s", suite.urlPrefix, strconv.FormatInt(time.Now().Unix(), 10)) - records := mustReadURL(re, recordURL) - suite.Contains(records, "operator not found") + urlPrefix := fmt.Sprintf("%s/pd/api/v1", cluster.GetLeaderServer().GetAddr()) + regionURL := fmt.Sprintf("%s/operators/%d", urlPrefix, region.GetId()) + err := tu.CheckGetJSON(testDialClient, regionURL, nil, + tu.StatusNotOK(re), tu.StringContain(re, "operator not found")) + suite.NoError(err) + recordURL := fmt.Sprintf("%s/operators/records?from=%s", urlPrefix, strconv.FormatInt(time.Now().Unix(), 10)) + err = tu.CheckGetJSON(testDialClient, recordURL, nil, + tu.StatusNotOK(re), tu.StringContain(re, "operator not found")) + suite.NoError(err) - mustPutStore(re, suite.svr, 3, metapb.StoreState_Up, metapb.NodeState_Serving, nil) - err := tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", suite.urlPrefix), []byte(`{"name":"add-peer", "region_id": 1, "store_id": 3}`), tu.StatusOK(re)) + err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", urlPrefix), []byte(`{"name":"add-peer", "region_id": 1, "store_id": 3}`), tu.StatusOK(re)) + suite.NoError(err) + err = tu.CheckGetJSON(testDialClient, regionURL, nil, + tu.StatusOK(re), tu.StringContain(re, "add learner peer 1 on store 3"), tu.StringContain(re, "RUNNING")) suite.NoError(err) - operator = mustReadURL(re, regionURL) - suite.Contains(operator, "add learner peer 1 on store 3") - suite.Contains(operator, "RUNNING") err = tu.CheckDelete(testDialClient, regionURL, tu.StatusOK(re)) suite.NoError(err) - records = mustReadURL(re, recordURL) - suite.Contains(records, "admin-add-peer {add peer: store [3]}") + err = tu.CheckGetJSON(testDialClient, recordURL, nil, + tu.StatusOK(re), tu.StringContain(re, "admin-add-peer {add peer: store [3]}")) + suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", suite.urlPrefix), []byte(`{"name":"remove-peer", "region_id": 1, "store_id": 2}`), tu.StatusOK(re)) + err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", urlPrefix), []byte(`{"name":"remove-peer", "region_id": 1, "store_id": 2}`), tu.StatusOK(re)) + suite.NoError(err) + err = tu.CheckGetJSON(testDialClient, regionURL, nil, + tu.StatusOK(re), tu.StringContain(re, "remove peer on store 2"), tu.StringContain(re, "RUNNING")) suite.NoError(err) - operator = mustReadURL(re, regionURL) - suite.Contains(operator, "RUNNING") - suite.Contains(operator, "remove peer on store 2") err = tu.CheckDelete(testDialClient, regionURL, tu.StatusOK(re)) suite.NoError(err) - records = mustReadURL(re, recordURL) - suite.Contains(records, "admin-remove-peer {rm peer: store [2]}") + err = tu.CheckGetJSON(testDialClient, recordURL, nil, + tu.StatusOK(re), tu.StringContain(re, "admin-remove-peer {rm peer: store [2]}")) + suite.NoError(err) - mustPutStore(re, suite.svr, 4, metapb.StoreState_Up, metapb.NodeState_Serving, nil) - err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", suite.urlPrefix), []byte(`{"name":"add-learner", "region_id": 1, "store_id": 4}`), tu.StatusOK(re)) + tests.MustPutStore(re, cluster, &metapb.Store{ + Id: 4, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + }) + err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", urlPrefix), []byte(`{"name":"add-learner", "region_id": 1, "store_id": 4}`), tu.StatusOK(re)) + suite.NoError(err) + err = tu.CheckGetJSON(testDialClient, regionURL, nil, + tu.StatusOK(re), tu.StringContain(re, "add learner peer 2 on store 4")) suite.NoError(err) - operator = mustReadURL(re, regionURL) - suite.Contains(operator, "add learner peer 2 on store 4") // Fail to add peer to tombstone store. - err = suite.svr.GetRaftCluster().RemoveStore(3, true) + err = cluster.GetLeaderServer().GetRaftCluster().RemoveStore(3, true) suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", suite.urlPrefix), []byte(`{"name":"add-peer", "region_id": 1, "store_id": 3}`), tu.StatusNotOK(re)) + err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", urlPrefix), []byte(`{"name":"add-peer", "region_id": 1, "store_id": 3}`), tu.StatusNotOK(re)) suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", suite.urlPrefix), []byte(`{"name":"transfer-peer", "region_id": 1, "from_store_id": 1, "to_store_id": 3}`), tu.StatusNotOK(re)) + err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", urlPrefix), []byte(`{"name":"transfer-peer", "region_id": 1, "from_store_id": 1, "to_store_id": 3}`), tu.StatusNotOK(re)) suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", suite.urlPrefix), []byte(`{"name":"transfer-region", "region_id": 1, "to_store_ids": [1, 2, 3]}`), tu.StatusNotOK(re)) + err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", urlPrefix), []byte(`{"name":"transfer-region", "region_id": 1, "to_store_ids": [1, 2, 3]}`), tu.StatusNotOK(re)) suite.NoError(err) // Fail to get operator if from is latest. time.Sleep(time.Second) - records = mustReadURL(re, fmt.Sprintf("%s/operators/records?from=%s", suite.urlPrefix, strconv.FormatInt(time.Now().Unix(), 10))) - suite.Contains(records, "operator not found") + url := fmt.Sprintf("%s/operators/records?from=%s", urlPrefix, strconv.FormatInt(time.Now().Unix(), 10)) + err = tu.CheckGetJSON(testDialClient, url, nil, + tu.StatusNotOK(re), tu.StringContain(re, "operator not found")) + suite.NoError(err) } -func (suite *operatorTestSuite) TestMergeRegionOperator() { +func (suite *operatorTestSuite) checkMergeRegionOperator(cluster *tests.TestCluster) { re := suite.Require() r1 := core.NewTestRegionInfo(10, 1, []byte(""), []byte("b"), core.SetWrittenBytes(1000), core.SetReadBytes(1000), core.SetRegionConfVer(1), core.SetRegionVersion(1)) - mustRegionHeartbeat(re, suite.svr, r1) + tests.MustPutRegionInfo(re, cluster, r1) r2 := core.NewTestRegionInfo(20, 1, []byte("b"), []byte("c"), core.SetWrittenBytes(2000), core.SetReadBytes(0), core.SetRegionConfVer(2), core.SetRegionVersion(3)) - mustRegionHeartbeat(re, suite.svr, r2) + tests.MustPutRegionInfo(re, cluster, r2) r3 := core.NewTestRegionInfo(30, 1, []byte("c"), []byte(""), core.SetWrittenBytes(500), core.SetReadBytes(800), core.SetRegionConfVer(3), core.SetRegionVersion(2)) - mustRegionHeartbeat(re, suite.svr, r3) + tests.MustPutRegionInfo(re, cluster, r3) - err := tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", suite.urlPrefix), []byte(`{"name":"merge-region", "source_region_id": 10, "target_region_id": 20}`), tu.StatusOK(re)) + urlPrefix := fmt.Sprintf("%s/pd/api/v1", cluster.GetLeaderServer().GetAddr()) + err := tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", urlPrefix), []byte(`{"name":"merge-region", "source_region_id": 10, "target_region_id": 20}`), tu.StatusOK(re)) suite.NoError(err) - suite.svr.GetHandler().RemoveOperator(10) - suite.svr.GetHandler().RemoveOperator(20) - err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", suite.urlPrefix), []byte(`{"name":"merge-region", "source_region_id": 20, "target_region_id": 10}`), tu.StatusOK(re)) + tu.CheckDelete(testDialClient, fmt.Sprintf("%s/operators/%d", urlPrefix, 10), tu.StatusOK(re)) + err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", urlPrefix), []byte(`{"name":"merge-region", "source_region_id": 20, "target_region_id": 10}`), tu.StatusOK(re)) suite.NoError(err) - suite.svr.GetHandler().RemoveOperator(10) - suite.svr.GetHandler().RemoveOperator(20) - err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", suite.urlPrefix), []byte(`{"name":"merge-region", "source_region_id": 10, "target_region_id": 30}`), + tu.CheckDelete(testDialClient, fmt.Sprintf("%s/operators/%d", urlPrefix, 10), tu.StatusOK(re)) + err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", urlPrefix), []byte(`{"name":"merge-region", "source_region_id": 10, "target_region_id": 30}`), tu.StatusNotOK(re), tu.StringContain(re, "not adjacent")) suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", suite.urlPrefix), []byte(`{"name":"merge-region", "source_region_id": 30, "target_region_id": 10}`), + err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", urlPrefix), []byte(`{"name":"merge-region", "source_region_id": 30, "target_region_id": 10}`), tu.StatusNotOK(re), tu.StringContain(re, "not adjacent")) suite.NoError(err) } -type transferRegionOperatorTestSuite struct { - suite.Suite - svr *server.Server - cleanup tu.CleanupFunc - urlPrefix string -} - -func TestTransferRegionOperatorTestSuite(t *testing.T) { - suite.Run(t, new(transferRegionOperatorTestSuite)) -} - -func (suite *transferRegionOperatorTestSuite) SetupSuite() { - re := suite.Require() - suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/unexpectedOperator", "return(true)")) - suite.svr, suite.cleanup = mustNewServer(re, func(cfg *config.Config) { cfg.Replication.MaxReplicas = 3 }) - server.MustWaitLeader(re, []*server.Server{suite.svr}) - - addr := suite.svr.GetAddr() - suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix) - - mustBootstrapCluster(re, suite.svr) -} - -func (suite *transferRegionOperatorTestSuite) TearDownSuite() { - suite.cleanup() -} - -func (suite *transferRegionOperatorTestSuite) TestTransferRegionWithPlacementRule() { +func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *tests.TestCluster) { re := suite.Require() - mustPutStore(re, suite.svr, 1, metapb.StoreState_Up, metapb.NodeState_Serving, []*metapb.StoreLabel{{Key: "key", Value: "1"}}) - mustPutStore(re, suite.svr, 2, metapb.StoreState_Up, metapb.NodeState_Serving, []*metapb.StoreLabel{{Key: "key", Value: "2"}}) - mustPutStore(re, suite.svr, 3, metapb.StoreState_Up, metapb.NodeState_Serving, []*metapb.StoreLabel{{Key: "key", Value: "3"}}) + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + Labels: []*metapb.StoreLabel{{Key: "key", Value: "1"}}, + }, + { + Id: 2, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + Labels: []*metapb.StoreLabel{{Key: "key", Value: "2"}}, + }, + { + Id: 3, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + Labels: []*metapb.StoreLabel{{Key: "key", Value: "3"}}, + }, + } - hbStream := mockhbstream.NewHeartbeatStream() - suite.svr.GetHBStreams().BindStream(1, hbStream) - suite.svr.GetHBStreams().BindStream(2, hbStream) - suite.svr.GetHBStreams().BindStream(3, hbStream) + for _, store := range stores { + tests.MustPutStore(re, cluster, store) + } peer1 := &metapb.Peer{Id: 1, StoreId: 1} peer2 := &metapb.Peer{Id: 2, StoreId: 2} @@ -211,11 +240,13 @@ func (suite *transferRegionOperatorTestSuite) TestTransferRegionWithPlacementRul Version: 1, }, } - mustRegionHeartbeat(re, suite.svr, core.NewRegionInfo(region, peer1)) + tests.MustPutRegionInfo(re, cluster, core.NewRegionInfo(region, peer1)) - regionURL := fmt.Sprintf("%s/operators/%d", suite.urlPrefix, region.GetId()) - operator := mustReadURL(re, regionURL) - suite.Contains(operator, "operator not found") + urlPrefix := fmt.Sprintf("%s/pd/api/v1", cluster.GetLeaderServer().GetAddr()) + regionURL := fmt.Sprintf("%s/operators/%d", urlPrefix, region.GetId()) + err := tu.CheckGetJSON(testDialClient, regionURL, nil, + tu.StatusNotOK(re), tu.StringContain(re, "operator not found")) + re.NoError(err) convertStepsToStr := func(steps []string) string { stepStrs := make([]string, len(steps)) for i := range steps { @@ -376,95 +407,53 @@ func (suite *transferRegionOperatorTestSuite) TestTransferRegionWithPlacementRul }), }, } + svr := cluster.GetLeaderServer() for _, testCase := range testCases { suite.T().Log(testCase.name) - suite.svr.GetRaftCluster().GetOpts().SetPlacementRuleEnabled(testCase.placementRuleEnable) + // TODO: remove this after we can sync this config to all servers. + if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { + sche.GetPersistConfig().SetPlacementRuleEnabled(testCase.placementRuleEnable) + } else { + svr.GetRaftCluster().GetOpts().SetPlacementRuleEnabled(testCase.placementRuleEnable) + } + manager := svr.GetRaftCluster().GetRuleManager() + if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { + manager = sche.GetCluster().GetRuleManager() + } + if testCase.placementRuleEnable { - err := suite.svr.GetRaftCluster().GetRuleManager().Initialize( - suite.svr.GetRaftCluster().GetOpts().GetMaxReplicas(), - suite.svr.GetRaftCluster().GetOpts().GetLocationLabels(), - suite.svr.GetRaftCluster().GetOpts().GetIsolationLevel(), + err := manager.Initialize( + svr.GetRaftCluster().GetOpts().GetMaxReplicas(), + svr.GetRaftCluster().GetOpts().GetLocationLabels(), + svr.GetRaftCluster().GetOpts().GetIsolationLevel(), ) suite.NoError(err) } if len(testCase.rules) > 0 { // add customized rule first and then remove default rule - err := suite.svr.GetRaftCluster().GetRuleManager().SetRules(testCase.rules) + err := manager.SetRules(testCase.rules) suite.NoError(err) - err = suite.svr.GetRaftCluster().GetRuleManager().DeleteRule("pd", "default") + err = manager.DeleteRule("pd", "default") suite.NoError(err) } var err error if testCase.expectedError == nil { - err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", suite.urlPrefix), testCase.input, tu.StatusOK(re)) + err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", urlPrefix), testCase.input, tu.StatusOK(re)) } else { - err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", suite.urlPrefix), testCase.input, + err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", urlPrefix), testCase.input, tu.StatusNotOK(re), tu.StringContain(re, testCase.expectedError.Error())) } suite.NoError(err) if len(testCase.expectSteps) > 0 { - operator = mustReadURL(re, regionURL) - suite.Contains(operator, testCase.expectSteps) + err = tu.CheckGetJSON(testDialClient, regionURL, nil, + tu.StatusOK(re), tu.StringContain(re, testCase.expectSteps)) + suite.NoError(err) err = tu.CheckDelete(testDialClient, regionURL, tu.StatusOK(re)) } else { - err = tu.CheckDelete(testDialClient, regionURL, tu.StatusNotOK(re)) + // FIXME: we should check the delete result, which should be failed, + // but the delete operator may be success because the cluster create a new operator to remove ophan peer. + err = tu.CheckDelete(testDialClient, regionURL) } suite.NoError(err) } } - -func mustPutRegion(re *require.Assertions, svr *server.Server, regionID, storeID uint64, start, end []byte, opts ...core.RegionCreateOption) *core.RegionInfo { - leader := &metapb.Peer{ - Id: regionID, - StoreId: storeID, - } - metaRegion := &metapb.Region{ - Id: regionID, - StartKey: start, - EndKey: end, - Peers: []*metapb.Peer{leader}, - RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}, - } - r := core.NewRegionInfo(metaRegion, leader, opts...) - err := svr.GetRaftCluster().HandleRegionHeartbeat(r) - re.NoError(err) - return r -} - -func mustPutStore(re *require.Assertions, svr *server.Server, id uint64, state metapb.StoreState, nodeState metapb.NodeState, labels []*metapb.StoreLabel) { - s := &server.GrpcServer{Server: svr} - _, err := s.PutStore(context.Background(), &pdpb.PutStoreRequest{ - Header: &pdpb.RequestHeader{ClusterId: svr.ClusterID()}, - Store: &metapb.Store{ - Id: id, - Address: fmt.Sprintf("tikv%d", id), - State: state, - NodeState: nodeState, - Labels: labels, - Version: versioninfo.MinSupportedVersion(versioninfo.Version2_0).String(), - }, - }) - re.NoError(err) - if state == metapb.StoreState_Up { - _, err = s.StoreHeartbeat(context.Background(), &pdpb.StoreHeartbeatRequest{ - Header: &pdpb.RequestHeader{ClusterId: svr.ClusterID()}, - Stats: &pdpb.StoreStats{StoreId: id}, - }) - re.NoError(err) - } -} - -func mustRegionHeartbeat(re *require.Assertions, svr *server.Server, region *core.RegionInfo) { - cluster := svr.GetRaftCluster() - err := cluster.HandleRegionHeartbeat(region) - re.NoError(err) -} - -func mustReadURL(re *require.Assertions, url string) string { - res, err := testDialClient.Get(url) - re.NoError(err) - defer res.Body.Close() - data, err := io.ReadAll(res.Body) - re.NoError(err) - return string(data) -} diff --git a/tests/testutil.go b/tests/testutil.go index 3fd8e9dca35..af4560e2609 100644 --- a/tests/testutil.go +++ b/tests/testutil.go @@ -19,9 +19,11 @@ import ( "fmt" "os" "sync" + "testing" "time" "github.com/docker/go-units" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" @@ -196,13 +198,18 @@ func MustPutRegion(re *require.Assertions, cluster *TestCluster, regionID, store RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}, } r := core.NewRegionInfo(metaRegion, leader, opts...) - err := cluster.HandleRegionHeartbeat(r) + MustPutRegionInfo(re, cluster, r) + return r +} + +// MustPutRegionInfo is used for test purpose. +func MustPutRegionInfo(re *require.Assertions, cluster *TestCluster, regionInfo *core.RegionInfo) { + err := cluster.HandleRegionHeartbeat(regionInfo) re.NoError(err) if cluster.GetSchedulingPrimaryServer() != nil { - err = cluster.GetSchedulingPrimaryServer().GetCluster().HandleRegionHeartbeat(r) + err = cluster.GetSchedulingPrimaryServer().GetCluster().HandleRegionHeartbeat(regionInfo) re.NoError(err) } - return r } // MustReportBuckets is used for test purpose. @@ -220,3 +227,77 @@ func MustReportBuckets(re *require.Assertions, cluster *TestCluster, regionID ui // TODO: forwards to scheduling server after it supports buckets return buckets } + +// SchedulingTestEnvironment is used for test purpose. +type SchedulingTestEnvironment struct { + t *testing.T + ctx context.Context + cancel context.CancelFunc + cluster *TestCluster + opts []ConfigOption +} + +// NewSchedulingTestEnvironment is to create a new SchedulingTestEnvironment. +func NewSchedulingTestEnvironment(t *testing.T, opts ...ConfigOption) *SchedulingTestEnvironment { + return &SchedulingTestEnvironment{ + t: t, + opts: opts, + } +} + +// RunTestInTwoModes is to run test in two modes. +func (s *SchedulingTestEnvironment) RunTestInTwoModes(test func(*TestCluster)) { + // run test in pd mode + s.t.Log("start to run test in pd mode") + re := require.New(s.t) + s.runInPDMode() + test(s.cluster) + s.cleanup() + s.t.Log("finish to run test in pd mode") + + // run test in api mode + s.t.Log("start to run test in api mode") + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember", `return(true)`)) + s.runInAPIMode() + test(s.cluster) + s.cleanup() + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember")) + s.t.Log("finish to run test in api mode") +} + +func (s *SchedulingTestEnvironment) cleanup() { + s.cluster.Destroy() + s.cancel() +} + +func (s *SchedulingTestEnvironment) runInPDMode() { + var err error + re := require.New(s.t) + s.ctx, s.cancel = context.WithCancel(context.Background()) + s.cluster, err = NewTestCluster(s.ctx, 1, s.opts...) + re.NoError(err) + err = s.cluster.RunInitialServers() + re.NoError(err) + re.NotEmpty(s.cluster.WaitLeader()) + leaderServer := s.cluster.GetServer(s.cluster.GetLeader()) + re.NoError(leaderServer.BootstrapCluster()) +} + +func (s *SchedulingTestEnvironment) runInAPIMode() { + var err error + re := require.New(s.t) + s.ctx, s.cancel = context.WithCancel(context.Background()) + s.cluster, err = NewTestAPICluster(s.ctx, 1, s.opts...) + re.NoError(err) + err = s.cluster.RunInitialServers() + re.NoError(err) + re.NotEmpty(s.cluster.WaitLeader()) + leaderServer := s.cluster.GetServer(s.cluster.GetLeader()) + re.NoError(leaderServer.BootstrapCluster()) + // start scheduling cluster + tc, err := NewTestSchedulingCluster(s.ctx, 1, leaderServer.GetAddr()) + re.NoError(err) + tc.WaitForPrimaryServing(re) + s.cluster.SetSchedulingCluster(tc) + time.Sleep(200 * time.Millisecond) // wait for scheduling cluster to update member +} From 849d80d79f423984a05f1524f746b6aa463b158d Mon Sep 17 00:00:00 2001 From: ShuNing Date: Thu, 28 Sep 2023 15:39:20 +0800 Subject: [PATCH 5/8] api: supports GetRegion by hex key (#7160) close tikv/pd#7159 api: supports GetRegion by hex key Signed-off-by: nolouch Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- server/api/region.go | 11 +++++++++++ server/api/region_test.go | 6 ++++++ 2 files changed, 17 insertions(+) diff --git a/server/api/region.go b/server/api/region.go index 42b430974c4..68e280f610c 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -277,6 +277,17 @@ func (h *regionHandler) GetRegion(w http.ResponseWriter, r *http.Request) { h.rd.JSON(w, http.StatusBadRequest, err.Error()) return } + // decode hex if query has params with hex format + formatStr := r.URL.Query().Get("format") + if formatStr == "hex" { + keyBytes, err := hex.DecodeString(key) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + key = string(keyBytes) + } + regionInfo := rc.GetRegionByKey([]byte(key)) h.rd.JSON(w, http.StatusOK, NewAPIRegionInfo(regionInfo)) } diff --git a/server/api/region_test.go b/server/api/region_test.go index acd305884d4..a39a1e5c5fd 100644 --- a/server/api/region_test.go +++ b/server/api/region_test.go @@ -147,6 +147,12 @@ func (suite *regionTestSuite) TestRegion() { suite.NoError(tu.ReadGetJSON(re, testDialClient, url, r2)) r2.Adjust() suite.Equal(NewAPIRegionInfo(r), r2) + + url = fmt.Sprintf("%s/region/key/%s?format=hex", suite.urlPrefix, hex.EncodeToString([]byte("a"))) + r2 = &RegionInfo{} + suite.NoError(tu.ReadGetJSON(re, testDialClient, url, r2)) + r2.Adjust() + suite.Equal(NewAPIRegionInfo(r), r2) } func (suite *regionTestSuite) TestRegionCheck() { From 3904a62661cdb8fe95b083c08ff4d69f55e01dc1 Mon Sep 17 00:00:00 2001 From: Hu# Date: Thu, 28 Sep 2023 17:19:51 +0800 Subject: [PATCH 6/8] dashboard_test: extend wait time for dashboard service (#7170) close tikv/pd#3182 Signed-off-by: husharp --- server/config/config.go | 2 +- tests/dashboard/service_test.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/server/config/config.go b/server/config/config.go index 5b9088ac8ea..0485e077c67 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -505,7 +505,7 @@ type PDServerConfig struct { // RuntimeServices is the running extension services. RuntimeServices typeutil.StringSlice `toml:"runtime-services" json:"runtime-services"` // MetricStorage is the cluster metric storage. - // Currently we use prometheus as metric storage, we may use PD/TiKV as metric storage later. + // Currently, we use prometheus as metric storage, we may use PD/TiKV as metric storage later. MetricStorage string `toml:"metric-storage" json:"metric-storage"` // There are some values supported: "auto", "none", or a specific address, default: "auto" DashboardAddress string `toml:"dashboard-address" json:"dashboard-address"` diff --git a/tests/dashboard/service_test.go b/tests/dashboard/service_test.go index ab3a2c431cb..5f72efb2c36 100644 --- a/tests/dashboard/service_test.go +++ b/tests/dashboard/service_test.go @@ -86,7 +86,8 @@ func (suite *dashboardTestSuite) checkRespCode(url string, code int) { } func waitForConfigSync() { - time.Sleep(time.Second) + // Need to wait dashboard service start. + time.Sleep(3 * time.Second) } func (suite *dashboardTestSuite) checkServiceIsStarted(internalProxy bool, servers map[string]*tests.TestServer, leader *tests.TestServer) string { From 54219d649fb4c8834cd94362a63988f3c074d33e Mon Sep 17 00:00:00 2001 From: buffer <1045931706@qq.com> Date: Thu, 28 Sep 2023 17:42:21 +0800 Subject: [PATCH 7/8] region: fix the potential panic . (#7143) close tikv/pd#4399 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/core/region.go | 20 ++++++++++++++++++-- pkg/core/region_test.go | 13 +++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/pkg/core/region.go b/pkg/core/region.go index 9e32bf7c2f5..8d0379f266f 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -23,11 +23,13 @@ import ( "sort" "strings" "sync/atomic" + "time" "unsafe" "github.com/docker/go-units" "github.com/gogo/protobuf/proto" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/replication_modepb" @@ -996,6 +998,11 @@ func (r *RegionsInfo) setRegionLocked(region *RegionInfo, withOverlaps bool, ol // UpdateSubTree updates the subtree. func (r *RegionsInfo) UpdateSubTree(region, origin *RegionInfo, overlaps []*RegionInfo, rangeChanged bool) { + failpoint.Inject("UpdateSubTree", func() { + if origin == nil { + time.Sleep(time.Second) + } + }) r.st.Lock() defer r.st.Unlock() if origin != nil { @@ -1004,8 +1011,17 @@ func (r *RegionsInfo) UpdateSubTree(region, origin *RegionInfo, overlaps []*Regi // TODO: Improve performance by deleting only the different peers. r.removeRegionFromSubTreeLocked(origin) } else { - r.updateSubTreeStat(origin, region) - r.subRegions[region.GetID()].RegionInfo = region + // The region tree and the subtree update is not atomic and the region tree is updated first. + // If there are two thread needs to update region tree, + // t1: thread-A update region tree + // t2: thread-B: update region tree again + // t3: thread-B: update subtree + // t4: thread-A: update region subtree + // to keep region tree consistent with subtree, we need to drop this update. + if tree, ok := r.subRegions[region.GetID()]; ok { + r.updateSubTreeStat(origin, region) + tree.RegionInfo = region + } return } } diff --git a/pkg/core/region_test.go b/pkg/core/region_test.go index 5588d9190ec..50302de920e 100644 --- a/pkg/core/region_test.go +++ b/pkg/core/region_test.go @@ -21,6 +21,7 @@ import ( "strconv" "testing" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" @@ -450,6 +451,18 @@ func TestRegionKey(t *testing.T) { } } +func TestSetRegionConcurrence(t *testing.T) { + re := require.New(t) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/core/UpdateSubTree", `return()`)) + regions := NewRegionsInfo() + region := NewTestRegionInfo(1, 1, []byte("a"), []byte("b")) + go func() { + regions.AtomicCheckAndPutRegion(region) + }() + regions.AtomicCheckAndPutRegion(region) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/core/UpdateSubTree")) +} + func TestSetRegion(t *testing.T) { re := require.New(t) regions := NewRegionsInfo() From 95560408ad1c3db206559a826dadaf6cc47a584a Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Sun, 8 Oct 2023 11:34:52 +0800 Subject: [PATCH 8/8] mcs: support changing log level (#7172) ref tikv/pd#5839 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/mcs/resourcemanager/server/apis/v1/api.go | 31 +++++++++++++++++-- .../resourcemanager/server/grpc_service.go | 3 +- pkg/mcs/resourcemanager/server/server.go | 12 +++++++ pkg/mcs/scheduling/server/apis/v1/api.go | 25 +++++++++++++++ pkg/mcs/scheduling/server/server.go | 12 +++++++ pkg/mcs/tso/server/apis/v1/api.go | 18 +++++++++++ pkg/mcs/tso/server/server.go | 12 +++++++ pkg/utils/logutil/log.go | 10 ++++++ server/server.go | 11 +------ 9 files changed, 120 insertions(+), 14 deletions(-) diff --git a/pkg/mcs/resourcemanager/server/apis/v1/api.go b/pkg/mcs/resourcemanager/server/apis/v1/api.go index 970880788d4..ffcb9318590 100644 --- a/pkg/mcs/resourcemanager/server/apis/v1/api.go +++ b/pkg/mcs/resourcemanager/server/apis/v1/api.go @@ -27,10 +27,12 @@ import ( "github.com/gin-gonic/gin" "github.com/joho/godotenv" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/pingcap/log" rmserver "github.com/tikv/pd/pkg/mcs/resourcemanager/server" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" + "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/reflectutil" ) @@ -57,7 +59,7 @@ func init() { // Service is the resource group service. type Service struct { apiHandlerEngine *gin.Engine - baseEndpoint *gin.RouterGroup + root *gin.RouterGroup manager *rmserver.Manager } @@ -86,15 +88,22 @@ func NewService(srv *rmserver.Service) *Service { s := &Service{ manager: manager, apiHandlerEngine: apiHandlerEngine, - baseEndpoint: endpoint, + root: endpoint, } + s.RegisterAdminRouter() s.RegisterRouter() return s } +// RegisterAdminRouter registers the router of the TSO admin handler. +func (s *Service) RegisterAdminRouter() { + router := s.root.Group("admin") + router.PUT("/log", changeLogLevel) +} + // RegisterRouter registers the router of the service. func (s *Service) RegisterRouter() { - configEndpoint := s.baseEndpoint.Group("/config") + configEndpoint := s.root.Group("/config") configEndpoint.POST("/group", s.postResourceGroup) configEndpoint.PUT("/group", s.putResourceGroup) configEndpoint.GET("/group/:name", s.getResourceGroup) @@ -110,6 +119,22 @@ func (s *Service) handler() http.Handler { }) } +func changeLogLevel(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*rmserver.Service) + var level string + if err := c.Bind(&level); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + if err := svr.SetLogLevel(level); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + log.SetLevel(logutil.StringToZapLogLevel(level)) + c.String(http.StatusOK, "The log level is updated.") +} + // postResourceGroup // // @Tags ResourceManager diff --git a/pkg/mcs/resourcemanager/server/grpc_service.go b/pkg/mcs/resourcemanager/server/grpc_service.go index d0fac920f2f..bd46c45fa63 100644 --- a/pkg/mcs/resourcemanager/server/grpc_service.go +++ b/pkg/mcs/resourcemanager/server/grpc_service.go @@ -54,7 +54,8 @@ func (d dummyRestService) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Service is the gRPC service for resource manager. type Service struct { - ctx context.Context + ctx context.Context + *Server manager *Manager // settings } diff --git a/pkg/mcs/resourcemanager/server/server.go b/pkg/mcs/resourcemanager/server/server.go index 78685850e86..9b9bd91c6eb 100644 --- a/pkg/mcs/resourcemanager/server/server.go +++ b/pkg/mcs/resourcemanager/server/server.go @@ -26,6 +26,7 @@ import ( "time" grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/diagnosticspb" "github.com/pingcap/kvproto/pkg/resource_manager" @@ -86,6 +87,17 @@ func (s *Server) GetAddr() string { return s.cfg.ListenAddr } +// SetLogLevel sets log level. +func (s *Server) SetLogLevel(level string) error { + if !logutil.IsLevelLegal(level) { + return errors.Errorf("log level %s is illegal", level) + } + s.cfg.Log.Level = level + log.SetLevel(logutil.StringToZapLogLevel(level)) + log.Warn("log level changed", zap.String("level", log.GetLevel().String())) + return nil +} + // Run runs the Resource Manager server. func (s *Server) Run() (err error) { skipWaitAPIServiceReady := false diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index e66bf00ef94..b34e79b9ea8 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -25,6 +25,7 @@ import ( "github.com/gin-contrib/pprof" "github.com/gin-gonic/gin" "github.com/joho/godotenv" + "github.com/pingcap/log" scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/schedule" @@ -33,6 +34,7 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" + "github.com/tikv/pd/pkg/utils/logutil" "github.com/unrolled/render" ) @@ -110,12 +112,19 @@ func NewService(srv *scheserver.Service) *Service { root: root, rd: createIndentRender(), } + s.RegisterAdminRouter() s.RegisterOperatorsRouter() s.RegisterSchedulersRouter() s.RegisterCheckersRouter() return s } +// RegisterAdminRouter registers the router of the admin handler. +func (s *Service) RegisterAdminRouter() { + router := s.root.Group("admin") + router.PUT("/log", changeLogLevel) +} + // RegisterSchedulersRouter registers the router of the schedulers handler. func (s *Service) RegisterSchedulersRouter() { router := s.root.Group("schedulers") @@ -138,6 +147,22 @@ func (s *Service) RegisterOperatorsRouter() { router.GET("/records", getOperatorRecords) } +func changeLogLevel(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) + var level string + if err := c.Bind(&level); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + if err := svr.SetLogLevel(level); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + log.SetLevel(logutil.StringToZapLogLevel(level)) + c.String(http.StatusOK, "The log level is updated.") +} + // @Tags operators // @Summary Get an operator by ID. // @Param region_id path int true "A Region's Id" diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index c1aecc2f18b..4ec2f2731e7 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -27,6 +27,7 @@ import ( "time" grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/diagnosticspb" "github.com/pingcap/kvproto/pkg/pdpb" @@ -119,6 +120,17 @@ func (s *Server) GetBackendEndpoints() string { return s.cfg.BackendEndpoints } +// SetLogLevel sets log level. +func (s *Server) SetLogLevel(level string) error { + if !logutil.IsLevelLegal(level) { + return errors.Errorf("log level %s is illegal", level) + } + s.cfg.Log.Level = level + log.SetLevel(logutil.StringToZapLogLevel(level)) + log.Warn("log level changed", zap.String("level", log.GetLevel().String())) + return nil +} + // Run runs the scheduling server. func (s *Server) Run() error { skipWaitAPIServiceReady := false diff --git a/pkg/mcs/tso/server/apis/v1/api.go b/pkg/mcs/tso/server/apis/v1/api.go index c2cbca005d7..f1853bf5483 100644 --- a/pkg/mcs/tso/server/apis/v1/api.go +++ b/pkg/mcs/tso/server/apis/v1/api.go @@ -32,6 +32,7 @@ import ( "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" + "github.com/tikv/pd/pkg/utils/logutil" "github.com/unrolled/render" "go.uber.org/zap" ) @@ -107,6 +108,7 @@ func NewService(srv *tsoserver.Service) *Service { func (s *Service) RegisterAdminRouter() { router := s.root.Group("admin") router.POST("/reset-ts", ResetTS) + router.PUT("/log", changeLogLevel) } // RegisterKeyspaceGroupRouter registers the router of the TSO keyspace group handler. @@ -115,6 +117,22 @@ func (s *Service) RegisterKeyspaceGroupRouter() { router.GET("/members", GetKeyspaceGroupMembers) } +func changeLogLevel(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service) + var level string + if err := c.Bind(&level); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + if err := svr.SetLogLevel(level); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + log.SetLevel(logutil.StringToZapLogLevel(level)) + c.String(http.StatusOK, "The log level is updated.") +} + // ResetTSParams is the input json body params of ResetTS type ResetTSParams struct { TSO string `json:"tso"` diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 40958ca463c..133f87b78f3 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -27,6 +27,7 @@ import ( "time" grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/diagnosticspb" "github.com/pingcap/kvproto/pkg/tsopb" @@ -129,6 +130,17 @@ func (s *Server) RegisterGRPCService(grpcServer *grpc.Server) { s.service.RegisterGRPCService(grpcServer) } +// SetLogLevel sets log level. +func (s *Server) SetLogLevel(level string) error { + if !logutil.IsLevelLegal(level) { + return errors.Errorf("log level %s is illegal", level) + } + s.cfg.Log.Level = level + log.SetLevel(logutil.StringToZapLogLevel(level)) + log.Warn("log level changed", zap.String("level", log.GetLevel().String())) + return nil +} + // Run runs the TSO server. func (s *Server) Run() error { skipWaitAPIServiceReady := false diff --git a/pkg/utils/logutil/log.go b/pkg/utils/logutil/log.go index abb6a2783a0..3dc4430b066 100644 --- a/pkg/utils/logutil/log.go +++ b/pkg/utils/logutil/log.go @@ -162,3 +162,13 @@ func CondUint32(key string, val uint32, condition bool) zap.Field { } return zap.Skip() } + +// IsLevelLegal checks whether the level is legal. +func IsLevelLegal(level string) bool { + switch strings.ToLower(level) { + case "fatal", "error", "warn", "warning", "debug", "info": + return true + default: + return false + } +} diff --git a/server/server.go b/server/server.go index d3fe5446a03..aa3ef12c4f5 100644 --- a/server/server.go +++ b/server/server.go @@ -1494,7 +1494,7 @@ func (s *Server) GetClusterStatus() (*cluster.Status, error) { // SetLogLevel sets log level. func (s *Server) SetLogLevel(level string) error { - if !isLevelLegal(level) { + if !logutil.IsLevelLegal(level) { return errors.Errorf("log level %s is illegal", level) } s.cfg.Log.Level = level @@ -1503,15 +1503,6 @@ func (s *Server) SetLogLevel(level string) error { return nil } -func isLevelLegal(level string) bool { - switch strings.ToLower(level) { - case "fatal", "error", "warn", "warning", "debug", "info": - return true - default: - return false - } -} - // GetReplicationModeConfig returns the replication mode config. func (s *Server) GetReplicationModeConfig() *config.ReplicationModeConfig { return s.persistOptions.GetReplicationModeConfig().Clone()