Skip to content

Commit

Permalink
server: schedule option persistence (tikv#574)
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch authored and siddontang committed Mar 22, 2017
1 parent 0d76dac commit ce861a6
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 13 deletions.
27 changes: 27 additions & 0 deletions pd-client/leader_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,33 @@ func (s *testLeaderChangeSuite) prepareClusterN(c *C, n int) (svrs map[string]*s
return
}

func (s *testLeaderChangeSuite) TestLeaderConfigChange(c *C) {
svrs, endpoints, closeFunc := s.prepareClusterN(c, 3)
defer closeFunc()

leader, err := getLeader(endpoints)
c.Assert(err, IsNil)
mustConnectLeader(c, endpoints, leader.GetAddr())

r := server.ReplicationConfig{MaxReplicas: 5}
svrs[leader.GetAddr()].SetReplication(r)
svrs[leader.GetAddr()].Close()
// wait leader changes
changed := false
for i := 0; i < 20; i++ {
newLeader, _ := getLeader(endpoints)
if newLeader != nil && newLeader.GetAddr() != leader.GetAddr() {
mustConnectLeader(c, endpoints, newLeader.GetAddr())
changed = true
nr := svrs[newLeader.GetAddr()].GetConfig().Replication.MaxReplicas
c.Assert(nr, Equals, uint64(5))
break
}
time.Sleep(500 * time.Millisecond)
}
c.Assert(changed, IsTrue)
}

func (s *testLeaderChangeSuite) TestLeaderChange(c *C) {
svrs, endpoints, closeFunc := s.prepareClusterN(c, 3)
defer closeFunc()
Expand Down
18 changes: 17 additions & 1 deletion server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (h *confHandler) GetSchedule(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, &h.svr.GetConfig().Schedule)
}

func (h *confHandler) Post(w http.ResponseWriter, r *http.Request) {
func (h *confHandler) SetSchedule(w http.ResponseWriter, r *http.Request) {
config := &server.ScheduleConfig{}
err := readJSON(r.Body, config)
if err != nil {
Expand All @@ -51,3 +51,19 @@ func (h *confHandler) Post(w http.ResponseWriter, r *http.Request) {
h.svr.SetScheduleConfig(*config)
h.rd.JSON(w, http.StatusOK, nil)
}

func (h *confHandler) GetReplication(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, &h.svr.GetConfig().Replication)
}

func (h *confHandler) SetReplication(w http.ResponseWriter, r *http.Request) {
config := &server.ReplicationConfig{}
err := readJSON(r.Body, config)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

h.svr.SetReplication(*config)
h.rd.JSON(w, http.StatusOK, nil)
}
41 changes: 33 additions & 8 deletions server/api/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package api

import (
"bytes"
"encoding/json"
"io/ioutil"
"math/rand"
Expand Down Expand Up @@ -68,24 +67,50 @@ func (s *testConfigSuite) TestConfigSchedule(c *C) {
addr := mustUnixAddrToHTTPAddr(c, strings.Join(parts, ""))
resp, err := s.hc.Get(addr)
c.Assert(err, IsNil)
buf, err := ioutil.ReadAll(resp.Body)
c.Assert(err, IsNil)

sc := &server.ScheduleConfig{}
err = json.Unmarshal(buf, sc)
c.Assert(err, IsNil)
readJSON(resp.Body, sc)

sc.MaxStoreDownTime.Duration = time.Second
postData, err := json.Marshal(sc)
postURL := []string{cfgs[rand.Intn(len(cfgs))].ClientUrls, apiPrefix, "/api/v1/config"}
postAddr := mustUnixAddrToHTTPAddr(c, strings.Join(postURL, ""))
resp, err = s.hc.Post(postAddr, "application/json", bytes.NewBuffer(postData))
err = postJSON(s.hc, postAddr, postData)
c.Assert(err, IsNil)

resp, err = s.hc.Get(addr)
sc1 := &server.ScheduleConfig{}
json.NewDecoder(resp.Body).Decode(sc1)
readJSON(resp.Body, sc1)

c.Assert(*sc, Equals, *sc1)
}
}

func (s *testConfigSuite) TestConfigReplication(c *C) {
numbers := []int{1, 3}
for _, num := range numbers {
cfgs, _, clean := mustNewCluster(c, num)
defer clean()

parts := []string{cfgs[rand.Intn(len(cfgs))].ClientUrls, apiPrefix, "/api/v1/config/replicate"}
addr := mustUnixAddrToHTTPAddr(c, strings.Join(parts, ""))
resp, err := s.hc.Get(addr)
c.Assert(err, IsNil)

rc := &server.ReplicationConfig{}
err = readJSON(resp.Body, rc)
c.Assert(err, IsNil)

rc.MaxReplicas = 5
postData, err := json.Marshal(rc)
postURL := []string{cfgs[rand.Intn(len(cfgs))].ClientUrls, apiPrefix, "/api/v1/config/replicate"}
postAddr := mustUnixAddrToHTTPAddr(c, strings.Join(postURL, ""))
err = postJSON(s.hc, postAddr, postData)
c.Assert(err, IsNil)

resp, err = s.hc.Get(addr)
rc1 := &server.ReplicationConfig{}
err = readJSON(resp.Body, rc1)

c.Assert(*rc, DeepEquals, *rc1)
}
}
4 changes: 3 additions & 1 deletion server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {

confHandler := newConfHandler(svr, rd)
router.HandleFunc("/api/v1/config", confHandler.Get).Methods("GET")
router.HandleFunc("/api/v1/config", confHandler.Post).Methods("POST")
router.HandleFunc("/api/v1/config", confHandler.SetSchedule).Methods("POST")
router.HandleFunc("/api/v1/config/schedule", confHandler.GetSchedule).Methods("GET")
router.HandleFunc("/api/v1/config/replicate", confHandler.SetReplication).Methods("POST")
router.HandleFunc("/api/v1/config/replicate", confHandler.GetReplication).Methods("GET")

storeHandler := newStoreHandler(svr, rd)
router.HandleFunc("/api/v1/store/{id}", storeHandler.Get).Methods("GET")
Expand Down
12 changes: 12 additions & 0 deletions server/api/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
package api

import (
"bytes"
"encoding/json"
"io"
"io/ioutil"
"net"
"net/http"

"github.com/juju/errors"
)
Expand All @@ -38,6 +40,16 @@ func readJSON(r io.ReadCloser, data interface{}) error {
return nil
}

func postJSON(cli *http.Client, url string, data []byte) error {
resp, err := cli.Post(url, "application/json", bytes.NewBuffer(data))
if err != nil {
return errors.Trace(err)
}
ioutil.ReadAll(resp.Body)
resp.Body.Close()
return nil
}

func unixDial(_, addr string) (net.Conn, error) {
return net.Dial("unix", addr)
}
14 changes: 12 additions & 2 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,26 @@ func (c *RaftCluster) isRunning() bool {

// GetConfig gets config information.
func (s *Server) GetConfig() *Config {
return s.cfg.clone()
cfg := s.cfg.clone()
cfg.Schedule = *s.scheduleOpt.load()
cfg.Replication = *s.scheduleOpt.rep.load()
return cfg
}

// SetScheduleConfig sets the balance config information.
func (s *Server) SetScheduleConfig(cfg ScheduleConfig) {
s.cfg.Schedule = cfg
s.scheduleOpt.store(&cfg)
s.scheduleOpt.persist(s.kv)
log.Infof("schedule config is updated: %+v, old: %+v", cfg, s.cfg.Schedule)
}

// SetReplication sets the replication config
func (s *Server) SetReplication(cfg ReplicationConfig) {
s.scheduleOpt.rep.store(&cfg)
s.scheduleOpt.persist(s.kv)
log.Infof("replication is updated: %+v, old: %+v", cfg, s.cfg.Replication)
}

func (s *Server) getClusterRootPath() string {
return path.Join(s.rootPath, "raft")
}
Expand Down
4 changes: 4 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,10 @@ func (o *scheduleOption) GetReplicaScheduleLimit() uint64 {
return o.load().ReplicaScheduleLimit
}

func (o *scheduleOption) persist(kv *kv) error {
return kv.saveScheduleOption(o)
}

// ParseUrls parse a string into multiple urls.
// Export for api.
func ParseUrls(s string) ([]url.URL, error) {
Expand Down
49 changes: 49 additions & 0 deletions server/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package server

import (
"encoding/json"
"fmt"
"math"
"path"
Expand Down Expand Up @@ -42,13 +43,15 @@ type kv struct {
s *Server
client *clientv3.Client
clusterPath string
configPath string
}

func newKV(s *Server) *kv {
return &kv{
s: s,
client: s.client,
clusterPath: path.Join(s.rootPath, "raft"),
configPath: path.Join(s.rootPath, "config"),
}
}

Expand Down Expand Up @@ -86,6 +89,52 @@ func (kv *kv) saveRegion(region *metapb.Region) error {
return kv.saveProto(kv.regionPath(region.GetId()), region)
}

func (kv *kv) loadScheduleOption(opt *scheduleOption) (bool, error) {
cfg := &Config{}
cfg.Schedule = *opt.load()
cfg.Replication = *opt.rep.load()
isExist, err := kv.loadConfig(cfg)
if err != nil {
return false, errors.Trace(err)
}
if !isExist {
return false, nil
}
opt.store(&cfg.Schedule)
opt.rep.store(&cfg.Replication)
return true, nil
}

func (kv *kv) saveScheduleOption(opt *scheduleOption) error {
cfg := &Config{}
cfg.Schedule = *opt.load()
cfg.Replication = *opt.rep.load()
return kv.saveConfig(cfg)
}

func (kv *kv) saveConfig(cfg *Config) error {
value, err := json.Marshal(cfg)
if err != nil {
return errors.Trace(err)
}
return kv.save(kv.configPath, string(value))
}

func (kv *kv) loadConfig(cfg *Config) (bool, error) {
value, err := kv.load(kv.configPath)
if err != nil {
return false, errors.Trace(err)
}
if value == nil {
return false, nil
}
err = json.Unmarshal(value, cfg)
if err != nil {
return false, errors.Trace(err)
}
return true, nil
}

func (kv *kv) loadStores(stores *storesInfo, rangeLimit int64) error {
nextID := uint64(0)
endStore := kv.storePath(math.MaxUint64)
Expand Down
16 changes: 15 additions & 1 deletion server/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,12 @@ func (s *Server) campaignLeader() error {
if err != nil {
return errors.Trace(err)
}

log.Debugf("campaign leader ok %s", s.Name())

err = s.reloadScheduleOption()
if err != nil {
return errors.Trace(err)
}
// Try to create raft cluster.
err = s.createRaftCluster()
if err != nil {
Expand Down Expand Up @@ -260,3 +263,14 @@ func (s *Server) resignLeader() error {
func (s *Server) leaderCmp() clientv3.Cmp {
return clientv3.Compare(clientv3.Value(s.getLeaderPath()), "=", s.leaderValue)
}

func (s *Server) reloadScheduleOption() error {
isExist, err := s.kv.loadScheduleOption(s.scheduleOpt)
if err != nil {
return errors.Trace(err)
}
if isExist {
return nil
}
return s.kv.saveScheduleOption(s.scheduleOpt)
}

0 comments on commit ce861a6

Please sign in to comment.