Skip to content

Commit

Permalink
address
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch committed Apr 11, 2024
1 parent 06a1903 commit d22d9c4
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 3 deletions.
6 changes: 5 additions & 1 deletion pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ func HandleOverlaps(c Cluster, overlaps []*core.RegionInfo) {
func Collect(c Cluster, region *core.RegionInfo, hasRegionStats bool) {
if hasRegionStats {
// get region again from root tree. make sure the observed region is the latest.
region = c.GetBasicCluster().GetRegion(region.GetID())
bc := c.GetBasicCluster()
if bc == nil {
return
}
region = bc.GetRegion(region.GetID())
if region == nil {
return
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ const (
regionLabelGCInterval = time.Hour
requestTimeout = 3 * time.Second
collectWaitTime = time.Minute

// heartbeat relative const
hbAsyncRunner = "heartbeat-async-task-runner"
)

var syncRunner = ratelimit.NewSyncRunner()
Expand Down Expand Up @@ -90,7 +93,7 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig,
clusterID: clusterID,
checkMembershipCh: checkMembershipCh,

taskRunner: ratelimit.NewAsyncRunner("heartbeat-async-task-runner", time.Minute),
taskRunner: ratelimit.NewAsyncRunner(hbAsyncRunner, time.Minute),
hbConcurrencyLimiter: ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU() * 2)),
}
c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams)
Expand Down
12 changes: 12 additions & 0 deletions pkg/statistics/region_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,18 @@ func (r *RegionStatistics) RegionStatsNeedUpdate(region *core.RegionInfo) bool {
region.IsOversized(int64(r.conf.GetRegionMaxSize()), int64(r.conf.GetRegionMaxKeys())) {
return true
}
// expected to be zero for below type
if r.IsRegionStatsType(regionID, PendingPeer) && len(region.GetPendingPeers()) == 0 {
return true
}
if r.IsRegionStatsType(regionID, DownPeer) && len(region.GetDownPeers()) == 0 {
return true
}
if r.IsRegionStatsType(regionID, LearnerPeer) && len(region.GetLearners()) == 0 {
return true
}

// merge
return r.IsRegionStatsType(regionID, UndersizedRegion) !=
region.NeedMerge(int64(r.conf.GetMaxMergeRegionSize()), int64(r.conf.GetMaxMergeRegionKeys()))
}
Expand Down
5 changes: 4 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ const (
// minSnapshotDurationSec is the minimum duration that a store can tolerate.
// It should enlarge the limiter if the snapshot's duration is less than this value.
minSnapshotDurationSec = 5

// heartbeat relative const
hbAsyncRunner = "heartbeat-async-task-runner"
)

// Server is the interface for cluster.
Expand Down Expand Up @@ -195,7 +198,7 @@ func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.Ba
etcdClient: etcdClient,
core: basicCluster,
storage: storage,
taskRunner: ratelimit.NewAsyncRunner("heartbeat-async-task-runner", time.Minute),
taskRunner: ratelimit.NewAsyncRunner(hbAsyncRunner, time.Minute),
hbConcurrencyLimiter: ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU() * 2)),
}
}
Expand Down

0 comments on commit d22d9c4

Please sign in to comment.