Skip to content

Commit

Permalink
add a config to manage tso switch
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Oct 25, 2024
1 parent 1474864 commit 57e3218
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 5 deletions.
25 changes: 24 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,11 +403,30 @@ func (c *RaftCluster) checkSchedulingService() {
// checkTSOService checks the TSO service.
func (c *RaftCluster) checkTSOService() {
if c.isAPIServiceMode {
if c.opt.GetMicroServiceConfig().IsTSODynamicSwitchingEnabled() {
servers, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), constant.TSOServiceName)
if err != nil || len(servers) == 0 {
if err := c.startTSOJobs(); err != nil {
log.Error("failed to start TSO jobs", errs.ZapError(err))
return
}
log.Info("TSO is provided by PD")
c.UnsetServiceIndependent(constant.TSOServiceName)
} else {
if err := c.stopTSOJobs(); err != nil {
log.Error("failed to stop TSO jobs", errs.ZapError(err))
return
}
log.Info("TSO is provided by TSO server")
if !c.IsServiceIndependent(constant.TSOServiceName) {
c.SetServiceIndependent(constant.TSOServiceName)
}
}
}
return
}

if err := c.startTSOJobs(); err != nil {
// If there is an error, need to wait for the next check.
log.Error("failed to start TSO jobs", errs.ZapError(err))
return
}
Expand All @@ -422,6 +441,8 @@ func (c *RaftCluster) runServiceCheckJob() {
schedulingTicker.Reset(time.Millisecond)
})
defer schedulingTicker.Stop()
tsoTicker := time.NewTicker(tsoServiceCheckInterval)
defer tsoTicker.Stop()

for {
select {
Expand All @@ -430,6 +451,8 @@ func (c *RaftCluster) runServiceCheckJob() {
return
case <-schedulingTicker.C:
c.checkSchedulingService()
case <-tsoTicker.C:
c.checkTSOService()
}
}
}
Expand Down
14 changes: 12 additions & 2 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ const (
minCheckRegionSplitInterval = 1 * time.Millisecond
maxCheckRegionSplitInterval = 100 * time.Millisecond

defaultEnableSchedulingFallback = true
defaultEnableSchedulingFallback = true
defaultEnableTSODynamicSwitching = false
)

// Special keys for Labels
Expand Down Expand Up @@ -854,13 +855,17 @@ func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) {

// MicroServiceConfig is the configuration for micro service.
type MicroServiceConfig struct {
EnableSchedulingFallback bool `toml:"enable-scheduling-fallback" json:"enable-scheduling-fallback,string"`
EnableSchedulingFallback bool `toml:"enable-scheduling-fallback" json:"enable-scheduling-fallback,string"`
EnableTSODynamicSwitching bool `toml:"enable-tso-dynamic-switching" json:"enable-tso-dynamic-switching,string"`
}

func (c *MicroServiceConfig) adjust(meta *configutil.ConfigMetaData) {
if !meta.IsDefined("enable-scheduling-fallback") {
c.EnableSchedulingFallback = defaultEnableSchedulingFallback
}
if !meta.IsDefined("enable-dynamic-tso") {
c.EnableTSODynamicSwitching = defaultEnableTSODynamicSwitching
}
}

// Clone returns a copy of micro service config.
Expand All @@ -874,6 +879,11 @@ func (c *MicroServiceConfig) IsSchedulingFallbackEnabled() bool {
return c.EnableSchedulingFallback
}

// IsTSODynamicSwitchingEnabled returns whether to enable TSO dynamic switching.
func (c *MicroServiceConfig) IsTSODynamicSwitchingEnabled() bool {
return c.EnableTSODynamicSwitching
}

// KeyspaceConfig is the configuration for keyspace management.
type KeyspaceConfig struct {
// PreAlloc contains the keyspace to be allocated during keyspace manager initialization.
Expand Down
3 changes: 1 addition & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1420,8 +1420,7 @@ func (s *Server) GetRaftCluster() *cluster.RaftCluster {
// IsServiceIndependent returns whether the service is independent.
func (s *Server) IsServiceIndependent(name string) bool {
if s.mode == APIServiceMode && !s.IsClosed() {
// TODO: remove it after we support tso discovery
if name == constant.TSOServiceName {
if name == constant.TSOServiceName && !s.GetMicroServiceConfig().IsTSODynamicSwitchingEnabled() {
return true
}
return s.cluster.IsServiceIndependent(name)
Expand Down

0 comments on commit 57e3218

Please sign in to comment.