diff --git a/pkg/basic_server/basic_server.go b/pkg/basic_server/basic_server.go index b5758485718..221b3091085 100644 --- a/pkg/basic_server/basic_server.go +++ b/pkg/basic_server/basic_server.go @@ -35,6 +35,6 @@ type Server interface { // GetClient returns builtin etcd client. GetClient() *clientv3.Client - // GetHTTPClient returns builtin etcd client. + // GetHTTPClient returns builtin http client. GetHTTPClient() *http.Client } diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go new file mode 100644 index 00000000000..a738ae8768c --- /dev/null +++ b/pkg/mcs/tso/server/server.go @@ -0,0 +1,123 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "context" + "flag" + "net/http" + "os" + + grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/pingcap/errors" + "github.com/pingcap/log" + basicsvr "github.com/tikv/pd/pkg/basic_server" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/tso" + "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/metricutil" + "go.etcd.io/etcd/clientv3" +) + +// If server doesn't implement all methods of basicsvr.Server, this line will result in a clear +// error message like "*Server does not implement basicsvr.Server (missing Method method)" +var _ basicsvr.Server = (*Server)(nil) + +// Server is the TSO server, and it implements basicsvr.Server. +// nolint +type Server struct { + ctx context.Context +} + +// TODO: Implement the following methods defined in basicsvr.Server + +// Name returns the unique etcd Name for this server in etcd cluster. +func (s *Server) Name() string { + return "" +} + +// Context returns the context of server. +func (s *Server) Context() context.Context { + return s.ctx +} + +// Run runs the pd server. +func (s *Server) Run() error { + return nil +} + +// Close closes the server. +func (s *Server) Close() { +} + +// GetClient returns builtin etcd client. +func (s *Server) GetClient() *clientv3.Client { + return nil +} + +// GetHTTPClient returns builtin http client. +func (s *Server) GetHTTPClient() *http.Client { + return nil +} + +// CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server +func CreateServerWrapper(args []string) (context.Context, context.CancelFunc, basicsvr.Server) { + cfg := tso.NewConfig() + err := cfg.Parse(os.Args[1:]) + + if cfg.Version { + printVersionInfo() + exit(0) + } + + defer logutil.LogPanic() + + switch errors.Cause(err) { + case nil: + case flag.ErrHelp: + exit(0) + default: + log.Fatal("parse cmd flags error", errs.ZapError(err)) + } + + if cfg.ConfigCheck { + printConfigCheckMsg(cfg) + exit(0) + } + + // TODO: Initialize logger + + // TODO: Make it configurable if it has big impact on performance. + grpcprometheus.EnableHandlingTimeHistogram() + + metricutil.Push(&cfg.Metric) + + // TODO: Create the server + + return nil, nil, nil +} + +// TODO: implement it +func printVersionInfo() { +} + +// TODO: implement it +func printConfigCheckMsg(cfg *tso.Config) { +} + +func exit(code int) { + log.Sync() + os.Exit(code) +} diff --git a/pkg/tso/config.go b/pkg/tso/config.go index a03c1f45e0b..baf1fdc0062 100644 --- a/pkg/tso/config.go +++ b/pkg/tso/config.go @@ -18,6 +18,8 @@ import ( "flag" "time" + "github.com/pingcap/errors" + "github.com/tikv/pd/pkg/utils/metricutil" "github.com/tikv/pd/pkg/utils/typeutil" ) @@ -30,7 +32,11 @@ const ( type Config struct { flagSet *flag.FlagSet - configFile string + Version bool `json:"-"` + + ConfigCheck bool `json:"-"` + configFile string + // EnableLocalTSO is used to enable the Local TSO Allocator feature, // which allows the PD server to generate Local TSO for certain DC-level transactions. // To make this feature meaningful, user has to set the "zone" label for the PD server @@ -46,6 +52,11 @@ type Config struct { // This config is only valid in 1ms to 10s. If it's configured too long or too short, it will // be automatically clamped to the range. TSOUpdatePhysicalInterval typeutil.Duration `toml:"tso-update-physical-interval" json:"tso-update-physical-interval"` + + // MaxResetTSGap is the max gap to reset the TSO. + MaxResetTSGap typeutil.Duration `toml:"max-gap-reset-ts" json:"max-gap-reset-ts"` + + Metric metricutil.MetricConfig `toml:"metric" json:"metric"` } // NewConfig creates a new config. @@ -58,3 +69,16 @@ func NewConfig() *Config { return cfg } + +// Parse parses flag definitions from the argument list. +func (c *Config) Parse(arguments []string) error { + // Parse first to get config file. + err := c.flagSet.Parse(arguments) + if err != nil { + return errors.WithStack(err) + } + + // TODO: Implement the main function body + + return nil +} diff --git a/pkg/utils/configutil/configutil.go b/pkg/utils/configutil/configutil.go new file mode 100644 index 00000000000..01a765bcf41 --- /dev/null +++ b/pkg/utils/configutil/configutil.go @@ -0,0 +1,68 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package configutil + +import ( + "errors" + + "github.com/BurntSushi/toml" +) + +// ConfigMetaData is an utility to test if a configuration is defined. +type ConfigMetaData struct { + meta *toml.MetaData + path []string +} + +// NewConfigMetadata is the a factory method to create a ConfigMetaData object +func NewConfigMetadata(meta *toml.MetaData) *ConfigMetaData { + return &ConfigMetaData{meta: meta} +} + +// IsDefined checks if the given key is defined in the configuration +func (m *ConfigMetaData) IsDefined(key string) bool { + if m.meta == nil { + return false + } + keys := append([]string(nil), m.path...) + keys = append(keys, key) + return m.meta.IsDefined(keys...) +} + +// Child gets the config metadata of the given path +func (m *ConfigMetaData) Child(path ...string) *ConfigMetaData { + newPath := append([]string(nil), m.path...) + newPath = append(newPath, path...) + return &ConfigMetaData{ + meta: m.meta, + path: newPath, + } +} + +// CheckUndecoded checks if the configuration contains undefined items +func (m *ConfigMetaData) CheckUndecoded() error { + if m.meta == nil { + return nil + } + undecoded := m.meta.Undecoded() + if len(undecoded) == 0 { + return nil + } + errInfo := "Config contains undefined item: " + for _, key := range undecoded { + errInfo += key.String() + ", " + } + return errors.New(errInfo[:len(errInfo)-2]) +} diff --git a/server/config/config.go b/server/config/config.go index bf3dc8b8772..14198f27789 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -30,6 +30,7 @@ import ( "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/encryption" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/utils/configutil" "github.com/tikv/pd/pkg/utils/grpcutil" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/metricutil" @@ -459,7 +460,7 @@ func (c *Config) Validate() error { // Adjust is used to adjust the PD configurations. func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error { - configMetaData := newConfigMetadata(meta) + configMetaData := configutil.NewConfigMetadata(meta) if err := configMetaData.CheckUndecoded(); err != nil { c.WarningMsgs = append(c.WarningMsgs, err.Error()) } @@ -570,50 +571,7 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error { return nil } -// Utility to test if a configuration is defined. -type configMetaData struct { - meta *toml.MetaData - path []string -} - -func newConfigMetadata(meta *toml.MetaData) *configMetaData { - return &configMetaData{meta: meta} -} - -func (m *configMetaData) IsDefined(key string) bool { - if m.meta == nil { - return false - } - keys := append([]string(nil), m.path...) - keys = append(keys, key) - return m.meta.IsDefined(keys...) -} - -func (m *configMetaData) Child(path ...string) *configMetaData { - newPath := append([]string(nil), m.path...) - newPath = append(newPath, path...) - return &configMetaData{ - meta: m.meta, - path: newPath, - } -} - -func (m *configMetaData) CheckUndecoded() error { - if m.meta == nil { - return nil - } - undecoded := m.meta.Undecoded() - if len(undecoded) == 0 { - return nil - } - errInfo := "Config contains undefined item: " - for _, key := range undecoded { - errInfo += key.String() + ", " - } - return errors.New(errInfo[:len(errInfo)-2]) -} - -func (c *Config) adjustLog(meta *configMetaData) { +func (c *Config) adjustLog(meta *configutil.ConfigMetaData) { if !meta.IsDefined("disable-error-verbose") { c.Log.DisableErrorVerbose = defaultDisableErrorVerbose } @@ -840,7 +798,7 @@ const ( defaultSlowStoreEvictingAffectedStoreRatioThreshold = 0.3 ) -func (c *ScheduleConfig) adjust(meta *configMetaData, reloading bool) error { +func (c *ScheduleConfig) adjust(meta *configutil.ConfigMetaData, reloading bool) error { if !meta.IsDefined("max-snapshot-count") { adjustUint64(&c.MaxSnapshotCount, defaultMaxSnapshotCount) } @@ -961,7 +919,7 @@ func (c *ScheduleConfig) GetMaxMergeRegionKeys() uint64 { return c.MaxMergeRegionSize * 10000 } -func (c *ScheduleConfig) parseDeprecatedFlag(meta *configMetaData, name string, old, new bool) (bool, error) { +func (c *ScheduleConfig) parseDeprecatedFlag(meta *configutil.ConfigMetaData, name string, old, new bool) (bool, error) { oldName, newName := "disable-"+name, "enable-"+name defineOld, defineNew := meta.IsDefined(oldName), meta.IsDefined(newName) switch { @@ -1146,7 +1104,7 @@ func (c *ReplicationConfig) Validate() error { return nil } -func (c *ReplicationConfig) adjust(meta *configMetaData) error { +func (c *ReplicationConfig) adjust(meta *configutil.ConfigMetaData) error { adjustUint64(&c.MaxReplicas, defaultMaxReplicas) if !meta.IsDefined("enable-placement-rules") { c.EnablePlacementRules = defaultEnablePlacementRules @@ -1186,7 +1144,7 @@ type PDServerConfig struct { MinResolvedTSPersistenceInterval typeutil.Duration `toml:"min-resolved-ts-persistence-interval" json:"min-resolved-ts-persistence-interval"` } -func (c *PDServerConfig) adjust(meta *configMetaData) error { +func (c *PDServerConfig) adjust(meta *configutil.ConfigMetaData) error { adjustDuration(&c.MaxResetTSGap, defaultMaxResetTSGap) if !meta.IsDefined("use-region-storage") { c.UseRegionStorage = defaultUseRegionStorage @@ -1213,7 +1171,7 @@ func (c *PDServerConfig) adjust(meta *configMetaData) error { return c.Validate() } -func (c *PDServerConfig) migrateConfigurationFromFile(meta *configMetaData) error { +func (c *PDServerConfig) migrateConfigurationFromFile(meta *configutil.ConfigMetaData) error { oldName, newName := "trace-region-flow", "flow-round-by-digit" defineOld, defineNew := meta.IsDefined(oldName), meta.IsDefined(newName) switch { @@ -1431,7 +1389,7 @@ func (c *DashboardConfig) ToTiDBTLSConfig() (*tls.Config, error) { return nil, nil } -func (c *DashboardConfig) adjust(meta *configMetaData) { +func (c *DashboardConfig) adjust(meta *configutil.ConfigMetaData) { if !meta.IsDefined("enable-telemetry") { c.EnableTelemetry = defaultEnableTelemetry } @@ -1451,7 +1409,7 @@ func (c *ReplicationModeConfig) Clone() *ReplicationModeConfig { return &cfg } -func (c *ReplicationModeConfig) adjust(meta *configMetaData) { +func (c *ReplicationModeConfig) adjust(meta *configutil.ConfigMetaData) { if !meta.IsDefined("replication-mode") || NormalizeReplicationMode(c.ReplicationMode) == "" { c.ReplicationMode = "majority" } @@ -1479,7 +1437,7 @@ type DRAutoSyncReplicationConfig struct { PauseRegionSplit bool `toml:"pause-region-split" json:"pause-region-split,string"` } -func (c *DRAutoSyncReplicationConfig) adjust(meta *configMetaData) { +func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) { if !meta.IsDefined("wait-store-timeout") { c.WaitStoreTimeout = typeutil.NewDuration(defaultDRWaitStoreTimeout) } diff --git a/server/config/config_test.go b/server/config/config_test.go index 81c937717cc..a767b71a443 100644 --- a/server/config/config_test.go +++ b/server/config/config_test.go @@ -26,6 +26,7 @@ import ( "github.com/BurntSushi/toml" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/storage" + "github.com/tikv/pd/pkg/utils/configutil" ) func TestSecurity(t *testing.T) { @@ -494,7 +495,7 @@ func TestConfigClone(t *testing.T) { cfg.Adjust(nil, false) re.Equal(cfg, cfg.Clone()) - emptyConfigMetaData := newConfigMetadata(nil) + emptyConfigMetaData := configutil.NewConfigMetadata(nil) schedule := &ScheduleConfig{} schedule.adjust(emptyConfigMetaData, false) diff --git a/server/server.go b/server/server.go index 2272e69db1d..47e692c99ac 100644 --- a/server/server.go +++ b/server/server.go @@ -40,6 +40,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/sysutil" "github.com/tikv/pd/pkg/audit" + basicsvr "github.com/tikv/pd/pkg/basic_server" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/encryption" "github.com/tikv/pd/pkg/errs" @@ -99,7 +100,11 @@ var ( etcdCommittedIndexGauge = etcdStateGauge.WithLabelValues("committedIndex") ) -// Server is the pd server. +// if server doesn't implement all methods of basicsvr.Server, this line will result in +// clear error message "*Server does not implement basicsvr.Server (missing Method method)" +var _ basicsvr.Server = (*Server)(nil) + +// Server is the pd server. It implements basicsvr.Server // nolint type Server struct { diagnosticspb.DiagnosticsServer @@ -373,6 +378,7 @@ func (s *Server) startServer(ctx context.Context) error { Label: idAllocLabel, Member: s.member.MemberValue(), }) + s.tsoAllocatorManager = tso.NewAllocatorManager( s.member, s.rootPath, s.cfg.IsLocalTSOEnabled(), s.cfg.GetTSOSaveInterval(), s.cfg.GetTSOUpdatePhysicalInterval(), s.cfg.GetTLSConfig(), func() time.Duration { return s.persistOptions.GetMaxResetTSGap() }) @@ -731,7 +737,7 @@ func (s *Server) GetClient() *clientv3.Client { return s.client } -// GetHTTPClient returns builtin etcd client. +// GetHTTPClient returns builtin http client. func (s *Server) GetHTTPClient() *http.Client { return s.httpClient }