Skip to content

Commit

Permalink
more implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed Mar 23, 2023
1 parent 5c5b64b commit 00a4144
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 67 deletions.
3 changes: 2 additions & 1 deletion pkg/mcs/tso/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/logutil"
Expand Down Expand Up @@ -138,7 +139,7 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, request.GetHeader().GetClusterId())
}
count := request.GetCount()
ts, err := s.tsoAllocatorManager.HandleTSORequest(request.GetDcLocation(), count)
ts, err := s.keyspaceGroupManager.HandleTSORequest(utils.DefaultKeySpaceGroupID, request.GetDcLocation(), count)
if err != nil {
return status.Errorf(codes.Unknown, err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/tso/server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool) er
zap.Uint64("new-ts", ts),
zap.Bool("ignore-smaller", ignoreSmaller),
zap.Bool("skip-upper-bound-check", skipUpperBoundCheck))
tsoAllocator, err := h.s.tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation)
tsoAllocator, err := h.s.GetTSOAllocatorManager().GetAllocator(tso.GlobalDCLocation)
if err != nil {
return err
}
Expand Down
52 changes: 15 additions & 37 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/tikv/pd/pkg/mcs/discovery"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/systimemon"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/etcdutil"
Expand All @@ -70,7 +69,7 @@ const (
)

var _ bs.Server = (*Server)(nil)
var _ tso.Member = (*member.Participant)(nil)
var _ tso.ElectionMember = (*member.Participant)(nil)

