Skip to content

Commit

Permalink
server/schedule/labeler: support schedule-disabled for region (#4649)
Browse files Browse the repository at this point in the history
 

Signed-off-by: shirly <AndreMouche@126.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
AndreMouche and ti-chi-bot authored Mar 1, 2022
1 parent a4b1968 commit 22cef40
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 4 deletions.
29 changes: 29 additions & 0 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/tikv/pd/server/core/storelimit"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/hbstream"
"github.com/tikv/pd/server/schedule/labeler"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/statistics"
Expand Down Expand Up @@ -350,6 +351,34 @@ func (s *testCoordinatorSuite) TestCheckRegion(c *C) {
s.checkRegion(c, tc, co, 1, 0)
}

func (s *testCoordinatorSuite) TestCheckRegionWithScheduleDeny(c *C) {
tc, co, cleanup := prepare(nil, nil, nil, c)

defer cleanup()

c.Assert(tc.addRegionStore(4, 4), IsNil)
c.Assert(tc.addRegionStore(3, 3), IsNil)
c.Assert(tc.addRegionStore(2, 2), IsNil)
c.Assert(tc.addRegionStore(1, 1), IsNil)
c.Assert(tc.addLeaderRegion(1, 2, 3), IsNil)
region := tc.GetRegion(1)
c.Assert(region, NotNil)
// test with label schedule=deny
labelerManager := tc.GetRegionLabeler()
labelerManager.SetLabelRule(&labeler.LabelRule{
ID: "schedulelabel",
Labels: []labeler.RegionLabel{{Key: "schedule", Value: "deny"}},
RuleType: labeler.KeyRange,
Data: []interface{}{map[string]interface{}{"start_key": "", "end_key": ""}},
})

c.Assert(labelerManager.ScheduleDisabled(region), IsTrue)
s.checkRegion(c, tc, co, 1, 0)
labelerManager.DeleteLabelRule("schedulelabel")
c.Assert(labelerManager.ScheduleDisabled(region), IsFalse)
s.checkRegion(c, tc, co, 1, 1)
}

func (s *testCoordinatorSuite) TestCheckerIsBusy(c *C) {
tc, co, cleanup := prepare(func(cfg *config.ScheduleConfig) {
cfg.ReplicaScheduleLimit = 0 // ensure replica checker is busy
Expand Down
7 changes: 7 additions & 0 deletions server/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ func (c *Controller) CheckRegion(region *core.RegionInfo) []*operator.Operator {
return []*operator.Operator{op}
}

if cl, ok := c.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler }); ok {
l := cl.GetRegionLabeler()
if l.ScheduleDisabled(region) {
return nil
}
}

if op := c.splitChecker.Check(region); op != nil {
return []*operator.Operator{op}
}
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/checker/joint_state_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (c *JointStateChecker) Check(region *core.RegionInfo) *operator.Operator {
if !core.IsInJointState(region.GetPeers()...) {
return nil
}
op, err := operator.CreateLeaveJointStateOperator("leave-joint-state", c.cluster, region)
op, err := operator.CreateLeaveJointStateOperator(operator.OpDescLeaveJointState, c.cluster, region)
if err != nil {
checkerCounter.WithLabelValues("joint_state_checker", "create-operator-fail").Inc()
log.Debug("fail to create leave joint state operator", errs.ZapError(err))
Expand Down
7 changes: 7 additions & 0 deletions server/schedule/labeler/labeler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"reflect"
"strings"
"sync"

"github.com/pingcap/log"
Expand Down Expand Up @@ -287,6 +288,12 @@ func (l *RegionLabeler) GetRegionLabel(region *core.RegionInfo, key string) stri
return value
}

// ScheduleDisabled returns true if the region is lablelld with schedule-disabled.
func (l *RegionLabeler) ScheduleDisabled(region *core.RegionInfo) bool {
v := l.GetRegionLabel(region, scheduleOptionLabel)
return strings.EqualFold(v, scheduleOptioonValueDeny)
}

// GetRegionLabels returns the labels of the region.
// For each key, the label with max rule index will be returned.
func (l *RegionLabeler) GetRegionLabels(region *core.RegionInfo) []*RegionLabel {
Expand Down
5 changes: 5 additions & 0 deletions server/schedule/labeler/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ const (
KeyRange = "key-range"
)

const (
scheduleOptionLabel = "schedule"
scheduleOptioonValueDeny = "deny"
)

// KeyRangeRule contains the start key and end key of the LabelRule.
// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it.
type KeyRangeRule struct {
Expand Down
3 changes: 3 additions & 0 deletions server/schedule/operator/create_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ func CreateScatterRegionOperator(desc string, ci ClusterInformer, origin *core.R
Build(0)
}

// OpDescLeaveJointState is the expected desc for LeaveJointStateOperator.
const OpDescLeaveJointState = "leave-joint-state"

// CreateLeaveJointStateOperator creates an operator that let region leave joint state.
func CreateLeaveJointStateOperator(desc string, ci ClusterInformer, origin *core.RegionInfo) (*Operator, error) {
b := NewBuilder(desc, ci, origin, SkipOriginJointStateCheck)
Expand Down
5 changes: 5 additions & 0 deletions server/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,11 @@ func (o *Operator) GetAdditionalInfo() string {
return ""
}

// IsLeaveJointStateOperator returns true if the desc is OpDescLeaveJointState.
func (o *Operator) IsLeaveJointStateOperator() bool {
return strings.EqualFold(o.desc, OpDescLeaveJointState)
}

// these values are used for unit test.
const (
// mock region default region size is 96MB.
Expand Down
13 changes: 13 additions & 0 deletions server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/core/storelimit"
"github.com/tikv/pd/server/schedule/hbstream"
"github.com/tikv/pd/server/schedule/labeler"
"github.com/tikv/pd/server/schedule/operator"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -413,6 +414,18 @@ func (oc *OperatorController) checkAddOperator(ops ...*operator.Operator) bool {
operatorWaitCounter.WithLabelValues(op.Desc(), "exceed-max").Inc()
return false
}

if op.SchedulerKind() == operator.OpAdmin || op.IsLeaveJointStateOperator() {
continue
}
if cl, ok := oc.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler }); ok {
l := cl.GetRegionLabeler()
if l.ScheduleDisabled(region) {
log.Debug("schedule disabled", zap.Uint64("region-id", op.RegionID()))
operatorWaitCounter.WithLabelValues(op.Desc(), "schedule-disabled").Inc()
return false
}
}
}
expired := false
for _, op := range ops {
Expand Down
32 changes: 29 additions & 3 deletions server/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package schedule
import (
"container/heap"
"context"
"encoding/hex"
"fmt"
"sync"
"testing"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/core/storelimit"
"github.com/tikv/pd/server/schedule/hbstream"
"github.com/tikv/pd/server/schedule/labeler"
"github.com/tikv/pd/server/schedule/operator"
)

Expand Down Expand Up @@ -592,11 +594,13 @@ func newRegionInfo(id uint64, startKey, endKey string, size, keys int64, leader
for _, peer := range peers {
prs = append(prs, &metapb.Peer{Id: peer[0], StoreId: peer[1]})
}
start, _ := hex.DecodeString(startKey)
end, _ := hex.DecodeString(endKey)
return core.NewRegionInfo(
&metapb.Region{
Id: id,
StartKey: []byte(startKey),
EndKey: []byte(endKey),
StartKey: start,
EndKey: end,
Peers: prs,
},
&metapb.Peer{Id: leader[0], StoreId: leader[1]},
Expand Down Expand Up @@ -650,9 +654,31 @@ func (t *testOperatorControllerSuite) TestAddWaitingOperator(c *C) {
cluster.PutRegion(source)
target := newRegionInfo(6, "0a", "0b", 1, 1, []uint64{101, 1}, []uint64{101, 1})
cluster.PutRegion(target)

ops, err := operator.CreateMergeRegionOperator("merge-region", cluster, source, target, operator.OpMerge)
c.Assert(err, IsNil)
c.Assert(ops, HasLen, 2)

// test with label schedule=deny
labelerManager := cluster.GetRegionLabeler()
labelerManager.SetLabelRule(&labeler.LabelRule{
ID: "schedulelabel",
Labels: []labeler.RegionLabel{{Key: "schedule", Value: "deny"}},
RuleType: labeler.KeyRange,
Data: []interface{}{map[string]interface{}{"start_key": "1a", "end_key": "1b"}},
})

c.Assert(labelerManager.ScheduleDisabled(source), IsTrue)
// add operator should be failed since it is labeled with `schedule=deny`.
c.Assert(controller.AddWaitingOperator(ops...), Equals, 0)

// add operator should be success without `schedule=deny`
labelerManager.DeleteLabelRule("schedulelabel")
labelerManager.ScheduleDisabled(source)
c.Assert(labelerManager.ScheduleDisabled(source), IsFalse)
// now there is one operator being allowed to add, if it is a merge operator
// both of the pair are allowed
ops, err := operator.CreateMergeRegionOperator("merge-region", cluster, source, target, operator.OpMerge)
ops, err = operator.CreateMergeRegionOperator("merge-region", cluster, source, target, operator.OpMerge)
c.Assert(err, IsNil)
c.Assert(ops, HasLen, 2)
c.Assert(controller.AddWaitingOperator(ops...), Equals, 2)
Expand Down

0 comments on commit 22cef40

Please sign in to comment.