diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index 48ec0917e25..ac693f38e49 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -36,6 +36,7 @@ import ( "github.com/tikv/pd/pkg/utils/configutil" "github.com/tikv/pd/pkg/utils/grpcutil" "github.com/tikv/pd/pkg/utils/metricutil" + "github.com/tikv/pd/server/config" "go.uber.org/zap" ) @@ -193,6 +194,7 @@ type PersistConfig struct { clusterVersion unsafe.Pointer schedule atomic.Value replication atomic.Value + storeConfig atomic.Value } // NewPersistConfig creates a new PersistConfig instance. @@ -201,6 +203,9 @@ func NewPersistConfig(cfg *Config) *PersistConfig { o.SetClusterVersion(&cfg.ClusterVersion) o.schedule.Store(&cfg.Schedule) o.replication.Store(&cfg.Replication) + // storeConfig will be fetched from TiKV by PD API server, + // so we just set an empty value here first. + o.storeConfig.Store(&config.StoreConfig{}) return o } @@ -234,6 +239,19 @@ func (o *PersistConfig) SetReplicationConfig(cfg *sc.ReplicationConfig) { o.replication.Store(cfg) } +// SetStoreConfig sets the TiKV store configuration. +func (o *PersistConfig) SetStoreConfig(cfg *config.StoreConfig) { + // Some of the fields won't be persisted and watched, + // so we need to adjust it here before storing it. + cfg.Adjust() + o.storeConfig.Store(cfg) +} + +// GetStoreConfig returns the TiKV store configuration. +func (o *PersistConfig) GetStoreConfig() *config.StoreConfig { + return o.storeConfig.Load().(*config.StoreConfig) +} + // GetMaxReplicas returns the max replicas. func (o *PersistConfig) GetMaxReplicas() int { return int(o.GetReplicationConfig().MaxReplicas) diff --git a/pkg/mcs/scheduling/server/config/watcher.go b/pkg/mcs/scheduling/server/config/watcher.go index fdf957b72c5..32b8c6cc508 100644 --- a/pkg/mcs/scheduling/server/config/watcher.go +++ b/pkg/mcs/scheduling/server/config/watcher.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/log" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/server/config" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" @@ -42,9 +43,10 @@ type Watcher struct { } type persistedConfig struct { + ClusterVersion semver.Version `json:"cluster-version"` Schedule sc.ScheduleConfig `json:"schedule"` Replication sc.ReplicationConfig `json:"replication"` - ClusterVersion semver.Version `json:"cluster-version"` + Store config.StoreConfig `json:"store"` } // NewWatcher creates a new watcher to watch the config meta change from PD API server. @@ -71,9 +73,10 @@ func NewWatcher( zap.String("event-kv-key", string(kv.Key)), zap.Error(err)) return err } + cw.SetClusterVersion(&cfg.ClusterVersion) cw.SetScheduleConfig(&cfg.Schedule) cw.SetReplicationConfig(&cfg.Replication) - cw.SetClusterVersion(&cfg.ClusterVersion) + cw.SetStoreConfig(&cfg.Store) return nil } deleteFn := func(kv *mvccpb.KeyValue) error { diff --git a/tests/integrations/mcs/scheduling/config_test.go b/tests/integrations/mcs/scheduling/config_test.go index 29f99c66f4c..17cb422f0ae 100644 --- a/tests/integrations/mcs/scheduling/config_test.go +++ b/tests/integrations/mcs/scheduling/config_test.go @@ -25,6 +25,7 @@ import ( "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/versioninfo" + severcfg "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" ) @@ -81,17 +82,31 @@ func (suite *configTestSuite) TestConfigWatch() { re.Equal(sc.DefaultSplitMergeInterval, watcher.GetScheduleConfig().SplitMergeInterval.Duration) re.Equal("0.0.0", watcher.GetClusterVersion().String()) // Update the config and check if the scheduling config watcher can get the latest value. - suite.pdLeaderServer.GetPersistOptions().SetMaxReplicas(5) + persistOpts := suite.pdLeaderServer.GetPersistOptions() + persistOpts.SetMaxReplicas(5) persistConfig(re, suite.pdLeaderServer) testutil.Eventually(re, func() bool { return watcher.GetReplicationConfig().MaxReplicas == 5 }) - suite.pdLeaderServer.GetPersistOptions().SetSplitMergeInterval(2 * sc.DefaultSplitMergeInterval) + persistOpts.SetSplitMergeInterval(2 * sc.DefaultSplitMergeInterval) persistConfig(re, suite.pdLeaderServer) testutil.Eventually(re, func() bool { return watcher.GetScheduleConfig().SplitMergeInterval.Duration == 2*sc.DefaultSplitMergeInterval }) - suite.pdLeaderServer.GetPersistOptions().SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) + persistOpts.SetStoreConfig(&severcfg.StoreConfig{ + Coprocessor: severcfg.Coprocessor{ + RegionMaxSize: "144MiB", + }, + Storage: severcfg.Storage{ + Engine: severcfg.RaftstoreV2, + }, + }) + persistConfig(re, suite.pdLeaderServer) + testutil.Eventually(re, func() bool { + return watcher.GetStoreConfig().GetRegionMaxSize() == 144 && + watcher.GetStoreConfig().IsRaftKV2() + }) + persistOpts.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) persistConfig(re, suite.pdLeaderServer) testutil.Eventually(re, func() bool { return watcher.GetClusterVersion().String() == "4.0.0"