Skip to content

Commit

Permalink
schedulers: add role config to shuffleRegionScheduler (#2219) (#2235)
Browse files Browse the repository at this point in the history
Signed-off-by: disksing <i@disksing.com>
  • Loading branch information
disksing authored Mar 12, 2020
1 parent 2fd8085 commit aff95b9
Show file tree
Hide file tree
Showing 7 changed files with 313 additions and 46 deletions.
12 changes: 12 additions & 0 deletions pkg/testutil/operator_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,18 @@ func CheckTransferPeer(c *check.C, op *operator.Operator, kind operator.OpKind,
c.Assert(op.Kind()&kind, check.Equals, kind)
}

// CheckTransferLearner checks if the operator is to transfer learner between the specified source and target stores.
func CheckTransferLearner(c *check.C, op *operator.Operator, kind operator.OpKind, sourceID, targetID uint64) {
c.Assert(op, check.NotNil)

steps, _ := trimTransferLeaders(op)
c.Assert(steps, check.HasLen, 2)
c.Assert(steps[0].(operator.AddLearner).ToStore, check.Equals, targetID)
c.Assert(steps[1].(operator.RemovePeer).FromStore, check.Equals, sourceID)
kind |= operator.OpRegion
c.Assert(op.Kind()&kind, check.Equals, kind)
}

// CheckTransferPeerWithLeaderTransfer checks if the operator is to transfer
// peer between the specified source and target stores and it meanwhile
// transfers the leader out of source store.
Expand Down
6 changes: 4 additions & 2 deletions server/schedule/selector/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,12 @@ func (s *RandomSelector) randStore(stores []*core.StoreInfo) *core.StoreInfo {
}

// SelectSource randomly selects a source store from those can pass all filters.
func (s *RandomSelector) SelectSource(opt opt.Options, stores []*core.StoreInfo) *core.StoreInfo {
func (s *RandomSelector) SelectSource(opt opt.Options, stores []*core.StoreInfo, filters ...filter.Filter) *core.StoreInfo {
filters = append(filters, s.filters...)

candidates := make([]*core.StoreInfo, 0, len(stores))
for _, store := range stores {
if filter.Source(opt, store, s.filters) {
if filter.Source(opt, store, filters) {
continue
}
candidates = append(candidates, store)
Expand Down
55 changes: 55 additions & 0 deletions server/schedulers/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/pd/v3/server/schedule"
"github.com/pingcap/pd/v3/server/schedule/operator"
"github.com/pingcap/pd/v3/server/schedule/opt"
"github.com/pingcap/pd/v3/server/schedule/placement"
"github.com/pingcap/pd/v3/server/statistics"
)

Expand Down Expand Up @@ -487,3 +488,57 @@ func (s *testShuffleRegionSuite) TestShuffle(c *C) {
c.Assert(op[0].Kind(), Equals, operator.OpRegion|operator.OpAdmin)
}
}

func (s *testShuffleRegionSuite) TestRole(c *C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
opt := mockoption.NewScheduleOptions()
tc := mockcluster.NewCluster(opt)

// update rule to 1leader+1follower+1learner
opt.EnablePlacementRules = true
tc.RuleManager.SetRule(&placement.Rule{
GroupID: "pd",
ID: "default",
Role: placement.Voter,
Count: 2,
})
tc.RuleManager.SetRule(&placement.Rule{
GroupID: "pd",
ID: "learner",
Role: placement.Learner,
Count: 1,
})

// Add stores 1, 2, 3, 4
tc.AddRegionStore(1, 6)
tc.AddRegionStore(2, 7)
tc.AddRegionStore(3, 8)
tc.AddRegionStore(4, 9)

// Put a region with 1leader + 1follower + 1learner
peers := []*metapb.Peer{
{Id: 1, StoreId: 1},
{Id: 2, StoreId: 2},
{Id: 3, StoreId: 3, IsLearner: true},
}
region := core.NewRegionInfo(&metapb.Region{
Id: 1,
RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1},
Peers: peers,
}, peers[0])
tc.PutRegion(region)

sl, err := schedule.CreateScheduler(ShuffleRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder(ShuffleRegionType, []string{"", ""}))
c.Assert(err, IsNil)

conf := sl.(*shuffleRegionScheduler).conf
conf.Roles = []string{"follower"}
ops := sl.Schedule(tc)
c.Assert(ops, HasLen, 1)
testutil.CheckTransferPeer(c, ops[0], operator.OpRegion, 2, 4) // transfer follower
conf.Roles = []string{"learner"}
ops = sl.Schedule(tc)
c.Assert(ops, HasLen, 1)
testutil.CheckTransferLearner(c, ops[0], operator.OpRegion, 3, 4) // transfer learner
}
75 changes: 46 additions & 29 deletions server/schedulers/shuffle_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package schedulers

import (
"net/http"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/v3/server/core"
"github.com/pingcap/pd/v3/server/schedule"
Expand Down Expand Up @@ -43,24 +45,19 @@ func init() {
return errors.WithStack(err)
}
conf.Ranges = ranges
conf.Name = ShuffleRegionName
conf.Roles = allRoles
return nil
}
})
schedule.RegisterScheduler(ShuffleRegionType, func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
conf := &shuffleRegionSchedulerConfig{}
conf := &shuffleRegionSchedulerConfig{storage: storage}
if err := decoder(conf); err != nil {
return nil, err
}
return newShuffleRegionScheduler(opController, conf), nil
})
}

