Skip to content

Commit

Permalink
*:Rollback config in store when kv.persist failed (#1476)
Browse files Browse the repository at this point in the history
  • Loading branch information
bradyjoestar authored and nolouch committed May 22, 2019
1 parent acf3e28 commit 1268d8b
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 28 deletions.
90 changes: 90 additions & 0 deletions server/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/server/core"
"github.com/pkg/errors"
"google.golang.org/grpc"
)

Expand All @@ -44,6 +45,14 @@ type testClusterSuite struct {
baseCluster
}

type testErrorKV struct {
core.KVBase
}

func (kv *testErrorKV) Save(key, value string) error {
return errors.New("save failed")
}

func mustNewGrpcClient(c *C, addr string) pdpb.PDClient {
conn, err := grpc.Dial(strings.TrimPrefix(addr, "http://"), grpc.WithInsecure())

Expand Down Expand Up @@ -597,3 +606,84 @@ func (s *testGetStoresSuite) BenchmarkGetStores(c *C) {
s.cluster.core.Stores.GetStores()
}
}

func (s *testClusterSuite) TestSetScheduleOpt(c *C) {
var err error
var cleanup func()
_, s.svr, cleanup, err = NewTestServer(c)
c.Assert(err, IsNil)
mustWaitLeader(c, []*Server{s.svr})
s.grpcPDClient = mustNewGrpcClient(c, s.svr.GetAddr())
defer cleanup()
clusterID := s.svr.clusterID

storeAddr := "127.0.0.1:0"
_, err = s.svr.bootstrapCluster(s.newBootstrapRequest(c, clusterID, storeAddr))
c.Assert(err, IsNil)

_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)

scheduleCfg := opt.load()
replicateCfg := s.svr.GetReplicationConfig()
pdServerCfg := s.svr.scheduleOpt.loadPDServerConfig()

//PUT GET DELETE successed
replicateCfg.MaxReplicas = 5
scheduleCfg.MaxSnapshotCount = 10
pdServerCfg.UseRegionStorage = true
typ, labelKey, labelValue := "testTyp", "testKey", "testValue"
nsConfig := NamespaceConfig{LeaderScheduleLimit: uint64(200)}

c.Assert(s.svr.SetScheduleConfig(*scheduleCfg), IsNil)
c.Assert(s.svr.SetPDServerConfig(*pdServerCfg), IsNil)
c.Assert(s.svr.SetLabelProperty(typ, labelKey, labelValue), IsNil)
c.Assert(s.svr.SetNamespaceConfig("testNS", nsConfig), IsNil)
c.Assert(s.svr.SetReplicationConfig(*replicateCfg), IsNil)

c.Assert(s.svr.GetReplicationConfig().MaxReplicas, Equals, uint64(5))
c.Assert(s.svr.scheduleOpt.GetMaxSnapshotCount(), Equals, uint64(10))
c.Assert(s.svr.scheduleOpt.loadPDServerConfig().UseRegionStorage, Equals, true)
c.Assert(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ][0].Key, Equals, "testKey")
c.Assert(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ][0].Value, Equals, "testValue")
c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(200))

c.Assert(s.svr.DeleteNamespaceConfig("testNS"), IsNil)
c.Assert(s.svr.DeleteLabelProperty(typ, labelKey, labelValue), IsNil)

c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(0))
c.Assert(len(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ]), Equals, 0)

//PUT GET failed
oldKV := s.svr.kv
s.svr.kv = core.NewKV(&testErrorKV{})
replicateCfg.MaxReplicas = 7
scheduleCfg.MaxSnapshotCount = 20
pdServerCfg.UseRegionStorage = false

c.Assert(s.svr.SetScheduleConfig(*scheduleCfg), NotNil)
c.Assert(s.svr.SetReplicationConfig(*replicateCfg), NotNil)
c.Assert(s.svr.SetPDServerConfig(*pdServerCfg), NotNil)
c.Assert(s.svr.SetLabelProperty(typ, labelKey, labelValue), NotNil)
c.Assert(s.svr.SetNamespaceConfig("testNS", nsConfig), NotNil)

c.Assert(s.svr.GetReplicationConfig().MaxReplicas, Equals, uint64(5))
c.Assert(s.svr.scheduleOpt.GetMaxSnapshotCount(), Equals, uint64(10))
c.Assert(s.svr.scheduleOpt.loadPDServerConfig().UseRegionStorage, Equals, true)
c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(0))
c.Assert(len(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ]), Equals, 0)

//DELETE failed
s.svr.kv = oldKV
c.Assert(s.svr.SetNamespaceConfig("testNS", nsConfig), IsNil)
c.Assert(s.svr.SetReplicationConfig(*replicateCfg), IsNil)

s.svr.kv = core.NewKV(&testErrorKV{})
c.Assert(s.svr.DeleteLabelProperty(typ, labelKey, labelValue), NotNil)
c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(200))
c.Assert(s.svr.DeleteNamespaceConfig("testNS"), NotNil)

c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(200))
c.Assert(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ][0].Key, Equals, "testKey")
c.Assert(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ][0].Value, Equals, "testValue")
}
61 changes: 43 additions & 18 deletions server/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package server

