Skip to content

Commit

Permalink
Support to pause scheduling on specific regions
Browse files Browse the repository at this point in the history
Signed-off-by: lightmelodies <lightmelodies@outlook.com>
  • Loading branch information
lightmelodies committed Jan 28, 2022
1 parent c1d1ecd commit e63348d
Show file tree
Hide file tree
Showing 17 changed files with 235 additions and 14 deletions.
15 changes: 15 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/cache"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/server/config"
Expand Down Expand Up @@ -53,6 +54,7 @@ type Cluster struct {
*config.PersistOptions
ID uint64
suspectRegions map[uint64]struct{}
pinnedRegions *cache.TTLUint64
}

// NewCluster creates a new Cluster
Expand All @@ -63,6 +65,7 @@ func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster {
HotStat: statistics.NewHotStat(ctx),
PersistOptions: opts,
suspectRegions: map[uint64]struct{}{},
pinnedRegions: cache.NewIDTTL(ctx, time.Minute, 3*time.Minute),
}
if clus.PersistOptions.GetReplicationConfig().EnablePlacementRules {
clus.initRuleManager()
Expand Down Expand Up @@ -730,6 +733,18 @@ func (mc *Cluster) SetStoreLabel(storeID uint64, labels map[string]string) {
mc.PutStore(newStore)
}

// PinRegions pause scheduling on specific regions for a duration.
func (mc *Cluster) PinRegions(regionIDs []uint64, duration time.Duration) {
for _, regionID := range regionIDs {
mc.pinnedRegions.PutWithTTL(regionID, nil, duration)
}
}

// IsRegionPinned check if region is pinned.
func (mc *Cluster) IsRegionPinned(region *core.RegionInfo) bool {
return mc.pinnedRegions.Exists(region.GetID())
}

// AddSuspectRegions mock method
func (mc *Cluster) AddSuspectRegions(ids ...uint64) {
for _, id := range ids {
Expand Down
65 changes: 65 additions & 0 deletions server/api/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net/url"
"sort"
"strconv"
"time"

"github.com/gorilla/mux"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -505,6 +506,18 @@ func (h *regionsHandler) GetEmptyRegion(w http.ResponseWriter, r *http.Request)
h.rd.JSON(w, http.StatusOK, regionsInfo)
}

// @Tags region
// @Summary List all pinned regions.
// @Produce json
// @Success 200 {object} RegionsInfo
// @Router /regions/check/pinned-region [get]
func (h *regionsHandler) GetPinnedRegions(w http.ResponseWriter, r *http.Request) {
rc := getCluster(r)
regions := rc.GetPinnedRegions()
regionsInfo := convertToAPIRegions(regions)
h.rd.JSON(w, http.StatusOK, regionsInfo)
}

type histItem struct {
Start int64 `json:"start"`
End int64 `json:"end"`
Expand Down Expand Up @@ -914,6 +927,58 @@ func (h *regionsHandler) SplitRegions(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, &s)
}

// @Tags region
// @Summary Pin regions by given key range or regions id with given duration.
// @Accept json
// @Param body body object true "json params"
// @Produce json
// @Success 200 {string} string "Pin regions successfully."
// @Failure 400 {string} string "The input is invalid."
// @Router /regions/pin [post]
func (h *regionsHandler) PinRegions(w http.ResponseWriter, r *http.Request) {
rc := getCluster(r)
var input struct {
StartKey *string `json:"start_key"`
EndKey *string `json:"end_key"`
RegionsID *[]uint64 `json:"regions_id"`
Duration *uint64 `json:"duration"`
}
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
if input.Duration == nil {
h.rd.JSON(w, http.StatusBadRequest, "missing pin duration")
return
}
duration := time.Second * time.Duration(*input.Duration)
if input.StartKey != nil && input.EndKey != nil {
startKey, err := hex.DecodeString(*input.StartKey)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, "start_key is not in hex format")
return
}
endKey, err := hex.DecodeString(*input.EndKey)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, "end_key is not in hex format")
return
}
regions := rc.ScanRegions(startKey, endKey, -1)
regionsID := make([]uint64, len(regions))
for i, region := range regions {
regionsID[i] = region.GetID()
}
rc.PinRegions(regionsID, duration)
} else {
if input.RegionsID == nil {
h.rd.JSON(w, http.StatusBadRequest, "missing key range or regions id")
return
}
rc.PinRegions(*input.RegionsID, duration)
}
h.rd.JSON(w, http.StatusOK, "Pin regions successfully.")
}

