Skip to content

Commit

Permalink
Support weighted-election for TSO keyspace group primary election.
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed Jun 20, 2023
1 parent 58d9208 commit ace1174
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 20 deletions.
3 changes: 2 additions & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,8 @@ func (s *Server) startServer() (err error) {
tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID)
s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr}
s.keyspaceGroupManager = tso.NewKeyspaceGroupManager(
s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.cfg.AdvertiseListenAddr, legacySvcRootPath, tsoSvcRootPath, s.cfg)
s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.cfg.AdvertiseListenAddr,
s.clusterID, legacySvcRootPath, tsoSvcRootPath, s.cfg)
if err := s.keyspaceGroupManager.Initialize(); err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,8 @@ const (
DefaultKeyspaceGroupReplicaCount = 2

// DefaultKeyspaceGroupReplicaPriority is the default priority of a keyspace group replica.
// It's used in keyspace group primary weighted-election to balance primaries' distribution.
// Among multiple replicas of a keyspace group, the higher the priority, the more likely
// the replica is to be elected as primary.
DefaultKeyspaceGroupReplicaPriority = 0
)
118 changes: 100 additions & 18 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ type KeyspaceGroupManager struct {
// which participate in the election of its keyspace group's primary, in the format of
// "electionNamePrefix:keyspace-group-id"
electionNamePrefix string
clusterID uint64
// legacySvcRootPath defines the legacy root path for all etcd paths which derives from
// the PD/API service. It's in the format of "/pd/{cluster_id}".
// The main paths for different usages include:
Expand Down Expand Up @@ -233,14 +234,22 @@ type KeyspaceGroupManager struct {
loadKeyspaceGroupsBatchSize int64
loadFromEtcdMaxRetryTimes int

// compiledKGMembershipIDRegexp is the compiled regular expression for matching keyspace group id in the
// keyspace group membership path.
// compiledKGMembershipIDRegexp is the compiled regular expression for matching keyspace group id
// in the keyspace group membership path.
compiledKGMembershipIDRegexp *regexp.Regexp
// groupUpdateRetryList is the list of keyspace groups which failed to update and need to retry.
groupUpdateRetryList map[uint32]*endpoint.KeyspaceGroup
groupWatcher *etcdutil.LoopWatcher

primaryPathBuilder *kgPrimaryPathBuilder

// tsoNodes is the registered tso servers.
tsoNodes sync.Map // store as map[string]struct{}
// serviceRegistryMap stores the mapping from the service registry key to the service address.
// Note: it is only used in tsoNodesWatcher.
serviceRegistryMap map[string]string
// tsoNodesWatcher is the watcher for the registered tso servers.
tsoNodesWatcher *etcdutil.LoopWatcher
}

// NewKeyspaceGroupManager creates a new Keyspace Group Manager.
Expand All @@ -250,6 +259,7 @@ func NewKeyspaceGroupManager(
etcdClient *clientv3.Client,
httpClient *http.Client,
electionNamePrefix string,
clusterID uint64,
legacySvcRootPath string,
tsoSvcRootPath string,
cfg ServiceConfig,
Expand All @@ -268,10 +278,12 @@ func NewKeyspaceGroupManager(
etcdClient: etcdClient,
httpClient: httpClient,
electionNamePrefix: electionNamePrefix,
clusterID: clusterID,
legacySvcRootPath: legacySvcRootPath,
tsoSvcRootPath: tsoSvcRootPath,
cfg: cfg,
groupUpdateRetryList: make(map[uint32]*endpoint.KeyspaceGroup),
serviceRegistryMap: make(map[string]string),
}
kgm.legacySvcStorage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(kgm.etcdClient, kgm.legacySvcRootPath), nil)
Expand All @@ -288,6 +300,92 @@ func NewKeyspaceGroupManager(

// Initialize this KeyspaceGroupManager
func (kgm *KeyspaceGroupManager) Initialize() error {
if err := kgm.InitializeTSOServerWatchLoop(); err != nil {
log.Error("failed to initialize tso server watch loop", zap.Error(err))
kgm.Close() // Close the manager to clean up the allocated resources.
return errs.ErrLoadKeyspaceGroupsTerminated.Wrap(err)
}
if err := kgm.InitializeGroupWatchLoop(); err != nil {
log.Error("failed to initialize group watch loop", zap.Error(err))
kgm.Close() // Close the manager to clean up the loaded keyspace groups.
return errs.ErrLoadKeyspaceGroupsTerminated.Wrap(err)
}
return nil
}

// Close this KeyspaceGroupManager
func (kgm *KeyspaceGroupManager) Close() {
log.Info("closing keyspace group manager")

// Note: don't change the order. We need to cancel all service loops in the keyspace group manager
// before closing all keyspace groups. It's to prevent concurrent addition/removal of keyspace groups
// during critical periods such as service shutdown and online keyspace group, while the former requires
// snapshot isolation to ensure all keyspace groups are properly closed and no new keyspace group is
// added/initialized after that.
kgm.cancel()
kgm.wg.Wait()
kgm.state.deinitialize()

log.Info("keyspace group manager closed")
}

// InitializeTSOServerWatchLoop initializes the watch loop monitoring the path for storing the
// registered tso servers.
// Key: /ms/{cluster_id}/tso/registry/{tsoServerAddress}
// Value: discover.ServiceRegistryEntry
func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error {
tsoServiceKey := discovery.TSOPath(kgm.clusterID)
tsoServiceEndKey := clientv3.GetPrefixRangeEnd(tsoServiceKey) + "/"

putFn := func(kv *mvccpb.KeyValue) error {
s := &discovery.ServiceRegistryEntry{}
if err := json.Unmarshal(kv.Value, s); err != nil {
log.Warn("failed to unmarshal service registry entry",
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
return err
}
kgm.tsoNodes.Store(s.ServiceAddr, struct{}{})
kgm.serviceRegistryMap[string(kv.Key)] = s.ServiceAddr
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
if serviceAddr, ok := kgm.serviceRegistryMap[key]; ok {
delete(kgm.serviceRegistryMap, key)
kgm.tsoNodes.Delete(serviceAddr)
return nil
}
return perrors.Errorf("failed to find the service address for key %s", key)
}

kgm.tsoNodesWatcher = etcdutil.NewLoopWatcher(
kgm.ctx,
&kgm.wg,
kgm.etcdClient,
"tso-nodes-watcher",
tsoServiceKey,
putFn,
deleteFn,
func() error { return nil },
clientv3.WithRange(tsoServiceEndKey),
)

kgm.wg.Add(1)
go kgm.tsoNodesWatcher.StartWatchLoop()

if err := kgm.tsoNodesWatcher.WaitLoad(); err != nil {
log.Error("failed to load the registered tso servers", errs.ZapError(err))
return err
}

return nil
}

// InitializeGroupWatchLoop initializes the watch loop monitoring the path for storing keyspace group
// membership/distribution metadata.
// Key: /pd/{cluster_id}/tso/keyspace_groups/membership/{group}
// Value: endpoint.KeyspaceGroup
func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error {
rootPath := kgm.legacySvcRootPath
startKey := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(mcsutils.DefaultKeyspaceGroupID)}, "/")
endKey := strings.Join(
Expand Down Expand Up @@ -367,22 +465,6 @@ func (kgm *KeyspaceGroupManager) Initialize() error {
return nil
}

// Close this KeyspaceGroupManager
func (kgm *KeyspaceGroupManager) Close() {
log.Info("closing keyspace group manager")

// Note: don't change the order. We need to cancel all service loops in the keyspace group manager
// before closing all keyspace groups. It's to prevent concurrent addition/removal of keyspace groups
// during critical periods such as service shutdown and online keyspace group, while the former requires
// snapshot isolation to ensure all keyspace groups are properly closed and no new keyspace group is
// added/initialized after that.
kgm.cancel()
kgm.wg.Wait()
kgm.state.deinitialize()

log.Info("keyspace group manager closed")
}

func (kgm *KeyspaceGroupManager) isAssignedToMe(group *endpoint.KeyspaceGroup) bool {
for _, member := range group.Members {
if member.Address == kgm.tsoServiceID.ServiceAddr {
Expand Down
2 changes: 1 addition & 1 deletion pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ func (suite *keyspaceGroupManagerTestSuite) newKeyspaceGroupManager(
) *KeyspaceGroupManager {
return NewKeyspaceGroupManager(
suite.ctx, tsoServiceID, suite.etcdClient, nil, electionNamePrefix,
legacySvcRootPath, tsoSvcRootPath, suite.cfg)
0, legacySvcRootPath, tsoSvcRootPath, suite.cfg)
}

// runTestLoadMultipleKeyspaceGroupsAssignment tests the loading of multiple keyspace group assignment.
Expand Down

0 comments on commit ace1174

Please sign in to comment.