diff --git a/pkg/mcs/resourcemanager/server/server.go b/pkg/mcs/resourcemanager/server/server.go index 645e118e1c9..19c10bb8cf9 100644 --- a/pkg/mcs/resourcemanager/server/server.go +++ b/pkg/mcs/resourcemanager/server/server.go @@ -128,9 +128,11 @@ func (s *Server) primaryElectionLoop() { defer s.serverLoopWg.Done() for { - if s.IsClosed() { + select { + case <-s.serverLoopCtx.Done(): log.Info("server is closed, exit resource manager primary election loop") return + default: } primary, checkAgain := s.participant.CheckLeader() diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 1d56817068a..f58fba2ed0b 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -13,7 +13,6 @@ import ( "github.com/tikv/pd/pkg/statistics/buckets" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/storage" - "github.com/tikv/pd/pkg/storage/endpoint" ) // Cluster is used to manage all information for scheduling purpose. @@ -23,12 +22,13 @@ type Cluster struct { labelerManager *labeler.RegionLabeler persistConfig *config.PersistConfig hotStat *statistics.HotStat + storage storage.Storage } const regionLabelGCInterval = time.Hour // NewCluster creates a new cluster. -func NewCluster(ctx context.Context, storage endpoint.RuleStorage, cfg *config.Config) (*Cluster, error) { +func NewCluster(ctx context.Context, storage storage.Storage, cfg *config.Config) (*Cluster, error) { basicCluster := core.NewBasicCluster() persistConfig := config.NewPersistConfig(cfg) labelerManager, err := labeler.NewRegionLabeler(ctx, storage, regionLabelGCInterval) @@ -42,6 +42,7 @@ func NewCluster(ctx context.Context, storage endpoint.RuleStorage, cfg *config.C labelerManager: labelerManager, persistConfig: persistConfig, hotStat: statistics.NewHotStat(ctx), + storage: storage, }, nil } @@ -102,22 +103,24 @@ func (c *Cluster) BucketsStats(degree int, regionIDs ...uint64) map[uint64][]*bu return c.hotStat.BucketsStats(degree, regionIDs...) } -// TODO: implement the following methods - // GetStorage returns the storage. -func (c *Cluster) GetStorage() storage.Storage { return nil } +func (c *Cluster) GetStorage() storage.Storage { + return c.storage +} -// UpdateRegionsLabelLevelStats updates the region label level stats. -func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {} +// GetCheckerConfig returns the checker config. +func (c *Cluster) GetCheckerConfig() sc.CheckerConfigProvider { return c.persistConfig } + +// GetSchedulerConfig returns the scheduler config. +func (c *Cluster) GetSchedulerConfig() sc.SchedulerConfigProvider { return c.persistConfig } // GetStoreConfig returns the store config. -func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return nil } +func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return c.persistConfig } -// AllocID allocates a new ID. -func (c *Cluster) AllocID() (uint64, error) { return 0, nil } +// TODO: implement the following methods -// GetCheckerConfig returns the checker config. -func (c *Cluster) GetCheckerConfig() sc.CheckerConfigProvider { return nil } +// UpdateRegionsLabelLevelStats updates the region label level stats. +func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {} -// GetSchedulerConfig returns the scheduler config. -func (c *Cluster) GetSchedulerConfig() sc.SchedulerConfigProvider { return nil } +// AllocID allocates a new ID. +func (c *Cluster) AllocID() (uint64, error) { return 0, nil } diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index 7fa16492178..7839ec7f274 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -564,6 +564,31 @@ func (o *PersistConfig) SetHaltScheduling(halt bool, source string) { o.SetScheduleConfig(v) } +// CheckRegionKeys return error if the smallest region's keys is less than mergeKeys +func (o *PersistConfig) CheckRegionKeys(keys, mergeKeys uint64) error { + return o.GetStoreConfig().CheckRegionKeys(keys, mergeKeys) +} + +// CheckRegionSize return error if the smallest region's size is less than mergeSize +func (o *PersistConfig) CheckRegionSize(size, mergeSize uint64) error { + return o.GetStoreConfig().CheckRegionSize(size, mergeSize) +} + +// GetRegionMaxSize returns the max region size in MB +func (o *PersistConfig) GetRegionMaxSize() uint64 { + return o.GetStoreConfig().GetRegionMaxSize() +} + +// GetRegionMaxKeys returns the region split keys +func (o *PersistConfig) GetRegionMaxKeys() uint64 { + return o.GetStoreConfig().GetRegionMaxKeys() +} + +// IsEnableRegionBucket return true if the region bucket is enabled. +func (o *PersistConfig) IsEnableRegionBucket() bool { + return o.GetStoreConfig().IsEnableRegionBucket() +} + // TODO: implement the following methods // AddSchedulerCfg adds the scheduler configurations. @@ -587,6 +612,3 @@ func (o *PersistConfig) Persist(storage endpoint.ConfigStorage) error { // RemoveSchedulerCfg removes the scheduler configurations. func (o *PersistConfig) RemoveSchedulerCfg(tp string) {} - -// UseRaftV2 set some config for raft store v2 by default temporary. -func (o *PersistConfig) UseRaftV2() {} diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index c721b4238a1..845dbe38aa5 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -174,9 +174,11 @@ func (s *Server) primaryElectionLoop() { defer s.serverLoopWg.Done() for { - if s.IsClosed() { - log.Info("server is closed, exit scheduling primary election loop") + select { + case <-s.serverLoopCtx.Done(): + log.Info("server is closed, exit resource manager primary election loop") return + default: } primary, checkAgain := s.participant.CheckLeader() @@ -487,13 +489,18 @@ func (s *Server) startServer() (err error) { s.participant.InitInfo(uniqueName, uniqueID, path.Join(schedulingPrimaryPrefix, fmt.Sprintf("%05d", 0)), utils.PrimaryKey, "primary election", s.cfg.AdvertiseListenAddr) s.storage = endpoint.NewStorageEndpoint( - kv.NewEtcdKVBase(s.etcdClient, endpoint.SchedulingSvcRootPath(s.clusterID)), nil) + kv.NewEtcdKVBase(s.etcdClient, endpoint.PDRootPath(s.clusterID)), nil) s.cluster, err = NewCluster(s.ctx, s.storage, s.cfg) if err != nil { return err } s.hbStreams = hbstream.NewHeartbeatStreams(s.ctx, s.clusterID, s.cluster.GetBasicCluster()) s.coordinator = schedule.NewCoordinator(s.ctx, s.cluster, s.hbStreams) + + s.listenURL, err = url.Parse(s.cfg.ListenAddr) + if err != nil { + return err + } tlsConfig, err := s.cfg.Security.ToTLSConfig() if err != nil { return err @@ -511,7 +518,7 @@ func (s *Server) startServer() (err error) { if err != nil { return err } - + go s.coordinator.RunUntilStop() serverReadyChan := make(chan struct{}) defer close(serverReadyChan) s.serverLoopWg.Add(1) diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index a0417a863d3..ce392d26a39 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -84,22 +84,32 @@ func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster { // GetStoreConfig returns the store config. func (mc *Cluster) GetStoreConfig() sc.StoreConfigProvider { - return mc + return mc.PersistOptions.GetStoreConfig() +} + +// SetRegionBucketEnabled sets the region bucket enabled. +func (mc *Cluster) SetRegionBucketEnabled(enabled bool) { + cfg, ok := mc.GetStoreConfig().(*sc.StoreConfig) + if !ok || cfg == nil { + return + } + cfg.Coprocessor.EnableRegionBucket = enabled + mc.SetStoreConfig(cfg) } // GetCheckerConfig returns the checker config. func (mc *Cluster) GetCheckerConfig() sc.CheckerConfigProvider { - return mc + return mc.PersistOptions } // GetSchedulerConfig returns the scheduler config. func (mc *Cluster) GetSchedulerConfig() sc.SchedulerConfigProvider { - return mc + return mc.PersistOptions } // GetSharedConfig returns the shared config. func (mc *Cluster) GetSharedConfig() sc.SharedConfigProvider { - return mc + return mc.PersistOptions } // GetStorage returns the storage. diff --git a/pkg/schedule/config/config_provider.go b/pkg/schedule/config/config_provider.go index b9bb2757663..920467a07e2 100644 --- a/pkg/schedule/config/config_provider.go +++ b/pkg/schedule/config/config_provider.go @@ -131,8 +131,6 @@ type ConfProvider interface { SetSplitMergeInterval(time.Duration) SetMaxReplicas(int) SetAllStoresLimit(storelimit.Type, float64) - // only for store configuration - UseRaftV2() } // StoreConfigProvider is the interface that wraps the StoreConfigProvider related methods. @@ -142,6 +140,4 @@ type StoreConfigProvider interface { CheckRegionSize(uint64, uint64) error CheckRegionKeys(uint64, uint64) error IsEnableRegionBucket() bool - // for test purpose - SetRegionBucketEnabled(bool) } diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index 3446daba8bc..ee569f4b70e 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -247,7 +247,7 @@ func TestSplitIfRegionTooHot(t *testing.T) { addRegionInfo(tc, utils.Read, []testRegionInfo{ {1, []uint64{1, 2, 3}, 4 * units.MiB, 0, 0}, }) - tc.GetStoreConfig().SetRegionBucketEnabled(true) + tc.SetRegionBucketEnabled(true) ops, _ := hb.Schedule(tc, false) re.Len(ops, 1) expectOp, _ := operator.CreateSplitRegionOperator(splitHotReadBuckets, tc.GetRegion(1), operator.OpSplit, @@ -283,7 +283,7 @@ func TestSplitBucketsBySize(t *testing.T) { statistics.Denoising = false cancel, _, tc, oc := prepareSchedulersTest() tc.SetHotRegionCacheHitsThreshold(1) - tc.GetStoreConfig().SetRegionBucketEnabled(true) + tc.SetRegionBucketEnabled(true) defer cancel() hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) @@ -336,7 +336,7 @@ func TestSplitBucketsByLoad(t *testing.T) { statistics.Denoising = false cancel, _, tc, oc := prepareSchedulersTest() tc.SetHotRegionCacheHitsThreshold(1) - tc.GetStoreConfig().SetRegionBucketEnabled(true) + tc.SetRegionBucketEnabled(true) defer cancel() hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index d9b5a72d411..b59a9581996 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -270,7 +270,7 @@ func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL) (*clientv3.Client for { select { case <-client.Ctx().Done(): - log.Info("[etcd client] etcd client is closed, exit health check goroutine") + log.Info("etcd client is closed, exit health check goroutine") checker.Range(func(key, value interface{}) bool { client := value.(*healthyClient) client.Close() @@ -287,7 +287,7 @@ func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL) (*clientv3.Client // otherwise, the subconn will be retrying in grpc layer and use exponential backoff, // and it cannot recover as soon as possible. if time.Since(lastAvailable) > etcdServerDisconnectedTimeout { - log.Info("[etcd client] no available endpoint, try to reset endpoints", zap.Strings("last-endpoints", usedEps)) + log.Info("no available endpoint, try to reset endpoints", zap.Strings("last-endpoints", usedEps)) client.SetEndpoints([]string{}...) client.SetEndpoints(usedEps...) } @@ -296,7 +296,7 @@ func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL) (*clientv3.Client client.SetEndpoints(healthyEps...) change := fmt.Sprintf("%d->%d", len(usedEps), len(healthyEps)) etcdStateGauge.WithLabelValues("endpoints").Set(float64(len(healthyEps))) - log.Info("[etcd client] update endpoints", zap.String("num-change", change), + log.Info("update endpoints", zap.String("num-change", change), zap.Strings("last-endpoints", usedEps), zap.Strings("endpoints", client.Endpoints())) } lastAvailable = time.Now() @@ -313,7 +313,7 @@ func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL) (*clientv3.Client for { select { case <-client.Ctx().Done(): - log.Info("[etcd client] etcd client is closed, exit update endpoint goroutine") + log.Info("etcd client is closed, exit update endpoint goroutine") return case <-ticker.C: eps := syncUrls(client) @@ -377,7 +377,7 @@ func (checker *healthyChecker) update(eps []string) { if client, ok := checker.Load(ep); ok { lastHealthy := client.(*healthyClient).lastHealth if time.Since(lastHealthy) > etcdServerOfflineTimeout { - log.Info("[etcd client] some etcd server maybe offline", zap.String("endpoint", ep)) + log.Info("some etcd server maybe offline", zap.String("endpoint", ep)) checker.Delete(ep) } if time.Since(lastHealthy) > etcdServerDisconnectedTimeout { @@ -394,7 +394,7 @@ func (checker *healthyChecker) update(eps []string) { func (checker *healthyChecker) addClient(ep string, lastHealth time.Time) { client, err := newClient(checker.tlsConfig, ep) if err != nil { - log.Error("[etcd client] failed to create etcd healthy client", zap.Error(err)) + log.Error("failed to create etcd healthy client", zap.Error(err)) return } checker.Store(ep, &healthyClient{ @@ -409,7 +409,7 @@ func syncUrls(client *clientv3.Client) []string { defer cancel() mresp, err := client.MemberList(ctx) if err != nil { - log.Error("[etcd client] failed to list members", errs.ZapError(err)) + log.Error("failed to list members", errs.ZapError(err)) return []string{} } var eps []string diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 59b0cb7b428..06de6f9a56e 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -426,7 +426,6 @@ func (c *RaftCluster) runStoreConfigSync() { for { synced, switchRaftV2Config = c.syncStoreConfig(stores) if switchRaftV2Config { - c.GetOpts().UseRaftV2() if err := c.opt.Persist(c.GetStorage()); err != nil { log.Warn("store config persisted failed", zap.Error(err)) } diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 3b248843e73..1ea0b79424f 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -216,9 +216,6 @@ func (o *PersistOptions) SetMaxReplicas(replicas int) { o.SetReplicationConfig(v) } -// UseRaftV2 set some config for raft store v2 by default temporary. -func (o *PersistOptions) UseRaftV2() {} - const ( maxSnapshotCountKey = "schedule.max-snapshot-count" maxMergeRegionSizeKey = "schedule.max-merge-region-size" diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 8ada9f9d519..41e7e650261 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -804,7 +804,7 @@ func (suite *clientTestSuite) SetupSuite() { })) suite.grpcSvr.GetRaftCluster().GetBasicCluster().PutStore(newStore) } - cluster.GetStoreConfig().SetRegionBucketEnabled(true) + cluster.GetOpts().(*config.PersistOptions).SetRegionBucketEnabled(true) } func (suite *clientTestSuite) TearDownSuite() { @@ -893,7 +893,7 @@ func (suite *clientTestSuite) TestGetRegion() { } return r.Buckets != nil }) - suite.srv.GetRaftCluster().GetStoreConfig().SetRegionBucketEnabled(false) + suite.srv.GetRaftCluster().GetOpts().(*config.PersistOptions).SetRegionBucketEnabled(false) testutil.Eventually(re, func() bool { r, err := suite.client.GetRegion(context.Background(), []byte("a"), pd.WithBuckets()) @@ -903,7 +903,7 @@ func (suite *clientTestSuite) TestGetRegion() { } return r.Buckets == nil }) - suite.srv.GetRaftCluster().GetStoreConfig().SetRegionBucketEnabled(true) + suite.srv.GetRaftCluster().GetOpts().(*config.PersistOptions).SetRegionBucketEnabled(true) suite.NoError(failpoint.Enable("github.com/tikv/pd/server/grpcClientClosed", `return(true)`)) suite.NoError(failpoint.Enable("github.com/tikv/pd/server/useForwardRequest", `return(true)`))