// Server is the TSO server, and it implements bs.Server.
type Server struct {
Expand All @@ -88,12 +87,10 @@ type Server struct {

handler *Handler

cfg *tso.Config
clusterID uint64
defaultGroupRootPath string
defaultGroupStorage endpoint.TSOStorage
listenURL *url.URL
backendUrls []url.URL
cfg *tso.Config
clusterID uint64
listenURL *url.URL
backendUrls []url.URL

// etcd client
etcdClient *clientv3.Client
Expand Down Expand Up @@ -151,30 +148,7 @@ func (s *Server) Run() error {
if err := s.initClient(); err != nil {
return err
}
if err := s.startServer(); err != nil {
return err
}

s.startServerLoop()

return nil
}

func (s *Server) startServerLoop() {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
s.serverLoopWg.Add(1)
go s.tsoAllocatorLoop()
}

// tsoAllocatorLoop is used to run the TSO Allocator updating daemon.
func (s *Server) tsoAllocatorLoop() {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()

ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
s.tsoAllocatorManager.AllocatorDaemon(ctx)
log.Info("tso server is closed, exit allocator loop")
return s.startServer()
}

// Close closes the server.
Expand All @@ -185,6 +159,8 @@ func (s *Server) Close() {
}

log.Info("closing tso server ...")
// close tso service loops in the keyspace group manager
s.keyspaceGroupManager.Close()
s.serviceRegister.Deregister()
s.muxListener.Close()
s.serverLoopCancel()
Expand Down Expand Up @@ -220,13 +196,13 @@ func (s *Server) AddStartCallback(callbacks ...func()) {
// IsServing implements basicserver. It returns whether the server is the leader
// if there is embedded etcd, or the primary otherwise.
func (s *Server) IsServing() bool {
return atomic.LoadInt64(&s.isServing) == 1 && s.participant.IsLeader()
return atomic.LoadInt64(&s.isServing) == 1 && s.keyspaceGroupManager.GetElectionMember(mcsutils.DefaultKeySpaceGroupID).IsLeader()
}

// GetLeaderListenUrls gets service endpoints from the leader in election group.
// The entry at the index 0 is the primary's service endpoint.
func (s *Server) GetLeaderListenUrls() []string {
return s.participant.GetLeaderListenUrls()
return s.keyspaceGroupManager.GetElectionMember(mcsutils.DefaultKeySpaceGroupID).GetLeaderListenUrls()
}

// AddServiceReadyCallback implements basicserver.
Expand All @@ -250,7 +226,7 @@ func (s *Server) IsClosed() bool {

// GetTSOAllocatorManager returns the manager of TSO Allocator.
func (s *Server) GetTSOAllocatorManager() *tso.AllocatorManager {
return s.tsoAllocatorManager
return s.keyspaceGroupManager.GetAllocatorManager(mcsutils.DefaultKeySpaceGroupID)
}

// IsLocalRequest checks if the forwarded host is the current host
Expand Down Expand Up @@ -451,19 +427,21 @@ func (s *Server) startServer() (err error) {
// The independent TSO service still reuses PD version info since PD and TSO are just
// different service modes provided by the same pd-server binary
serverInfo.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix()))
s.defaultGroupRootPath = path.Join(pdRootPath, strconv.FormatUint(s.clusterID, 10))

s.listenURL, err = url.Parse(s.cfg.ListenAddr)
if err != nil {
return err
}

s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
defaultKsgStorageTSRootPath := path.Join(pdRootPath, strconv.FormatUint(s.clusterID, 10))
tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID)
s.keyspaceGroupManager = tso.NewKeyspaceGroupManager(
s.ctx, s.etcdClient, s.listenURL.Host, s.defaultGroupRootPath, tsoSvcRootPath, s.cfg)
s.serverLoopCtx, s.etcdClient, s.listenURL.Host, defaultKsgStorageTSRootPath, tsoSvcRootPath, s.cfg)
s.keyspaceGroupManager.Initialize()

s.service = &Service{Server: s}

tlsConfig, err := s.cfg.Security.ToTLSConfig()
if err != nil {
return err
Expand Down
59 changes: 47 additions & 12 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func (info *DCLocationInfo) clone() DCLocationInfo {
return copiedInfo
}

// Member defines the interface for the election related logic.
type Member interface {
// ElectionMember defines the interface for the election related logic.
type ElectionMember interface {
// ID returns the unique ID in the election group. For example, it can be unique
// server id of a cluster or the unique keyspace group replica id of the election
// group comprised of the replicas of a keyspace group.
Expand Down Expand Up @@ -162,11 +162,12 @@ type AllocatorManager struct {
}
wg sync.WaitGroup

ctx context.Context
ctx context.Context
cancel context.CancelFunc
// ksgID is the keyspace group id
ksgID uint32
// member is for election use
member Member
member ElectionMember
// TSO config
rootPath string
storage endpoint.TSOStorage
Expand All @@ -191,7 +192,7 @@ func NewAllocatorManager(
ctx context.Context,
startGlobalLeaderLoop bool,
keyspaceGroupID uint32,
member Member,
member ElectionMember,
rootPath string,
storage endpoint.TSOStorage,
enableLocalTSO bool,
Expand All @@ -201,8 +202,10 @@ func NewAllocatorManager(
tlsConfig *grpcutil.TLSConfig,
maxResetTSGap func() time.Duration,
) *AllocatorManager {
ctx, cancel := context.WithCancel(ctx)
allocatorManager := &AllocatorManager{
ctx: ctx,
cancel: cancel,
ksgID: keyspaceGroupID,
member: member,
rootPath: rootPath,
Expand All @@ -219,28 +222,45 @@ func NewAllocatorManager(
allocatorManager.localAllocatorConn.clientConns = make(map[string]*grpc.ClientConn)

// Set up the Global TSO Allocator here, it will be initialized once the member campaigns leader successfully.
allocatorManager.setUpGlobalAllocator(ctx, startGlobalLeaderLoop)
allocatorManager.setUpGlobalAllocator(startGlobalLeaderLoop)

return allocatorManager
}

// setUpGlobalAllocator is used to set up the global allocator, which will initialize the allocator and put it into
// an allocator daemon. An TSO Allocator should only be set once, and may be initialized and reset multiple times
// depending on the election.
func (am *AllocatorManager) setUpGlobalAllocator(parentCtx context.Context, startGlobalLeaderLoop bool) {
func (am *AllocatorManager) setUpGlobalAllocator(startGlobalLeaderLoop bool) {
am.mu.Lock()
defer am.mu.Unlock()

allocator := NewGlobalTSOAllocator(parentCtx, am, startGlobalLeaderLoop)
allocator := NewGlobalTSOAllocator(am.ctx, am, startGlobalLeaderLoop)
// Create a new allocatorGroup
ctx, cancel := context.WithCancel(parentCtx)
ctx, cancel := context.WithCancel(am.ctx)
am.mu.allocatorGroups[GlobalDCLocation] = &allocatorGroup{
dcLocation: GlobalDCLocation,
ctx: ctx,
cancel: cancel,
leadership: am.member.GetLeadership(),
allocator: allocator,
}

if startGlobalLeaderLoop {
am.wg.Add(1)
go am.tsoAllocatorLoop()
}
}

// tsoAllocatorLoop is used to run the TSO Allocator updating daemon.
// tso service starts the loop here, but pd starts its own loop.
func (am *AllocatorManager) tsoAllocatorLoop() {
defer logutil.LogPanic()
defer am.wg.Done()

ctx, cancel := context.WithCancel(am.ctx)
defer cancel()
am.AllocatorDaemon(ctx)
log.Info("exit allocator loop", zap.Uint32("keyspace-group-id", am.ksgID))
}

// setUpLocalAllocator is used to set up an allocator, which will initialize the allocator and put it into allocator daemon.
Expand All @@ -267,6 +287,21 @@ func (am *AllocatorManager) setUpLocalAllocator(parentCtx context.Context, dcLoc
go am.allocatorLeaderLoop(parentCtx, localTSOAllocator)
}

// close is used to shutdown TSO Allocator updating daemon.
// tso service call this function to shutdown the loop here, but pd manages its own loop.
func (am *AllocatorManager) close() {
if allocatorGroup, exist := am.getAllocatorGroup(GlobalDCLocation); exist {
allocatorGroup.allocator.(*GlobalTSOAllocator).close()
}

am.cancel()
am.wg.Wait()
}

func (am *AllocatorManager) getMember() *ElectionMember {
return &am.member
}

// SetLocalTSOConfig receives the zone label of this PD server and write it into etcd as dc-location
// to make the whole cluster know the DC-level topology for later Local TSO Allocator campaign.
func (am *AllocatorManager) SetLocalTSOConfig(dcLocation string) error {
Expand Down Expand Up @@ -662,7 +697,7 @@ func (am *AllocatorManager) campaignAllocatorLeader(

// AllocatorDaemon is used to update every allocator's TSO and check whether we have
// any new local allocator that needs to be set up.
func (am *AllocatorManager) AllocatorDaemon(serverCtx context.Context) {
func (am *AllocatorManager) AllocatorDaemon(ctx context.Context) {
// allocatorPatroller should only work when enableLocalTSO is true to
// set up the new Local TSO Allocator in time.
var patrolTicker = &time.Ticker{}
Expand All @@ -679,7 +714,7 @@ func (am *AllocatorManager) AllocatorDaemon(serverCtx context.Context) {
select {
case <-patrolTicker.C:
// Inspect the cluster dc-location info and set up the new Local TSO Allocator in time.
am.allocatorPatroller(serverCtx)
am.allocatorPatroller(ctx)
case <-tsTicker.C:
// Update the initialized TSO Allocator to advance TSO.
am.allocatorUpdater()
Expand All @@ -693,7 +728,7 @@ func (am *AllocatorManager) AllocatorDaemon(serverCtx context.Context) {
}
// PS: ClusterDCLocationChecker and PriorityChecker are time consuming and low frequent to run,
// we should run them concurrently to speed up the progress.
case <-serverCtx.Done():
case <-ctx.Done():
return
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/tso/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func NewConfig() *Config {
func (c *Config) GetLeaderLease() int64 {
return c.LeaderLease
}

// IsLocalTSOEnabled returns if the local TSO is enabled.
func (c *Config) IsLocalTSOEnabled() bool {
return c.EnableLocalTSO
Expand Down
9 changes: 8 additions & 1 deletion pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type GlobalTSOAllocator struct {
// for global TSO synchronization
am *AllocatorManager
// for election use
member Member
member ElectionMember
timestampOracle *timestampOracle
// syncRTT is the RTT duration a SyncMaxTS RPC call will cost,
// which is used to estimate the MaxTS in a Global TSO generation
Expand Down Expand Up @@ -108,6 +108,13 @@ func NewGlobalTSOAllocator(
return gta
}

// close is used to shutdown the primary election loop.
// tso service call this function to shutdown the loop here, but pd manages its own loop.
func (gta *GlobalTSOAllocator) close() {
gta.cancel()
gta.wg.Wait()
}

func (gta *GlobalTSOAllocator) setSyncRTT(rtt int64) {
gta.syncRTT.Store(rtt)
tsoGauge.WithLabelValues("global_tso_sync_rtt", gta.timestampOracle.dcLocation).Set(float64(rtt))
Expand Down
Loading

0 comments on commit 00a4144

Please sign in to comment.