Skip to content

Commit

Permalink
keypath: unify leader/primary path (#8859)
Browse files Browse the repository at this point in the history
ref #8582

Signed-off-by: okJiang <819421878@qq.com>
  • Loading branch information
okJiang authored Dec 2, 2024
1 parent 29dfaf9 commit 3cfd66f
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 106 deletions.
6 changes: 4 additions & 2 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -1151,8 +1151,10 @@ func (m *GroupManager) GetKeyspaceGroupPrimaryByID(id uint32) (string, error) {
return "", ErrKeyspaceGroupNotExists(id)
}

rootPath := keypath.TSOSvcRootPath()
primaryPath := keypath.KeyspaceGroupPrimaryPath(rootPath, id)
primaryPath := keypath.LeaderPath(&keypath.MsParam{
ServiceName: constant.TSOServiceName,
GroupID: id,
})
leader := &tsopb.Participant{}
ok, _, err := etcdutil.GetProtoMsgWithModRev(m.client, primaryPath, leader)
if err != nil {
Expand Down
8 changes: 5 additions & 3 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (s *Server) primaryElectionLoop() {
}

// To make sure the expected primary(if existed) and new primary are on the same server.
expectedPrimary := utils.GetExpectedPrimaryFlag(s.GetClient(), s.participant.GetLeaderPath())
expectedPrimary := utils.GetExpectedPrimaryFlag(s.GetClient(), &s.participant.MsParam)
// skip campaign the primary if the expected primary is not empty and not equal to the current memberValue.
// expected primary ONLY SET BY `{service}/primary/transfer` API.
if len(expectedPrimary) > 0 && !strings.Contains(s.participant.MemberValue(), expectedPrimary) {
Expand Down Expand Up @@ -313,7 +313,9 @@ func (s *Server) campaignLeader() {
// check expected primary and watch the primary.
exitPrimary := make(chan struct{})
lease, err := utils.KeepExpectedPrimaryAlive(ctx, s.GetClient(), exitPrimary,
s.cfg.LeaderLease, s.participant.GetLeaderPath(), s.participant.MemberValue(), constant.SchedulingServiceName)
s.cfg.LeaderLease, &keypath.MsParam{
ServiceName: constant.SchedulingServiceName,
}, s.participant.MemberValue())
if err != nil {
log.Error("prepare scheduling primary watch error", errs.ZapError(err))
return
Expand Down Expand Up @@ -460,7 +462,7 @@ func (s *Server) startServer() (err error) {
Id: uniqueID, // id is unique among all participants
ListenUrls: []string{s.cfg.GetAdvertiseListenAddr()},
}
s.participant.InitInfo(p, keypath.SchedulingSvcRootPath(), "primary election")
s.participant.InitInfo(p, "primary election")

s.service = &Service{Server: s}
s.AddServiceReadyCallback(s.startCluster)
Expand Down
52 changes: 21 additions & 31 deletions pkg/mcs/utils/expected_primary.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,9 @@ import (
"go.uber.org/zap"
)

// expectedPrimaryFlag is the flag to indicate the expected primary.
// 1. When the primary was campaigned successfully, it will set the `expected_primary` flag.
// 2. Using `{service}/primary/transfer` API will revoke the previous lease and set a new `expected_primary` flag.
// This flag used to help new primary to campaign successfully while other secondaries can skip the campaign.
const expectedPrimaryFlag = "expected_primary"

// expectedPrimaryPath formats the primary path with the expected primary flag.
func expectedPrimaryPath(primaryPath string) string {
return fmt.Sprintf("%s/%s", primaryPath, expectedPrimaryFlag)
}

// GetExpectedPrimaryFlag gets the expected primary flag.
func GetExpectedPrimaryFlag(client *clientv3.Client, primaryPath string) string {
path := expectedPrimaryPath(primaryPath)
func GetExpectedPrimaryFlag(client *clientv3.Client, msParam *keypath.MsParam) string {
path := keypath.ExpectedPrimaryPath(msParam)
primary, err := etcdutil.GetValue(client, path)
if err != nil {
log.Error("get expected primary flag error", errs.ZapError(err), zap.String("primary-path", path))
Expand All @@ -57,12 +46,12 @@ func GetExpectedPrimaryFlag(client *clientv3.Client, primaryPath string) string
}

// markExpectedPrimaryFlag marks the expected primary flag when the primary is specified.
func markExpectedPrimaryFlag(client *clientv3.Client, primaryPath string, leaderRaw string, leaseID clientv3.LeaseID) (int64, error) {
path := expectedPrimaryPath(primaryPath)
func markExpectedPrimaryFlag(client *clientv3.Client, msParam *keypath.MsParam, leaderRaw string, leaseID clientv3.LeaseID) (int64, error) {
path := keypath.ExpectedPrimaryPath(msParam)
log.Info("set expected primary flag", zap.String("primary-path", path), zap.String("leader-raw", leaderRaw))
// write a flag to indicate the expected primary.
resp, err := kv.NewSlowLogTxn(client).
Then(clientv3.OpPut(expectedPrimaryPath(primaryPath), leaderRaw, clientv3.WithLease(leaseID))).
Then(clientv3.OpPut(path, leaderRaw, clientv3.WithLease(leaseID))).
Commit()
if err != nil || !resp.Succeeded {
log.Error("mark expected primary error", errs.ZapError(err), zap.String("primary-path", path))
Expand All @@ -77,23 +66,29 @@ func markExpectedPrimaryFlag(client *clientv3.Client, primaryPath string, leader
// - changed by `{service}/primary/transfer` API.
// - leader lease expired.
// ONLY primary called this function.
func KeepExpectedPrimaryAlive(ctx context.Context, cli *clientv3.Client, exitPrimary chan<- struct{},
leaseTimeout int64, leaderPath, memberValue, service string) (*election.Lease, error) {
log.Info("primary start to watch the expected primary", zap.String("service", service), zap.String("primary-value", memberValue))
service = fmt.Sprintf("%s expected primary", service)
func KeepExpectedPrimaryAlive(
ctx context.Context,
cli *clientv3.Client,
exitPrimary chan<- struct{},
leaseTimeout int64,
msParam *keypath.MsParam,
memberValue string) (*election.Lease, error) {
log.Info("primary start to watch the expected primary",
zap.String("service", msParam.ServiceName), zap.String("primary-value", memberValue))
service := fmt.Sprintf("%s expected primary", msParam.ServiceName)
lease := election.NewLease(cli, service)
if err := lease.Grant(leaseTimeout); err != nil {
return nil, err
}

revision, err := markExpectedPrimaryFlag(cli, leaderPath, memberValue, lease.ID.Load().(clientv3.LeaseID))
revision, err := markExpectedPrimaryFlag(cli, msParam, memberValue, lease.ID.Load().(clientv3.LeaseID))
if err != nil {
log.Error("mark expected primary error", errs.ZapError(err))
return nil, err
}
// Keep alive the current expected primary leadership to indicate that the server is still alive.
// Watch the expected primary path to check whether the expected primary has changed by `{service}/primary/transfer` API.
expectedPrimary := election.NewLeadership(cli, expectedPrimaryPath(leaderPath), service)
expectedPrimary := election.NewLeadership(cli, keypath.ExpectedPrimaryPath(msParam), service)
expectedPrimary.SetLease(lease)
expectedPrimary.Keep(ctx)

Expand Down Expand Up @@ -165,15 +160,10 @@ func TransferPrimary(client *clientv3.Client, lease *election.Lease, serviceName
return errors.Errorf("failed to revoke current primary's lease: %v", err)
}

var primaryPath string
switch serviceName {
case constant.SchedulingServiceName:
primaryPath = keypath.SchedulingPrimaryPath()
case constant.TSOServiceName:
tsoRootPath := keypath.TSOSvcRootPath()
primaryPath = keypath.KeyspaceGroupPrimaryPath(tsoRootPath, keyspaceGroupID)
}
_, err = markExpectedPrimaryFlag(client, primaryPath, primaryIDs[nextPrimaryID], grantResp.ID)
_, err = markExpectedPrimaryFlag(client, &keypath.MsParam{
ServiceName: serviceName,
GroupID: keyspaceGroupID,
}, primaryIDs[nextPrimaryID], grantResp.ID)
if err != nil {
return errors.Errorf("failed to mark expected primary flag for %s, err: %v", serviceName, err)
}
Expand Down
26 changes: 9 additions & 17 deletions pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ package member

import (
"context"
"fmt"
"math/rand"
"os"
"path"
"path/filepath"
"strconv"
"strings"
Expand Down Expand Up @@ -52,11 +50,10 @@ type EmbeddedEtcdMember struct {
leadership *election.Leadership
leader atomic.Value // stored as *pdpb.Member
// etcd and cluster information.
etcd *embed.Etcd
client *clientv3.Client
id uint64 // etcd server id.
member *pdpb.Member // current PD's info.
rootPath string
etcd *embed.Etcd
client *clientv3.Client
id uint64 // etcd server id.
member *pdpb.Member // current PD's info.
// memberValue is the serialized string of `member`. It will be saved in
// etcd leader key when the PD node is successfully elected as the PD leader
// of the cluster. Every write will use it to check PD leadership.
Expand Down Expand Up @@ -330,7 +327,7 @@ func (m *EmbeddedEtcdMember) IsSameLeader(leader any) bool {
}

// InitMemberInfo initializes the member info.
func (m *EmbeddedEtcdMember) InitMemberInfo(advertiseClientUrls, advertisePeerUrls, name string, rootPath string) {
func (m *EmbeddedEtcdMember) InitMemberInfo(advertiseClientUrls, advertisePeerUrls, name string) {
leader := &pdpb.Member{
Name: name,
MemberId: m.ID(),
Expand All @@ -345,9 +342,8 @@ func (m *EmbeddedEtcdMember) InitMemberInfo(advertiseClientUrls, advertisePeerUr
}
m.member = leader
m.memberValue = string(data)
m.rootPath = rootPath
m.leadership = election.NewLeadership(m.client, m.GetLeaderPath(), "leader election")
log.Info("member joining election", zap.Stringer("member-info", m.member), zap.String("root-path", m.rootPath))
log.Info("member joining election", zap.Stringer("member-info", m.member))
}

// ResignEtcdLeader resigns current PD's etcd leadership. If nextLeader is empty, all
Expand Down Expand Up @@ -379,13 +375,9 @@ func (m *EmbeddedEtcdMember) ResignEtcdLeader(ctx context.Context, from string,
return m.MoveEtcdLeader(ctx, m.ID(), nextEtcdLeaderID)
}

func (m *EmbeddedEtcdMember) getMemberLeaderPriorityPath(id uint64) string {
return path.Join(m.rootPath, fmt.Sprintf("member/%d/leader_priority", id))
}

// SetMemberLeaderPriority saves a member's priority to be elected as the etcd leader.
func (m *EmbeddedEtcdMember) SetMemberLeaderPriority(id uint64, priority int) error {
key := m.getMemberLeaderPriorityPath(id)
key := keypath.MemberLeaderPriorityPath(id)
res, err := m.leadership.LeaderTxn().Then(clientv3.OpPut(key, strconv.Itoa(priority))).Commit()
if err != nil {
return errs.ErrEtcdTxnInternal.Wrap(err).GenWithStackByCause()
Expand All @@ -399,7 +391,7 @@ func (m *EmbeddedEtcdMember) SetMemberLeaderPriority(id uint64, priority int) er

// DeleteMemberLeaderPriority removes a member's etcd leader priority config.
func (m *EmbeddedEtcdMember) DeleteMemberLeaderPriority(id uint64) error {
key := m.getMemberLeaderPriorityPath(id)
key := keypath.MemberLeaderPriorityPath(id)
res, err := m.leadership.LeaderTxn().Then(clientv3.OpDelete(key)).Commit()
if err != nil {
return errs.ErrEtcdTxnInternal.Wrap(err).GenWithStackByCause()
Expand All @@ -413,7 +405,7 @@ func (m *EmbeddedEtcdMember) DeleteMemberLeaderPriority(id uint64) error {

// GetMemberLeaderPriority loads a member's priority to be elected as the etcd leader.
func (m *EmbeddedEtcdMember) GetMemberLeaderPriority(id uint64) (int, error) {
key := m.getMemberLeaderPriorityPath(id)
key := keypath.MemberLeaderPriorityPath(id)
res, err := etcdutil.EtcdKVGet(m.client, key)
if err != nil {
return 0, err
Expand Down
10 changes: 4 additions & 6 deletions pkg/member/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,9 @@ type Participant struct {
keypath.MsParam
leadership *election.Leadership
// stored as member type
leader atomic.Value
client *clientv3.Client
rootPath string
member participant
leader atomic.Value
client *clientv3.Client
member participant
// memberValue is the serialized string of `member`. It will be saved in the
// leader key when this participant is successfully elected as the leader of
// the group. Every write will use it to check the leadership.
Expand All @@ -76,15 +75,14 @@ func NewParticipant(client *clientv3.Client, msParam keypath.MsParam) *Participa
}

// InitInfo initializes the member info.
func (m *Participant) InitInfo(p participant, rootPath string, purpose string) {
func (m *Participant) InitInfo(p participant, purpose string) {
data, err := p.Marshal()
if err != nil {
// can't fail, so panic here.
log.Fatal("marshal leader meet error", zap.String("member-name", p.String()), errs.ZapError(errs.ErrMarshalLeader, err))
}
m.member = p
m.memberValue = string(data)
m.rootPath = rootPath
m.leadership = election.NewLeadership(m.client, m.GetLeaderPath(), purpose)
m.lastLeaderUpdatedTime.Store(time.Now())
log.Info("participant joining election", zap.String("participant-info", p.String()), zap.String("leader-path", m.GetLeaderPath()))
Expand Down
10 changes: 8 additions & 2 deletions pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,10 @@ func (gta *GlobalTSOAllocator) primaryElectionLoop() {
}

// To make sure the expected primary(if existed) and new primary are on the same server.
expectedPrimary := mcsutils.GetExpectedPrimaryFlag(gta.member.Client(), gta.member.GetLeaderPath())
expectedPrimary := mcsutils.GetExpectedPrimaryFlag(gta.member.Client(), &keypath.MsParam{
ServiceName: constant.TSOServiceName,
GroupID: gta.getGroupID(),
})
// skip campaign the primary if the expected primary is not empty and not equal to the current memberValue.
// expected primary ONLY SET BY `{service}/primary/transfer` API.
if len(expectedPrimary) > 0 && !strings.Contains(gta.member.MemberValue(), expectedPrimary) {
Expand Down Expand Up @@ -284,7 +287,10 @@ func (gta *GlobalTSOAllocator) campaignLeader() {
// check expected primary and watch the primary.
exitPrimary := make(chan struct{})
lease, err := mcsutils.KeepExpectedPrimaryAlive(ctx, gta.member.Client(), exitPrimary,
gta.am.leaderLease, gta.member.GetLeaderPath(), gta.member.MemberValue(), constant.TSOServiceName)
gta.am.leaderLease, &keypath.MsParam{
ServiceName: constant.TSOServiceName,
GroupID: gta.getGroupID(),
}, gta.member.MemberValue())
if err != nil {
log.Error("prepare tso primary watch error", errs.ZapError(err))
return
Expand Down
7 changes: 5 additions & 2 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
Id: uniqueID, // id is unique among all participants
ListenUrls: []string{kgm.cfg.GetAdvertiseListenAddr()},
}
participant.InitInfo(p, keypath.KeyspaceGroupsElectionPath(kgm.tsoSvcRootPath, group.ID), "keyspace group primary election")
participant.InitInfo(p, "keyspace group primary election")
// If the keyspace group is in split, we should ensure that the primary elected by the new keyspace group
// is always on the same TSO Server node as the primary of the old keyspace group, and this constraint cannot
// be broken until the entire split process is completed.
Expand Down Expand Up @@ -1341,7 +1341,10 @@ mergeLoop:
// Check if the keyspace group primaries in the merge map are all gone.
if len(mergeMap) != 0 {
for id := range mergeMap {
leaderPath := keypath.KeyspaceGroupPrimaryPath(kgm.tsoSvcRootPath, id)
leaderPath := keypath.LeaderPath(&keypath.MsParam{
ServiceName: constant.TSOServiceName,
GroupID: id,
})
val, err := kgm.tsoSvcStorage.Load(leaderPath)
if err != nil {
log.Error("failed to check if the keyspace group primary in the merge list has gone",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,31 @@ package keypath
import (
"fmt"
"path"

"github.com/tikv/pd/pkg/mcs/utils/constant"
)

// Leader and primary are the same thing in this context.
const (
leaderPathFormat = "/pd/%d/leader" // "/pd/{cluster_id}/leader"
memberBinaryDeployPathFormat = "/pd/%d/member/%d/deploy_path" // "/pd/{cluster_id}/member/{member_id}/deploy_path"
memberGitHashPath = "/pd/%d/member/%d/git_hash" // "/pd/{cluster_id}/member/{member_id}/git_hash"
memberBinaryVersionPathFormat = "/pd/%d/member/%d/binary_version" // "/pd/{cluster_id}/member/{member_id}/binary_version"
allocIDPathFormat = "/pd/%d/alloc_id" // "/pd/{cluster_id}/alloc_id"
keyspaceAllocIDPathFormat = "/pd/%d/keyspaces/alloc_id" // "/pd/{cluster_id}/keyspaces/alloc_id"
leaderPathFormat = "/pd/%d/leader" // "/pd/{cluster_id}/leader"
memberBinaryDeployPathFormat = "/pd/%d/member/%d/deploy_path" // "/pd/{cluster_id}/member/{member_id}/deploy_path"
memberGitHashPath = "/pd/%d/member/%d/git_hash" // "/pd/{cluster_id}/member/{member_id}/git_hash"
memberBinaryVersionPathFormat = "/pd/%d/member/%d/binary_version" // "/pd/{cluster_id}/member/{member_id}/binary_version"
allocIDPathFormat = "/pd/%d/alloc_id" // "/pd/{cluster_id}/alloc_id"
keyspaceAllocIDPathFormat = "/pd/%d/keyspaces/alloc_id" // "/pd/{cluster_id}/keyspaces/alloc_id"
kemberLeaderPriorityPathFormat = "/pd/%d/member/%d/leader_priority" // "/pd/{cluster_id}/member/{member_id}/leader_priority"

msLeaderPathFormat = "/ms/%d/%s/primary" // "/ms/{cluster_id}/{service_name}/primary"
msTsoDefaultLeaderPathFormat = "/ms/%d/tso/00000/primary" // "/ms/{cluster_id}/tso/00000/primary"
msTsoKespaceLeaderPathFormat = "/ms/%d/tso/keyspace_groups/election/%05d/primary" // "/ms/{cluster_id}/tso/keyspace_groups/election/{group_id}/primary"

// `expected_primary` is the flag to indicate the expected primary/leader.
// 1. When the leader was campaigned successfully, it will set the `expected_primary` flag.
// 2. Using `{service}/primary/transfer` API will revoke the previous lease and set a new `expected_primary` flag.
// This flag used to help new primary to campaign successfully while other secondaries can skip the campaign.
msExpectedLeaderPathFormat = "/ms/%d/%s/primary/expected_primary" // "/ms/{cluster_id}/{service_name}/primary/expected_primary"
msTsoDefaultExpectedLeaderPathFormat = "/ms/%d/tso/00000/primary/expected_primary" // "/ms/{cluster_id}/tso/00000/primary"
msTsoKespaceExpectedLeaderPathFormat = "/ms/%d/tso/keyspace_groups/election/%05d/primary/expected_primary" // "/ms/{cluster_id}/tso/keyspace_groups/election/{group_id}/primary"
)

// MsParam is the parameter of micro service.
Expand All @@ -48,7 +60,7 @@ func LeaderPath(p *MsParam) string {
if p == nil || p.ServiceName == "" {
return fmt.Sprintf(leaderPathFormat, ClusterID())
}
if p.ServiceName == "tso" {
if p.ServiceName == constant.TSOServiceName {
if p.GroupID == 0 {
return fmt.Sprintf(msTsoDefaultLeaderPathFormat, ClusterID())
}
Expand All @@ -57,6 +69,17 @@ func LeaderPath(p *MsParam) string {
return fmt.Sprintf(msLeaderPathFormat, ClusterID(), p.ServiceName)
}

// ExpectedPrimaryPath returns the expected_primary path.
func ExpectedPrimaryPath(p *MsParam) string {
if p.ServiceName == constant.TSOServiceName {
if p.GroupID == 0 {
return fmt.Sprintf(msTsoDefaultExpectedLeaderPathFormat, ClusterID())
}
return fmt.Sprintf(msTsoKespaceExpectedLeaderPathFormat, ClusterID(), p.GroupID)
}
return fmt.Sprintf(msExpectedLeaderPathFormat, ClusterID(), p.ServiceName)
}

// MemberBinaryDeployPath returns the member binary deploy path.
func MemberBinaryDeployPath(id uint64) string {
return fmt.Sprintf(memberBinaryDeployPathFormat, ClusterID(), id)
Expand All @@ -81,3 +104,8 @@ func AllocIDPath() string {
func KeyspaceAllocIDPath() string {
return fmt.Sprintf(keyspaceAllocIDPathFormat, ClusterID())
}

// MemberLeaderPriorityPath returns the member leader priority path.
func MemberLeaderPriorityPath(id uint64) string {
return fmt.Sprintf(kemberLeaderPriorityPathFormat, ClusterID(), id)
}
Loading

0 comments on commit 3cfd66f

Please sign in to comment.