Skip to content

Commit

Permalink
*: support configuring tso switch (#8755)
Browse files Browse the repository at this point in the history
close #8477

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] authored Nov 6, 2024
1 parent cfd8f34 commit b27f021
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 14 deletions.
3 changes: 3 additions & 0 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,9 @@ func (c *pdServiceDiscovery) updateServiceModeLoop() {
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
ticker := time.NewTicker(serviceModeUpdateInterval)
failpoint.Inject("fastUpdateServiceMode", func() {
ticker.Reset(10 * time.Millisecond)
})
defer ticker.Stop()

for {
Expand Down
15 changes: 7 additions & 8 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,6 @@ const (
heartbeatTaskRunner = "heartbeat-async"
miscTaskRunner = "misc-async"
logTaskRunner = "log-async"

// TODO: make it configurable
IsTSODynamicSwitchingEnabled = false
)

// Server is the interface for cluster.
Expand Down Expand Up @@ -412,22 +409,24 @@ func (c *RaftCluster) checkSchedulingService() {
// checkTSOService checks the TSO service.
func (c *RaftCluster) checkTSOService() {
if c.isAPIServiceMode {
if IsTSODynamicSwitchingEnabled {
if c.opt.GetMicroServiceConfig().IsTSODynamicSwitchingEnabled() {
servers, err := discovery.Discover(c.etcdClient, constant.TSOServiceName)
if err != nil || len(servers) == 0 {
if err := c.startTSOJobsIfNeeded(); err != nil {
log.Error("failed to start TSO jobs", errs.ZapError(err))
return
}
log.Info("TSO is provided by PD")
c.UnsetServiceIndependent(constant.TSOServiceName)
if c.IsServiceIndependent(constant.TSOServiceName) {
log.Info("TSO is provided by PD")
c.UnsetServiceIndependent(constant.TSOServiceName)
}
} else {
if err := c.startTSOJobsIfNeeded(); err != nil {
if err := c.stopTSOJobsIfNeeded(); err != nil {
log.Error("failed to stop TSO jobs", errs.ZapError(err))
return
}
log.Info("TSO is provided by TSO server")
if !c.IsServiceIndependent(constant.TSOServiceName) {
log.Info("TSO is provided by TSO server")
c.SetServiceIndependent(constant.TSOServiceName)
}
}
Expand Down
14 changes: 12 additions & 2 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ const (
minCheckRegionSplitInterval = 1 * time.Millisecond
maxCheckRegionSplitInterval = 100 * time.Millisecond

defaultEnableSchedulingFallback = true
defaultEnableSchedulingFallback = true
defaultEnableTSODynamicSwitching = false
)

// Special keys for Labels
Expand Down Expand Up @@ -854,13 +855,17 @@ func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) {

// MicroServiceConfig is the configuration for micro service.
type MicroServiceConfig struct {
EnableSchedulingFallback bool `toml:"enable-scheduling-fallback" json:"enable-scheduling-fallback,string"`
EnableSchedulingFallback bool `toml:"enable-scheduling-fallback" json:"enable-scheduling-fallback,string"`
EnableTSODynamicSwitching bool `toml:"enable-tso-dynamic-switching" json:"enable-tso-dynamic-switching,string"`
}

func (c *MicroServiceConfig) adjust(meta *configutil.ConfigMetaData) {
if !meta.IsDefined("enable-scheduling-fallback") {
c.EnableSchedulingFallback = defaultEnableSchedulingFallback
}
if !meta.IsDefined("enable-tso-dynamic-switching") {
c.EnableTSODynamicSwitching = defaultEnableTSODynamicSwitching
}
}

// Clone returns a copy of micro service config.
Expand All @@ -874,6 +879,11 @@ func (c *MicroServiceConfig) IsSchedulingFallbackEnabled() bool {
return c.EnableSchedulingFallback
}

// IsTSODynamicSwitchingEnabled returns whether to enable TSO dynamic switching.
func (c *MicroServiceConfig) IsTSODynamicSwitchingEnabled() bool {
return c.EnableTSODynamicSwitching
}

// KeyspaceConfig is the configuration for keyspace management.
type KeyspaceConfig struct {
// PreAlloc contains the keyspace to be allocated during keyspace manager initialization.
Expand Down
2 changes: 1 addition & 1 deletion server/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func (s *GrpcServer) isLocalRequest(host string) bool {
}

func (s *GrpcServer) getGlobalTSO(ctx context.Context) (pdpb.Timestamp, error) {
if !s.IsAPIServiceMode() {
if !s.IsServiceIndependent(constant.TSOServiceName) {
return s.tsoAllocatorManager.HandleRequest(ctx, tso.GlobalDCLocation, 1)
}
request := &tsopb.TsoRequest{
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1411,7 +1411,7 @@ func (s *Server) GetRaftCluster() *cluster.RaftCluster {
// IsServiceIndependent returns whether the service is independent.
func (s *Server) IsServiceIndependent(name string) bool {
if s.mode == APIServiceMode && !s.IsClosed() {
if name == constant.TSOServiceName && !cluster.IsTSODynamicSwitchingEnabled {
if name == constant.TSOServiceName && !s.GetMicroServiceConfig().IsTSODynamicSwitchingEnabled() {
return true
}
return s.cluster.IsServiceIndependent(name)
Expand Down
3 changes: 3 additions & 0 deletions tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ func TestTSOFollowerProxy(t *testing.T) {

func TestTSOFollowerProxyWithTSOService(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/client/fastUpdateServiceMode", `return(true)`))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestAPICluster(ctx, 1)
Expand All @@ -405,12 +406,14 @@ func TestTSOFollowerProxyWithTSOService(t *testing.T) {
tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, backendEndpoints)
re.NoError(err)
defer tsoCluster.Destroy()
time.Sleep(100 * time.Millisecond)
cli := mcs.SetupClientWithKeyspaceID(ctx, re, constant.DefaultKeyspaceID, strings.Split(backendEndpoints, ","))
re.NotNil(cli)
defer cli.Close()
// TSO service does not support the follower proxy, so enabling it should fail.
err = cli.UpdateOption(pd.EnableTSOFollowerProxy, true)
re.Error(err)
re.NoError(failpoint.Disable("github.com/tikv/pd/client/fastUpdateServiceMode"))
}

// TestUnavailableTimeAfterLeaderIsReady is used to test https://github.com/tikv/pd/issues/5207
Expand Down
106 changes: 105 additions & 1 deletion tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ import (
tsoapi "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/storage/endpoint"
tsopkg "github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/integrations/mcs"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -163,7 +165,9 @@ func checkTSOPath(re *require.Assertions, isAPIServiceMode bool) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if isAPIServiceMode {
cluster, err = tests.NewTestAPICluster(ctx, 1)
cluster, err = tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, _ string) {
conf.MicroService.EnableTSODynamicSwitching = false
})
} else {
cluster, err = tests.NewTestCluster(ctx, 1)
}
Expand Down Expand Up @@ -267,6 +271,10 @@ func TestForwardTSORelated(t *testing.T) {
re := require.New(t)
suite := NewAPIServerForward(re)
defer suite.ShutDown()
leaderServer := suite.cluster.GetLeaderServer().GetServer()
cfg := leaderServer.GetMicroServiceConfig().Clone()
cfg.EnableTSODynamicSwitching = false
leaderServer.SetMicroServiceConfig(*cfg)
// Unable to use the tso-related interface without tso server
suite.checkUnavailableTSO(re)
tc, err := tests.NewTestTSOCluster(suite.ctx, 1, suite.backendEndpoints)
Expand Down Expand Up @@ -575,3 +583,99 @@ func (suite *CommonTestSuite) TestBootstrapDefaultKeyspaceGroup() {
suite.pdLeader.ResignLeader()
suite.pdLeader = suite.cluster.GetServer(suite.cluster.WaitLeader())
}

// TestTSOServiceSwitch tests the behavior of TSO service switching when `EnableTSODynamicSwitching` is enabled.
// Initially, the TSO service should be provided by PD. After starting a TSO server, the service should switch to the TSO server.
// When the TSO server is stopped, the PD should resume providing the TSO service if `EnableTSODynamicSwitching` is enabled.
// If `EnableTSODynamicSwitching` is disabled, the PD should not provide TSO service after the TSO server is stopped.
func TestTSOServiceSwitch(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/client/fastUpdateServiceMode", `return(true)`))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

tc, err := tests.NewTestAPICluster(ctx, 1,
func(conf *config.Config, _ string) {
conf.MicroService.EnableTSODynamicSwitching = true
},
)
re.NoError(err)
defer tc.Destroy()

err = tc.RunInitialServers()
re.NoError(err)
leaderName := tc.WaitLeader()
re.NotEmpty(leaderName)
pdLeader := tc.GetServer(leaderName)
backendEndpoints := pdLeader.GetAddr()
re.NoError(pdLeader.BootstrapCluster())
pdClient, err := pd.NewClientWithContext(ctx, []string{backendEndpoints}, pd.SecurityOption{})
re.NoError(err)
re.NotNil(pdClient)
defer pdClient.Close()

var globalLastTS uint64
// Initially, TSO service should be provided by PD
re.NoError(checkTSOMonotonic(ctx, pdClient, &globalLastTS, 10))

// Start TSO server
tsoCluster, err := tests.NewTestTSOCluster(ctx, 1, pdLeader.GetAddr())
re.NoError(err)
tsoCluster.WaitForDefaultPrimaryServing(re)

// Verify PD is not providing TSO service
testutil.Eventually(re, func() bool {
allocator, err := pdLeader.GetServer().GetTSOAllocatorManager().GetAllocator(tsopkg.GlobalDCLocation)
if err != nil {
return false
}
return !allocator.IsInitialize()
})

err = checkTSOMonotonic(ctx, pdClient, &globalLastTS, 10)
re.NoError(err)

// Disable TSO switching
cfg := pdLeader.GetServer().GetMicroServiceConfig().Clone()
cfg.EnableTSODynamicSwitching = false
pdLeader.GetServer().SetMicroServiceConfig(*cfg)

tsoCluster.Destroy()

// Wait for the configuration change to take effect
time.Sleep(300 * time.Millisecond)
// Verify PD is not providing TSO service multiple times
for range 10 {
err = checkTSOMonotonic(ctx, pdClient, &globalLastTS, 1)
re.Error(err, "TSO service should not be available")
time.Sleep(10 * time.Millisecond)
}

// Now enable TSO switching
cfg = pdLeader.GetServer().GetMicroServiceConfig().Clone()

cfg.EnableTSODynamicSwitching = true
pdLeader.GetServer().SetMicroServiceConfig(*cfg)

// Wait for PD to detect the change
time.Sleep(300 * time.Millisecond)

// Verify PD is now providing TSO service and timestamps are monotonically increasing
re.NoError(checkTSOMonotonic(ctx, pdClient, &globalLastTS, 10))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/fastUpdateServiceMode"))
}

func checkTSOMonotonic(ctx context.Context, pdClient pd.Client, globalLastTS *uint64, count int) error {
for range count {
physical, logical, err := pdClient.GetTS(ctx)
if err != nil {
return err
}
ts := (uint64(physical) << 18) + uint64(logical)
if ts <= *globalLastTS {
return fmt.Errorf("TSO is not globally increasing: last %d, current %d", globalLastTS, ts)
}
*globalLastTS = ts
}
return nil
}
5 changes: 4 additions & 1 deletion tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/server/apiv2/handlers"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/integrations/mcs"
handlersutil "github.com/tikv/pd/tests/server/apiv2/handlers"
Expand Down Expand Up @@ -91,7 +92,9 @@ func (suite *tsoClientTestSuite) SetupSuite() {
if suite.legacy {
suite.cluster, err = tests.NewTestCluster(suite.ctx, serverCount)
} else {
suite.cluster, err = tests.NewTestAPICluster(suite.ctx, serverCount)
suite.cluster, err = tests.NewTestAPICluster(suite.ctx, serverCount, func(conf *config.Config, _ string) {
conf.MicroService.EnableTSODynamicSwitching = false
})
}
re.NoError(err)
err = suite.cluster.RunInitialServers()
Expand Down

0 comments on commit b27f021

Please sign in to comment.