diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index dcf91f71b59..46a525a3e09 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -114,6 +114,9 @@ const ( heartbeatTaskRunner = "heartbeat-async" miscTaskRunner = "misc-async" logTaskRunner = "log-async" + + // TODO: make it configurable + IsTSODynamicSwitchingEnabled = false ) // Server is the interface for cluster. @@ -409,11 +412,30 @@ func (c *RaftCluster) checkSchedulingService() { // checkTSOService checks the TSO service. func (c *RaftCluster) checkTSOService() { if c.isAPIServiceMode { + if IsTSODynamicSwitchingEnabled { + servers, err := discovery.Discover(c.etcdClient, constant.TSOServiceName) + if err != nil || len(servers) == 0 { + if err := c.startTSOJobsIfNeeded(); err != nil { + log.Error("failed to start TSO jobs", errs.ZapError(err)) + return + } + log.Info("TSO is provided by PD") + c.UnsetServiceIndependent(constant.TSOServiceName) + } else { + if err := c.startTSOJobsIfNeeded(); err != nil { + log.Error("failed to stop TSO jobs", errs.ZapError(err)) + return + } + log.Info("TSO is provided by TSO server") + if !c.IsServiceIndependent(constant.TSOServiceName) { + c.SetServiceIndependent(constant.TSOServiceName) + } + } + } return } - if err := c.startTSOJobs(); err != nil { - // If there is an error, need to wait for the next check. + if err := c.startTSOJobsIfNeeded(); err != nil { log.Error("failed to start TSO jobs", errs.ZapError(err)) return } @@ -428,6 +450,8 @@ func (c *RaftCluster) runServiceCheckJob() { schedulingTicker.Reset(time.Millisecond) }) defer schedulingTicker.Stop() + tsoTicker := time.NewTicker(tsoServiceCheckInterval) + defer tsoTicker.Stop() for { select { @@ -436,11 +460,13 @@ func (c *RaftCluster) runServiceCheckJob() { return case <-schedulingTicker.C: c.checkSchedulingService() + case <-tsoTicker.C: + c.checkTSOService() } } } -func (c *RaftCluster) startTSOJobs() error { +func (c *RaftCluster) startTSOJobsIfNeeded() error { allocator, err := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation) if err != nil { log.Error("failed to get global TSO allocator", errs.ZapError(err)) @@ -456,7 +482,7 @@ func (c *RaftCluster) startTSOJobs() error { return nil } -func (c *RaftCluster) stopTSOJobs() error { +func (c *RaftCluster) stopTSOJobsIfNeeded() error { allocator, err := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation) if err != nil { log.Error("failed to get global TSO allocator", errs.ZapError(err)) @@ -824,7 +850,7 @@ func (c *RaftCluster) Stop() { if !c.IsServiceIndependent(constant.SchedulingServiceName) { c.stopSchedulingJobs() } - if err := c.stopTSOJobs(); err != nil { + if err := c.stopTSOJobsIfNeeded(); err != nil { log.Error("failed to stop tso jobs", errs.ZapError(err)) } c.heartbeatRunner.Stop() diff --git a/server/grpc_service.go b/server/grpc_service.go index 25d5d3ed8e7..d5fd8ae3e32 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -529,10 +529,29 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { return s.forwardTSO(stream) } + tsDeadlineCh := make(chan *tsoutil.TSDeadline, 1) + go tsoutil.WatchTSDeadline(stream.Context(), tsDeadlineCh) + var ( doneCh chan struct{} errCh chan error + // The following are tso forward stream related variables. + forwardStream tsopb.TSO_TsoClient + cancelForward context.CancelFunc + forwardCtx context.Context + tsoStreamErr error + lastForwardedHost string ) + + defer func() { + if cancelForward != nil { + cancelForward() + } + if grpcutil.NeedRebuildConnection(tsoStreamErr) { + s.closeDelegateClient(lastForwardedHost) + } + }() + ctx, cancel := context.WithCancel(stream.Context()) defer cancel() for { @@ -570,6 +589,21 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { continue } + if s.IsServiceIndependent(constant.TSOServiceName) { + if request.GetCount() == 0 { + err = errs.ErrGenerateTimestamp.FastGenByArgs("tso count should be positive") + return status.Error(codes.Unknown, err.Error()) + } + forwardCtx, cancelForward, forwardStream, lastForwardedHost, tsoStreamErr, err = s.handleTSOForwarding(forwardCtx, forwardStream, stream, nil, request, tsDeadlineCh, lastForwardedHost, cancelForward) + if tsoStreamErr != nil { + return tsoStreamErr + } + if err != nil { + return err + } + continue + } + start := time.Now() // TSO uses leader lease to determine validity. No need to check leader here. if s.IsClosed() { diff --git a/server/server.go b/server/server.go index 760b185a6ff..c88871658dc 100644 --- a/server/server.go +++ b/server/server.go @@ -1411,8 +1411,7 @@ func (s *Server) GetRaftCluster() *cluster.RaftCluster { // IsServiceIndependent returns whether the service is independent. func (s *Server) IsServiceIndependent(name string) bool { if s.mode == APIServiceMode && !s.IsClosed() { - // TODO: remove it after we support tso discovery - if name == constant.TSOServiceName { + if name == constant.TSOServiceName && !cluster.IsTSODynamicSwitchingEnabled { return true } return s.cluster.IsServiceIndependent(name)