diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 17e6967c5ae..16dec9426a9 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -358,6 +358,7 @@ func (c *RaftCluster) Stop() { c.coordinator.stop() c.Unlock() c.wg.Wait() + log.Info("raftcluster is stopped") } // IsRunning return if the cluster is running. diff --git a/server/server.go b/server/server.go index 86c09576080..73fc6c5c660 100644 --- a/server/server.go +++ b/server/server.go @@ -603,6 +603,7 @@ func (s *Server) createRaftCluster() error { } func (s *Server) stopRaftCluster() { + failpoint.Inject("raftclusterIsBusy", func() {}) s.cluster.Stop() } @@ -1114,8 +1115,11 @@ func (s *Server) leaderLoop() { func (s *Server) campaignLeader() { log.Info("start to campaign leader", zap.String("campaign-leader-name", s.Name())) + var resetLeaderOnce sync.Once lease := member.NewLeaderLease(s.client) - defer lease.Close() + defer resetLeaderOnce.Do(func() { + lease.Close() + }) if err := s.member.CampaignLeader(lease, s.cfg.LeaderLease); err != nil { log.Error("campaign leader meet error", errs.ZapError(err)) return @@ -1127,7 +1131,11 @@ func (s *Server) campaignLeader() { // 2. load region could be slow. Based on lease we can recover TSO service faster. ctx, cancel := context.WithCancel(s.serverLoopCtx) - defer cancel() + + defer resetLeaderOnce.Do(func() { + cancel() + lease.Close() + }) go lease.KeepAlive(ctx) s.SetLease(lease) defer s.SetLease(nil) @@ -1160,8 +1168,15 @@ func (s *Server) campaignLeader() { log.Error("failed to sync id from etcd", errs.ZapError(err)) return } + // EnableLeader to accept the remaining service, such as GetStore, GetRegion. s.member.EnableLeader() - defer s.member.DisableLeader() + defer resetLeaderOnce.Do(func() { + // as soon as cancel the leadership keepalive, then other member have chance + // to be new leader. + cancel() + lease.Close() + s.member.DisableLeader() + }) CheckPDVersion(s.persistOptions) log.Info("PD cluster leader is ready to serve", zap.String("leader-name", s.Name())) diff --git a/tests/server/member/member_test.go b/tests/server/member/member_test.go index 2e5462ee9ae..eba25cc95a4 100644 --- a/tests/server/member/member_test.go +++ b/tests/server/member/member_test.go @@ -26,6 +26,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/tikv/pd/pkg/testutil" "github.com/tikv/pd/server" @@ -221,12 +222,31 @@ func (s *serverTestSuite) TestLeaderResign(c *C) { c.Assert(leader3, Equals, leader1) } +func (s *serverTestSuite) TestLeaderResignWithBlock(c *C) { + cluster, err := tests.NewTestCluster(s.ctx, 3) + defer cluster.Destroy() + c.Assert(err, IsNil) + + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + + leader1 := cluster.WaitLeader() + addr1 := cluster.GetServer(leader1).GetConfig().ClientUrls + + err = failpoint.Enable("github.com/tikv/pd/server/raftclusterIsBusy", `pause`) + c.Assert(err, IsNil) + defer failpoint.Disable("github.com/tikv/pd/server/raftclusterIsBusy") + s.post(c, addr1+"/pd/api/v1/leader/resign", "") + leader2 := s.waitLeaderChange(c, cluster, leader1) + c.Log("leader2:", leader2) + c.Assert(leader2, Not(Equals), leader1) +} + func (s *serverTestSuite) waitLeaderChange(c *C, cluster *tests.TestCluster, old string) string { var leader string testutil.WaitUntil(c, func(c *C) bool { leader = cluster.GetLeader() if leader == old || leader == "" { - time.Sleep(time.Second) return false } return true