Skip to content

Commit

Permalink
only trigger by updating
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <jinhao.hu@pingcap.com>
  • Loading branch information
HuSharp committed May 9, 2024
1 parent af995cc commit 8d36be5
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 61 deletions.
6 changes: 3 additions & 3 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ func (ls *Leadership) SetLeaderWatch(val bool) {
ls.leaderWatch.Store(val)
}

// GetLeaderWatch gets the leader watch flag.
func (ls *Leadership) GetLeaderWatch() bool {
// IsLeader gets the leader watch flag.
func (ls *Leadership) IsLeader() bool {
return ls.leaderWatch.Load()
}

Expand Down Expand Up @@ -392,7 +392,7 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
return
}
// only API update the leader key to transfer the leader will meet
if ev.Type == mvccpb.PUT && ls.GetLeaderWatch() {
if ev.Type == mvccpb.PUT && ls.IsLeader() {
log.Info("[LeaderWatch] current leadership is updated",
zap.Int64("revision", wresp.Header.Revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose))
return
Expand Down
39 changes: 18 additions & 21 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (s *Server) primaryElectionLoop() {
}

// To make sure the expected leader(if exist) and primary are on the same server.
expectedPrimary := utils.GetExpectedPrimary(s.participant.GetLeaderPath(), s.GetClient())
expectedPrimary := utils.GetExpectedPrimary(s.GetClient(), s.participant.GetLeaderPath())
if expectedPrimary != "" && expectedPrimary != s.participant.MemberValue() {
log.Info("skip campaigning of scheduling primary and check later",
zap.String("server-name", s.Name()),
Expand Down Expand Up @@ -332,39 +332,36 @@ func (s *Server) campaignLeader() {
}

func (s *Server) primaryWatch(ctx context.Context, exitPrimary chan struct{}) {
_, revision, err := s.participant.GetPersistentLeader()
if err != nil {
resp, err := etcdutil.EtcdKVGet(s.participant.GetLeadership().GetClient(), s.participant.GetLeaderPath())
if err != nil || resp == nil || len(resp.Kvs) == 0 {
log.Error("[primary] getting the leader meets error", errs.ZapError(err))
return

Check warning on line 338 in pkg/mcs/scheduling/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/server.go#L337-L338

Added lines #L337 - L338 were not covered by tests
}
log.Info("[primary] start to watch the primary", zap.Stringer("scheduling-primary", s.participant.GetLeader()))
// Watch will keep looping and never return unless the primary has changed.
s.participant.GetLeadership().SetLeaderWatch(true)
s.participant.GetLeadership().Watch(s.serverLoopCtx, revision+1)
s.participant.GetLeadership().Watch(s.serverLoopCtx, resp.Kvs[0].ModRevision+1)
s.participant.GetLeadership().SetLeaderWatch(false)

// only API update primary will set the expected leader
// check leader key whether deleted
leaderRaw, err := etcdutil.GetValue(s.participant.Client(), s.participant.GetLeaderPath())
curPrimary, err := etcdutil.GetValue(s.participant.Client(), s.participant.GetLeaderPath())
if err != nil {
log.Error("[primary] get primary key error", zap.Error(err))
return
}
if leaderRaw == nil {
log.Info("[primary] leader key is deleted, the primary will step down")
log.Error("[primary] getting the leader meets error", errs.ZapError(err))
return
}
// only trigger by updating primary
if curPrimary != nil && resp.Kvs[0].Value != nil && string(curPrimary) != string(resp.Kvs[0].Value) {
utils.SetExpectedPrimary(s.participant.Client(), s.participant.GetLeaderPath())

utils.SetExpectedPrimary(s.participant.Client(), s.participant.GetLeaderPath())

s.participant.UnsetLeader()
defer log.Info("[primary] exit the primary watch loop")
for {
select {
case <-ctx.Done():
return
case exitPrimary <- struct{}{}:
return
s.participant.UnsetLeader()
defer log.Info("[primary] exit the primary watch loop")
for {
select {
case <-ctx.Done():
return

Check warning on line 361 in pkg/mcs/scheduling/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/server.go#L360-L361

Added lines #L360 - L361 were not covered by tests
case exitPrimary <- struct{}{}:
return
}
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/mcs/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err
}

// GetExpectedPrimary indicates API has changed the primary, ONLY SET VALUE BY API.
func GetExpectedPrimary(keyPath string, client *clientv3.Client) string {
leader, err := etcdutil.GetValue(client, strings.Join([]string{keyPath, ExpectedPrimary}, "/"))
func GetExpectedPrimary(client *clientv3.Client, leaderPath string) string {
leader, err := etcdutil.GetValue(client, strings.Join([]string{leaderPath, ExpectedPrimary}, "/"))
if err != nil {
log.Error("get expected primary key error", errs.ZapError(err))
return ""

Check warning on line 82 in pkg/mcs/utils/util.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/utils/util.go#L81-L82

Added lines #L81 - L82 were not covered by tests
Expand All @@ -86,7 +86,10 @@ func GetExpectedPrimary(keyPath string, client *clientv3.Client) string {
}

// RemoveExpectedPrimary removes the expected primary key.
// - removed when campaign success
// - removed when server is closed
func RemoveExpectedPrimary(client *clientv3.Client, leaderPath string) {
log.Info("remove expected primary key", zap.String("leaderPath", leaderPath))
// remove expected leader key
resp, err := kv.NewSlowLogTxn(client).
Then(clientv3.OpDelete(strings.Join([]string{leaderPath, ExpectedPrimary}, "/"))).
Expand All @@ -99,6 +102,7 @@ func RemoveExpectedPrimary(client *clientv3.Client, leaderPath string) {

// SetExpectedPrimary sets the expected primary key when the current primary has exited.
func SetExpectedPrimary(client *clientv3.Client, leaderPath string) {
log.Info("set expected primary key", zap.String("leaderPath", leaderPath))
leaderRaw, err := etcdutil.GetValue(client, leaderPath)
if err != nil {
log.Error("[primary] get primary key error", zap.Error(err))
Expand Down
10 changes: 5 additions & 5 deletions pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ func (m *EmbeddedEtcdMember) PreCheckLeader() error {
return nil
}

// GetPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func (m *EmbeddedEtcdMember) GetPersistentLeader() (any, int64, error) {
// getPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func (m *EmbeddedEtcdMember) getPersistentLeader() (*pdpb.Member, int64, error) {
leader := &pdpb.Member{}
ok, rev, err := etcdutil.GetProtoMsgWithModRev(m.client, m.GetLeaderPath(), leader)
if err != nil {
Expand All @@ -233,17 +233,17 @@ func (m *EmbeddedEtcdMember) CheckLeader() (ElectionLeader, bool) {
return nil, true
}

leaderRaw, revision, err := m.GetPersistentLeader()
leader, revision, err := m.getPersistentLeader()
if err != nil {
log.Error("getting pd leader meets error", errs.ZapError(err))
time.Sleep(200 * time.Millisecond)
return nil, true
}
if leaderRaw == nil {
if leader == nil {
// no leader yet
return nil, false
}
leader := leaderRaw.(*pdpb.Member)

if m.IsSameLeader(leader) {
// oh, we are already a PD leader, which indicates we may meet something wrong
// in previous CampaignLeader. We should delete the leadership and campaign again.
Expand Down
9 changes: 4 additions & 5 deletions pkg/member/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ func (*Participant) PreCheckLeader() error {
return nil
}

// GetPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func (m *Participant) GetPersistentLeader() (any, int64, error) {
// getPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func (m *Participant) getPersistentLeader() (participant, int64, error) {
leader := NewParticipantByService(m.serviceName)
ok, rev, err := etcdutil.GetProtoMsgWithModRev(m.client, m.GetLeaderPath(), leader)
if err != nil {
Expand All @@ -229,18 +229,17 @@ func (m *Participant) CheckLeader() (ElectionLeader, bool) {
return nil, true
}

leaderRaw, revision, err := m.GetPersistentLeader()
leader, revision, err := m.getPersistentLeader()
if err != nil {
log.Error("getting the leader meets error", errs.ZapError(err))
time.Sleep(200 * time.Millisecond)
return nil, true
}
if leaderRaw == nil {
if leader == nil {
// no leader yet
return nil, false
}

leader := leaderRaw.(participant)
if m.IsSameLeader(leader) {
// oh, we are already the leader, which indicates we may meet something wrong
// in previous CampaignLeader. We should delete the leadership and campaign again.
Expand Down
2 changes: 0 additions & 2 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,6 @@ type ElectionMember interface {
GetDCLocationPath(id uint64) string
// PreCheckLeader does some pre-check before checking whether it's the leader.
PreCheckLeader() error
// GetPersistentLeader returns the persistent leader.
GetPersistentLeader() (any, int64, error)
// UnsetLeader unsets the member's leader.
UnsetLeader()
}
Expand Down
39 changes: 18 additions & 21 deletions pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"errors"
"fmt"
"github.com/tikv/pd/pkg/utils/etcdutil"
"runtime/trace"
"sync"
"sync/atomic"
Expand All @@ -33,6 +32,7 @@ import (
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/pkg/utils/typeutil"
Expand Down Expand Up @@ -562,7 +562,7 @@ func (gta *GlobalTSOAllocator) primaryElectionLoop() {
}

// To make sure the expected leader(if exist) and primary are on the same server.
targetPrimary := mcsutils.GetExpectedPrimary(gta.member.GetLeaderPath(), gta.member.Client())
targetPrimary := mcsutils.GetExpectedPrimary(gta.member.Client(), gta.member.GetLeaderPath())
if targetPrimary != "" && targetPrimary != gta.member.MemberValue() {
log.Info("skip campaigning of scheduling primary and check later",
zap.String("server-name", gta.member.Name()),
Expand Down Expand Up @@ -675,8 +675,8 @@ func (gta *GlobalTSOAllocator) campaignLeader() {
}

func (gta *GlobalTSOAllocator) primaryWatch(ctx context.Context, exitPrimary chan struct{}) {
_, revision, err := gta.member.GetPersistentLeader()
if err != nil {
resp, err := etcdutil.EtcdKVGet(gta.member.GetLeadership().GetClient(), gta.member.GetLeaderPath())
if err != nil || resp == nil || len(resp.Kvs) == 0 {
log.Error("[primary] getting the leader meets error", errs.ZapError(err))
return
}
Expand All @@ -685,31 +685,28 @@ func (gta *GlobalTSOAllocator) primaryWatch(ctx context.Context, exitPrimary cha
zap.String("campaign-tso-primary-name", gta.member.Name()))
// Watch will keep looping and never return unless the primary has changed.
gta.member.GetLeadership().SetLeaderWatch(true)
gta.member.GetLeadership().Watch(gta.ctx, revision+1)
gta.member.GetLeadership().Watch(gta.ctx, resp.Kvs[0].ModRevision+1)
gta.member.GetLeadership().SetLeaderWatch(false)

// only API update primary will set the expected leader
// check leader key whether deleted
leaderRaw, err := etcdutil.GetValue(gta.member.Client(), gta.member.GetLeaderPath())
curPrimary, err := etcdutil.GetValue(gta.member.Client(), gta.member.GetLeaderPath())
if err != nil {
log.Error("[primary] get primary key error", zap.Error(err))
return
}
if leaderRaw == nil {
log.Info("[primary] leader key is deleted, the primary will step down")
log.Error("[primary] getting the leader meets error", errs.ZapError(err))
return
}

mcsutils.SetExpectedPrimary(gta.member.Client(), gta.member.GetLeaderPath())
if curPrimary != nil && resp.Kvs[0].Value != nil && string(curPrimary) != string(resp.Kvs[0].Value) {
mcsutils.SetExpectedPrimary(gta.member.Client(), gta.member.GetLeaderPath())

gta.member.UnsetLeader()
for {
select {
case <-ctx.Done():
log.Info("[primary] exit the primary watch loop")
return
case exitPrimary <- struct{}{}:
return
gta.member.UnsetLeader()
for {
select {
case <-ctx.Done():
log.Info("[primary] exit the primary watch loop")
return
case exitPrimary <- struct{}{}:
return
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions server/apiv2/handlers/micro_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func GetPrimary(c *gin.Context) {
// @Summary Transfer the primary member of the specified service.
// @Produce json
// @Param service path string true "service name"
// @Param new_primary query string false "new primary address"
// @Success 200 {object} string
// @Param new_primary body string false "new primary address"
// @Success 200 string string
// @Router /ms/primary/transfer/{service} [post]
func TransferPrimary(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
Expand Down

0 comments on commit 8d36be5

Please sign in to comment.