diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 1b4b5c363746..ee94cb95dd40 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -539,7 +539,8 @@ func (s *Server) startServer() (err error) { tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID) s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr} s.keyspaceGroupManager = tso.NewKeyspaceGroupManager( - s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.cfg.AdvertiseListenAddr, legacySvcRootPath, tsoSvcRootPath, s.cfg) + s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.cfg.AdvertiseListenAddr, + s.clusterID, legacySvcRootPath, tsoSvcRootPath, s.cfg) if err := s.keyspaceGroupManager.Initialize(); err != nil { return err } diff --git a/pkg/mcs/utils/constant.go b/pkg/mcs/utils/constant.go index 21a4a655afe6..c87cec16a64c 100644 --- a/pkg/mcs/utils/constant.go +++ b/pkg/mcs/utils/constant.go @@ -76,5 +76,8 @@ const ( DefaultKeyspaceGroupReplicaCount = 2 // DefaultKeyspaceGroupReplicaPriority is the default priority of a keyspace group replica. + // It's used in keyspace group primary weighted-election to balance primaries' distribution. + // Among multiple replicas of a keyspace group, the higher the priority, the more likely + // the replica is to be elected as primary. DefaultKeyspaceGroupReplicaPriority = 0 ) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 62a6986422c6..5e69da935575 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -193,6 +193,7 @@ type KeyspaceGroupManager struct { // which participate in the election of its keyspace group's primary, in the format of // "electionNamePrefix:keyspace-group-id" electionNamePrefix string + clusterID uint64 // legacySvcRootPath defines the legacy root path for all etcd paths which derives from // the PD/API service. It's in the format of "/pd/{cluster_id}". // The main paths for different usages include: @@ -233,14 +234,22 @@ type KeyspaceGroupManager struct { loadKeyspaceGroupsBatchSize int64 loadFromEtcdMaxRetryTimes int - // compiledKGMembershipIDRegexp is the compiled regular expression for matching keyspace group id in the - // keyspace group membership path. + // compiledKGMembershipIDRegexp is the compiled regular expression for matching keyspace group id + // in the keyspace group membership path. compiledKGMembershipIDRegexp *regexp.Regexp // groupUpdateRetryList is the list of keyspace groups which failed to update and need to retry. groupUpdateRetryList map[uint32]*endpoint.KeyspaceGroup groupWatcher *etcdutil.LoopWatcher primaryPathBuilder *kgPrimaryPathBuilder + + // tsoNodes is the registered tso servers. + tsoNodes sync.Map // store as map[string]struct{} + // serviceRegistryMap stores the mapping from the service registry key to the service address. + // Note: it is only used in tsoNodesWatcher. + serviceRegistryMap map[string]string + // tsoNodesWatcher is the watcher for the registered tso servers. + tsoNodesWatcher *etcdutil.LoopWatcher } // NewKeyspaceGroupManager creates a new Keyspace Group Manager. @@ -250,6 +259,7 @@ func NewKeyspaceGroupManager( etcdClient *clientv3.Client, httpClient *http.Client, electionNamePrefix string, + clusterID uint64, legacySvcRootPath string, tsoSvcRootPath string, cfg ServiceConfig, @@ -268,10 +278,12 @@ func NewKeyspaceGroupManager( etcdClient: etcdClient, httpClient: httpClient, electionNamePrefix: electionNamePrefix, + clusterID: clusterID, legacySvcRootPath: legacySvcRootPath, tsoSvcRootPath: tsoSvcRootPath, cfg: cfg, groupUpdateRetryList: make(map[uint32]*endpoint.KeyspaceGroup), + serviceRegistryMap: make(map[string]string), } kgm.legacySvcStorage = endpoint.NewStorageEndpoint( kv.NewEtcdKVBase(kgm.etcdClient, kgm.legacySvcRootPath), nil) @@ -288,6 +300,92 @@ func NewKeyspaceGroupManager( // Initialize this KeyspaceGroupManager func (kgm *KeyspaceGroupManager) Initialize() error { + if err := kgm.InitializeTSOServerWatchLoop(); err != nil { + log.Error("failed to initialize tso server watch loop", zap.Error(err)) + kgm.Close() // Close the manager to clean up the allocated resources. + return errs.ErrLoadKeyspaceGroupsTerminated.Wrap(err) + } + if err := kgm.InitializeGroupWatchLoop(); err != nil { + log.Error("failed to initialize group watch loop", zap.Error(err)) + kgm.Close() // Close the manager to clean up the loaded keyspace groups. + return errs.ErrLoadKeyspaceGroupsTerminated.Wrap(err) + } + return nil +} + +// Close this KeyspaceGroupManager +func (kgm *KeyspaceGroupManager) Close() { + log.Info("closing keyspace group manager") + + // Note: don't change the order. We need to cancel all service loops in the keyspace group manager + // before closing all keyspace groups. It's to prevent concurrent addition/removal of keyspace groups + // during critical periods such as service shutdown and online keyspace group, while the former requires + // snapshot isolation to ensure all keyspace groups are properly closed and no new keyspace group is + // added/initialized after that. + kgm.cancel() + kgm.wg.Wait() + kgm.state.deinitialize() + + log.Info("keyspace group manager closed") +} + +// InitializeTSOServerWatchLoop initializes the watch loop monitoring the path for storing the +// registered tso servers. +// Key: /ms/{cluster_id}/tso/registry/{tsoServerAddress} +// Value: discover.ServiceRegistryEntry +func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error { + tsoServiceKey := discovery.TSOPath(kgm.clusterID) + tsoServiceEndKey := clientv3.GetPrefixRangeEnd(tsoServiceKey) + "/" + + putFn := func(kv *mvccpb.KeyValue) error { + s := &discovery.ServiceRegistryEntry{} + if err := json.Unmarshal(kv.Value, s); err != nil { + log.Warn("failed to unmarshal service registry entry", + zap.String("event-kv-key", string(kv.Key)), zap.Error(err)) + return err + } + kgm.tsoNodes.Store(s.ServiceAddr, struct{}{}) + kgm.serviceRegistryMap[string(kv.Key)] = s.ServiceAddr + return nil + } + deleteFn := func(kv *mvccpb.KeyValue) error { + key := string(kv.Key) + if serviceAddr, ok := kgm.serviceRegistryMap[key]; ok { + delete(kgm.serviceRegistryMap, key) + kgm.tsoNodes.Delete(serviceAddr) + return nil + } + return perrors.Errorf("failed to find the service address for key %s", key) + } + + kgm.tsoNodesWatcher = etcdutil.NewLoopWatcher( + kgm.ctx, + &kgm.wg, + kgm.etcdClient, + "tso-nodes-watcher", + tsoServiceKey, + putFn, + deleteFn, + func() error { return nil }, + clientv3.WithRange(tsoServiceEndKey), + ) + + kgm.wg.Add(1) + go kgm.tsoNodesWatcher.StartWatchLoop() + + if err := kgm.tsoNodesWatcher.WaitLoad(); err != nil { + log.Error("failed to load the registered tso servers", errs.ZapError(err)) + return err + } + + return nil +} + +// InitializeGroupWatchLoop initializes the watch loop monitoring the path for storing keyspace group +// membership/distribution metadata. +// Key: /pd/{cluster_id}/tso/keyspace_groups/membership/{group} +// Value: endpoint.KeyspaceGroup +func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error { rootPath := kgm.legacySvcRootPath startKey := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(mcsutils.DefaultKeyspaceGroupID)}, "/") endKey := strings.Join( @@ -367,22 +465,6 @@ func (kgm *KeyspaceGroupManager) Initialize() error { return nil } -// Close this KeyspaceGroupManager -func (kgm *KeyspaceGroupManager) Close() { - log.Info("closing keyspace group manager") - - // Note: don't change the order. We need to cancel all service loops in the keyspace group manager - // before closing all keyspace groups. It's to prevent concurrent addition/removal of keyspace groups - // during critical periods such as service shutdown and online keyspace group, while the former requires - // snapshot isolation to ensure all keyspace groups are properly closed and no new keyspace group is - // added/initialized after that. - kgm.cancel() - kgm.wg.Wait() - kgm.state.deinitialize() - - log.Info("keyspace group manager closed") -} - func (kgm *KeyspaceGroupManager) isAssignedToMe(group *endpoint.KeyspaceGroup) bool { for _, member := range group.Members { if member.Address == kgm.tsoServiceID.ServiceAddr { diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index 1e7d072ade32..831c22ec81cb 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -687,7 +687,7 @@ func (suite *keyspaceGroupManagerTestSuite) newKeyspaceGroupManager( ) *KeyspaceGroupManager { return NewKeyspaceGroupManager( suite.ctx, tsoServiceID, suite.etcdClient, nil, electionNamePrefix, - legacySvcRootPath, tsoSvcRootPath, suite.cfg) + 0, legacySvcRootPath, tsoSvcRootPath, suite.cfg) } // runTestLoadMultipleKeyspaceGroupsAssignment tests the loading of multiple keyspace group assignment.