Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changing interaction between Leave and RemovePeers #621

Merged
merged 4 commits into from
Jan 22, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions consul/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,9 +469,11 @@ func (s *Server) handleReapMember(member serf.Member) error {
// handleDeregisterMember is used to deregister a member of a given reason
func (s *Server) handleDeregisterMember(reason string, member serf.Member) error {
state := s.fsm.State()
// Check if the node does not exists
_, found, _ := state.GetNode(member.Name)
if !found {
// Do not deregister ourself. This can only happen if the current leader
// is leaving. Instead, we should allow a follower to take-over and
// deregister us later.
if member.Name == s.config.NodeName {
s.logger.Printf("[WARN] consul: deregistering self (%s) should be done by follower", s.config.NodeName)
return nil
}

Expand All @@ -483,11 +485,9 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error
}
}

// Do not deregister ourself. This can only happen if the current leader
// is leaving. Instead, we should allow a follower to take-over and
// deregister us later.
if member.Name == s.config.NodeName {
s.logger.Printf("[WARN] consul: deregistering self (%s) should be done by follower", s.config.NodeName)
// Check if the node does not exists
_, found, _ := state.GetNode(member.Name)
if !found {
return nil
}

Expand Down
61 changes: 61 additions & 0 deletions consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ const (
// raftLogCacheSize is the maximum number of logs to cache in-memory.
// This is used to reduce disk I/O for the recently commited entries.
raftLogCacheSize = 512

// raftRemoveGracePeriod is how long we wait to allow a RemovePeer
// to replicate to gracefully leave the cluster.
raftRemoveGracePeriod = 5 * time.Second
)

// Server is Consul server which manages the service discovery,
Expand Down Expand Up @@ -521,6 +525,25 @@ func (s *Server) Leave() error {
s.logger.Printf("[INFO] consul: server starting leave")
s.left = true

// Check the number of known peers
numPeers, err := s.numOtherPeers()
if err != nil {
s.logger.Printf("[ERR] consul: failed to check raft peers: %v", err)
return err
}

// If we are the current leader, and we have any other peers (cluster has multiple
// servers), we should do a RemovePeer to safely reduce the quorum size. If we are
// not the leader, then we should issue our leave intention and wait to be removed
// for some sane period of time.
isLeader := s.IsLeader()
if isLeader && numPeers > 0 {
future := s.raft.RemovePeer(s.raftTransport.LocalAddr())
if err := future.Error(); err != nil && err != raft.ErrUnknownPeer {
s.logger.Printf("[ERR] consul: failed to remove ourself as raft peer: %v", err)
}
}

// Leave the WAN pool
if s.serfWAN != nil {
if err := s.serfWAN.Leave(); err != nil {
Expand All @@ -534,9 +557,47 @@ func (s *Server) Leave() error {
s.logger.Printf("[ERR] consul: failed to leave LAN Serf cluster: %v", err)
}
}

// If we were not leader, wait to be safely removed from the cluster.
// We must wait to allow the raft replication to take place, otherwise
// an immediate shutdown could cause a loss of quorum.
if !isLeader {
limit := time.Now().Add(raftRemoveGracePeriod)
for numPeers > 0 && time.Now().Before(limit) {
// Update the number of peers
numPeers, err = s.numOtherPeers()
if err != nil {
s.logger.Printf("[ERR] consul: failed to check raft peers: %v", err)
break
}

// Avoid the sleep if we are done
if numPeers == 0 {
break
}

// Sleep a while and check again
time.Sleep(50 * time.Millisecond)
}
if numPeers != 0 {
s.logger.Printf("[WARN] consul: failed to leave raft peer set gracefully, timeout")
}
}

return nil
}

// numOtherPeers is used to check on the number of known peers
// excluding the local ndoe
func (s *Server) numOtherPeers() (int, error) {
peers, err := s.raftPeers.Peers()
if err != nil {
return 0, err
}
otherPeers := raft.ExcludePeer(peers, s.raftTransport.LocalAddr())
return len(otherPeers), nil
}

// JoinLAN is used to have Consul join the inner-DC pool
// The target address should be another node inside the DC
// listening on the Serf LAN address
Expand Down
74 changes: 68 additions & 6 deletions consul/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func TestServer_JoinWAN(t *testing.T) {
})
}

func TestServer_Leave(t *testing.T) {
func TestServer_LeaveLeader(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
Expand Down Expand Up @@ -250,18 +250,80 @@ func TestServer_Leave(t *testing.T) {
t.Fatalf("should have 2 peers: %v", err)
})

// Issue a leave
if err := s2.Leave(); err != nil {
t.Fatalf("err: %v", err)
// Issue a leave to the leader
for _, s := range []*Server{s1, s2} {
if !s.IsLeader() {
continue
}
if err := s.Leave(); err != nil {
t.Fatalf("err: %v", err)
}
}

// Should lose a peer
for _, s := range []*Server{s1, s2} {
testutil.WaitForResult(func() (bool, error) {
p1, _ = s.raftPeers.Peers()
return len(p1) == 1, nil
}, func(err error) {
t.Fatalf("should have 1 peer: %v", p1)
})
}
}

func TestServer_Leave(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()

// Second server not in bootstrap mode
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()

// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}

var p1 []net.Addr
var p2 []net.Addr

testutil.WaitForResult(func() (bool, error) {
p1, _ = s1.raftPeers.Peers()
return len(p1) == 1, nil
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
}, func(err error) {
t.Fatalf("should have 1 peer: %v", p1)
t.Fatalf("should have 2 peers: %v", err)
})

testutil.WaitForResult(func() (bool, error) {
p2, _ = s2.raftPeers.Peers()
return len(p2) == 2, errors.New(fmt.Sprintf("%v", p1))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})

// Issue a leave to the non-leader
for _, s := range []*Server{s1, s2} {
if s.IsLeader() {
continue
}
if err := s.Leave(); err != nil {
t.Fatalf("err: %v", err)
}
}

// Should lose a peer
for _, s := range []*Server{s1, s2} {
testutil.WaitForResult(func() (bool, error) {
p1, _ = s.raftPeers.Peers()
return len(p1) == 1, nil
}, func(err error) {
t.Fatalf("should have 1 peer: %v", p1)
})
}
}

func TestServer_RPC(t *testing.T) {
Expand Down