Skip to content

Commit

Permalink
member: return err when meet frequently campaign leader (#7566)
Browse files Browse the repository at this point in the history
close #7562

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
HuSharp and ti-chi-bot[bot] authored Dec 20, 2023
1 parent 84e60be commit a8aaae9
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 11 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,11 @@ error = '''
cannot set invalid configuration
'''

["PD:server:ErrLeaderFrequentlyChange"]
error = '''
leader %s frequently changed, leader-key is [%s]
'''

["PD:server:ErrLeaderNil"]
error = '''
leader is nil
Expand Down
11 changes: 11 additions & 0 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -156,6 +157,16 @@ func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...cl
lease: clientv3.NewLease(ls.client),
}
ls.setLease(newLease)

failpoint.Inject("skipGrantLeader", func(val failpoint.Value) {
var member pdpb.Member
member.Unmarshal([]byte(leaderData))
name, ok := val.(string)
if ok && member.Name == name {
failpoint.Return(errors.Errorf("failed to grant lease"))
}
})

if err := newLease.Grant(leaseTimeout); err != nil {
return err
}
Expand Down
17 changes: 9 additions & 8 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,15 @@ var (

// server errors
var (
ErrServiceRegistered = errors.Normalize("service with path [%s] already registered", errors.RFCCodeText("PD:server:ErrServiceRegistered"))
ErrAPIInformationInvalid = errors.Normalize("invalid api information, group %s version %s", errors.RFCCodeText("PD:server:ErrAPIInformationInvalid"))
ErrClientURLEmpty = errors.Normalize("client url empty", errors.RFCCodeText("PD:server:ErrClientEmpty"))
ErrLeaderNil = errors.Normalize("leader is nil", errors.RFCCodeText("PD:server:ErrLeaderNil"))
ErrCancelStartEtcd = errors.Normalize("etcd start canceled", errors.RFCCodeText("PD:server:ErrCancelStartEtcd"))
ErrConfigItem = errors.Normalize("cannot set invalid configuration", errors.RFCCodeText("PD:server:ErrConfiguration"))
ErrServerNotStarted = errors.Normalize("server not started", errors.RFCCodeText("PD:server:ErrServerNotStarted"))
ErrRateLimitExceeded = errors.Normalize("rate limit exceeded", errors.RFCCodeText("PD:server:ErrRateLimitExceeded"))
ErrServiceRegistered = errors.Normalize("service with path [%s] already registered", errors.RFCCodeText("PD:server:ErrServiceRegistered"))
ErrAPIInformationInvalid = errors.Normalize("invalid api information, group %s version %s", errors.RFCCodeText("PD:server:ErrAPIInformationInvalid"))
ErrClientURLEmpty = errors.Normalize("client url empty", errors.RFCCodeText("PD:server:ErrClientEmpty"))
ErrLeaderNil = errors.Normalize("leader is nil", errors.RFCCodeText("PD:server:ErrLeaderNil"))
ErrCancelStartEtcd = errors.Normalize("etcd start canceled", errors.RFCCodeText("PD:server:ErrCancelStartEtcd"))
ErrConfigItem = errors.Normalize("cannot set invalid configuration", errors.RFCCodeText("PD:server:ErrConfiguration"))
ErrServerNotStarted = errors.Normalize("server not started", errors.RFCCodeText("PD:server:ErrServerNotStarted"))
ErrRateLimitExceeded = errors.Normalize("rate limit exceeded", errors.RFCCodeText("PD:server:ErrRateLimitExceeded"))
ErrLeaderFrequentlyChange = errors.Normalize("leader %s frequently changed, leader-key is [%s]", errors.RFCCodeText("PD:server:ErrLeaderFrequentlyChange"))
)

// logutil errors
Expand Down
9 changes: 6 additions & 3 deletions pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,15 @@ func (m *EmbeddedEtcdMember) CampaignLeader(ctx context.Context, leaseTimeout in
failpoint.Inject("skipCampaignLeaderCheck", func() {
failpoint.Return(m.leadership.Campaign(leaseTimeout, m.MemberValue()))
})

if m.leadership.GetCampaignTimesNum() >= campaignLeaderFrequencyTimes {
log.Warn("campaign times is too frequent, resign and campaign again",
zap.String("leader-name", m.Name()), zap.String("leader-key", m.GetLeaderPath()))
m.leadership.ResetCampaignTimes()
return m.ResignEtcdLeader(ctx, m.Name(), "")
if err := m.ResignEtcdLeader(ctx, m.Name(), ""); err != nil {
return err
}
return errs.ErrLeaderFrequentlyChange.FastGenByArgs(m.Name(), m.GetLeaderPath())
}

return m.leadership.Campaign(leaseTimeout, m.MemberValue())
}

Expand Down
25 changes: 25 additions & 0 deletions tests/server/member/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,31 @@ func TestCampaignLeaderFrequently(t *testing.T) {
re.NotEqual(leader, cluster.GetLeader())
}

func TestGrantLeaseFailed(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 5)
defer cluster.Destroy()
re.NoError(err)

err = cluster.RunInitialServers()
re.NoError(err)
cluster.WaitLeader()
leader := cluster.GetLeader()
re.NotEmpty(cluster.GetLeader())
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/election/skipGrantLeader", fmt.Sprintf("return(\"%s\")", leader)))

for i := 0; i < 3; i++ {
cluster.GetLeaderServer().ResetPDLeader()
cluster.WaitLeader()
}
// PD leader should be different from before because etcd leader changed.
re.NotEmpty(cluster.GetLeader())
re.NotEqual(leader, cluster.GetLeader())
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/election/skipGrantLeader"))
}

func TestGetLeader(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down

0 comments on commit a8aaae9

Please sign in to comment.