Skip to content

Commit

Permalink
scheduler: use move-hot-write-leader operator (#7852)
Browse files Browse the repository at this point in the history
close #7848

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
lhy1024 and ti-chi-bot[bot] authored Apr 12, 2024
1 parent 1cd99f1 commit 33ae3b6
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 87 deletions.
15 changes: 15 additions & 0 deletions metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -5881,6 +5881,21 @@
"intervalFactor": 1,
"legendFormat": "store-{{store}}-in",
"refId": "B"
},
{
"expr": "- sum(delta(pd_scheduler_hot_region_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store\",type=\"move-leader\",direction=\"out\",rw=\"write\"}[1m]))by (store)",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "store-{{store}}-out",
"refId": "C",
"step": 4
},
{
"expr": "sum(delta(pd_scheduler_hot_region_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store\",type=\"move-leader\",direction=\"in\",rw=\"write\"}[1m]))by (store)",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "store-{{store}}-in",
"refId": "D"
}
],
"thresholds": [],
Expand Down
45 changes: 6 additions & 39 deletions pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1546,20 +1546,12 @@ func (bs *balanceSolver) buildOperators() (ops []*operator.Operator) {
targetLabel := strconv.FormatUint(dstStoreID, 10)
dim := bs.rankToDimString()

var createOperator func(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error)
switch bs.rwTy {
case utils.Read:
createOperator = bs.createReadOperator
case utils.Write:
createOperator = bs.createWriteOperator
}

currentOp, typ, err := createOperator(bs.cur.region, srcStoreID, dstStoreID)
currentOp, typ, err := bs.createOperator(bs.cur.region, srcStoreID, dstStoreID)
if err == nil {
bs.decorateOperator(currentOp, false, sourceLabel, targetLabel, typ, dim)
ops = []*operator.Operator{currentOp}
if bs.cur.revertRegion != nil {
currentOp, typ, err = createOperator(bs.cur.revertRegion, dstStoreID, srcStoreID)
currentOp, typ, err = bs.createOperator(bs.cur.revertRegion, dstStoreID, srcStoreID)
if err == nil {
bs.decorateOperator(currentOp, true, targetLabel, sourceLabel, typ, dim)
ops = append(ops, currentOp)
Expand Down Expand Up @@ -1725,11 +1717,11 @@ func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo, strateg
return operators
}

func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error) {
func (bs *balanceSolver) createOperator(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error) {
if region.GetStorePeer(dstStoreID) != nil {
typ = "transfer-leader"
op, err = operator.CreateTransferLeaderOperator(
"transfer-hot-read-leader",
"transfer-hot-"+bs.rwTy.String()+"-leader",
bs,
region,
dstStoreID,
Expand All @@ -1741,7 +1733,7 @@ func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID,
if region.GetLeader().GetStoreId() == srcStoreID {
typ = "move-leader"
op, err = operator.CreateMoveLeaderOperator(
"move-hot-read-leader",
"move-hot-"+bs.rwTy.String()+"-leader",
bs,
region,
operator.OpHotRegion,
Expand All @@ -1750,7 +1742,7 @@ func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID,
} else {
typ = "move-peer"
op, err = operator.CreateMovePeerOperator(
"move-hot-read-peer",
"move-hot-"+bs.rwTy.String()+"-peer",
bs,
region,
operator.OpHotRegion,
Expand All @@ -1761,31 +1753,6 @@ func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID,
return
}

func (bs *balanceSolver) createWriteOperator(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error) {
if region.GetStorePeer(dstStoreID) != nil {
typ = "transfer-leader"
op, err = operator.CreateTransferLeaderOperator(
"transfer-hot-write-leader",
bs,
region,
dstStoreID,
[]uint64{},
operator.OpHotRegion)
} else {
srcPeer := region.GetStorePeer(srcStoreID) // checked in `filterHotPeers`
dstPeer := &metapb.Peer{StoreId: dstStoreID, Role: srcPeer.Role}
typ = "move-peer"
op, err = operator.CreateMovePeerOperator(
"move-hot-write-peer",
bs,
region,
operator.OpHotRegion,
srcStoreID,
dstPeer)
}
return
}

func (bs *balanceSolver) decorateOperator(op *operator.Operator, isRevert bool, sourceLabel, targetLabel, typ, dim string) {
op.SetPriorityLevel(constant.High)
op.FinishedCounters = append(op.FinishedCounters,
Expand Down
57 changes: 32 additions & 25 deletions pkg/schedule/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ func clearPendingInfluence(h *hotScheduler) {
h.regionPendings = make(map[uint64]*pendingInfluence)
}

func newTestRegion(id uint64) *core.RegionInfo {
peers := []*metapb.Peer{{Id: id*100 + 1, StoreId: 1}, {Id: id*100 + 2, StoreId: 2}, {Id: id*100 + 3, StoreId: 3}}
return core.NewRegionInfo(&metapb.Region{Id: id, Peers: peers}, peers[0])
}

func TestUpgrade(t *testing.T) {
re := require.New(t)
cancel, _, _, oc := prepareSchedulersTest()
Expand Down Expand Up @@ -193,20 +198,6 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) {
}
}

func newTestRegion(id uint64) *core.RegionInfo {
peers := []*metapb.Peer{{Id: id*100 + 1, StoreId: 1}, {Id: id*100 + 2, StoreId: 2}, {Id: id*100 + 3, StoreId: 3}}
return core.NewRegionInfo(&metapb.Region{Id: id, Peers: peers}, peers[0])
}

func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) {
re := require.New(t)
statistics.Denoising = false
statisticsInterval = 0
checkHotWriteRegionScheduleByteRateOnly(re, false /* disable placement rules */)
checkHotWriteRegionScheduleByteRateOnly(re, true /* enable placement rules */)
checkHotWriteRegionPlacement(re, true)
}

func TestSplitIfRegionTooHot(t *testing.T) {
re := require.New(t)
statistics.Denoising = false
Expand Down Expand Up @@ -395,6 +386,15 @@ func TestSplitBucketsByLoad(t *testing.T) {
}
}

func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) {
re := require.New(t)
statistics.Denoising = false
statisticsInterval = 0
checkHotWriteRegionScheduleByteRateOnly(re, false /* disable placement rules */)
checkHotWriteRegionScheduleByteRateOnly(re, true /* enable placement rules */)
checkHotWriteRegionPlacement(re, true)
}

func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules bool) {
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
Expand Down Expand Up @@ -446,14 +446,13 @@ func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules b
tc.RuleManager.DeleteRule("pd", "follower")
ops, _ = hb.Schedule(tc, false)
re.NotEmpty(ops)
// TODO: fix the test
// re.NotContains(ops[0].Step(1).String(), "transfer leader")
re.NotContains(ops[0].Step(1).String(), "transfer leader")
}

func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlacementRules bool) {
cancel, opt, tc, oc := prepareSchedulersTest()
defer cancel()
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.ConfChangeV2))
tc.SetEnablePlacementRules(enablePlacementRules)
labels := []string{"zone", "host"}
tc.SetMaxReplicasWithLabel(enablePlacementRules, 3, labels...)
Expand Down Expand Up @@ -511,12 +510,14 @@ func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlace
switch op.Len() {
case 1:
// balance by leader selected
re.Equal("transfer-hot-write-leader", op.Desc())
operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1)
case 4:
case 5:
// balance by peer selected
re.Equal("move-hot-write-leader", op.Desc())
if op.RegionID() == 2 {
// peer in store 1 of the region 2 can transfer to store 5 or store 6 because of the label
operatorutil.CheckTransferPeerWithLeaderTransferFrom(re, op, operator.OpHotRegion, 1)
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 0)
} else {
// peer in store 1 of the region 1,3 can only transfer to store 6
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 6)
Expand All @@ -536,10 +537,10 @@ func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlace
ops, _ := hb.Schedule(tc, false)
op := ops[0]
clearPendingInfluence(hb.(*hotScheduler))
re.Equal(4, op.Len())
re.Equal(5, op.Len())
if op.RegionID() == 2 {
// peer in store 1 of the region 2 can transfer to store 5 or store 6 because of the label
operatorutil.CheckTransferPeerWithLeaderTransferFrom(re, op, operator.OpHotRegion, 1)
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 0)
} else {
// peer in store 1 of the region 1,3 can only transfer to store 6
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 6)
Expand Down Expand Up @@ -636,7 +637,7 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) {
statisticsInterval = 0
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.ConfChangeV2))
tc.SetHotRegionCacheHitsThreshold(0)
re.NoError(tc.RuleManager.SetRules([]*placement.Rule{
{
Expand Down Expand Up @@ -730,9 +731,11 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) {
switch op.Len() {
case 1:
// balance by leader selected
re.Equal("transfer-hot-write-leader", op.Desc())
operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1)
case 2:
// balance by peer selected
re.Equal("move-hot-write-leader", op.Desc())
operatorutil.CheckTransferLearner(re, op, operator.OpHotRegion, 8, 10)
default:
re.FailNow("wrong op: " + op.String())
Expand Down Expand Up @@ -823,12 +826,14 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) {
switch op.Len() {
case 1:
// balance by leader selected
re.Equal("transfer-hot-write-leader", op.Desc())
operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1)
case 4:
case 5:
// balance by peer selected
re.Equal("move-hot-write-leader", op.Desc())
if op.RegionID() == 2 {
// peer in store 1 of the region 2 can transfer to store 5 or store 6 because of the label
operatorutil.CheckTransferPeerWithLeaderTransferFrom(re, op, operator.OpHotRegion, 1)
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 0)
} else {
// peer in store 1 of the region 1,3 can only transfer to store 6
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 6)
Expand Down Expand Up @@ -1164,9 +1169,11 @@ func checkHotWriteRegionScheduleWithPendingInfluence(re *require.Assertions, dim
switch op.Len() {
case 1:
// balance by leader selected
re.Equal("transfer-hot-write-leader", op.Desc())
operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1)
case 4:
case 5:
// balance by peer selected
re.Equal("move-hot-write-leader", op.Desc())
operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 4)
cnt++
if cnt == 3 {
Expand Down
66 changes: 43 additions & 23 deletions pkg/utils/operatorutil/operator_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,27 @@ func trimTransferLeaders(op *operator.Operator) (steps []operator.OpStep, lastLe
// CheckTransferPeer checks if the operator is to transfer peer between the specified source and target stores.
func CheckTransferPeer(re *require.Assertions, op *operator.Operator, kind operator.OpKind, sourceID, targetID uint64) {
re.NotNil(op)
var addLearnerTo, removePeerFrom uint64
steps, _ := trimTransferLeaders(op)
re.Len(steps, 3)
re.Equal(targetID, steps[0].(operator.AddLearner).ToStore)
re.IsType(operator.PromoteLearner{}, steps[1])
re.Equal(sourceID, steps[2].(operator.RemovePeer).FromStore)
switch len(steps) {
case 3: // without joint consensus
re.IsType(operator.AddLearner{}, steps[0])
re.IsType(operator.PromoteLearner{}, steps[1])
re.IsType(operator.RemovePeer{}, steps[2])
addLearnerTo = steps[0].(operator.AddLearner).ToStore
removePeerFrom = steps[2].(operator.RemovePeer).FromStore
case 4: // with joint consensus
re.IsType(operator.AddLearner{}, steps[0])
re.IsType(operator.ChangePeerV2Enter{}, steps[1])
re.IsType(operator.ChangePeerV2Leave{}, steps[2])
re.IsType(operator.RemovePeer{}, steps[3])
addLearnerTo = steps[0].(operator.AddLearner).ToStore
removePeerFrom = steps[3].(operator.RemovePeer).FromStore
default:
re.FailNow("unexpected operator steps")
}
re.Equal(sourceID, removePeerFrom)
re.Equal(targetID, addLearnerTo)
kind |= operator.OpRegion
re.Equal(kind, op.Kind()&kind)
}
Expand All @@ -88,32 +104,36 @@ func CheckTransferLearner(re *require.Assertions, op *operator.Operator, kind op
// CheckTransferPeerWithLeaderTransfer checks if the operator is to transfer
// peer between the specified source and target stores and it meanwhile
// transfers the leader out of source store.
// If targetID is 0, it means the operator is to transfer peer to any store.
func CheckTransferPeerWithLeaderTransfer(re *require.Assertions, op *operator.Operator, kind operator.OpKind, sourceID, targetID uint64) {
re.NotNil(op)
var addLearnerTo, removePeerFrom uint64
steps, lastLeader := trimTransferLeaders(op)
re.Len(steps, 3)
re.Equal(targetID, steps[0].(operator.AddLearner).ToStore)
re.IsType(operator.PromoteLearner{}, steps[1])
re.Equal(sourceID, steps[2].(operator.RemovePeer).FromStore)
switch len(steps) {
case 3: // without joint consensus
re.IsType(operator.AddLearner{}, steps[0])
re.IsType(operator.PromoteLearner{}, steps[1])
re.IsType(operator.RemovePeer{}, steps[2])
addLearnerTo = steps[0].(operator.AddLearner).ToStore
removePeerFrom = steps[2].(operator.RemovePeer).FromStore
case 4: // with joint consensus
re.IsType(operator.AddLearner{}, steps[0])
re.IsType(operator.ChangePeerV2Enter{}, steps[1])
re.IsType(operator.ChangePeerV2Leave{}, steps[2])
re.IsType(operator.RemovePeer{}, steps[3])
addLearnerTo = steps[0].(operator.AddLearner).ToStore
removePeerFrom = steps[3].(operator.RemovePeer).FromStore
default:
re.FailNow("unexpected operator steps")
}
re.NotZero(lastLeader)
re.NotEqual(sourceID, lastLeader)
kind |= operator.OpRegion
re.Equal(kind, op.Kind()&kind)
}

// CheckTransferPeerWithLeaderTransferFrom checks if the operator is to transfer
// peer out of the specified store and it meanwhile transfers the leader out of
// the store.
func CheckTransferPeerWithLeaderTransferFrom(re *require.Assertions, op *operator.Operator, kind operator.OpKind, sourceID uint64) {
re.NotNil(op)
steps, lastLeader := trimTransferLeaders(op)
re.IsType(operator.AddLearner{}, steps[0])
re.IsType(operator.PromoteLearner{}, steps[1])
re.Equal(sourceID, steps[2].(operator.RemovePeer).FromStore)
re.NotZero(lastLeader)
re.NotEqual(sourceID, lastLeader)
kind |= operator.OpRegion | operator.OpLeader
re.Equal(kind, op.Kind()&kind)
re.Equal(sourceID, removePeerFrom)
if targetID != 0 {
re.Equal(targetID, addLearnerTo)
}
}

// CheckAddPeer checks if the operator is to add peer on specified store.
Expand Down

0 comments on commit 33ae3b6

Please sign in to comment.