From 40f06261bd692fc9bd7a1a7c7d2fb40c37364f1f Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 3 Aug 2023 15:46:06 +0800 Subject: [PATCH 1/8] merge hot stats Signed-off-by: Ryan Leung --- pkg/statistics/utils/kind.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/statistics/utils/kind.go b/pkg/statistics/utils/kind.go index 4d44b8d57e1..035851dd3dd 100644 --- a/pkg/statistics/utils/kind.go +++ b/pkg/statistics/utils/kind.go @@ -226,7 +226,7 @@ func (rw RWType) DefaultAntiCount() int { case Write: return HotRegionAntiCount default: // Case Read - return HotRegionAntiCount * (RegionHeartBeatReportInterval / StoreHeartBeatReportInterval) + return HotRegionAntiCount * (WriteReportInterval / ReadReportInterval) } } From cb7477e9ee32b2aebdbb95c22d59626f660fb14f Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 3 Aug 2023 12:26:25 +0800 Subject: [PATCH 2/8] create coordinator with scheduling cluster Signed-off-by: Ryan Leung --- pkg/statistics/utils/kind.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/statistics/utils/kind.go b/pkg/statistics/utils/kind.go index 035851dd3dd..4d44b8d57e1 100644 --- a/pkg/statistics/utils/kind.go +++ b/pkg/statistics/utils/kind.go @@ -226,7 +226,7 @@ func (rw RWType) DefaultAntiCount() int { case Write: return HotRegionAntiCount default: // Case Read - return HotRegionAntiCount * (WriteReportInterval / ReadReportInterval) + return HotRegionAntiCount * (RegionHeartBeatReportInterval / StoreHeartBeatReportInterval) } } From 129a19cc8073d504b26095c45dec38b86a4da1fb Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 4 Aug 2023 15:59:06 +0800 Subject: [PATCH 3/8] remove some store config Signed-off-by: Ryan Leung --- pkg/mcs/scheduling/server/cluster.go | 33 ++++++++++++++-------- pkg/mcs/scheduling/server/config/config.go | 3 -- pkg/mock/mockcluster/mockcluster.go | 10 +++++++ pkg/schedule/config/config_provider.go | 4 --- pkg/schedule/schedulers/hot_region_test.go | 6 ++-- server/cluster/cluster.go | 1 - server/config/persist_options.go | 3 -- tests/integrations/client/client_test.go | 6 ++-- 8 files changed, 38 insertions(+), 28 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 1d56817068a..dc466166fca 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -13,7 +13,6 @@ import ( "github.com/tikv/pd/pkg/statistics/buckets" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/storage" - "github.com/tikv/pd/pkg/storage/endpoint" ) // Cluster is used to manage all information for scheduling purpose. @@ -23,12 +22,13 @@ type Cluster struct { labelerManager *labeler.RegionLabeler persistConfig *config.PersistConfig hotStat *statistics.HotStat + storage storage.Storage } const regionLabelGCInterval = time.Hour // NewCluster creates a new cluster. -func NewCluster(ctx context.Context, storage endpoint.RuleStorage, cfg *config.Config) (*Cluster, error) { +func NewCluster(ctx context.Context, storage storage.Storage, cfg *config.Config) (*Cluster, error) { basicCluster := core.NewBasicCluster() persistConfig := config.NewPersistConfig(cfg) labelerManager, err := labeler.NewRegionLabeler(ctx, storage, regionLabelGCInterval) @@ -102,22 +102,33 @@ func (c *Cluster) BucketsStats(degree int, regionIDs ...uint64) map[uint64][]*bu return c.hotStat.BucketsStats(degree, regionIDs...) } +<<<<<<< HEAD // TODO: implement the following methods +======= +// GetPersistOptions returns the persist options. +func (c *Cluster) GetPersistOptions() sc.ConfProvider { + return c.persistConfig +} +>>>>>>> remove some store config // GetStorage returns the storage. -func (c *Cluster) GetStorage() storage.Storage { return nil } +func (c *Cluster) GetStorage() storage.Storage { + return c.storage +} + +// GetCheckerConfig returns the checker config. +func (c *Cluster) GetCheckerConfig() sc.CheckerConfigProvider { return c.persistConfig } + +// GetSchedulerConfig returns the scheduler config. +func (c *Cluster) GetSchedulerConfig() sc.SchedulerConfigProvider { return c.persistConfig } + +// TODO: implement the following methods // UpdateRegionsLabelLevelStats updates the region label level stats. func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {} -// GetStoreConfig returns the store config. -func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return nil } - // AllocID allocates a new ID. func (c *Cluster) AllocID() (uint64, error) { return 0, nil } -// GetCheckerConfig returns the checker config. -func (c *Cluster) GetCheckerConfig() sc.CheckerConfigProvider { return nil } - -// GetSchedulerConfig returns the scheduler config. -func (c *Cluster) GetSchedulerConfig() sc.SchedulerConfigProvider { return nil } +// GetStoreConfig returns the store config. +func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return nil } diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index 7fa16492178..7b396afe35b 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -587,6 +587,3 @@ func (o *PersistConfig) Persist(storage endpoint.ConfigStorage) error { // RemoveSchedulerCfg removes the scheduler configurations. func (o *PersistConfig) RemoveSchedulerCfg(tp string) {} - -// UseRaftV2 set some config for raft store v2 by default temporary. -func (o *PersistConfig) UseRaftV2() {} diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index a0417a863d3..8baa852117e 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -87,6 +87,16 @@ func (mc *Cluster) GetStoreConfig() sc.StoreConfigProvider { return mc } +// SetRegionBucketEnabled sets the region bucket enabled. +func (mc *Cluster) SetRegionBucketEnabled(enabled bool) { + cfg, ok := mc.GetStoreConfig().(*config.StoreConfig) + if !ok || cfg == nil { + return + } + cfg.Coprocessor.EnableRegionBucket = enabled + mc.SetStoreConfig(cfg) +} + // GetCheckerConfig returns the checker config. func (mc *Cluster) GetCheckerConfig() sc.CheckerConfigProvider { return mc diff --git a/pkg/schedule/config/config_provider.go b/pkg/schedule/config/config_provider.go index b9bb2757663..920467a07e2 100644 --- a/pkg/schedule/config/config_provider.go +++ b/pkg/schedule/config/config_provider.go @@ -131,8 +131,6 @@ type ConfProvider interface { SetSplitMergeInterval(time.Duration) SetMaxReplicas(int) SetAllStoresLimit(storelimit.Type, float64) - // only for store configuration - UseRaftV2() } // StoreConfigProvider is the interface that wraps the StoreConfigProvider related methods. @@ -142,6 +140,4 @@ type StoreConfigProvider interface { CheckRegionSize(uint64, uint64) error CheckRegionKeys(uint64, uint64) error IsEnableRegionBucket() bool - // for test purpose - SetRegionBucketEnabled(bool) } diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index 3446daba8bc..ee569f4b70e 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -247,7 +247,7 @@ func TestSplitIfRegionTooHot(t *testing.T) { addRegionInfo(tc, utils.Read, []testRegionInfo{ {1, []uint64{1, 2, 3}, 4 * units.MiB, 0, 0}, }) - tc.GetStoreConfig().SetRegionBucketEnabled(true) + tc.SetRegionBucketEnabled(true) ops, _ := hb.Schedule(tc, false) re.Len(ops, 1) expectOp, _ := operator.CreateSplitRegionOperator(splitHotReadBuckets, tc.GetRegion(1), operator.OpSplit, @@ -283,7 +283,7 @@ func TestSplitBucketsBySize(t *testing.T) { statistics.Denoising = false cancel, _, tc, oc := prepareSchedulersTest() tc.SetHotRegionCacheHitsThreshold(1) - tc.GetStoreConfig().SetRegionBucketEnabled(true) + tc.SetRegionBucketEnabled(true) defer cancel() hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) @@ -336,7 +336,7 @@ func TestSplitBucketsByLoad(t *testing.T) { statistics.Denoising = false cancel, _, tc, oc := prepareSchedulersTest() tc.SetHotRegionCacheHitsThreshold(1) - tc.GetStoreConfig().SetRegionBucketEnabled(true) + tc.SetRegionBucketEnabled(true) defer cancel() hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 68578eb8a0a..dcc94a6669d 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -426,7 +426,6 @@ func (c *RaftCluster) runStoreConfigSync() { for { synced, switchRaftV2Config = c.syncStoreConfig(stores) if switchRaftV2Config { - c.GetOpts().UseRaftV2() if err := c.opt.Persist(c.GetStorage()); err != nil { log.Warn("store config persisted failed", zap.Error(err)) } diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 3b248843e73..1ea0b79424f 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -216,9 +216,6 @@ func (o *PersistOptions) SetMaxReplicas(replicas int) { o.SetReplicationConfig(v) } -// UseRaftV2 set some config for raft store v2 by default temporary. -func (o *PersistOptions) UseRaftV2() {} - const ( maxSnapshotCountKey = "schedule.max-snapshot-count" maxMergeRegionSizeKey = "schedule.max-merge-region-size" diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 8ada9f9d519..712952cc2fc 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -804,7 +804,7 @@ func (suite *clientTestSuite) SetupSuite() { })) suite.grpcSvr.GetRaftCluster().GetBasicCluster().PutStore(newStore) } - cluster.GetStoreConfig().SetRegionBucketEnabled(true) + cluster.GetStoreConfig().(*config.StoreConfig).SetRegionBucketEnabled(true) } func (suite *clientTestSuite) TearDownSuite() { @@ -893,7 +893,7 @@ func (suite *clientTestSuite) TestGetRegion() { } return r.Buckets != nil }) - suite.srv.GetRaftCluster().GetStoreConfig().SetRegionBucketEnabled(false) + suite.srv.GetRaftCluster().GetStoreConfig().(*config.StoreConfig).SetRegionBucketEnabled(false) testutil.Eventually(re, func() bool { r, err := suite.client.GetRegion(context.Background(), []byte("a"), pd.WithBuckets()) @@ -903,7 +903,7 @@ func (suite *clientTestSuite) TestGetRegion() { } return r.Buckets == nil }) - suite.srv.GetRaftCluster().GetStoreConfig().SetRegionBucketEnabled(true) + suite.srv.GetRaftCluster().GetStoreConfig().(*config.StoreConfig).SetRegionBucketEnabled(true) suite.NoError(failpoint.Enable("github.com/tikv/pd/server/grpcClientClosed", `return(true)`)) suite.NoError(failpoint.Enable("github.com/tikv/pd/server/useForwardRequest", `return(true)`)) From 973f3668ab18c2aabd9386ff34c87d38aea476a3 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 4 Aug 2023 16:48:36 +0800 Subject: [PATCH 4/8] start coordinator Signed-off-by: Ryan Leung --- pkg/mcs/resourcemanager/server/server.go | 4 +++- pkg/mcs/scheduling/server/cluster.go | 1 + pkg/mcs/scheduling/server/server.go | 15 +++++++++++---- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/pkg/mcs/resourcemanager/server/server.go b/pkg/mcs/resourcemanager/server/server.go index 645e118e1c9..dd2ae496d41 100644 --- a/pkg/mcs/resourcemanager/server/server.go +++ b/pkg/mcs/resourcemanager/server/server.go @@ -128,9 +128,11 @@ func (s *Server) primaryElectionLoop() { defer s.serverLoopWg.Done() for { - if s.IsClosed() { + select { + case <-s.ctx.Done(): log.Info("server is closed, exit resource manager primary election loop") return + default: } primary, checkAgain := s.participant.CheckLeader() diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index dc466166fca..49f7fb4379f 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -42,6 +42,7 @@ func NewCluster(ctx context.Context, storage storage.Storage, cfg *config.Config labelerManager: labelerManager, persistConfig: persistConfig, hotStat: statistics.NewHotStat(ctx), + storage: storage, }, nil } diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index c721b4238a1..3d1f9105f2f 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -174,9 +174,11 @@ func (s *Server) primaryElectionLoop() { defer s.serverLoopWg.Done() for { - if s.IsClosed() { - log.Info("server is closed, exit scheduling primary election loop") + select { + case <-s.ctx.Done(): + log.Info("server is closed, exit resource manager primary election loop") return + default: } primary, checkAgain := s.participant.CheckLeader() @@ -487,13 +489,18 @@ func (s *Server) startServer() (err error) { s.participant.InitInfo(uniqueName, uniqueID, path.Join(schedulingPrimaryPrefix, fmt.Sprintf("%05d", 0)), utils.PrimaryKey, "primary election", s.cfg.AdvertiseListenAddr) s.storage = endpoint.NewStorageEndpoint( - kv.NewEtcdKVBase(s.etcdClient, endpoint.SchedulingSvcRootPath(s.clusterID)), nil) + kv.NewEtcdKVBase(s.etcdClient, endpoint.PDRootPath(s.clusterID)), nil) s.cluster, err = NewCluster(s.ctx, s.storage, s.cfg) if err != nil { return err } s.hbStreams = hbstream.NewHeartbeatStreams(s.ctx, s.clusterID, s.cluster.GetBasicCluster()) s.coordinator = schedule.NewCoordinator(s.ctx, s.cluster, s.hbStreams) + + s.listenURL, err = url.Parse(s.cfg.ListenAddr) + if err != nil { + return err + } tlsConfig, err := s.cfg.Security.ToTLSConfig() if err != nil { return err @@ -511,7 +518,7 @@ func (s *Server) startServer() (err error) { if err != nil { return err } - + go s.coordinator.RunUntilStop() serverReadyChan := make(chan struct{}) defer close(serverReadyChan) s.serverLoopWg.Add(1) From 7bd020bef547e7aada1b36355552bb58dc83c7cb Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 10 Aug 2023 15:50:24 +0800 Subject: [PATCH 5/8] resolve the conflicts Signed-off-by: Ryan Leung --- pkg/mcs/scheduling/server/cluster.go | 15 +++---------- pkg/mcs/scheduling/server/config/config.go | 25 ++++++++++++++++++++++ pkg/mock/mockcluster/mockcluster.go | 10 ++++----- tests/integrations/client/client_test.go | 7 +++--- 4 files changed, 37 insertions(+), 20 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 49f7fb4379f..f58fba2ed0b 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -103,15 +103,6 @@ func (c *Cluster) BucketsStats(degree int, regionIDs ...uint64) map[uint64][]*bu return c.hotStat.BucketsStats(degree, regionIDs...) } -<<<<<<< HEAD -// TODO: implement the following methods -======= -// GetPersistOptions returns the persist options. -func (c *Cluster) GetPersistOptions() sc.ConfProvider { - return c.persistConfig -} ->>>>>>> remove some store config - // GetStorage returns the storage. func (c *Cluster) GetStorage() storage.Storage { return c.storage @@ -123,6 +114,9 @@ func (c *Cluster) GetCheckerConfig() sc.CheckerConfigProvider { return c.persist // GetSchedulerConfig returns the scheduler config. func (c *Cluster) GetSchedulerConfig() sc.SchedulerConfigProvider { return c.persistConfig } +// GetStoreConfig returns the store config. +func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return c.persistConfig } + // TODO: implement the following methods // UpdateRegionsLabelLevelStats updates the region label level stats. @@ -130,6 +124,3 @@ func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {} // AllocID allocates a new ID. func (c *Cluster) AllocID() (uint64, error) { return 0, nil } - -// GetStoreConfig returns the store config. -func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return nil } diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index 7b396afe35b..7839ec7f274 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -564,6 +564,31 @@ func (o *PersistConfig) SetHaltScheduling(halt bool, source string) { o.SetScheduleConfig(v) } +// CheckRegionKeys return error if the smallest region's keys is less than mergeKeys +func (o *PersistConfig) CheckRegionKeys(keys, mergeKeys uint64) error { + return o.GetStoreConfig().CheckRegionKeys(keys, mergeKeys) +} + +// CheckRegionSize return error if the smallest region's size is less than mergeSize +func (o *PersistConfig) CheckRegionSize(size, mergeSize uint64) error { + return o.GetStoreConfig().CheckRegionSize(size, mergeSize) +} + +// GetRegionMaxSize returns the max region size in MB +func (o *PersistConfig) GetRegionMaxSize() uint64 { + return o.GetStoreConfig().GetRegionMaxSize() +} + +// GetRegionMaxKeys returns the region split keys +func (o *PersistConfig) GetRegionMaxKeys() uint64 { + return o.GetStoreConfig().GetRegionMaxKeys() +} + +// IsEnableRegionBucket return true if the region bucket is enabled. +func (o *PersistConfig) IsEnableRegionBucket() bool { + return o.GetStoreConfig().IsEnableRegionBucket() +} + // TODO: implement the following methods // AddSchedulerCfg adds the scheduler configurations. diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 8baa852117e..ce392d26a39 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -84,12 +84,12 @@ func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster { // GetStoreConfig returns the store config. func (mc *Cluster) GetStoreConfig() sc.StoreConfigProvider { - return mc + return mc.PersistOptions.GetStoreConfig() } // SetRegionBucketEnabled sets the region bucket enabled. func (mc *Cluster) SetRegionBucketEnabled(enabled bool) { - cfg, ok := mc.GetStoreConfig().(*config.StoreConfig) + cfg, ok := mc.GetStoreConfig().(*sc.StoreConfig) if !ok || cfg == nil { return } @@ -99,17 +99,17 @@ func (mc *Cluster) SetRegionBucketEnabled(enabled bool) { // GetCheckerConfig returns the checker config. func (mc *Cluster) GetCheckerConfig() sc.CheckerConfigProvider { - return mc + return mc.PersistOptions } // GetSchedulerConfig returns the scheduler config. func (mc *Cluster) GetSchedulerConfig() sc.SchedulerConfigProvider { - return mc + return mc.PersistOptions } // GetSharedConfig returns the shared config. func (mc *Cluster) GetSharedConfig() sc.SharedConfigProvider { - return mc + return mc.PersistOptions } // GetStorage returns the storage. diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 712952cc2fc..99b74078ad9 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -39,6 +39,7 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mock/mockid" + sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/assertutil" @@ -804,7 +805,7 @@ func (suite *clientTestSuite) SetupSuite() { })) suite.grpcSvr.GetRaftCluster().GetBasicCluster().PutStore(newStore) } - cluster.GetStoreConfig().(*config.StoreConfig).SetRegionBucketEnabled(true) + cluster.GetStoreConfig().(*sc.StoreConfig).SetRegionBucketEnabled(true) } func (suite *clientTestSuite) TearDownSuite() { @@ -893,7 +894,7 @@ func (suite *clientTestSuite) TestGetRegion() { } return r.Buckets != nil }) - suite.srv.GetRaftCluster().GetStoreConfig().(*config.StoreConfig).SetRegionBucketEnabled(false) + suite.srv.GetRaftCluster().GetStoreConfig().(*sc.StoreConfig).SetRegionBucketEnabled(false) testutil.Eventually(re, func() bool { r, err := suite.client.GetRegion(context.Background(), []byte("a"), pd.WithBuckets()) @@ -903,7 +904,7 @@ func (suite *clientTestSuite) TestGetRegion() { } return r.Buckets == nil }) - suite.srv.GetRaftCluster().GetStoreConfig().(*config.StoreConfig).SetRegionBucketEnabled(true) + suite.srv.GetRaftCluster().GetStoreConfig().(*sc.StoreConfig).SetRegionBucketEnabled(true) suite.NoError(failpoint.Enable("github.com/tikv/pd/server/grpcClientClosed", `return(true)`)) suite.NoError(failpoint.Enable("github.com/tikv/pd/server/useForwardRequest", `return(true)`)) From 931626ce346d96c00dba1871f77c58379f712dd1 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 10 Aug 2023 15:52:20 +0800 Subject: [PATCH 6/8] remove unnecessary log prefix Signed-off-by: Ryan Leung --- pkg/utils/etcdutil/etcdutil.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index d9b5a72d411..b59a9581996 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -270,7 +270,7 @@ func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL) (*clientv3.Client for { select { case <-client.Ctx().Done(): - log.Info("[etcd client] etcd client is closed, exit health check goroutine") + log.Info("etcd client is closed, exit health check goroutine") checker.Range(func(key, value interface{}) bool { client := value.(*healthyClient) client.Close() @@ -287,7 +287,7 @@ func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL) (*clientv3.Client // otherwise, the subconn will be retrying in grpc layer and use exponential backoff, // and it cannot recover as soon as possible. if time.Since(lastAvailable) > etcdServerDisconnectedTimeout { - log.Info("[etcd client] no available endpoint, try to reset endpoints", zap.Strings("last-endpoints", usedEps)) + log.Info("no available endpoint, try to reset endpoints", zap.Strings("last-endpoints", usedEps)) client.SetEndpoints([]string{}...) client.SetEndpoints(usedEps...) } @@ -296,7 +296,7 @@ func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL) (*clientv3.Client client.SetEndpoints(healthyEps...) change := fmt.Sprintf("%d->%d", len(usedEps), len(healthyEps)) etcdStateGauge.WithLabelValues("endpoints").Set(float64(len(healthyEps))) - log.Info("[etcd client] update endpoints", zap.String("num-change", change), + log.Info("update endpoints", zap.String("num-change", change), zap.Strings("last-endpoints", usedEps), zap.Strings("endpoints", client.Endpoints())) } lastAvailable = time.Now() @@ -313,7 +313,7 @@ func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL) (*clientv3.Client for { select { case <-client.Ctx().Done(): - log.Info("[etcd client] etcd client is closed, exit update endpoint goroutine") + log.Info("etcd client is closed, exit update endpoint goroutine") return case <-ticker.C: eps := syncUrls(client) @@ -377,7 +377,7 @@ func (checker *healthyChecker) update(eps []string) { if client, ok := checker.Load(ep); ok { lastHealthy := client.(*healthyClient).lastHealth if time.Since(lastHealthy) > etcdServerOfflineTimeout { - log.Info("[etcd client] some etcd server maybe offline", zap.String("endpoint", ep)) + log.Info("some etcd server maybe offline", zap.String("endpoint", ep)) checker.Delete(ep) } if time.Since(lastHealthy) > etcdServerDisconnectedTimeout { @@ -394,7 +394,7 @@ func (checker *healthyChecker) update(eps []string) { func (checker *healthyChecker) addClient(ep string, lastHealth time.Time) { client, err := newClient(checker.tlsConfig, ep) if err != nil { - log.Error("[etcd client] failed to create etcd healthy client", zap.Error(err)) + log.Error("failed to create etcd healthy client", zap.Error(err)) return } checker.Store(ep, &healthyClient{ @@ -409,7 +409,7 @@ func syncUrls(client *clientv3.Client) []string { defer cancel() mresp, err := client.MemberList(ctx) if err != nil { - log.Error("[etcd client] failed to list members", errs.ZapError(err)) + log.Error("failed to list members", errs.ZapError(err)) return []string{} } var eps []string From d30aa24737290f46ee4aaa721568da0a42c0a5e2 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 10 Aug 2023 16:35:10 +0800 Subject: [PATCH 7/8] fix Signed-off-by: Ryan Leung --- tests/integrations/client/client_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 99b74078ad9..41e7e650261 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -39,7 +39,6 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mock/mockid" - sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/assertutil" @@ -805,7 +804,7 @@ func (suite *clientTestSuite) SetupSuite() { })) suite.grpcSvr.GetRaftCluster().GetBasicCluster().PutStore(newStore) } - cluster.GetStoreConfig().(*sc.StoreConfig).SetRegionBucketEnabled(true) + cluster.GetOpts().(*config.PersistOptions).SetRegionBucketEnabled(true) } func (suite *clientTestSuite) TearDownSuite() { @@ -894,7 +893,7 @@ func (suite *clientTestSuite) TestGetRegion() { } return r.Buckets != nil }) - suite.srv.GetRaftCluster().GetStoreConfig().(*sc.StoreConfig).SetRegionBucketEnabled(false) + suite.srv.GetRaftCluster().GetOpts().(*config.PersistOptions).SetRegionBucketEnabled(false) testutil.Eventually(re, func() bool { r, err := suite.client.GetRegion(context.Background(), []byte("a"), pd.WithBuckets()) @@ -904,7 +903,7 @@ func (suite *clientTestSuite) TestGetRegion() { } return r.Buckets == nil }) - suite.srv.GetRaftCluster().GetStoreConfig().(*sc.StoreConfig).SetRegionBucketEnabled(true) + suite.srv.GetRaftCluster().GetOpts().(*config.PersistOptions).SetRegionBucketEnabled(true) suite.NoError(failpoint.Enable("github.com/tikv/pd/server/grpcClientClosed", `return(true)`)) suite.NoError(failpoint.Enable("github.com/tikv/pd/server/useForwardRequest", `return(true)`)) From 2d9fab8b18fd3147235df58109cfccb45b39f701 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 10 Aug 2023 17:52:35 +0800 Subject: [PATCH 8/8] fix Signed-off-by: Ryan Leung --- pkg/mcs/resourcemanager/server/server.go | 2 +- pkg/mcs/scheduling/server/server.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/mcs/resourcemanager/server/server.go b/pkg/mcs/resourcemanager/server/server.go index dd2ae496d41..19c10bb8cf9 100644 --- a/pkg/mcs/resourcemanager/server/server.go +++ b/pkg/mcs/resourcemanager/server/server.go @@ -129,7 +129,7 @@ func (s *Server) primaryElectionLoop() { for { select { - case <-s.ctx.Done(): + case <-s.serverLoopCtx.Done(): log.Info("server is closed, exit resource manager primary election loop") return default: diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 3d1f9105f2f..845dbe38aa5 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -175,7 +175,7 @@ func (s *Server) primaryElectionLoop() { for { select { - case <-s.ctx.Done(): + case <-s.serverLoopCtx.Done(): log.Info("server is closed, exit resource manager primary election loop") return default: