Skip to content

Commit

Permalink
Support weighted-election for TSO keyspace group primary election.
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <binshi.bing@gmail.com>

Implement weighted-primary election

Signed-off-by: Bin Shi <binshi.bing@gmail.com>

Add test

Signed-off-by: Bin Shi <binshi.bing@gmail.com>

Finalize test and fix bugs.

Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed Jun 22, 2023
1 parent ac31f87 commit 09e6e45
Show file tree
Hide file tree
Showing 9 changed files with 493 additions and 101 deletions.
2 changes: 1 addition & 1 deletion pkg/mcs/discovery/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

// Discover is used to get all the service instances of the specified service name.
func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, error) {
key := discoveryPath(clusterID, serviceName) + "/"
key := ServicePath(clusterID, serviceName) + "/"
endKey := clientv3.GetPrefixRangeEnd(key)

withRange := clientv3.WithRange(endKey)
Expand Down
8 changes: 5 additions & 3 deletions pkg/mcs/discovery/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@ const (
registryKey = "registry"
)

func registryPath(clusterID, serviceName, serviceAddr string) string {
// RegistryPath returns the full path to store microservice addresses.
func RegistryPath(clusterID, serviceName, serviceAddr string) string {
return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey, serviceAddr}, "/")
}

func discoveryPath(clusterID, serviceName string) string {
// ServicePath returns the path to store microservice addresses.
func ServicePath(clusterID, serviceName string) string {
return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey}, "/")
}

// TSOPath returns the path to store TSO addresses.
func TSOPath(clusterID uint64) string {
return discoveryPath(strconv.FormatUint(clusterID, 10), "tso") + "/"
return ServicePath(strconv.FormatUint(clusterID, 10), "tso") + "/"
}
2 changes: 1 addition & 1 deletion pkg/mcs/discovery/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type ServiceRegister struct {
// NewServiceRegister creates a new ServiceRegister.
func NewServiceRegister(ctx context.Context, cli *clientv3.Client, clusterID, serviceName, serviceAddr, serializedValue string, ttl int64) *ServiceRegister {
cctx, cancel := context.WithCancel(ctx)
serviceKey := registryPath(clusterID, serviceName, serviceAddr)
serviceKey := RegistryPath(clusterID, serviceName, serviceAddr)
return &ServiceRegister{
ctx: cctx,
cancel: cancel,
Expand Down
3 changes: 2 additions & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,8 @@ func (s *Server) startServer() (err error) {
tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID)
s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr}
s.keyspaceGroupManager = tso.NewKeyspaceGroupManager(
s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.cfg.AdvertiseListenAddr, legacySvcRootPath, tsoSvcRootPath, s.cfg)
s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.cfg.AdvertiseListenAddr,
discovery.TSOPath(s.clusterID), legacySvcRootPath, tsoSvcRootPath, s.cfg)
if err := s.keyspaceGroupManager.Initialize(); err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,8 @@ const (
DefaultKeyspaceGroupReplicaCount = 2

// DefaultKeyspaceGroupReplicaPriority is the default priority of a keyspace group replica.
// It's used in keyspace group primary weighted-election to balance primaries' distribution.
// Among multiple replicas of a keyspace group, the higher the priority, the more likely
// the replica is to be elected as primary.
DefaultKeyspaceGroupReplicaPriority = 0
)
244 changes: 219 additions & 25 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"net/http"
"path"
"regexp"
Expand All @@ -27,6 +28,7 @@ import (
"time"

perrors "github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/election"
Expand All @@ -50,6 +52,10 @@ const (
keyspaceGroupsElectionPath = mcsutils.KeyspaceGroupsKey + "/election"
// primaryKey is the key for keyspace group primary election.
primaryKey = "primary"
// defaultPrimaryPriorityCheckInterval is the default interval for checking if the priorities
// of the primaries on this TSO server/pod have changed. A goroutine will periodically check
// do this check and re-distribute the primaries if necessary.
defaultPrimaryPriorityCheckInterval = 10 * time.Second
)

type state struct {
Expand Down Expand Up @@ -148,6 +154,40 @@ func (s *state) getKeyspaceGroupMetaWithCheck(
mcsutils.DefaultKeyspaceGroupID, nil
}

func (s *state) getNextPrimaryToReset(
groupID int, localAddress string,
) (member ElectionMember, kg *endpoint.KeyspaceGroup, localPriority, nextGroupID int) {
s.RLock()
defer s.RUnlock()

groupSize := len(s.ams)
groupID %= groupSize
for j := 0; j < groupSize; groupID, j = (groupID+1)%groupSize, j+1 {
am := s.ams[groupID]
kg := s.kgs[groupID]
if am != nil && kg != nil && am.GetMember().IsLeader() {
maxPriority := math.MinInt32
localPriority := math.MaxInt32
for _, member := range kg.Members {
if member.Priority > maxPriority {
maxPriority = member.Priority
}
if member.Address == localAddress {
localPriority = member.Priority
}
}

if localPriority < maxPriority {
// return here and reset the primary outside of the critical section
// as resetting the primary may take some time.
return am.GetMember(), kg, localPriority, (groupID + 1) % groupSize
}
}
}

return nil, nil, 0, groupID
}

// kgPrimaryPathBuilder builds the path for keyspace group primary election.
// default keyspace group: "/ms/{cluster_id}/tso/00000/primary".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary".
Expand Down Expand Up @@ -193,6 +233,10 @@ type KeyspaceGroupManager struct {
// which participate in the election of its keyspace group's primary, in the format of
// "electionNamePrefix:keyspace-group-id"
electionNamePrefix string
// tsoServiceKey is the path for storing the registered tso servers.
// Key: /ms/{cluster_id}/tso/registry/{tsoServerAddress}
// Value: discover.ServiceRegistryEntry
tsoServiceKey string
// legacySvcRootPath defines the legacy root path for all etcd paths which derives from
// the PD/API service. It's in the format of "/pd/{cluster_id}".
// The main paths for different usages include:
Expand Down Expand Up @@ -233,14 +277,23 @@ type KeyspaceGroupManager struct {
loadKeyspaceGroupsBatchSize int64
loadFromEtcdMaxRetryTimes int

// compiledKGMembershipIDRegexp is the compiled regular expression for matching keyspace group id in the
// keyspace group membership path.
// compiledKGMembershipIDRegexp is the compiled regular expression for matching keyspace group id
// in the keyspace group membership path.
compiledKGMembershipIDRegexp *regexp.Regexp
// groupUpdateRetryList is the list of keyspace groups which failed to update and need to retry.
groupUpdateRetryList map[uint32]*endpoint.KeyspaceGroup
groupWatcher *etcdutil.LoopWatcher

primaryPathBuilder *kgPrimaryPathBuilder
primaryPathBuilder *kgPrimaryPathBuilder
primaryPriorityCheckInterval time.Duration

// tsoNodes is the registered tso servers.
tsoNodes sync.Map // store as map[string]struct{}
// serviceRegistryMap stores the mapping from the service registry key to the service address.
// Note: it is only used in tsoNodesWatcher.
serviceRegistryMap map[string]string
// tsoNodesWatcher is the watcher for the registered tso servers.
tsoNodesWatcher *etcdutil.LoopWatcher
}

// NewKeyspaceGroupManager creates a new Keyspace Group Manager.
Expand All @@ -250,6 +303,7 @@ func NewKeyspaceGroupManager(
etcdClient *clientv3.Client,
httpClient *http.Client,
electionNamePrefix string,
tsoServiceKey string,
legacySvcRootPath string,
tsoSvcRootPath string,
cfg ServiceConfig,
Expand All @@ -262,16 +316,19 @@ func NewKeyspaceGroupManager(

ctx, cancel := context.WithCancel(ctx)
kgm := &KeyspaceGroupManager{
ctx: ctx,
cancel: cancel,
tsoServiceID: tsoServiceID,
etcdClient: etcdClient,
httpClient: httpClient,
electionNamePrefix: electionNamePrefix,
legacySvcRootPath: legacySvcRootPath,
tsoSvcRootPath: tsoSvcRootPath,
cfg: cfg,
groupUpdateRetryList: make(map[uint32]*endpoint.KeyspaceGroup),
ctx: ctx,
cancel: cancel,
tsoServiceID: tsoServiceID,
etcdClient: etcdClient,
httpClient: httpClient,
electionNamePrefix: electionNamePrefix,
tsoServiceKey: tsoServiceKey,
legacySvcRootPath: legacySvcRootPath,
tsoSvcRootPath: tsoSvcRootPath,
primaryPriorityCheckInterval: defaultPrimaryPriorityCheckInterval,
cfg: cfg,
groupUpdateRetryList: make(map[uint32]*endpoint.KeyspaceGroup),
serviceRegistryMap: make(map[string]string),
}
kgm.legacySvcStorage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(kgm.etcdClient, kgm.legacySvcRootPath), nil)
Expand All @@ -288,6 +345,100 @@ func NewKeyspaceGroupManager(

// Initialize this KeyspaceGroupManager
func (kgm *KeyspaceGroupManager) Initialize() error {
if err := kgm.InitializeTSOServerWatchLoop(); err != nil {
log.Error("failed to initialize tso server watch loop", zap.Error(err))
kgm.Close() // Close the manager to clean up the allocated resources.
return errs.ErrLoadKeyspaceGroupsTerminated.Wrap(err)
}
if err := kgm.InitializeGroupWatchLoop(); err != nil {
log.Error("failed to initialize group watch loop", zap.Error(err))
kgm.Close() // Close the manager to clean up the loaded keyspace groups.
return errs.ErrLoadKeyspaceGroupsTerminated.Wrap(err)
}

kgm.wg.Add(1)
go kgm.primaryPriorityCheckLoop()

return nil
}

// Close this KeyspaceGroupManager
func (kgm *KeyspaceGroupManager) Close() {
log.Info("closing keyspace group manager")

// Note: don't change the order. We need to cancel all service loops in the keyspace group manager
// before closing all keyspace groups. It's to prevent concurrent addition/removal of keyspace groups
// during critical periods such as service shutdown and online keyspace group, while the former requires
// snapshot isolation to ensure all keyspace groups are properly closed and no new keyspace group is
// added/initialized after that.
kgm.cancel()
kgm.wg.Wait()
kgm.state.deinitialize()

log.Info("keyspace group manager closed")
}

// GetServiceConfig returns the service config.
func (kgm *KeyspaceGroupManager) GetServiceConfig() ServiceConfig {
return kgm.cfg
}

// InitializeTSOServerWatchLoop initializes the watch loop monitoring the path for storing the
// registered tso servers.
// Key: /ms/{cluster_id}/tso/registry/{tsoServerAddress}
// Value: discover.ServiceRegistryEntry
func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error {
tsoServiceEndKey := clientv3.GetPrefixRangeEnd(kgm.tsoServiceKey) + "/"

putFn := func(kv *mvccpb.KeyValue) error {
s := &discovery.ServiceRegistryEntry{}
if err := json.Unmarshal(kv.Value, s); err != nil {
log.Warn("failed to unmarshal service registry entry",
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
return err
}
kgm.tsoNodes.Store(s.ServiceAddr, struct{}{})
kgm.serviceRegistryMap[string(kv.Key)] = s.ServiceAddr
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
if serviceAddr, ok := kgm.serviceRegistryMap[key]; ok {
delete(kgm.serviceRegistryMap, key)
kgm.tsoNodes.Delete(serviceAddr)
return nil
}
return perrors.Errorf("failed to find the service address for key %s", key)
}

kgm.tsoNodesWatcher = etcdutil.NewLoopWatcher(
kgm.ctx,
&kgm.wg,
kgm.etcdClient,
"tso-nodes-watcher",
kgm.tsoServiceKey,
putFn,
deleteFn,
func() error { return nil },
clientv3.WithRange(tsoServiceEndKey),
)

kgm.wg.Add(1)
go kgm.tsoNodesWatcher.StartWatchLoop()

if err := kgm.tsoNodesWatcher.WaitLoad(); err != nil {
log.Error("failed to load the registered tso servers", errs.ZapError(err))
return err
}

return nil
}

// InitializeGroupWatchLoop initializes the watch loop monitoring the path for storing keyspace group
// membership/distribution metadata.
// Key: /pd/{cluster_id}/tso/keyspace_groups/membership/{group}
// Value: endpoint.KeyspaceGroup
func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error {
rootPath := kgm.legacySvcRootPath
startKey := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(mcsutils.DefaultKeyspaceGroupID)}, "/")
endKey := strings.Join(
Expand Down Expand Up @@ -367,20 +518,63 @@ func (kgm *KeyspaceGroupManager) Initialize() error {
return nil
}

// Close this KeyspaceGroupManager
func (kgm *KeyspaceGroupManager) Close() {
log.Info("closing keyspace group manager")
func (kgm *KeyspaceGroupManager) primaryPriorityCheckLoop() {
defer logutil.LogPanic()
defer kgm.wg.Done()

// Note: don't change the order. We need to cancel all service loops in the keyspace group manager
// before closing all keyspace groups. It's to prevent concurrent addition/removal of keyspace groups
// during critical periods such as service shutdown and online keyspace group, while the former requires
// snapshot isolation to ensure all keyspace groups are properly closed and no new keyspace group is
// added/initialized after that.
kgm.cancel()
kgm.wg.Wait()
kgm.state.deinitialize()
failpoint.Inject("fastPrimaryPriorityCheck", func() {
kgm.primaryPriorityCheckInterval = 200 * time.Millisecond
})

log.Info("keyspace group manager closed")
ctx, cancel := context.WithCancel(kgm.ctx)
defer cancel()
groupID := 0
for {
select {
case <-ctx.Done():
log.Info("exit primary priority check loop")
return
case <-time.After(kgm.primaryPriorityCheckInterval):
// Every primaryPriorityCheckInterval, we only reset the primary of one keyspace group
member, kg, localPriority, nextGroupID := kgm.getNextPrimaryToReset(groupID, kgm.tsoServiceID.ServiceAddr)
if member != nil {
aliveTSONodes := make(map[string]struct{})
kgm.tsoNodes.Range(func(key, _ interface{}) bool {
aliveTSONodes[key.(string)] = struct{}{}
return true
})
if len(aliveTSONodes) == 0 {
log.Warn("no alive tso node", zap.String("local-address", kgm.tsoServiceID.ServiceAddr))
continue
}
// If there is a alive member with higher priority, reset the leader.
resetLeader := false
for _, member := range kg.Members {
if member.Priority <= localPriority {
continue
}
if _, ok := aliveTSONodes[member.Address]; ok {
resetLeader = true
break
}
}
if resetLeader {
select {
case <-ctx.Done():
default:
member.ResetLeader()
log.Info("reset primary", zap.Uint32("keyspace-group-id", kg.ID))
}
} else {
log.Warn("no need to reset primary as the replicas with higher priority are offline",
zap.String("local-address", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("keyspace-group-id", kg.ID),
zap.Int("local-priority", localPriority))
}
}
groupID = nextGroupID
}
}
}

func (kgm *KeyspaceGroupManager) isAssignedToMe(group *endpoint.KeyspaceGroup) bool {
Expand Down
Loading

0 comments on commit 09e6e45

Please sign in to comment.