Skip to content

Commit

Permalink
Merge branch 'release-5.0' into cherry-pick-5326-to-release-5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Sep 20, 2022
2 parents 8eeba93 + 9f14d0a commit 2c4856d
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 40 deletions.
42 changes: 27 additions & 15 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,11 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
return nil
}

// IsPrepared return true if the prepare checker is ready.
func (c *RaftCluster) IsPrepared() bool {
return c.prepareChecker.isPrepared()
}

var regionGuide = core.GenerateRegionGuideFunc(true)

// processRegionHeartbeat updates the region information.
Expand Down Expand Up @@ -608,7 +613,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
regionEventCounter.WithLabelValues("update_cache").Inc()
}

if isNew {
if !c.IsPrepared() && isNew {
c.prepareChecker.collect(region)
}

Expand Down Expand Up @@ -656,7 +661,6 @@ func (c *RaftCluster) updateStoreStatusLocked(id uint64) {
c.core.UpdateStoreStatus(id, leaderCount, regionCount, pendingPeerCount, leaderRegionSize, regionSize)
}

//nolint:unused
func (c *RaftCluster) getClusterID() uint64 {
c.RLock()
defer c.RUnlock()
Expand Down Expand Up @@ -1381,13 +1385,6 @@ func (c *RaftCluster) GetComponentManager() *component.Manager {
return c.componentManager
}

// isPrepared if the cluster information is collected
func (c *RaftCluster) isPrepared() bool {
c.RLock()
defer c.RUnlock()
return c.prepareChecker.check(c)
}

// GetStoresLoads returns load stats of all stores.
func (c *RaftCluster) GetStoresLoads() map[uint64][]float64 {
c.RLock()
Expand Down Expand Up @@ -1447,10 +1444,11 @@ func (c *RaftCluster) FitRegion(region *core.RegionInfo) *placement.RegionFit {
}

type prepareChecker struct {
sync.RWMutex
reactiveRegions map[uint64]int
start time.Time
sum int
isPrepared bool
prepared bool
}

func newPrepareChecker() *prepareChecker {
Expand All @@ -1461,12 +1459,18 @@ func newPrepareChecker() *prepareChecker {
}

// Before starting up the scheduler, we need to take the proportion of the regions on each store into consideration.
func (checker *prepareChecker) check(c *RaftCluster) bool {
if checker.isPrepared || time.Since(checker.start) > collectTimeout {
func (checker *prepareChecker) check(c *core.BasicCluster) bool {
checker.Lock()
defer checker.Unlock()
if checker.prepared {
return true
}
if time.Since(checker.start) > collectTimeout {
checker.prepared = true
return true
}
// The number of active regions should be more than total region of all stores * collectFactor
if float64(c.core.GetRegionCount())*collectFactor > float64(checker.sum) {
if float64(c.GetRegionCount())*collectFactor > float64(checker.sum) {
return false
}
for _, store := range c.GetStores() {
Expand All @@ -1475,21 +1479,29 @@ func (checker *prepareChecker) check(c *RaftCluster) bool {
}
storeID := store.GetID()
// For each store, the number of active regions should be more than total region of the store * collectFactor
if float64(c.core.GetStoreRegionCount(storeID))*collectFactor > float64(checker.reactiveRegions[storeID]) {
if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(checker.reactiveRegions[storeID]) {
return false
}
}
checker.isPrepared = true
checker.prepared = true
return true
}

func (checker *prepareChecker) collect(region *core.RegionInfo) {
checker.Lock()
defer checker.Unlock()
for _, p := range region.GetPeers() {
checker.reactiveRegions[p.GetStoreId()]++
}
checker.sum++
}

func (checker *prepareChecker) isPrepared() bool {
checker.RLock()
defer checker.RUnlock()
return checker.prepared
}

// GetHotWriteRegions gets hot write regions' info.
func (c *RaftCluster) GetHotWriteRegions() *statistics.StoreHotPeersInfos {
c.RLock()
Expand Down
5 changes: 4 additions & 1 deletion server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ func (c *coordinator) drivePushOperator() {

func (c *coordinator) run() {
ticker := time.NewTicker(runSchedulerCheckInterval)
failpoint.Inject("changeCoordinatorTicker", func() {
ticker = time.NewTicker(100 * time.Millisecond)
})
defer ticker.Stop()
log.Info("coordinator starts to collect cluster information")
for {
Expand Down Expand Up @@ -569,7 +572,7 @@ func (c *coordinator) resetHotSpotMetrics() {
}

func (c *coordinator) shouldRun() bool {
return c.cluster.isPrepared()
return c.cluster.prepareChecker.check(c.cluster.core)
}

func (c *coordinator) addScheduler(scheduler schedule.Scheduler, args ...string) error {
Expand Down
3 changes: 1 addition & 2 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,8 @@ func (s *testCoordinatorSuite) TestBasic(c *C) {
}

func (s *testCoordinatorSuite) TestDispatch(c *C) {
tc, co, cleanup := prepare(nil, func(tc *testCluster) { tc.prepareChecker.isPrepared = true }, nil, c)
tc, co, cleanup := prepare(nil, func(tc *testCluster) { tc.prepareChecker.prepared = true }, nil, c)
defer cleanup()

// Transfer peer from store 4 to store 1.
c.Assert(tc.addRegionStore(4, 40), IsNil)
c.Assert(tc.addRegionStore(3, 30), IsNil)
Expand Down
15 changes: 14 additions & 1 deletion server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type RegionInfo struct {
approximateKeys int64
interval *pdpb.TimeInterval
replicationStatus *replication_modepb.RegionReplicationStatus
fromHeartbeat bool
}

// NewRegionInfo creates RegionInfo with region's meta and leader peer.
Expand Down Expand Up @@ -98,7 +99,7 @@ const (
)

// RegionFromHeartbeat constructs a Region from region heartbeat.
func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest) *RegionInfo {
func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionCreateOption) *RegionInfo {
// Convert unit to MB.
// If region is empty or less than 1MB, use 1MB instead.
regionSize := heartbeat.GetApproximateSize() / (1 << 20)
Expand All @@ -122,6 +123,10 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest) *RegionInfo {
replicationStatus: heartbeat.GetReplicationStatus(),
}

for _, opt := range opts {
opt(region)
}

if region.writtenKeys >= ImpossibleFlowSize || region.writtenBytes >= ImpossibleFlowSize {
region.writtenKeys = 0
region.writtenBytes = 0
Expand Down Expand Up @@ -439,6 +444,11 @@ func (r *RegionInfo) GetReplicationStatus() *replication_modepb.RegionReplicatio
return r.replicationStatus
}

// IsFromHeartbeat returns whether the region info is from the region heartbeat.
func (r *RegionInfo) IsFromHeartbeat() bool {
return r.fromHeartbeat
}

// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(region, origin *RegionInfo, traceRegionFlow bool) (isNew, saveKV, saveCache, needSync bool)
Expand Down Expand Up @@ -523,6 +533,9 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
region.GetReplicationStatus().GetStateId() != origin.GetReplicationStatus().GetStateId()) {
saveCache = true
}
if !origin.IsFromHeartbeat() {
isNew = true
}
}
return
}
Expand Down
7 changes: 7 additions & 0 deletions server/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,10 @@ func WithInterval(interval *pdpb.TimeInterval) RegionCreateOption {
region.interval = interval
}
}

// SetFromHeartbeat sets if the region info comes from the region heartbeat.
func SetFromHeartbeat(fromHeartbeat bool) RegionCreateOption {
return func(region *RegionInfo) {
region.fromHeartbeat = fromHeartbeat
}
}
2 changes: 1 addition & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
lastBind = time.Now()
}

region := core.RegionFromHeartbeat(request)
region := core.RegionFromHeartbeat(request, core.SetFromHeartbeat(true))
if region.GetLeader() == nil {
log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil))
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc()
Expand Down
3 changes: 2 additions & 1 deletion server/region_syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,10 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
core.SetWrittenKeys(stats[i].KeysWritten),
core.SetReadBytes(stats[i].BytesRead),
core.SetReadKeys(stats[i].KeysRead),
core.SetFromHeartbeat(false),
)
} else {
region = core.NewRegionInfo(r, regionLeader)
region = core.NewRegionInfo(r, regionLeader, core.SetFromHeartbeat(false))
}

origin, err := bc.PreCheckPutRegion(region)
Expand Down
13 changes: 8 additions & 5 deletions server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (oc *OperatorController) AddWaitingOperator(ops ...*operator.Operator) int
}
isMerge = true
}
if !oc.checkAddOperator(op) {
if !oc.checkAddOperator(false, op) {
_ = op.Cancel()
oc.buryOperator(op)
if isMerge {
Expand Down Expand Up @@ -313,7 +313,10 @@ func (oc *OperatorController) AddOperator(ops ...*operator.Operator) bool {
oc.Lock()
defer oc.Unlock()

if oc.exceedStoreLimitLocked(ops...) || !oc.checkAddOperator(ops...) {
// note: checkAddOperator uses false param for `isPromoting`.
// This is used to keep check logic before fixing issue #4946,
// but maybe user want to add operator when waiting queue is busy
if oc.exceedStoreLimitLocked(ops...) || !oc.checkAddOperator(false, ops...) {
for _, op := range ops {
_ = op.Cancel()
oc.buryOperator(op)
Expand Down Expand Up @@ -341,7 +344,7 @@ func (oc *OperatorController) PromoteWaitingOperator() {
}
operatorWaitCounter.WithLabelValues(ops[0].Desc(), "get").Inc()

if oc.exceedStoreLimitLocked(ops...) || !oc.checkAddOperator(ops...) {
if oc.exceedStoreLimitLocked(ops...) || !oc.checkAddOperator(true, ops...) {
for _, op := range ops {
operatorWaitCounter.WithLabelValues(op.Desc(), "promote-canceled").Inc()
_ = op.Cancel()
Expand All @@ -368,7 +371,7 @@ func (oc *OperatorController) PromoteWaitingOperator() {
// - The region already has a higher priority or same priority operator.
// - Exceed the max number of waiting operators
// - At least one operator is expired.
func (oc *OperatorController) checkAddOperator(ops ...*operator.Operator) bool {
func (oc *OperatorController) checkAddOperator(isPromoting bool, ops ...*operator.Operator) bool {
for _, op := range ops {
region := oc.cluster.GetRegion(op.RegionID())
if region == nil {
Expand Down Expand Up @@ -404,7 +407,7 @@ func (oc *OperatorController) checkAddOperator(ops ...*operator.Operator) bool {
operatorWaitCounter.WithLabelValues(op.Desc(), "unexpected-status").Inc()
return false
}
if oc.wopStatus.ops[op.Desc()] >= oc.cluster.GetOpts().GetSchedulerMaxWaitingOperator() {
if !isPromoting && oc.wopStatus.ops[op.Desc()] >= oc.cluster.GetOpts().GetSchedulerMaxWaitingOperator() {
log.Debug("exceed max return false", zap.Uint64("waiting", oc.wopStatus.ops[op.Desc()]), zap.String("desc", op.Desc()), zap.Uint64("max", oc.cluster.GetOpts().GetSchedulerMaxWaitingOperator()))
operatorWaitCounter.WithLabelValues(op.Desc(), "exceed-max").Inc()
return false
Expand Down
26 changes: 13 additions & 13 deletions server/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,36 +201,36 @@ func (t *testOperatorControllerSuite) TestCheckAddUnexpectedStatus(c *C) {
{
// finished op
op := operator.NewOperator("test", "test", 1, &metapb.RegionEpoch{}, operator.OpRegion, operator.TransferLeader{ToStore: 2})
c.Assert(oc.checkAddOperator(op), IsTrue)
c.Assert(oc.checkAddOperator(false, op), IsTrue)
op.Start()
c.Assert(oc.checkAddOperator(op), IsFalse) // started
c.Assert(oc.checkAddOperator(false, op), IsFalse) // started
c.Assert(op.Check(region1), IsNil)
c.Assert(op.Status(), Equals, operator.SUCCESS)
c.Assert(oc.checkAddOperator(op), IsFalse) // success
c.Assert(oc.checkAddOperator(false, op), IsFalse) // success
}
{
// finished op canceled
op := operator.NewOperator("test", "test", 1, &metapb.RegionEpoch{}, operator.OpRegion, operator.TransferLeader{ToStore: 2})
c.Assert(oc.checkAddOperator(op), IsTrue)
c.Assert(oc.checkAddOperator(false, op), IsTrue)
c.Assert(op.Cancel(), IsTrue)
c.Assert(oc.checkAddOperator(op), IsFalse)
c.Assert(oc.checkAddOperator(false, op), IsFalse)
}
{
// finished op replaced
op := operator.NewOperator("test", "test", 1, &metapb.RegionEpoch{}, operator.OpRegion, operator.TransferLeader{ToStore: 2})
c.Assert(oc.checkAddOperator(op), IsTrue)
c.Assert(oc.checkAddOperator(false, op), IsTrue)
c.Assert(op.Start(), IsTrue)
c.Assert(op.Replace(), IsTrue)
c.Assert(oc.checkAddOperator(op), IsFalse)
c.Assert(oc.checkAddOperator(false, op), IsFalse)
}
{
// finished op expired
op1 := operator.NewOperator("test", "test", 1, &metapb.RegionEpoch{}, operator.OpRegion, operator.TransferLeader{ToStore: 2})
op2 := operator.NewOperator("test", "test", 2, &metapb.RegionEpoch{}, operator.OpRegion, operator.TransferLeader{ToStore: 1})
c.Assert(oc.checkAddOperator(op1, op2), IsTrue)
c.Assert(oc.checkAddOperator(false, op1, op2), IsTrue)
operator.SetOperatorStatusReachTime(op1, operator.CREATED, time.Now().Add(-operator.OperatorExpireTime))
operator.SetOperatorStatusReachTime(op2, operator.CREATED, time.Now().Add(-operator.OperatorExpireTime))
c.Assert(oc.checkAddOperator(op1, op2), IsFalse)
c.Assert(oc.checkAddOperator(false, op1, op2), IsFalse)
c.Assert(op1.Status(), Equals, operator.EXPIRED)
c.Assert(op2.Status(), Equals, operator.EXPIRED)
}
Expand All @@ -239,11 +239,11 @@ func (t *testOperatorControllerSuite) TestCheckAddUnexpectedStatus(c *C) {
{
// unfinished op timeout
op := operator.NewOperator("test", "test", 1, &metapb.RegionEpoch{}, operator.OpRegion, steps...)
c.Assert(oc.checkAddOperator(op), IsTrue)
c.Assert(oc.checkAddOperator(false, op), IsTrue)
op.Start()
operator.SetOperatorStatusReachTime(op, operator.STARTED, time.Now().Add(-operator.SlowOperatorWaitTime))
c.Assert(op.CheckTimeout(), IsTrue)
c.Assert(oc.checkAddOperator(op), IsFalse)
c.Assert(oc.checkAddOperator(false, op), IsFalse)
}
}

Expand Down Expand Up @@ -704,11 +704,11 @@ func (t *testOperatorControllerSuite) TestAddWaitingOperator(c *C) {

// a batch of operators should be added atomically
var batch []*operator.Operator
for i := uint64(0); i < cluster.GetSchedulerMaxWaitingOperator()-1; i++ {
for i := uint64(0); i < cluster.GetSchedulerMaxWaitingOperator(); i++ {
batch = append(batch, addPeerOp(i))
}
added := controller.AddWaitingOperator(batch...)
c.Assert(added, Equals, int(cluster.GetSchedulerMaxWaitingOperator()-1))
c.Assert(added, Equals, int(cluster.GetSchedulerMaxWaitingOperator()))

source := newRegionInfo(1, "1a", "1b", 1, 1, []uint64{101, 1}, []uint64{101, 1})
target := newRegionInfo(0, "0a", "0b", 1, 1, []uint64{101, 1}, []uint64{101, 1})
Expand Down
Loading

0 comments on commit 2c4856d

Please sign in to comment.