Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: make coordinator run #6896

Merged
merged 9 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,11 @@ func (s *Server) primaryElectionLoop() {
defer s.serverLoopWg.Done()

for {
if s.IsClosed() {
select {
case <-s.serverLoopCtx.Done():
log.Info("server is closed, exit resource manager primary election loop")
return
default:
}

primary, checkAgain := s.participant.CheckLeader()
Expand Down
31 changes: 17 additions & 14 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/tikv/pd/pkg/statistics/buckets"
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/storage/endpoint"
)

// Cluster is used to manage all information for scheduling purpose.
Expand All @@ -23,12 +22,13 @@ type Cluster struct {
labelerManager *labeler.RegionLabeler
persistConfig *config.PersistConfig
hotStat *statistics.HotStat
storage storage.Storage
}

const regionLabelGCInterval = time.Hour

// NewCluster creates a new cluster.
func NewCluster(ctx context.Context, storage endpoint.RuleStorage, cfg *config.Config) (*Cluster, error) {
func NewCluster(ctx context.Context, storage storage.Storage, cfg *config.Config) (*Cluster, error) {
basicCluster := core.NewBasicCluster()
persistConfig := config.NewPersistConfig(cfg)
labelerManager, err := labeler.NewRegionLabeler(ctx, storage, regionLabelGCInterval)
Expand All @@ -42,6 +42,7 @@ func NewCluster(ctx context.Context, storage endpoint.RuleStorage, cfg *config.C
labelerManager: labelerManager,
persistConfig: persistConfig,
hotStat: statistics.NewHotStat(ctx),
storage: storage,
}, nil
}

Expand Down Expand Up @@ -102,22 +103,24 @@ func (c *Cluster) BucketsStats(degree int, regionIDs ...uint64) map[uint64][]*bu
return c.hotStat.BucketsStats(degree, regionIDs...)
}

// TODO: implement the following methods

// GetStorage returns the storage.
func (c *Cluster) GetStorage() storage.Storage { return nil }
func (c *Cluster) GetStorage() storage.Storage {
return c.storage
}

// UpdateRegionsLabelLevelStats updates the region label level stats.
func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {}
// GetCheckerConfig returns the checker config.
func (c *Cluster) GetCheckerConfig() sc.CheckerConfigProvider { return c.persistConfig }

// GetSchedulerConfig returns the scheduler config.
func (c *Cluster) GetSchedulerConfig() sc.SchedulerConfigProvider { return c.persistConfig }

// GetStoreConfig returns the store config.
func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return nil }
func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return c.persistConfig }

// AllocID allocates a new ID.
func (c *Cluster) AllocID() (uint64, error) { return 0, nil }
// TODO: implement the following methods

// GetCheckerConfig returns the checker config.
func (c *Cluster) GetCheckerConfig() sc.CheckerConfigProvider { return nil }
// UpdateRegionsLabelLevelStats updates the region label level stats.
func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {}

// GetSchedulerConfig returns the scheduler config.
func (c *Cluster) GetSchedulerConfig() sc.SchedulerConfigProvider { return nil }
// AllocID allocates a new ID.
func (c *Cluster) AllocID() (uint64, error) { return 0, nil }
28 changes: 25 additions & 3 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,31 @@
o.SetScheduleConfig(v)
}

// CheckRegionKeys return error if the smallest region's keys is less than mergeKeys
func (o *PersistConfig) CheckRegionKeys(keys, mergeKeys uint64) error {
return o.GetStoreConfig().CheckRegionKeys(keys, mergeKeys)

Check warning on line 569 in pkg/mcs/scheduling/server/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/config/config.go#L569

Added line #L569 was not covered by tests
}

// CheckRegionSize return error if the smallest region's size is less than mergeSize
func (o *PersistConfig) CheckRegionSize(size, mergeSize uint64) error {
return o.GetStoreConfig().CheckRegionSize(size, mergeSize)

Check warning on line 574 in pkg/mcs/scheduling/server/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/config/config.go#L574

Added line #L574 was not covered by tests
}

// GetRegionMaxSize returns the max region size in MB
func (o *PersistConfig) GetRegionMaxSize() uint64 {
return o.GetStoreConfig().GetRegionMaxSize()

Check warning on line 579 in pkg/mcs/scheduling/server/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/config/config.go#L579

Added line #L579 was not covered by tests
}