import (
"reflect"
"sync"
"sync/atomic"
"time"

Expand All @@ -28,7 +29,7 @@ import (
type scheduleOption struct {
v atomic.Value
rep *Replication
ns map[string]*namespaceOption
ns sync.Map // concurrent map[string]*namespaceOption
labelProperty atomic.Value
clusterVersion atomic.Value
pdServerConfig atomic.Value
Expand All @@ -37,10 +38,10 @@ type scheduleOption struct {
func newScheduleOption(cfg *Config) *scheduleOption {
o := &scheduleOption{}
o.store(&cfg.Schedule)
o.ns = make(map[string]*namespaceOption)
o.ns = sync.Map{}
for name, nsCfg := range cfg.Namespace {
nsCfg := nsCfg
o.ns[name] = newNamespaceOption(&nsCfg)
o.ns.Store(name, newNamespaceOption(&nsCfg))
}
o.rep = newReplication(&cfg.Replication)
o.pdServerConfig.Store(&cfg.PDServerCfg)
Expand All @@ -61,8 +62,36 @@ func (o *scheduleOption) GetReplication() *Replication {
return o.rep
}

func (o *scheduleOption) getNS(name string) (*namespaceOption, bool) {
if n, ok := o.ns.Load(name); ok {
if n, ok := n.(*namespaceOption); ok {
return n, true
}
}
return nil, false
}

func (o *scheduleOption) loadNSConfig() map[string]NamespaceConfig {
namespaces := make(map[string]NamespaceConfig)
f := func(k, v interface{}) bool {
var kstr string
var ok bool
if kstr, ok = k.(string); !ok {
return false
}
if ns, ok := v.(*namespaceOption); ok {
namespaces[kstr] = *ns.load()
return true
}
return false
}
o.ns.Range(f)

return namespaces
}

func (o *scheduleOption) GetMaxReplicas(name string) int {
if n, ok := o.ns[name]; ok {
if n, ok := o.getNS(name); ok {
return n.GetMaxReplicas()
}
return o.rep.GetMaxReplicas()
Expand Down Expand Up @@ -105,35 +134,35 @@ func (o *scheduleOption) GetMaxStoreDownTime() time.Duration {
}

func (o *scheduleOption) GetLeaderScheduleLimit(name string) uint64 {
if n, ok := o.ns[name]; ok {
if n, ok := o.getNS(name); ok {
return n.GetLeaderScheduleLimit()
}
return o.load().LeaderScheduleLimit
}

func (o *scheduleOption) GetRegionScheduleLimit(name string) uint64 {
if n, ok := o.ns[name]; ok {
if n, ok := o.getNS(name); ok {
return n.GetRegionScheduleLimit()
}
return o.load().RegionScheduleLimit
}

func (o *scheduleOption) GetReplicaScheduleLimit(name string) uint64 {
if n, ok := o.ns[name]; ok {
if n, ok := o.getNS(name); ok {
return n.GetReplicaScheduleLimit()
}
return o.load().ReplicaScheduleLimit
}

func (o *scheduleOption) GetMergeScheduleLimit(name string) uint64 {
if n, ok := o.ns[name]; ok {
if n, ok := o.getNS(name); ok {
return n.GetMergeScheduleLimit()
}
return o.load().MergeScheduleLimit
}

func (o *scheduleOption) GetHotRegionScheduleLimit(name string) uint64 {
if n, ok := o.ns[name]; ok {
if n, ok := o.getNS(name); ok {
return n.GetHotRegionScheduleLimit()
}
return o.load().HotRegionScheduleLimit
Expand Down Expand Up @@ -272,10 +301,8 @@ func (o *scheduleOption) loadPDServerConfig() *PDServerConfig {
}

func (o *scheduleOption) persist(kv *core.KV) error {
namespaces := make(map[string]NamespaceConfig)
for name, ns := range o.ns {
namespaces[name] = *ns.load()
}
namespaces := o.loadNSConfig()

cfg := &Config{
Schedule: *o.load(),
Replication: *o.rep.load(),
Expand All @@ -289,10 +316,8 @@ func (o *scheduleOption) persist(kv *core.KV) error {
}

func (o *scheduleOption) reload(kv *core.KV) error {
namespaces := make(map[string]NamespaceConfig)
for name, ns := range o.ns {
namespaces[name] = *ns.load()
}
namespaces := o.loadNSConfig()

cfg := &Config{
Schedule: *o.load().clone(),
Replication: *o.rep.load(),
Expand All @@ -311,7 +336,7 @@ func (o *scheduleOption) reload(kv *core.KV) error {
o.rep.store(&cfg.Replication)
for name, nsCfg := range cfg.Namespace {
nsCfg := nsCfg
o.ns[name] = newNamespaceOption(&nsCfg)
o.ns.Store(name, newNamespaceOption(&nsCfg))
}
o.labelProperty.Store(cfg.LabelProperty)
o.clusterVersion.Store(cfg.ClusterVersion)
Expand Down
Loading

0 comments on commit 1268d8b

Please sign in to comment.