Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: store balance weight. #713

Merged
merged 9 commits into from
Sep 1, 2017
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion pdctl/command/store_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ var (
// NewStoreCommand return a store subcommand of rootCmd
func NewStoreCommand() *cobra.Command {
s := &cobra.Command{
Use: "store [delete|label] <store_id>",
Use: "store [delete|label|weight] <store_id>",
Short: "show the store status",
Run: showStoreCommandFunc,
}
s.AddCommand(NewDeleteStoreCommand())
s.AddCommand(NewLabelStoreCommand())
s.AddCommand(NewSetStoreWeightCommand())
return s
}

Expand All @@ -59,6 +60,15 @@ func NewLabelStoreCommand() *cobra.Command {
return l
}

// NewSetStoreWeightCommand returns a weight subcommand of storeCmd.
func NewSetStoreWeightCommand() *cobra.Command {
return &cobra.Command{
Use: "weight <store_id> <leader_weight> <region_weight>",
Short: "set a store's leader and region balance weight",
Run: setStoreWeightCommandFunc,
}
}

func showStoreCommandFunc(cmd *cobra.Command, args []string) {
var prefix string
prefix = storesPrefix
Expand Down Expand Up @@ -107,3 +117,25 @@ func labelStoreCommandFunc(cmd *cobra.Command, args []string) {
prefix := fmt.Sprintf(path.Join(storePrefix, "label"), args[0])
postJSON(cmd, prefix, map[string]interface{}{args[1]: args[2]})
}

func setStoreWeightCommandFunc(cmd *cobra.Command, args []string) {
if len(args) != 3 {
fmt.Println("Usage: store weight <store_id> <leader_weight> <region_weight>")
return
}
leader, err := strconv.ParseFloat(args[1], 64)
if err != nil || leader < 0 {
fmt.Println("leader_weight should be a number that >= 0.")
return
}
region, err := strconv.ParseFloat(args[2], 64)
if err != nil || region < 0 {
fmt.Println("region_weight should be a number that >= 0")
return
}
prefix := fmt.Sprintf(path.Join(storePrefix, "weight"), args[0])
postJSON(cmd, prefix, map[string]interface{}{
"leader": leader,
"region": region,
})
}
1 change: 1 addition & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
router.HandleFunc("/api/v1/store/{id}", storeHandler.Get).Methods("GET")
router.HandleFunc("/api/v1/store/{id}", storeHandler.Delete).Methods("DELETE")
router.HandleFunc("/api/v1/store/{id}/label", storeHandler.SetLabels).Methods("POST")
router.HandleFunc("/api/v1/store/{id}/weight", storeHandler.SetWeight).Methods("POST")
router.Handle("/api/v1/stores", newStoresHandler(svr, rd)).Methods("GET")

labelsHandler := newLabelsHandler(svr, rd)
Expand Down
58 changes: 58 additions & 0 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ type storeStatus struct {
Capacity typeutil.ByteSize `json:"capacity,omitempty"`
Available typeutil.ByteSize `json:"available,omitempty"`
LeaderCount int `json:"leader_count,omitempty"`
LeaderWeight float64 `json:"leader_weight,omitempty"`
LeaderScore float64 `json:"leader_score,omitempty"`
RegionCount int `json:"region_count,omitempty"`
RegionWeight float64 `json:"region_weight,omitempty"`
RegionScore float64 `json:"region_score,omitempty"`
SendingSnapCount uint32 `json:"sending_snap_count,omitempty"`
ReceivingSnapCount uint32 `json:"receiving_snap_count,omitempty"`
ApplyingSnapCount uint32 `json:"applying_snap_count,omitempty"`
Expand All @@ -63,7 +67,11 @@ func newStoreInfo(store *server.StoreInfo) *storeInfo {
Capacity: typeutil.ByteSize(store.Stats.GetCapacity()),
Available: typeutil.ByteSize(store.Stats.GetAvailable()),
LeaderCount: store.LeaderCount,
LeaderWeight: store.LeaderWeight,
LeaderScore: store.LeaderScore(),
RegionCount: store.RegionCount,
RegionWeight: store.RegionWeight,
RegionScore: store.RegionScore(),
SendingSnapCount: store.Stats.GetSendingSnapCount(),
ReceivingSnapCount: store.Stats.GetReceivingSnapCount(),
ApplyingSnapCount: store.Stats.GetApplyingSnapCount(),
Expand Down Expand Up @@ -197,6 +205,56 @@ func (h *storeHandler) SetLabels(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, nil)
}

func (h *storeHandler) SetWeight(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
if cluster == nil {
h.rd.JSON(w, http.StatusInternalServerError, server.ErrNotBootstrapped.Error())
return
}

vars := mux.Vars(r)
storeIDStr := vars["id"]
storeID, err := strconv.ParseUint(storeIDStr, 10, 64)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

var input map[string]interface{}
if err := readJSON(r.Body, &input); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

leaderVal, ok := input["leader"]
if !ok {
h.rd.JSON(w, http.StatusBadRequest, "leader weight unset")
return
}
regionVal, ok := input["region"]
if !ok {
h.rd.JSON(w, http.StatusBadRequest, "region weight unset")
return
}
leader, ok := leaderVal.(float64)
if !ok || leader < 0 {
h.rd.JSON(w, http.StatusBadRequest, "badformat leader weight")
return
}
region, ok := regionVal.(float64)
if !ok || region < 0 {
h.rd.JSON(w, http.StatusBadRequest, "badformat region weight")
return
}

if err := cluster.SetStoreWeight(storeID, leader, region); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

h.rd.JSON(w, http.StatusOK, nil)
}

type storesHandler struct {
svr *server.Server
rd *render.Render
Expand Down
56 changes: 56 additions & 0 deletions server/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ func (c *testClusterInfo) addRegionStore(storeID uint64, regionCount int) {
c.putStore(store)
}

func (c *testClusterInfo) updateStoreLeaderWeight(storeID uint64, weight float64) {
store := c.getStore(storeID)
store.LeaderWeight = weight
c.putStore(store)
}

func (c *testClusterInfo) updateStoreRegionWeight(storeID uint64, weight float64) {
store := c.getStore(storeID)
store.RegionWeight = weight
c.putStore(store)
}

func (c *testClusterInfo) addLabelsStore(storeID uint64, regionCount int, labels map[string]string) {
c.addRegionStore(storeID, regionCount)
store := c.getStore(storeID)
Expand Down Expand Up @@ -315,6 +327,26 @@ func (s *testBalanceLeaderSchedulerSuite) TestBalanceFilter(c *C) {
checkTransferLeader(c, s.schedule(), 4, 3)
}

func (s *testBalanceLeaderSchedulerSuite) TestWeight(c *C) {
// Stores: 1 2 3 4
// Leaders: 10 10 10 10
// Weight: 0.5 0.5 1 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/0.5 0.5/0.5 0.9/

// Region1: L F F F

s.tc.addLeaderStore(1, 10)
s.tc.addLeaderStore(2, 10)
s.tc.addLeaderStore(3, 10)
s.tc.addLeaderStore(4, 10)
s.tc.updateStoreLeaderWeight(1, 0.5)
s.tc.updateStoreLeaderWeight(2, 0.5)
s.tc.updateStoreLeaderWeight(3, 1)
s.tc.updateStoreLeaderWeight(4, 2)
s.tc.addLeaderRegion(1, 1, 2, 3, 4)
checkTransferLeader(c, s.schedule(), 1, 4)
s.tc.updateLeaderCount(4, 30)
checkTransferLeader(c, s.schedule(), 1, 3)
}

func (s *testBalanceLeaderSchedulerSuite) TestBalanceSelector(c *C) {
// Stores: 1 2 3 4
// Leaders: 1 2 3 10
Expand Down Expand Up @@ -480,6 +512,30 @@ func (s *testBalanceRegionSchedulerSuite) TestReplicas5(c *C) {
checkTransferPeer(c, sb.Schedule(cluster), 11, 6)
}

func (s *testBalanceRegionSchedulerSuite) TestStoreWeight(c *C) {
cluster := newClusterInfo(newMockIDAllocator())
tc := newTestClusterInfo(cluster)

_, opt := newTestScheduleConfig()
sb := newBalanceRegionScheduler(opt)
opt.SetMaxReplicas(1)

tc.addRegionStore(1, 10)
tc.addRegionStore(2, 10)
tc.addRegionStore(3, 10)
tc.addRegionStore(4, 10)
tc.updateStoreRegionWeight(1, 0.5)
tc.updateStoreRegionWeight(2, 0.5)
tc.updateStoreRegionWeight(3, 1.0)
tc.updateStoreRegionWeight(4, 2.0)

tc.addLeaderRegion(1, 1)
checkTransferPeer(c, sb.Schedule(cluster), 1, 4)

tc.updateRegionCount(4, 30)
checkTransferPeer(c, sb.Schedule(cluster), 1, 3)
}

var _ = Suite(&testReplicaCheckerSuite{})

type testReplicaCheckerSuite struct{}
Expand Down
26 changes: 22 additions & 4 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,24 @@ func (c *RaftCluster) BuryStore(storeID uint64, force bool) error {
return cluster.putStore(store)
}

// SetStoreWeight sets up a store's leader/region balance weight.
func (c *RaftCluster) SetStoreWeight(storeID uint64, leader, region float64) error {
c.Lock()
defer c.Unlock()

store := c.cachedCluster.getStore(storeID)
if store == nil {
return errors.Trace(errStoreNotFound(storeID))
}

if err := c.s.kv.saveStoreWeight(storeID, leader, region); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving this inside clusterInfo.putStore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clusterInfo.putStore has other callers, SetStoreLabel, DeleteStore, etc. They won't change the weight, so I think put saveStoreWeight outside is more suitable.

return errors.Trace(err)
}

store.LeaderWeight, store.RegionWeight = leader, region
return c.cachedCluster.putStore(store)
}

func (c *RaftCluster) checkStores() {
cluster := c.cachedCluster
for _, store := range cluster.getMetaStores() {
Expand Down Expand Up @@ -565,10 +583,10 @@ func (c *RaftCluster) collectMetrics() {
storageCapacity += s.Stats.GetCapacity()

// Balance score.
minLeaderScore = math.Min(minLeaderScore, s.leaderScore())
maxLeaderScore = math.Max(maxLeaderScore, s.leaderScore())
minRegionScore = math.Min(minRegionScore, s.regionScore())
maxRegionScore = math.Max(maxRegionScore, s.regionScore())
minLeaderScore = math.Min(minLeaderScore, s.LeaderScore())
maxLeaderScore = math.Max(maxLeaderScore, s.LeaderScore())
minRegionScore = math.Min(minRegionScore, s.RegionScore())
maxRegionScore = math.Max(maxRegionScore, s.RegionScore())
}

metrics := make(map[string]float64)
Expand Down
68 changes: 59 additions & 9 deletions server/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"math"
"path"
"strconv"
"time"

log "github.com/Sirupsen/logrus"
Expand All @@ -40,18 +41,20 @@ var (

// kv wraps all kv operations, keep it stateless.
type kv struct {
s *Server
client *clientv3.Client
clusterPath string
configPath string
s *Server
client *clientv3.Client
clusterPath string
configPath string
schedulePath string
}

func newKV(s *Server) *kv {
return &kv{
s: s,
client: s.client,
clusterPath: path.Join(s.rootPath, "raft"),
configPath: path.Join(s.rootPath, "config"),
s: s,
client: s.client,
clusterPath: path.Join(s.rootPath, "raft"),
configPath: path.Join(s.rootPath, "config"),
schedulePath: path.Join(s.rootPath, "schedule"),
}
}

Expand All @@ -69,6 +72,14 @@ func (kv *kv) clusterStatePath(option string) string {
return path.Join(kv.clusterPath, "status", option)
}

func (kv *kv) storeLeaderWeightPath(storeID uint64) string {
return path.Join(kv.schedulePath, "store_weight", fmt.Sprintf("%020d", storeID), "leader")
}

func (kv *kv) storeRegionWeightPath(storeID uint64) string {
return path.Join(kv.schedulePath, "store_weight", fmt.Sprintf("%020d", storeID), "region")
}

func (kv *kv) getRaftClusterBootstrapTime() (time.Time, error) {
data, err := kv.load(kv.clusterStatePath("raft_bootstrap_time"))
if err != nil {
Expand Down Expand Up @@ -169,8 +180,20 @@ func (kv *kv) loadStores(stores *storesInfo, rangeLimit int64) error {
return errors.Trace(err)
}

storeInfo := newStoreInfo(store)
leaderWeight, err := kv.loadFloatWithDefaultValue(kv.storeLeaderWeightPath(storeInfo.GetId()), 1.0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider extracting a loadStoreWeight and test them.

if err != nil {
return errors.Trace(err)
}
storeInfo.LeaderWeight = leaderWeight
regionWeight, err := kv.loadFloatWithDefaultValue(kv.storeRegionWeightPath(storeInfo.GetId()), 1.0)
if err != nil {
return errors.Trace(err)
}
storeInfo.RegionWeight = regionWeight

nextID = store.GetId() + 1
stores.setStore(newStoreInfo(store))
stores.setStore(storeInfo)
}

if len(resp.Kvs) < int(rangeLimit) {
Expand All @@ -179,6 +202,33 @@ func (kv *kv) loadStores(stores *storesInfo, rangeLimit int64) error {
}
}

func (kv *kv) saveStoreWeight(storeID uint64, leader, region float64) error {
leaderValue := strconv.FormatFloat(leader, 'f', -1, 64)
if err := kv.save(kv.storeLeaderWeightPath(storeID), leaderValue); err != nil {
return errors.Trace(err)
}
regionValue := strconv.FormatFloat(region, 'f', -1, 64)
if err := kv.save(kv.storeRegionWeightPath(storeID), regionValue); err != nil {
return errors.Trace(err)
}
return nil
}

func (kv *kv) loadFloatWithDefaultValue(path string, def float64) (float64, error) {
res, err := kvGet(kv.client, path)
if err != nil {
return 0, errors.Trace(err)
}
if len(res.Kvs) == 0 {
return def, nil
}
val, err := strconv.ParseFloat(string(res.Kvs[0].Value), 64)
if err != nil {
return 0, errors.Trace(err)
}
return val, nil
}

func (kv *kv) loadRegions(regions *regionsInfo, rangeLimit int64) error {
nextID := uint64(0)
endRegion := kv.regionPath(math.MaxUint64)
Expand Down
4 changes: 2 additions & 2 deletions server/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ func compareStoreScore(storeA *StoreInfo, scoreA float64, storeB *StoreInfo, sco
return -1
}
// The store with lower region score is better.
if storeA.regionScore() < storeB.regionScore() {
if storeA.RegionScore() < storeB.RegionScore() {
return 1
}
if storeA.regionScore() > storeB.regionScore() {
if storeA.RegionScore() > storeB.RegionScore() {
return -1
}
return 0
Expand Down
Loading