// GetRegionMaxKeys returns the region split keys
func (o *PersistConfig) GetRegionMaxKeys() uint64 {
return o.GetStoreConfig().GetRegionMaxKeys()

Check warning on line 584 in pkg/mcs/scheduling/server/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/config/config.go#L584

Added line #L584 was not covered by tests
}

// IsEnableRegionBucket return true if the region bucket is enabled.
func (o *PersistConfig) IsEnableRegionBucket() bool {
return o.GetStoreConfig().IsEnableRegionBucket()

Check warning on line 589 in pkg/mcs/scheduling/server/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/config/config.go#L589

Added line #L589 was not covered by tests
}

// TODO: implement the following methods

// AddSchedulerCfg adds the scheduler configurations.
Expand All @@ -587,6 +612,3 @@

// RemoveSchedulerCfg removes the scheduler configurations.
func (o *PersistConfig) RemoveSchedulerCfg(tp string) {}

// UseRaftV2 set some config for raft store v2 by default temporary.
func (o *PersistConfig) UseRaftV2() {}
15 changes: 11 additions & 4 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,11 @@ func (s *Server) primaryElectionLoop() {
defer s.serverLoopWg.Done()

for {
if s.IsClosed() {
log.Info("server is closed, exit scheduling primary election loop")
select {
case <-s.serverLoopCtx.Done():
log.Info("server is closed, exit resource manager primary election loop")
return
default:
}

primary, checkAgain := s.participant.CheckLeader()
Expand Down Expand Up @@ -487,13 +489,18 @@ func (s *Server) startServer() (err error) {
s.participant.InitInfo(uniqueName, uniqueID, path.Join(schedulingPrimaryPrefix, fmt.Sprintf("%05d", 0)),
utils.PrimaryKey, "primary election", s.cfg.AdvertiseListenAddr)
s.storage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(s.etcdClient, endpoint.SchedulingSvcRootPath(s.clusterID)), nil)
kv.NewEtcdKVBase(s.etcdClient, endpoint.PDRootPath(s.clusterID)), nil)
s.cluster, err = NewCluster(s.ctx, s.storage, s.cfg)
if err != nil {
return err
}
s.hbStreams = hbstream.NewHeartbeatStreams(s.ctx, s.clusterID, s.cluster.GetBasicCluster())
s.coordinator = schedule.NewCoordinator(s.ctx, s.cluster, s.hbStreams)

s.listenURL, err = url.Parse(s.cfg.ListenAddr)
if err != nil {
return err
}
tlsConfig, err := s.cfg.Security.ToTLSConfig()
if err != nil {
return err
Expand All @@ -511,7 +518,7 @@ func (s *Server) startServer() (err error) {
if err != nil {
return err
}

go s.coordinator.RunUntilStop()
serverReadyChan := make(chan struct{})
defer close(serverReadyChan)
s.serverLoopWg.Add(1)
Expand Down
18 changes: 14 additions & 4 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,32 @@

// GetStoreConfig returns the store config.
func (mc *Cluster) GetStoreConfig() sc.StoreConfigProvider {
return mc
return mc.PersistOptions.GetStoreConfig()
}

// SetRegionBucketEnabled sets the region bucket enabled.
func (mc *Cluster) SetRegionBucketEnabled(enabled bool) {
cfg, ok := mc.GetStoreConfig().(*sc.StoreConfig)
if !ok || cfg == nil {
return

Check warning on line 94 in pkg/mock/mockcluster/mockcluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mock/mockcluster/mockcluster.go#L94

Added line #L94 was not covered by tests
}
cfg.Coprocessor.EnableRegionBucket = enabled
mc.SetStoreConfig(cfg)
}

// GetCheckerConfig returns the checker config.
func (mc *Cluster) GetCheckerConfig() sc.CheckerConfigProvider {
return mc
return mc.PersistOptions
}

// GetSchedulerConfig returns the scheduler config.
func (mc *Cluster) GetSchedulerConfig() sc.SchedulerConfigProvider {
return mc
return mc.PersistOptions
}

// GetSharedConfig returns the shared config.
func (mc *Cluster) GetSharedConfig() sc.SharedConfigProvider {
return mc
return mc.PersistOptions
}

// GetStorage returns the storage.
Expand Down
4 changes: 0 additions & 4 deletions pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,6 @@ type ConfProvider interface {
SetSplitMergeInterval(time.Duration)
SetMaxReplicas(int)
SetAllStoresLimit(storelimit.Type, float64)
// only for store configuration
UseRaftV2()
}

// StoreConfigProvider is the interface that wraps the StoreConfigProvider related methods.
Expand All @@ -142,6 +140,4 @@ type StoreConfigProvider interface {
CheckRegionSize(uint64, uint64) error
CheckRegionKeys(uint64, uint64) error
IsEnableRegionBucket() bool
// for test purpose
SetRegionBucketEnabled(bool)
}
6 changes: 3 additions & 3 deletions pkg/schedule/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func TestSplitIfRegionTooHot(t *testing.T) {
addRegionInfo(tc, utils.Read, []testRegionInfo{
{1, []uint64{1, 2, 3}, 4 * units.MiB, 0, 0},
})
tc.GetStoreConfig().SetRegionBucketEnabled(true)
tc.SetRegionBucketEnabled(true)
ops, _ := hb.Schedule(tc, false)
re.Len(ops, 1)
expectOp, _ := operator.CreateSplitRegionOperator(splitHotReadBuckets, tc.GetRegion(1), operator.OpSplit,
Expand Down Expand Up @@ -283,7 +283,7 @@ func TestSplitBucketsBySize(t *testing.T) {
statistics.Denoising = false
cancel, _, tc, oc := prepareSchedulersTest()
tc.SetHotRegionCacheHitsThreshold(1)
tc.GetStoreConfig().SetRegionBucketEnabled(true)
tc.SetRegionBucketEnabled(true)
defer cancel()
hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
Expand Down Expand Up @@ -336,7 +336,7 @@ func TestSplitBucketsByLoad(t *testing.T) {
statistics.Denoising = false
cancel, _, tc, oc := prepareSchedulersTest()
tc.SetHotRegionCacheHitsThreshold(1)
tc.GetStoreConfig().SetRegionBucketEnabled(true)
tc.SetRegionBucketEnabled(true)
defer cancel()
hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
Expand Down
14 changes: 7 additions & 7 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@
for {
select {
case <-client.Ctx().Done():
log.Info("[etcd client] etcd client is closed, exit health check goroutine")
log.Info("etcd client is closed, exit health check goroutine")
checker.Range(func(key, value interface{}) bool {
client := value.(*healthyClient)
client.Close()
Expand All @@ -287,7 +287,7 @@
// otherwise, the subconn will be retrying in grpc layer and use exponential backoff,
// and it cannot recover as soon as possible.
if time.Since(lastAvailable) > etcdServerDisconnectedTimeout {
log.Info("[etcd client] no available endpoint, try to reset endpoints", zap.Strings("last-endpoints", usedEps))
log.Info("no available endpoint, try to reset endpoints", zap.Strings("last-endpoints", usedEps))

Check warning on line 290 in pkg/utils/etcdutil/etcdutil.go

View check run for this annotation

Codecov / codecov/patch

pkg/utils/etcdutil/etcdutil.go#L290

Added line #L290 was not covered by tests
client.SetEndpoints([]string{}...)
client.SetEndpoints(usedEps...)
}
Expand All @@ -296,7 +296,7 @@
client.SetEndpoints(healthyEps...)
change := fmt.Sprintf("%d->%d", len(usedEps), len(healthyEps))
etcdStateGauge.WithLabelValues("endpoints").Set(float64(len(healthyEps)))
log.Info("[etcd client] update endpoints", zap.String("num-change", change),
log.Info("update endpoints", zap.String("num-change", change),
zap.Strings("last-endpoints", usedEps), zap.Strings("endpoints", client.Endpoints()))
}
lastAvailable = time.Now()
Expand All @@ -313,7 +313,7 @@
for {
select {
case <-client.Ctx().Done():
log.Info("[etcd client] etcd client is closed, exit update endpoint goroutine")
log.Info("etcd client is closed, exit update endpoint goroutine")
return
case <-ticker.C:
eps := syncUrls(client)
Expand Down Expand Up @@ -377,7 +377,7 @@
if client, ok := checker.Load(ep); ok {
lastHealthy := client.(*healthyClient).lastHealth
if time.Since(lastHealthy) > etcdServerOfflineTimeout {
log.Info("[etcd client] some etcd server maybe offline", zap.String("endpoint", ep))
log.Info("some etcd server maybe offline", zap.String("endpoint", ep))

Check warning on line 380 in pkg/utils/etcdutil/etcdutil.go

View check run for this annotation

Codecov / codecov/patch

pkg/utils/etcdutil/etcdutil.go#L380

Added line #L380 was not covered by tests
checker.Delete(ep)
}
if time.Since(lastHealthy) > etcdServerDisconnectedTimeout {
Expand All @@ -394,7 +394,7 @@
func (checker *healthyChecker) addClient(ep string, lastHealth time.Time) {
client, err := newClient(checker.tlsConfig, ep)
if err != nil {
log.Error("[etcd client] failed to create etcd healthy client", zap.Error(err))
log.Error("failed to create etcd healthy client", zap.Error(err))

Check warning on line 397 in pkg/utils/etcdutil/etcdutil.go

View check run for this annotation

Codecov / codecov/patch

pkg/utils/etcdutil/etcdutil.go#L397

Added line #L397 was not covered by tests
return
}
checker.Store(ep, &healthyClient{
Expand All @@ -409,7 +409,7 @@
defer cancel()
mresp, err := client.MemberList(ctx)
if err != nil {
log.Error("[etcd client] failed to list members", errs.ZapError(err))
log.Error("failed to list members", errs.ZapError(err))
return []string{}
}
var eps []string
Expand Down
1 change: 0 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,6 @@ func (c *RaftCluster) runStoreConfigSync() {
for {
synced, switchRaftV2Config = c.syncStoreConfig(stores)
if switchRaftV2Config {
c.GetOpts().UseRaftV2()
if err := c.opt.Persist(c.GetStorage()); err != nil {
log.Warn("store config persisted failed", zap.Error(err))
}
Expand Down
3 changes: 0 additions & 3 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,6 @@ func (o *PersistOptions) SetMaxReplicas(replicas int) {
o.SetReplicationConfig(v)
}

// UseRaftV2 set some config for raft store v2 by default temporary.
func (o *PersistOptions) UseRaftV2() {}

const (
maxSnapshotCountKey = "schedule.max-snapshot-count"
maxMergeRegionSizeKey = "schedule.max-merge-region-size"
Expand Down
6 changes: 3 additions & 3 deletions tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ func (suite *clientTestSuite) SetupSuite() {
}))
suite.grpcSvr.GetRaftCluster().GetBasicCluster().PutStore(newStore)
}
cluster.GetStoreConfig().SetRegionBucketEnabled(true)
cluster.GetOpts().(*config.PersistOptions).SetRegionBucketEnabled(true)
}

func (suite *clientTestSuite) TearDownSuite() {
Expand Down Expand Up @@ -893,7 +893,7 @@ func (suite *clientTestSuite) TestGetRegion() {
}
return r.Buckets != nil
})
suite.srv.GetRaftCluster().GetStoreConfig().SetRegionBucketEnabled(false)
suite.srv.GetRaftCluster().GetOpts().(*config.PersistOptions).SetRegionBucketEnabled(false)

testutil.Eventually(re, func() bool {
r, err := suite.client.GetRegion(context.Background(), []byte("a"), pd.WithBuckets())
Expand All @@ -903,7 +903,7 @@ func (suite *clientTestSuite) TestGetRegion() {
}
return r.Buckets == nil
})
suite.srv.GetRaftCluster().GetStoreConfig().SetRegionBucketEnabled(true)
suite.srv.GetRaftCluster().GetOpts().(*config.PersistOptions).SetRegionBucketEnabled(true)

suite.NoError(failpoint.Enable("github.com/tikv/pd/server/grpcClientClosed", `return(true)`))
suite.NoError(failpoint.Enable("github.com/tikv/pd/server/useForwardRequest", `return(true)`))
Expand Down
Loading