Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedule: fix progress cannot display when enabling scheduling service #8334

Merged
merged 5 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
const (
randomRegionMaxRetry = 10
scanRegionLimit = 1000
CollectFactor = 0.9
)

// errRegionIsStale is error info for region is stale.
Expand Down Expand Up @@ -1583,6 +1584,12 @@ func (r *RegionsInfo) GetNotFromStorageRegionsCntByStore(storeID uint64) int {
return r.getNotFromStorageRegionsCntByStoreLocked(storeID)
}

// IsStorePrepared checks if a store is prepared.
// For each store, the number of active regions should be more than total region of the store * CollectFactor
func (r *RegionsInfo) IsStorePrepared(storeID uint64) bool {
return float64(r.GetNotFromStorageRegionsCntByStore(storeID)) >= float64(r.GetStoreRegionCount(storeID))*CollectFactor
}

// getNotFromStorageRegionsCntByStoreLocked gets the `NotFromStorageRegionsCnt` count of a store's leader, follower and learner by storeID.
func (r *RegionsInfo) getNotFromStorageRegionsCntByStoreLocked(storeID uint64) int {
return r.leaders[storeID].notFromStorageRegionsCount() + r.followers[storeID].notFromStorageRegionsCount() + r.learners[storeID].notFromStorageRegionsCount()
Expand Down
1 change: 0 additions & 1 deletion pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
const (
runSchedulerCheckInterval = 3 * time.Second
checkSuspectRangesInterval = 100 * time.Millisecond
collectFactor = 0.9
collectTimeout = 5 * time.Minute
maxLoadConfigRetries = 10
// pushOperatorTickInterval is the interval try to push the operator.
Expand Down
9 changes: 4 additions & 5 deletions pkg/schedule/prepare_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...ti
}
notLoadedFromRegionsCnt := c.GetClusterNotFromStorageRegionsCnt()
totalRegionsCnt := c.GetTotalRegionCount()
// The number of active regions should be more than total region of all stores * collectFactor
if float64(totalRegionsCnt)*collectFactor > float64(notLoadedFromRegionsCnt) {
// The number of active regions should be more than total region of all stores * core.CollectFactor
if float64(totalRegionsCnt)*core.CollectFactor > float64(notLoadedFromRegionsCnt) {
return false
}
for _, store := range c.GetStores() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use the similiar function with isStorePrepared to avoid to maintain the two functions?

Expand All @@ -61,11 +61,10 @@ func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...ti
}
storeID := store.GetID()
// It is used to avoid sudden scheduling when scheduling service is just started.
if len(collectWaitTime) > 0 && (float64(store.GetStoreStats().GetRegionCount())*collectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID))) {
if len(collectWaitTime) > 0 && (float64(store.GetStoreStats().GetRegionCount())*core.CollectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID))) {
return false
}
// For each store, the number of active regions should be more than total region of the store * collectFactor
if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID)) {
if !c.IsStorePrepared(storeID) {
return false
}
}
Expand Down
15 changes: 14 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1590,6 +1590,19 @@ func (c *RaftCluster) setStore(store *core.StoreInfo) error {
return nil
}

func (c *RaftCluster) isStorePrepared() bool {
for _, store := range c.GetStores() {
if !store.IsPreparing() && !store.IsServing() {
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
continue
}
storeID := store.GetID()
if !c.IsStorePrepared(storeID) {
return false
}
}
return true
}

func (c *RaftCluster) checkStores() {
var offlineStores []*metapb.Store
var upStoreCount int
Expand Down Expand Up @@ -1621,7 +1634,7 @@ func (c *RaftCluster) checkStores() {
zap.Int("region-count", c.GetTotalRegionCount()),
errs.ZapError(err))
}
} else if c.IsPrepared() {
} else if c.IsPrepared() || (c.IsServiceIndependent(mcsutils.SchedulingServiceName) && c.isStorePrepared()) {
threshold := c.getThreshold(stores, store)
regionSize := float64(store.GetRegionSize())
log.Debug("store serving threshold", zap.Uint64("store-id", storeID), zap.Float64("threshold", threshold), zap.Float64("region-size", regionSize))
Expand Down
69 changes: 69 additions & 0 deletions tests/integrations/mcs/scheduling/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,3 +675,72 @@ func (suite *multipleServerTestSuite) TestReElectLeader() {
rc = suite.pdLeader.GetServer().GetRaftCluster()
rc.IsPrepared()
}

func (suite *serverTestSuite) TestOnlineProgress() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit , please move TestOnlineProgress into serverTestSuite scope instead of behind multipleServerTestSuite :)

re := suite.Require()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)

rc := suite.pdLeader.GetServer().GetRaftCluster()
re.NotNil(rc)
s := &server.GrpcServer{Server: suite.pdLeader.GetServer()}
for i := uint64(1); i <= 3; i++ {
resp, err := s.PutStore(
context.Background(), &pdpb.PutStoreRequest{
Header: &pdpb.RequestHeader{ClusterId: suite.pdLeader.GetClusterID()},
Store: &metapb.Store{
Id: i,
Address: fmt.Sprintf("mock://%d", i),
State: metapb.StoreState_Up,
Version: "7.0.0",
},
},
)
re.NoError(err)
re.Empty(resp.GetHeader().GetError())
}
regionLen := 1000
regions := tests.InitRegions(regionLen)
for _, region := range regions {
err = rc.HandleRegionHeartbeat(region)
re.NoError(err)
}
time.Sleep(2 * time.Second)

// add a new store
resp, err := s.PutStore(
context.Background(), &pdpb.PutStoreRequest{
Header: &pdpb.RequestHeader{ClusterId: suite.pdLeader.GetClusterID()},
Store: &metapb.Store{
Id: 4,
Address: fmt.Sprintf("mock://%d", 4),
State: metapb.StoreState_Up,
Version: "7.0.0",
},
},
)
re.NoError(err)
re.Empty(resp.GetHeader().GetError())

time.Sleep(2 * time.Second)
for i, r := range regions {
if i < 50 {
r.GetMeta().Peers[2].StoreId = 4
r.GetMeta().RegionEpoch.ConfVer = 2
r.GetMeta().RegionEpoch.Version = 2
err = rc.HandleRegionHeartbeat(r)
re.NoError(err)
}
}
time.Sleep(2 * time.Second)
action, progress, ls, cs, err := rc.GetProgressByID("4")
re.Equal("preparing", action)
re.NotEmpty(progress)
re.NotEmpty(cs)
re.NotEmpty(ls)
re.NoError(err)
suite.TearDownSuite()
suite.SetupSuite()
}
5 changes: 5 additions & 0 deletions tests/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,11 @@ func InitRegions(regionLen int) []*core.RegionInfo {
{Id: allocator.alloc(), StoreId: uint64(3)},
},
}
if i == 0 {
r.StartKey = []byte{}
} else if i == regionLen-1 {
r.EndKey = []byte{}
}
region := core.NewRegionInfo(r, r.Peers[0], core.SetSource(core.Heartbeat))
// Here is used to simulate the upgrade process.
if i < regionLen/2 {
Expand Down
Loading