Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: store balance weight. #713

Merged
merged 9 commits into from
Sep 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion pdctl/command/store_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ var (
// NewStoreCommand return a store subcommand of rootCmd
func NewStoreCommand() *cobra.Command {
s := &cobra.Command{
Use: "store [delete|label] <store_id>",
Use: "store [delete|label|weight] <store_id>",
Short: "show the store status",
Run: showStoreCommandFunc,
}
s.AddCommand(NewDeleteStoreCommand())
s.AddCommand(NewLabelStoreCommand())
s.AddCommand(NewSetStoreWeightCommand())
return s
}

Expand All @@ -59,6 +60,15 @@ func NewLabelStoreCommand() *cobra.Command {
return l
}

// NewSetStoreWeightCommand returns a weight subcommand of storeCmd.
func NewSetStoreWeightCommand() *cobra.Command {
return &cobra.Command{
Use: "weight <store_id> <leader_weight> <region_weight>",
Short: "set a store's leader and region balance weight",
Run: setStoreWeightCommandFunc,
}
}

func showStoreCommandFunc(cmd *cobra.Command, args []string) {
var prefix string
prefix = storesPrefix
Expand Down Expand Up @@ -107,3 +117,25 @@ func labelStoreCommandFunc(cmd *cobra.Command, args []string) {
prefix := fmt.Sprintf(path.Join(storePrefix, "label"), args[0])
postJSON(cmd, prefix, map[string]interface{}{args[1]: args[2]})
}

func setStoreWeightCommandFunc(cmd *cobra.Command, args []string) {
if len(args) != 3 {
fmt.Println("Usage: store weight <store_id> <leader_weight> <region_weight>")
return
}
leader, err := strconv.ParseFloat(args[1], 64)
if err != nil || leader < 0 {
fmt.Println("leader_weight should be a number that >= 0.")
return
}
region, err := strconv.ParseFloat(args[2], 64)
if err != nil || region < 0 {
fmt.Println("region_weight should be a number that >= 0")
return
}
prefix := fmt.Sprintf(path.Join(storePrefix, "weight"), args[0])
postJSON(cmd, prefix, map[string]interface{}{
"leader": leader,
"region": region,
})
}
1 change: 1 addition & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
router.HandleFunc("/api/v1/store/{id}", storeHandler.Get).Methods("GET")
router.HandleFunc("/api/v1/store/{id}", storeHandler.Delete).Methods("DELETE")
router.HandleFunc("/api/v1/store/{id}/label", storeHandler.SetLabels).Methods("POST")
router.HandleFunc("/api/v1/store/{id}/weight", storeHandler.SetWeight).Methods("POST")
router.Handle("/api/v1/stores", newStoresHandler(svr, rd)).Methods("GET")

labelsHandler := newLabelsHandler(svr, rd)
Expand Down
58 changes: 58 additions & 0 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ type storeStatus struct {
Capacity typeutil.ByteSize `json:"capacity,omitempty"`
Available typeutil.ByteSize `json:"available,omitempty"`
LeaderCount int `json:"leader_count,omitempty"`
LeaderWeight float64 `json:"leader_weight,omitempty"`
LeaderScore float64 `json:"leader_score,omitempty"`
RegionCount int `json:"region_count,omitempty"`
RegionWeight float64 `json:"region_weight,omitempty"`
RegionScore float64 `json:"region_score,omitempty"`
SendingSnapCount uint32 `json:"sending_snap_count,omitempty"`
ReceivingSnapCount uint32 `json:"receiving_snap_count,omitempty"`
ApplyingSnapCount uint32 `json:"applying_snap_count,omitempty"`
Expand All @@ -63,7 +67,11 @@ func newStoreInfo(store *server.StoreInfo) *storeInfo {
Capacity: typeutil.ByteSize(store.Stats.GetCapacity()),
Available: typeutil.ByteSize(store.Stats.GetAvailable()),
LeaderCount: store.LeaderCount,
LeaderWeight: store.LeaderWeight,
LeaderScore: store.LeaderScore(),
RegionCount: store.RegionCount,
RegionWeight: store.RegionWeight,
RegionScore: store.RegionScore(),
SendingSnapCount: store.Stats.GetSendingSnapCount(),
ReceivingSnapCount: store.Stats.GetReceivingSnapCount(),
ApplyingSnapCount: store.Stats.GetApplyingSnapCount(),
Expand Down Expand Up @@ -197,6 +205,56 @@ func (h *storeHandler) SetLabels(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, nil)
}

func (h *storeHandler) SetWeight(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
if cluster == nil {
h.rd.JSON(w, http.StatusInternalServerError, server.ErrNotBootstrapped.Error())
return
}

vars := mux.Vars(r)
storeIDStr := vars["id"]
storeID, err := strconv.ParseUint(storeIDStr, 10, 64)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

var input map[string]interface{}
if err := readJSON(r.Body, &input); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

leaderVal, ok := input["leader"]
if !ok {
h.rd.JSON(w, http.StatusBadRequest, "leader weight unset")
return
}
regionVal, ok := input["region"]
if !ok {
h.rd.JSON(w, http.StatusBadRequest, "region weight unset")
return
}
leader, ok := leaderVal.(float64)
if !ok || leader < 0 {
h.rd.JSON(w, http.StatusBadRequest, "badformat leader weight")
return
}
region, ok := regionVal.(float64)
if !ok || region < 0 {
h.rd.JSON(w, http.StatusBadRequest, "badformat region weight")
return
}

if err := cluster.SetStoreWeight(storeID, leader, region); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

h.rd.JSON(w, http.StatusOK, nil)
}

type storesHandler struct {
svr *server.Server
rd *render.Render
Expand Down
12 changes: 12 additions & 0 deletions server/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ func (l *balanceLeaderScheduler) Schedule(cluster *clusterInfo) Operator {
return nil
}

// Skip hot regions.
if cluster.isRegionHot(region.GetId()) {
schedulerCounter.WithLabelValues(l.GetName(), "region_hot").Inc()
return nil
}

source := cluster.getStore(region.Leader.GetStoreId())
target := cluster.getStore(newLeader.GetStoreId())
if !shouldBalance(source, target, l.GetResourceKind()) {
Expand Down Expand Up @@ -184,6 +190,12 @@ func (s *balanceRegionScheduler) Schedule(cluster *clusterInfo) Operator {
return nil
}

// Skip hot regions.
if cluster.isRegionHot(region.GetId()) {
schedulerCounter.WithLabelValues(s.GetName(), "region_hot").Inc()
return nil
}

op := s.transferPeer(cluster, region, oldPeer)
if op == nil {
// We can't transfer peer from this store now, so we add it to the cache
Expand Down
56 changes: 56 additions & 0 deletions server/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ func (c *testClusterInfo) addRegionStore(storeID uint64, regionCount int) {
c.putStore(store)
}

func (c *testClusterInfo) updateStoreLeaderWeight(storeID uint64, weight float64) {
store := c.getStore(storeID)
store.LeaderWeight = weight
c.putStore(store)
}

func (c *testClusterInfo) updateStoreRegionWeight(storeID uint64, weight float64) {
store := c.getStore(storeID)
store.RegionWeight = weight
c.putStore(store)
}

func (c *testClusterInfo) addLabelsStore(storeID uint64, regionCount int, labels map[string]string) {
c.addRegionStore(storeID, regionCount)
store := c.getStore(storeID)
Expand Down Expand Up @@ -315,6 +327,26 @@ func (s *testBalanceLeaderSchedulerSuite) TestBalanceFilter(c *C) {
checkTransferLeader(c, s.schedule(), 4, 3)
}

func (s *testBalanceLeaderSchedulerSuite) TestLeaderWeight(c *C) {
// Stores: 1 2 3 4
// Leaders: 10 10 10 10
// Weight: 0.5 0.9 1 2
// Region1: L F F F

s.tc.addLeaderStore(1, 10)
s.tc.addLeaderStore(2, 10)
s.tc.addLeaderStore(3, 10)
s.tc.addLeaderStore(4, 10)
s.tc.updateStoreLeaderWeight(1, 0.5)
s.tc.updateStoreLeaderWeight(2, 0.9)
s.tc.updateStoreLeaderWeight(3, 1)
s.tc.updateStoreLeaderWeight(4, 2)
s.tc.addLeaderRegion(1, 1, 2, 3, 4)
checkTransferLeader(c, s.schedule(), 1, 4)
s.tc.updateLeaderCount(4, 30)
checkTransferLeader(c, s.schedule(), 1, 3)
}

func (s *testBalanceLeaderSchedulerSuite) TestBalanceSelector(c *C) {
// Stores: 1 2 3 4
// Leaders: 1 2 3 10
Expand Down Expand Up @@ -480,6 +512,30 @@ func (s *testBalanceRegionSchedulerSuite) TestReplicas5(c *C) {
checkTransferPeer(c, sb.Schedule(cluster), 11, 6)
}

func (s *testBalanceRegionSchedulerSuite) TestStoreWeight(c *C) {
cluster := newClusterInfo(newMockIDAllocator())
tc := newTestClusterInfo(cluster)

_, opt := newTestScheduleConfig()
sb := newBalanceRegionScheduler(opt)
opt.SetMaxReplicas(1)

tc.addRegionStore(1, 10)
tc.addRegionStore(2, 10)
tc.addRegionStore(3, 10)
tc.addRegionStore(4, 10)
tc.updateStoreRegionWeight(1, 0.5)
tc.updateStoreRegionWeight(2, 0.9)
tc.updateStoreRegionWeight(3, 1.0)
tc.updateStoreRegionWeight(4, 2.0)

tc.addLeaderRegion(1, 1)
checkTransferPeer(c, sb.Schedule(cluster), 1, 4)

tc.updateRegionCount(4, 30)
checkTransferPeer(c, sb.Schedule(cluster), 1, 3)
}

var _ = Suite(&testReplicaCheckerSuite{})

type testReplicaCheckerSuite struct{}
Expand Down
9 changes: 9 additions & 0 deletions server/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,15 @@ func (c *clusterInfo) updateWriteStatCache(region *RegionInfo, hotRegionThreshol
c.writeStatistics.add(key, newItem)
}

func (c *clusterInfo) isRegionHot(id uint64) bool {
c.RLock()
defer c.RUnlock()
if stat, ok := c.writeStatistics.peek(id); ok {
return stat.(*RegionStat).HotDegree >= hotRegionLowThreshold
}
return false
}

func (c *clusterInfo) searchRegion(regionKey []byte) *RegionInfo {
c.RLock()
defer c.RUnlock()
Expand Down
26 changes: 22 additions & 4 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,24 @@ func (c *RaftCluster) BuryStore(storeID uint64, force bool) error {
return cluster.putStore(store)
}

// SetStoreWeight sets up a store's leader/region balance weight.
func (c *RaftCluster) SetStoreWeight(storeID uint64, leader, region float64) error {
c.Lock()
defer c.Unlock()

store := c.cachedCluster.getStore(storeID)
if store == nil {
return errors.Trace(errStoreNotFound(storeID))
}

if err := c.s.kv.saveStoreWeight(storeID, leader, region); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving this inside clusterInfo.putStore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clusterInfo.putStore has other callers, SetStoreLabel, DeleteStore, etc. They won't change the weight, so I think put saveStoreWeight outside is more suitable.

return errors.Trace(err)
}

store.LeaderWeight, store.RegionWeight = leader, region
return c.cachedCluster.putStore(store)
}

func (c *RaftCluster) checkStores() {
cluster := c.cachedCluster
for _, store := range cluster.getMetaStores() {
Expand Down Expand Up @@ -565,10 +583,10 @@ func (c *RaftCluster) collectMetrics() {
storageCapacity += s.Stats.GetCapacity()

// Balance score.
minLeaderScore = math.Min(minLeaderScore, s.leaderScore())
maxLeaderScore = math.Max(maxLeaderScore, s.leaderScore())
minRegionScore = math.Min(minRegionScore, s.regionScore())
maxRegionScore = math.Max(maxRegionScore, s.regionScore())
minLeaderScore = math.Min(minLeaderScore, s.LeaderScore())
maxLeaderScore = math.Max(maxLeaderScore, s.LeaderScore())
minRegionScore = math.Min(minRegionScore, s.RegionScore())
maxRegionScore = math.Max(maxRegionScore, s.RegionScore())
}

metrics := make(map[string]float64)
Expand Down
13 changes: 7 additions & 6 deletions server/cluster_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,11 +473,8 @@ func (s *testClusterWorkerSuite) TestHeartbeatSplit2(c *C) {
// Split.
r2ID, r2PeerIDs := s.askSplit(c, r1)
r2 := splitRegion(c, r1, []byte("m"), r2ID, r2PeerIDs)
c.Logf("r1: %v, r2: %v", r1, r2)
leaderPeer2 := s.chooseRegionLeader(c, r2)
resp := s.heartbeatRegion(c, s.clusterID, r2, leaderPeer2)
c.Logf("resp: %+v", resp)
c.Assert(resp, IsNil)
s.heartbeatRegion(c, s.clusterID, r2, leaderPeer2)
testutil.WaitUntil(c, s.checkSearchRegions(cluster, "", "m"))
}

Expand Down Expand Up @@ -577,6 +574,11 @@ func (s *testClusterWorkerSuite) TestHeartbeatChangePeer(c *C) {

func (s *testClusterWorkerSuite) TestHeartbeatSplitAddPeer(c *C) {
s.svr.scheduleOpt.SetMaxReplicas(2)
// Stop schedulers.
scheduleCfg := s.svr.GetScheduleConfig()
scheduleCfg.LeaderScheduleLimit = 0
scheduleCfg.RegionScheduleLimit = 0
s.svr.SetScheduleConfig(*scheduleCfg)

cluster := s.svr.GetRaftCluster()
c.Assert(cluster, NotNil)
Expand All @@ -591,8 +593,7 @@ func (s *testClusterWorkerSuite) TestHeartbeatSplitAddPeer(c *C) {
r2 := splitRegion(c, r1, []byte("m"), r2ID, r2PeerIDs)

// Sync r1 with both ConfVer and Version updated.
resp := s.heartbeatRegion(c, s.clusterID, r1, leaderPeer1)
c.Assert(resp, IsNil)
s.heartbeatRegion(c, s.clusterID, r1, leaderPeer1)
testutil.WaitUntil(c, s.checkSearchRegions(cluster, "m", ""))
mustGetRegion(c, cluster, []byte("z"), r1)
mustGetRegion(c, cluster, []byte("a"), nil)
Expand Down
Loading