diff --git a/server/cluster_test.go b/server/cluster_test.go index 78c4dde1c58..d0801e72fca 100644 --- a/server/cluster_test.go +++ b/server/cluster_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/pd/server/core" + "github.com/pkg/errors" "google.golang.org/grpc" ) @@ -44,6 +45,14 @@ type testClusterSuite struct { baseCluster } +type testErrorKV struct { + core.KVBase +} + +func (kv *testErrorKV) Save(key, value string) error { + return errors.New("save failed") +} + func mustNewGrpcClient(c *C, addr string) pdpb.PDClient { conn, err := grpc.Dial(strings.TrimPrefix(addr, "http://"), grpc.WithInsecure()) @@ -597,3 +606,84 @@ func (s *testGetStoresSuite) BenchmarkGetStores(c *C) { s.cluster.core.Stores.GetStores() } } + +func (s *testClusterSuite) TestSetScheduleOpt(c *C) { + var err error + var cleanup func() + _, s.svr, cleanup, err = NewTestServer(c) + c.Assert(err, IsNil) + mustWaitLeader(c, []*Server{s.svr}) + s.grpcPDClient = mustNewGrpcClient(c, s.svr.GetAddr()) + defer cleanup() + clusterID := s.svr.clusterID + + storeAddr := "127.0.0.1:0" + _, err = s.svr.bootstrapCluster(s.newBootstrapRequest(c, clusterID, storeAddr)) + c.Assert(err, IsNil) + + _, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) + + scheduleCfg := opt.load() + replicateCfg := s.svr.GetReplicationConfig() + pdServerCfg := s.svr.scheduleOpt.loadPDServerConfig() + + //PUT GET DELETE successed + replicateCfg.MaxReplicas = 5 + scheduleCfg.MaxSnapshotCount = 10 + pdServerCfg.UseRegionStorage = true + typ, labelKey, labelValue := "testTyp", "testKey", "testValue" + nsConfig := NamespaceConfig{LeaderScheduleLimit: uint64(200)} + + c.Assert(s.svr.SetScheduleConfig(*scheduleCfg), IsNil) + c.Assert(s.svr.SetPDServerConfig(*pdServerCfg), IsNil) + c.Assert(s.svr.SetLabelProperty(typ, labelKey, labelValue), IsNil) + c.Assert(s.svr.SetNamespaceConfig("testNS", nsConfig), IsNil) + c.Assert(s.svr.SetReplicationConfig(*replicateCfg), IsNil) + + c.Assert(s.svr.GetReplicationConfig().MaxReplicas, Equals, uint64(5)) + c.Assert(s.svr.scheduleOpt.GetMaxSnapshotCount(), Equals, uint64(10)) + c.Assert(s.svr.scheduleOpt.loadPDServerConfig().UseRegionStorage, Equals, true) + c.Assert(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ][0].Key, Equals, "testKey") + c.Assert(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ][0].Value, Equals, "testValue") + c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(200)) + + c.Assert(s.svr.DeleteNamespaceConfig("testNS"), IsNil) + c.Assert(s.svr.DeleteLabelProperty(typ, labelKey, labelValue), IsNil) + + c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(0)) + c.Assert(len(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ]), Equals, 0) + + //PUT GET failed + oldKV := s.svr.kv + s.svr.kv = core.NewKV(&testErrorKV{}) + replicateCfg.MaxReplicas = 7 + scheduleCfg.MaxSnapshotCount = 20 + pdServerCfg.UseRegionStorage = false + + c.Assert(s.svr.SetScheduleConfig(*scheduleCfg), NotNil) + c.Assert(s.svr.SetReplicationConfig(*replicateCfg), NotNil) + c.Assert(s.svr.SetPDServerConfig(*pdServerCfg), NotNil) + c.Assert(s.svr.SetLabelProperty(typ, labelKey, labelValue), NotNil) + c.Assert(s.svr.SetNamespaceConfig("testNS", nsConfig), NotNil) + + c.Assert(s.svr.GetReplicationConfig().MaxReplicas, Equals, uint64(5)) + c.Assert(s.svr.scheduleOpt.GetMaxSnapshotCount(), Equals, uint64(10)) + c.Assert(s.svr.scheduleOpt.loadPDServerConfig().UseRegionStorage, Equals, true) + c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(0)) + c.Assert(len(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ]), Equals, 0) + + //DELETE failed + s.svr.kv = oldKV + c.Assert(s.svr.SetNamespaceConfig("testNS", nsConfig), IsNil) + c.Assert(s.svr.SetReplicationConfig(*replicateCfg), IsNil) + + s.svr.kv = core.NewKV(&testErrorKV{}) + c.Assert(s.svr.DeleteLabelProperty(typ, labelKey, labelValue), NotNil) + c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(200)) + c.Assert(s.svr.DeleteNamespaceConfig("testNS"), NotNil) + + c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(200)) + c.Assert(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ][0].Key, Equals, "testKey") + c.Assert(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ][0].Value, Equals, "testValue") +} diff --git a/server/option.go b/server/option.go index e51626cc102..9e301db710d 100644 --- a/server/option.go +++ b/server/option.go @@ -15,6 +15,7 @@ package server import ( "reflect" + "sync" "sync/atomic" "time" @@ -28,7 +29,7 @@ import ( type scheduleOption struct { v atomic.Value rep *Replication - ns map[string]*namespaceOption + ns sync.Map // concurrent map[string]*namespaceOption labelProperty atomic.Value clusterVersion atomic.Value pdServerConfig atomic.Value @@ -37,10 +38,10 @@ type scheduleOption struct { func newScheduleOption(cfg *Config) *scheduleOption { o := &scheduleOption{} o.store(&cfg.Schedule) - o.ns = make(map[string]*namespaceOption) + o.ns = sync.Map{} for name, nsCfg := range cfg.Namespace { nsCfg := nsCfg - o.ns[name] = newNamespaceOption(&nsCfg) + o.ns.Store(name, newNamespaceOption(&nsCfg)) } o.rep = newReplication(&cfg.Replication) o.pdServerConfig.Store(&cfg.PDServerCfg) @@ -61,8 +62,36 @@ func (o *scheduleOption) GetReplication() *Replication { return o.rep } +func (o *scheduleOption) getNS(name string) (*namespaceOption, bool) { + if n, ok := o.ns.Load(name); ok { + if n, ok := n.(*namespaceOption); ok { + return n, true + } + } + return nil, false +} + +func (o *scheduleOption) loadNSConfig() map[string]NamespaceConfig { + namespaces := make(map[string]NamespaceConfig) + f := func(k, v interface{}) bool { + var kstr string + var ok bool + if kstr, ok = k.(string); !ok { + return false + } + if ns, ok := v.(*namespaceOption); ok { + namespaces[kstr] = *ns.load() + return true + } + return false + } + o.ns.Range(f) + + return namespaces +} + func (o *scheduleOption) GetMaxReplicas(name string) int { - if n, ok := o.ns[name]; ok { + if n, ok := o.getNS(name); ok { return n.GetMaxReplicas() } return o.rep.GetMaxReplicas() @@ -105,35 +134,35 @@ func (o *scheduleOption) GetMaxStoreDownTime() time.Duration { } func (o *scheduleOption) GetLeaderScheduleLimit(name string) uint64 { - if n, ok := o.ns[name]; ok { + if n, ok := o.getNS(name); ok { return n.GetLeaderScheduleLimit() } return o.load().LeaderScheduleLimit } func (o *scheduleOption) GetRegionScheduleLimit(name string) uint64 { - if n, ok := o.ns[name]; ok { + if n, ok := o.getNS(name); ok { return n.GetRegionScheduleLimit() } return o.load().RegionScheduleLimit } func (o *scheduleOption) GetReplicaScheduleLimit(name string) uint64 { - if n, ok := o.ns[name]; ok { + if n, ok := o.getNS(name); ok { return n.GetReplicaScheduleLimit() } return o.load().ReplicaScheduleLimit } func (o *scheduleOption) GetMergeScheduleLimit(name string) uint64 { - if n, ok := o.ns[name]; ok { + if n, ok := o.getNS(name); ok { return n.GetMergeScheduleLimit() } return o.load().MergeScheduleLimit } func (o *scheduleOption) GetHotRegionScheduleLimit(name string) uint64 { - if n, ok := o.ns[name]; ok { + if n, ok := o.getNS(name); ok { return n.GetHotRegionScheduleLimit() } return o.load().HotRegionScheduleLimit @@ -272,10 +301,8 @@ func (o *scheduleOption) loadPDServerConfig() *PDServerConfig { } func (o *scheduleOption) persist(kv *core.KV) error { - namespaces := make(map[string]NamespaceConfig) - for name, ns := range o.ns { - namespaces[name] = *ns.load() - } + namespaces := o.loadNSConfig() + cfg := &Config{ Schedule: *o.load(), Replication: *o.rep.load(), @@ -289,10 +316,8 @@ func (o *scheduleOption) persist(kv *core.KV) error { } func (o *scheduleOption) reload(kv *core.KV) error { - namespaces := make(map[string]NamespaceConfig) - for name, ns := range o.ns { - namespaces[name] = *ns.load() - } + namespaces := o.loadNSConfig() + cfg := &Config{ Schedule: *o.load().clone(), Replication: *o.rep.load(), @@ -311,7 +336,7 @@ func (o *scheduleOption) reload(kv *core.KV) error { o.rep.store(&cfg.Replication) for name, nsCfg := range cfg.Namespace { nsCfg := nsCfg - o.ns[name] = newNamespaceOption(&nsCfg) + o.ns.Store(name, newNamespaceOption(&nsCfg)) } o.labelProperty.Store(cfg.LabelProperty) o.clusterVersion.Store(cfg.ClusterVersion) diff --git a/server/server.go b/server/server.go index 57a4c5d9d63..f538a17f1de 100644 --- a/server/server.go +++ b/server/server.go @@ -510,10 +510,8 @@ func (s *Server) GetConfig() *Config { cfg := s.cfg.clone() cfg.Schedule = *s.scheduleOpt.load() cfg.Replication = *s.scheduleOpt.rep.load() - namespaces := make(map[string]NamespaceConfig) - for name, opt := range s.scheduleOpt.ns { - namespaces[name] = *opt.load() - } + namespaces := s.scheduleOpt.loadNSConfig() + cfg.Namespace = namespaces cfg.LabelProperty = s.scheduleOpt.loadLabelPropertyConfig().clone() cfg.ClusterVersion = s.scheduleOpt.loadClusterVersion() @@ -536,6 +534,11 @@ func (s *Server) SetScheduleConfig(cfg ScheduleConfig) error { old := s.scheduleOpt.load() s.scheduleOpt.store(&cfg) if err := s.scheduleOpt.persist(s.kv); err != nil { + s.scheduleOpt.store(old) + log.Error("failed to update schedule config", + zap.Reflect("new", cfg), + zap.Reflect("old", old), + zap.Error(err)) return err } log.Info("schedule config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old)) @@ -557,6 +560,11 @@ func (s *Server) SetReplicationConfig(cfg ReplicationConfig) error { old := s.scheduleOpt.rep.load() s.scheduleOpt.rep.store(&cfg) if err := s.scheduleOpt.persist(s.kv); err != nil { + s.scheduleOpt.rep.store(old) + log.Error("failed to update replication config", + zap.Reflect("new", cfg), + zap.Reflect("old", old), + zap.Error(err)) return err } log.Info("replication config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old)) @@ -568,6 +576,11 @@ func (s *Server) SetPDServerConfig(cfg PDServerConfig) error { old := s.scheduleOpt.loadPDServerConfig() s.scheduleOpt.pdServerConfig.Store(&cfg) if err := s.scheduleOpt.persist(s.kv); err != nil { + s.scheduleOpt.pdServerConfig.Store(old) + log.Error("failed to update PDServer config", + zap.Reflect("new", cfg), + zap.Reflect("old", old), + zap.Error(err)) return err } log.Info("PD server config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old)) @@ -576,7 +589,7 @@ func (s *Server) SetPDServerConfig(cfg PDServerConfig) error { // GetNamespaceConfig get the namespace config. func (s *Server) GetNamespaceConfig(name string) *NamespaceConfig { - if _, ok := s.scheduleOpt.ns[name]; !ok { + if _, ok := s.scheduleOpt.getNS(name); !ok { return &NamespaceConfig{} } @@ -601,16 +614,27 @@ func (s *Server) GetNamespaceConfigWithAdjust(name string) *NamespaceConfig { // SetNamespaceConfig sets the namespace config. func (s *Server) SetNamespaceConfig(name string, cfg NamespaceConfig) error { - if n, ok := s.scheduleOpt.ns[name]; ok { - old := s.scheduleOpt.ns[name].load() + if n, ok := s.scheduleOpt.getNS(name); ok { + old := n.load() n.store(&cfg) if err := s.scheduleOpt.persist(s.kv); err != nil { + s.scheduleOpt.ns.Store(name, newNamespaceOption(old)) + log.Error("failed to update namespace config", + zap.String("name", name), + zap.Reflect("new", cfg), + zap.Reflect("old", old), + zap.Error(err)) return err } log.Info("namespace config is updated", zap.String("name", name), zap.Reflect("new", cfg), zap.Reflect("old", old)) } else { - s.scheduleOpt.ns[name] = newNamespaceOption(&cfg) + s.scheduleOpt.ns.Store(name, newNamespaceOption(&cfg)) if err := s.scheduleOpt.persist(s.kv); err != nil { + s.scheduleOpt.ns.Delete(name) + log.Error("failed to add namespace config", + zap.String("name", name), + zap.Reflect("new", cfg), + zap.Error(err)) return err } log.Info("namespace config is added", zap.String("name", name), zap.Reflect("new", cfg)) @@ -620,10 +644,14 @@ func (s *Server) SetNamespaceConfig(name string, cfg NamespaceConfig) error { // DeleteNamespaceConfig deletes the namespace config. func (s *Server) DeleteNamespaceConfig(name string) error { - if n, ok := s.scheduleOpt.ns[name]; ok { + if n, ok := s.scheduleOpt.getNS(name); ok { cfg := n.load() - delete(s.scheduleOpt.ns, name) + s.scheduleOpt.ns.Delete(name) if err := s.scheduleOpt.persist(s.kv); err != nil { + s.scheduleOpt.ns.Store(name, newNamespaceOption(cfg)) + log.Error("failed to delete namespace config", + zap.String("name", name), + zap.Error(err)) return err } log.Info("namespace config is deleted", zap.String("name", name), zap.Reflect("config", *cfg)) @@ -636,6 +664,13 @@ func (s *Server) SetLabelProperty(typ, labelKey, labelValue string) error { s.scheduleOpt.SetLabelProperty(typ, labelKey, labelValue) err := s.scheduleOpt.persist(s.kv) if err != nil { + s.scheduleOpt.DeleteLabelProperty(typ, labelKey, labelValue) + log.Error("failed to update label property config", + zap.String("typ", typ), + zap.String("labelKey", labelKey), + zap.String("labelValue", labelValue), + zap.Reflect("config", s.scheduleOpt.loadLabelPropertyConfig()), + zap.Error(err)) return err } log.Info("label property config is updated", zap.Reflect("config", s.scheduleOpt.loadLabelPropertyConfig())) @@ -647,6 +682,13 @@ func (s *Server) DeleteLabelProperty(typ, labelKey, labelValue string) error { s.scheduleOpt.DeleteLabelProperty(typ, labelKey, labelValue) err := s.scheduleOpt.persist(s.kv) if err != nil { + s.scheduleOpt.SetLabelProperty(typ, labelKey, labelValue) + log.Error("failed to delete label property config", + zap.String("typ", typ), + zap.String("labelKey", labelKey), + zap.String("labelValue", labelValue), + zap.Reflect("config", s.scheduleOpt.loadLabelPropertyConfig()), + zap.Error(err)) return err } log.Info("label property config is deleted", zap.Reflect("config", s.scheduleOpt.loadLabelPropertyConfig())) @@ -664,9 +706,15 @@ func (s *Server) SetClusterVersion(v string) error { if err != nil { return err } + old := s.scheduleOpt.loadClusterVersion() s.scheduleOpt.SetClusterVersion(*version) err = s.scheduleOpt.persist(s.kv) if err != nil { + s.scheduleOpt.SetClusterVersion(old) + log.Error("failed to update cluster version", + zap.String("old-version", old.String()), + zap.String("new-version", v), + zap.Error(err)) return err } log.Info("cluster version is updated", zap.String("new-version", v))