// RegionHeap implements heap.Interface, used for selecting top n regions.
type RegionHeap struct {
regions []*core.RegionInfo
Expand Down
2 changes: 2 additions & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,13 +266,15 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
registerFunc(clusterRouter, "GetLearnerPeerRegions", "/regions/check/learner-peer", regionsHandler.GetLearnerPeerRegions, setMethods("GET"))
registerFunc(clusterRouter, "GetEmptyRegion", "/regions/check/empty-region", regionsHandler.GetEmptyRegion, setMethods("GET"))
registerFunc(clusterRouter, "GetOfflinePeer", "/regions/check/offline-peer", regionsHandler.GetOfflinePeer, setMethods("GET"))
registerFunc(clusterRouter, "GetPinnedRegions", "/regions/check/pinned-region", regionsHandler.GetPinnedRegions, setMethods("GET"))

registerFunc(clusterRouter, "GetSizeHistogram", "/regions/check/hist-size", regionsHandler.GetSizeHistogram, setMethods("GET"))
registerFunc(clusterRouter, "GetKeysHistogram", "/regions/check/hist-keys", regionsHandler.GetKeysHistogram, setMethods("GET"))
registerFunc(clusterRouter, "GetRegionSiblings", "/regions/sibling/{id}", regionsHandler.GetRegionSiblings, setMethods("GET"))
registerFunc(clusterRouter, "AccelerateRegionsSchedule", "/regions/accelerate-schedule", regionsHandler.AccelerateRegionsScheduleInRange, setMethods("POST"))
registerFunc(clusterRouter, "ScatterRegions", "/regions/scatter", regionsHandler.ScatterRegions, setMethods("POST"))
registerFunc(clusterRouter, "SplitRegions", "/regions/split", regionsHandler.SplitRegions, setMethods("POST"))
registerFunc(clusterRouter, "PinRegions", "/regions/pin", regionsHandler.PinRegions, setMethods("POST"))
registerFunc(clusterRouter, "GetRangeHoles", "/regions/range-holes", regionsHandler.GetRangeHoles, setMethods("GET"))
registerFunc(clusterRouter, "CheckRegionsReplicated", "/regions/replicated", regionsHandler.CheckRegionsReplicated, setMethods("GET"), setQueries("startKey", "{startKey}", "endKey", "{endKey}"))

Expand Down
37 changes: 37 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/cache"
"github.com/tikv/pd/pkg/component"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/etcdutil"
Expand Down Expand Up @@ -110,6 +111,8 @@ type RaftCluster struct {
regionStats *statistics.RegionStatistics
hotStat *statistics.HotStat

pinnedRegions *cache.TTLUint64

coordinator *coordinator

regionSyncer *syncer.RegionSyncer
Expand Down Expand Up @@ -209,6 +212,7 @@ func (c *RaftCluster) InitCluster(
c.labelLevelStats = statistics.NewLabelStatistics()
c.hotStat = statistics.NewHotStat(c.ctx)
c.changedRegions = make(chan *core.RegionInfo, defaultChangedRegionsLimit)
c.pinnedRegions = cache.NewIDTTL(c.ctx, time.Minute, 3*time.Minute)
}

// Start starts a cluster.
Expand Down Expand Up @@ -489,6 +493,39 @@ func (c *RaftCluster) GetOpts() *config.PersistOptions {
return c.opt
}

// PinRegions pause scheduling on specific regions for a duration.
func (c *RaftCluster) PinRegions(regionIDs []uint64, duration time.Duration) {
c.Lock()
defer c.Unlock()
if duration > 0 {
for _, regionID := range regionIDs {
c.pinnedRegions.PutWithTTL(regionID, nil, duration)
}
} else {
for _, regionID := range regionIDs {
c.pinnedRegions.Remove(regionID)
}
}
}

// IsRegionPinned check if region is pinned.
func (c *RaftCluster) IsRegionPinned(region *core.RegionInfo) bool {
return c.pinnedRegions.Exists(region.GetID())
}

// GetPinnedRegions returns all pinned regions' information in detail.
func (c *RaftCluster) GetPinnedRegions() []*core.RegionInfo {
regionIds := c.pinnedRegions.GetAllID()
regions := make([]*core.RegionInfo, 0, len(regionIds))
for _, regionID := range regionIds {
region := c.GetRegion(regionID)
if region != nil {
regions = append(regions, region)
}
}
return regions
}

// AddSuspectRegions adds regions to suspect list.
func (c *RaftCluster) AddSuspectRegions(regionIDs ...uint64) {
c.coordinator.checkers.AddSuspectRegions(regionIDs...)
Expand Down
10 changes: 10 additions & 0 deletions server/schedule/checker/merge_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator {
return nil
}

if m.cluster.IsRegionPinned(region) {
checkerCounter.WithLabelValues("merge_checker", "pinned").Inc()
return nil
}

expireTime := m.startTime.Add(m.opts.GetSplitMergeInterval())
if time.Now().Before(expireTime) {
checkerCounter.WithLabelValues("merge_checker", "recently-start").Inc()
Expand Down Expand Up @@ -172,6 +177,11 @@ func (m *MergeChecker) checkTarget(region, adjacent *core.RegionInfo) bool {
return false
}

if m.cluster.IsRegionPinned(adjacent) {
checkerCounter.WithLabelValues("merge_checker", "adj-pinned").Inc()
return false
}

if m.splitCache.Exists(adjacent.GetID()) {
checkerCounter.WithLabelValues("merge_checker", "adj-recently-split").Inc()
return false
Expand Down
15 changes: 15 additions & 0 deletions server/schedule/checker/merge_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ func (s *testMergeCheckerSuite) TestBasic(c *C) {
c.Assert(ops[0].RegionID(), Equals, s.regions[2].GetID())
c.Assert(ops[1].RegionID(), Equals, s.regions[1].GetID())

// Test pin region.
s.cluster.PinRegions([]uint64{s.regions[2].GetID()}, time.Minute)
ops = s.mc.Check(s.regions[2])
c.Assert(ops, IsNil)
s.cluster.PinRegions([]uint64{s.regions[2].GetID()}, 0)

// Enable one way merge
s.cluster.SetEnableOneWayMerge(true)
ops = s.mc.Check(s.regions[2])
Expand All @@ -180,6 +186,15 @@ func (s *testMergeCheckerSuite) TestBasic(c *C) {
c.Assert(ops[0].RegionID(), Equals, s.regions[2].GetID())
c.Assert(ops[1].RegionID(), Equals, s.regions[3].GetID())

// Test pin next region.
s.cluster.PinRegions([]uint64{s.regions[3].GetID()}, time.Minute)
ops = s.mc.Check(s.regions[2])
c.Assert(ops, NotNil)
// Now it merges to prev region.
c.Assert(ops[0].RegionID(), Equals, s.regions[2].GetID())
c.Assert(ops[1].RegionID(), Equals, s.regions[1].GetID())
s.cluster.PinRegions([]uint64{s.regions[3].GetID()}, 0)

// merge cannot across rule key.
s.cluster.SetEnablePlacementRules(true)
s.cluster.RuleManager.SetRule(&placement.Rule{
Expand Down
5 changes: 5 additions & 0 deletions server/schedule/checker/split_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ func (c *SplitChecker) Check(region *core.RegionInfo) *operator.Operator {
return nil
}

if c.cluster.IsRegionPinned(region) {
checkerCounter.WithLabelValues("split_checker", "pinned").Inc()
return nil
}

start, end := region.GetStartKey(), region.GetEndKey()
// We may consider to merge labeler split keys and rule split keys together
// before creating operator. It can help to reduce operator count. However,
Expand Down
1 change: 1 addition & 0 deletions server/schedule/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ type Cluster interface {

RemoveScheduler(name string) error
AddSuspectRegions(ids ...uint64)
IsRegionPinned(region *core.RegionInfo) bool
}
5 changes: 5 additions & 0 deletions server/schedule/healthy.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,8 @@ func IsRegionReplicated(cluster Cluster, region *core.RegionInfo) bool {
func ReplicatedRegion(cluster Cluster) func(*core.RegionInfo) bool {
return func(region *core.RegionInfo) bool { return IsRegionReplicated(cluster, region) }
}

// NonPinnedRegion returns a function that checks if a region is not pinned.
func NonPinnedRegion(cluster Cluster) func(*core.RegionInfo) bool {
return func(region *core.RegionInfo) bool { return !cluster.IsRegionPinned(region) }
}
12 changes: 6 additions & 6 deletions server/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (l *balanceLeaderScheduler) Schedule(cluster schedule.Cluster) []*operator.
l.counter.WithLabelValues("high-score", plan.SourceMetricLabel()).Inc()
for j := 0; j < retryLimit; j++ {
schedulerCounter.WithLabelValues(l.GetName(), "total").Inc()
if ops := l.transferLeaderOut(plan); len(ops) > 0 {
if ops := l.transferLeaderOut(cluster, plan); len(ops) > 0 {
l.retryQuota.ResetLimit(plan.source)
ops[0].Counters = append(ops[0].Counters, l.counter.WithLabelValues("transfer-out", plan.SourceMetricLabel()))
return ops
Expand All @@ -185,7 +185,7 @@ func (l *balanceLeaderScheduler) Schedule(cluster schedule.Cluster) []*operator.
l.counter.WithLabelValues("low-score", plan.TargetMetricLabel()).Inc()
for j := 0; j < retryLimit; j++ {
schedulerCounter.WithLabelValues(l.GetName(), "total").Inc()
if ops := l.transferLeaderIn(plan); len(ops) > 0 {
if ops := l.transferLeaderIn(cluster, plan); len(ops) > 0 {
l.retryQuota.ResetLimit(plan.target)
ops[0].Counters = append(ops[0].Counters, l.counter.WithLabelValues("transfer-in", plan.TargetMetricLabel()))
return ops
Expand All @@ -202,8 +202,8 @@ func (l *balanceLeaderScheduler) Schedule(cluster schedule.Cluster) []*operator.
// transferLeaderOut transfers leader from the source store.
// It randomly selects a health region from the source store, then picks
// the best follower peer and transfers the leader.
func (l *balanceLeaderScheduler) transferLeaderOut(plan *balancePlan) []*operator.Operator {
plan.region = plan.RandLeaderRegion(plan.SourceStoreID(), l.conf.Ranges, schedule.IsRegionHealthy)
func (l *balanceLeaderScheduler) transferLeaderOut(cluster schedule.Cluster, plan *balancePlan) []*operator.Operator {
plan.region = plan.RandLeaderRegion(plan.SourceStoreID(), l.conf.Ranges, schedule.IsRegionHealthy, schedule.NonPinnedRegion(cluster))
if plan.region == nil {
log.Debug("store has no leader", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", plan.SourceStoreID()))
schedulerCounter.WithLabelValues(l.GetName(), "no-leader-region").Inc()
Expand Down Expand Up @@ -235,8 +235,8 @@ func (l *balanceLeaderScheduler) transferLeaderOut(plan *balancePlan) []*operato
// transferLeaderIn transfers leader to the target store.
// It randomly selects a health region from the target store, then picks
// the worst follower peer and transfers the leader.
func (l *balanceLeaderScheduler) transferLeaderIn(plan *balancePlan) []*operator.Operator {
plan.region = plan.RandFollowerRegion(plan.TargetStoreID(), l.conf.Ranges, schedule.IsRegionHealthy)
func (l *balanceLeaderScheduler) transferLeaderIn(cluster schedule.Cluster, plan *balancePlan) []*operator.Operator {
plan.region = plan.RandFollowerRegion(plan.TargetStoreID(), l.conf.Ranges, schedule.IsRegionHealthy, schedule.NonPinnedRegion(cluster))
if plan.region == nil {
log.Debug("store has no follower", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", plan.TargetStoreID()))
schedulerCounter.WithLabelValues(l.GetName(), "no-follower-region").Inc()
Expand Down
12 changes: 8 additions & 4 deletions server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,18 +171,22 @@ func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster) []*operator.
schedulerCounter.WithLabelValues(s.GetName(), "total").Inc()
// Priority pick the region that has a pending peer.
// Pending region may means the disk is overload, remove the pending region firstly.
plan.region = cluster.RandPendingRegion(plan.SourceStoreID(), s.conf.Ranges, schedule.IsRegionHealthyAllowPending, schedule.ReplicatedRegion(cluster), allowBalanceEmptyRegion)
plan.region = cluster.RandPendingRegion(plan.SourceStoreID(), s.conf.Ranges, schedule.IsRegionHealthyAllowPending,
schedule.ReplicatedRegion(cluster), schedule.NonPinnedRegion(cluster), allowBalanceEmptyRegion)
if plan.region == nil {
// Then pick the region that has a follower in the source store.
plan.region = cluster.RandFollowerRegion(plan.SourceStoreID(), s.conf.Ranges, schedule.IsRegionHealthy, schedule.ReplicatedRegion(cluster), allowBalanceEmptyRegion)
plan.region = cluster.RandFollowerRegion(plan.SourceStoreID(), s.conf.Ranges, schedule.IsRegionHealthy,
schedule.ReplicatedRegion(cluster), schedule.NonPinnedRegion(cluster), allowBalanceEmptyRegion)
}
if plan.region == nil {
// Then pick the region has the leader in the source store.
plan.region = cluster.RandLeaderRegion(plan.SourceStoreID(), s.conf.Ranges, schedule.IsRegionHealthy, schedule.ReplicatedRegion(cluster), allowBalanceEmptyRegion)
plan.region = cluster.RandLeaderRegion(plan.SourceStoreID(), s.conf.Ranges, schedule.IsRegionHealthy,
schedule.ReplicatedRegion(cluster), schedule.NonPinnedRegion(cluster), allowBalanceEmptyRegion)
}
if plan.region == nil {
// Finally pick learner.
plan.region = cluster.RandLearnerRegion(plan.SourceStoreID(), s.conf.Ranges, schedule.IsRegionHealthy, schedule.ReplicatedRegion(cluster), allowBalanceEmptyRegion)
plan.region = cluster.RandLearnerRegion(plan.SourceStoreID(), s.conf.Ranges, schedule.IsRegionHealthy,
schedule.ReplicatedRegion(cluster), schedule.NonPinnedRegion(cluster), allowBalanceEmptyRegion)
}
if plan.region == nil {
schedulerCounter.WithLabelValues(s.GetName(), "no-region").Inc()
Expand Down
Loading

0 comments on commit e63348d

Please sign in to comment.