type shuffleRegionSchedulerConfig struct {
Name string `json:"name"`
Ranges []core.KeyRange `json:"ranges"`
}

type shuffleRegionScheduler struct {
*BaseScheduler
conf *shuffleRegionSchedulerConfig
Expand All @@ -71,7 +68,7 @@ type shuffleRegionScheduler struct {
// between stores.
func newShuffleRegionScheduler(opController *schedule.OperatorController, conf *shuffleRegionSchedulerConfig) schedule.Scheduler {
filters := []filter.Filter{
filter.StoreStateFilter{ActionScope: conf.Name, MoveRegion: true},
filter.StoreStateFilter{ActionScope: ShuffleRegionName, MoveRegion: true},
}
base := NewBaseScheduler(opController)
return &shuffleRegionScheduler{
Expand All @@ -81,16 +78,20 @@ func newShuffleRegionScheduler(opController *schedule.OperatorController, conf *
}
}

func (s *shuffleRegionScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.conf.ServeHTTP(w, r)
}

func (s *shuffleRegionScheduler) GetName() string {
return s.conf.Name
return ShuffleRegionName
}

func (s *shuffleRegionScheduler) GetType() string {
return ShuffleRegionType
}

func (s *shuffleRegionScheduler) EncodeConfig() ([]byte, error) {
return schedule.EncodeConfig(s.conf)
return s.conf.EncodeConfig()
}

func (s *shuffleRegionScheduler) IsScheduleAllowed(cluster opt.Cluster) bool {
Expand All @@ -105,8 +106,7 @@ func (s *shuffleRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Opera
return nil
}

excludedFilter := filter.NewExcludedFilter(s.GetName(), nil, region.GetStoreIds())
newPeer := s.scheduleAddPeer(cluster, excludedFilter)
newPeer := s.scheduleAddPeer(cluster, region, oldPeer)
if newPeer == nil {
schedulerCounter.WithLabelValues(s.GetName(), "no-new-peer").Inc()
return nil
Expand All @@ -124,31 +124,48 @@ func (s *shuffleRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Opera

func (s *shuffleRegionScheduler) scheduleRemovePeer(cluster opt.Cluster) (*core.RegionInfo, *metapb.Peer) {
stores := cluster.GetStores()
exclude := make(map[uint64]struct{})
excludeFilter := filter.NewExcludedFilter(s.GetType(), exclude, nil)

for {
source := s.selector.SelectSource(cluster, stores, excludeFilter)
if source == nil {
schedulerCounter.WithLabelValues(s.GetName(), "no-source-store").Inc()
return nil, nil
}

source := s.selector.SelectSource(cluster, stores)
if source == nil {
schedulerCounter.WithLabelValues(s.GetName(), "no-source-store").Inc()
return nil, nil
}
var region *core.RegionInfo
if s.conf.IsRoleAllow(roleFollower) {
region = cluster.RandFollowerRegion(source.GetID(), s.conf.GetRanges(), opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster))
}
if region == nil && s.conf.IsRoleAllow(roleLeader) {
region = cluster.RandLeaderRegion(source.GetID(), s.conf.GetRanges(), opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster))
}
if region == nil && s.conf.IsRoleAllow(roleLearner) {
region = cluster.RandLearnerRegion(source.GetID(), s.conf.GetRanges(), opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster))
}
if region != nil {
return region, region.GetStorePeer(source.GetID())
}

region := cluster.RandFollowerRegion(source.GetID(), s.conf.Ranges, opt.HealthRegion(cluster))
if region == nil {
region = cluster.RandLeaderRegion(source.GetID(), s.conf.Ranges, opt.HealthRegion(cluster))
}
if region == nil {
exclude[source.GetID()] = struct{}{}
schedulerCounter.WithLabelValues(s.GetName(), "no-region").Inc()
return nil, nil
}

return region, region.GetStorePeer(source.GetID())
}

func (s *shuffleRegionScheduler) scheduleAddPeer(cluster opt.Cluster, filter filter.Filter) *metapb.Peer {
stores := cluster.GetStores()
func (s *shuffleRegionScheduler) scheduleAddPeer(cluster opt.Cluster, region *core.RegionInfo, oldPeer *metapb.Peer) *metapb.Peer {
var scoreGuard filter.Filter
if cluster.IsPlacementRulesEnabled() {
scoreGuard = filter.NewRuleFitFilter(s.GetName(), cluster, region, oldPeer.GetStoreId())
} else {
scoreGuard = filter.NewDistinctScoreFilter(s.GetName(), cluster.GetLocationLabels(), cluster.GetRegionStores(region), cluster.GetStore(oldPeer.GetStoreId()))
}
excludedFilter := filter.NewExcludedFilter(s.GetName(), nil, region.GetStoreIds())

target := s.selector.SelectTarget(cluster, stores, filter)
stores := cluster.GetStores()
target := s.selector.SelectTarget(cluster, stores, scoreGuard, excludedFilter)
if target == nil {
return nil
}
return &metapb.Peer{StoreId: target.GetID()}
return &metapb.Peer{StoreId: target.GetID(), IsLearner: oldPeer.GetIsLearner()}
}
112 changes: 112 additions & 0 deletions server/schedulers/shuffle_region_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package schedulers

import (
"net/http"
"sync"

"github.com/gorilla/mux"
"github.com/pingcap/pd/v3/pkg/apiutil"
"github.com/pingcap/pd/v3/pkg/slice"
"github.com/pingcap/pd/v3/server/core"
"github.com/pingcap/pd/v3/server/schedule"
"github.com/pingcap/pd/v3/server/schedule/placement"
"github.com/unrolled/render"
)

const (
roleLeader = string(placement.Leader)
roleFollower = string(placement.Follower)
roleLearner = string(placement.Learner)
)

var allRoles = []string{roleLeader, roleFollower, roleLearner}

type shuffleRegionSchedulerConfig struct {
sync.RWMutex
storage *core.Storage

Ranges []core.KeyRange `json:"ranges"`
Roles []string `json:"roles"` // can include `leader`, `follower`, `learner`.
}

func (conf *shuffleRegionSchedulerConfig) EncodeConfig() ([]byte, error) {
conf.RLock()
defer conf.RUnlock()
return schedule.EncodeConfig(conf)
}

func (conf *shuffleRegionSchedulerConfig) GetRoles() []string {
conf.RLock()
defer conf.RUnlock()
return conf.Roles
}

func (conf *shuffleRegionSchedulerConfig) GetRanges() []core.KeyRange {
conf.RLock()
defer conf.RUnlock()
return conf.Ranges
}

func (conf *shuffleRegionSchedulerConfig) IsRoleAllow(role string) bool {
conf.RLock()
defer conf.RUnlock()
return slice.AnyOf(conf.Roles, func(i int) bool { return conf.Roles[i] == role })
}

func (conf *shuffleRegionSchedulerConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) {
router := mux.NewRouter()
router.HandleFunc("/roles", conf.handleGetRoles).Methods("GET")
router.HandleFunc("/roles", conf.handleSetRoles).Methods("POST")
router.ServeHTTP(w, r)
}

func (conf *shuffleRegionSchedulerConfig) handleGetRoles(w http.ResponseWriter, r *http.Request) {
rd := render.New(render.Options{IndentJSON: true})
rd.JSON(w, http.StatusOK, conf.GetRoles())
}

func (conf *shuffleRegionSchedulerConfig) handleSetRoles(w http.ResponseWriter, r *http.Request) {
rd := render.New(render.Options{IndentJSON: true})
var roles []string
if err := apiutil.ReadJSONRespondError(rd, w, r.Body, &roles); err != nil {
return
}
for _, r := range roles {
if slice.NoneOf(allRoles, func(i int) bool { return allRoles[i] == r }) {
rd.Text(w, http.StatusBadRequest, "invalid role:"+r)
return
}
}

conf.Lock()
defer conf.Unlock()
old := conf.Roles
conf.Roles = roles
if err := conf.persist(); err != nil {
conf.Roles = old // revert
rd.Text(w, http.StatusInternalServerError, err.Error())
return
}
rd.Text(w, http.StatusOK, "")
}

func (conf *shuffleRegionSchedulerConfig) persist() error {
data, err := schedule.EncodeConfig(conf)
if err != nil {
return err
}
return conf.storage.SaveScheduleConfig(ShuffleRegionName, data)
}
Loading

0 comments on commit aff95b9

Please sign in to comment.