Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ForceLeave Prune #580

Merged
merged 4 commits into from
Oct 2, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 29 additions & 7 deletions serf/serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,10 @@ func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool {
case StatusAlive:
member.Status = StatusLeaving
member.statusLTime = leaveMsg.LTime

if leaveMsg.Prune {
s.handlePrune(member)
}
return true
case StatusFailed:
member.Status = StatusLeft
Expand All @@ -1127,26 +1131,44 @@ func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool {
}

if leaveMsg.Prune {
s.logger.Printf("[INFO] serf: EventMemberReap (forced): %s %s", member.Name, member.Member.Addr)
s.leftMembers = removeOldMember(s.leftMembers, member.Name)
s.eraseNode(member)
s.handlePrune(member)
}

return true

case StatusLeft:
schristoff marked this conversation as resolved.
Show resolved Hide resolved
case StatusLeaving:
schristoff marked this conversation as resolved.
Show resolved Hide resolved
if leaveMsg.Prune {
s.logger.Printf("[INFO] serf: EventMemberReap (forced): %s %s", member.Name, member.Member.Addr)
s.leftMembers = removeOldMember(s.leftMembers, member.Name)
s.eraseNode(member)
s.handlePrune(member)
}
return true

case StatusLeft:
if leaveMsg.Prune {
s.handlePrune(member)
}
return true
default:
return false
}
}

// handlePrune waits for nodes that are leaving and then forcibly
// erases a member from the list of members
func (s *Serf) handlePrune(member *memberState) {
if member.Status == StatusLeaving {
time.Sleep(s.config.BroadcastTimeout + s.config.LeavePropagateDelay)
}

s.logger.Printf("[INFO] serf: EventMemberReap (forced): %s %s", member.Name, member.Member.Addr)

//If we are leaving or left we may be in that list of members
if member.Status == StatusLeaving || member.Status == StatusLeft {
s.leftMembers = removeOldMember(s.leftMembers, member.Name)
}
s.eraseNode(member)

}

// handleNodeJoinIntent is called when a node broadcasts a
// join message to set the lamport time of its join
func (s *Serf) handleNodeJoinIntent(joinMsg *messageJoin) bool {
Expand Down
223 changes: 209 additions & 14 deletions serf/serf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/hashicorp/memberlist"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/testutil"
"github.com/hashicorp/serf/testutil/retry"
)

func testConfig() *Config {
Expand Down Expand Up @@ -65,6 +66,27 @@ func testMember(t *testing.T, members []Member, name string, status MemberStatus
panic(fmt.Sprintf("node not found: %s", name))
}

// testMemberStatus is testMember but returns an error
// instead of failing the test
func testMemberStatus(members []Member, name string, status MemberStatus) error {
for _, m := range members {
if m.Name == name {
if m.Status != status {
return fmt.Errorf("bad state for %s: %d", name, m.Status)
}
return nil
}
}

if status == StatusNone {
// We didn't expect to find it
return nil
}

return fmt.Errorf("node not found: %s", name)

}

func TestCreate_badProtocolVersion(t *testing.T) {
cases := []struct {
version uint8
Expand Down Expand Up @@ -500,7 +522,7 @@ func TestSerf_leaveRejoinDifferentRole(t *testing.T) {
t.Fatalf("s1 members: %d", len(s1.Members()))
}

var member *Member = nil
var member *Member
for _, m := range members {
if m.Name == s3Config.NodeName {
member = &m
Expand All @@ -517,6 +539,179 @@ func TestSerf_leaveRejoinDifferentRole(t *testing.T) {
}
}

func TestSerf_forceLeaveAlive(t *testing.T) {
s1Config := testConfig()
s2Config := testConfig()

s1, err := Create(s1Config)
if err != nil {
t.Fatalf("err: %s", err)
}
defer s1.Shutdown()

s2, err := Create(s2Config)
if err != nil {
t.Fatalf("err: %s", err)
}
defer s2.Shutdown()

_, err = s1.Join([]string{s2Config.MemberlistConfig.BindAddr}, false)
if err != nil {
t.Fatalf("err: %s", err)
}

// s1 should be alive when we call the force leave
// but not refute it
s2.forceLeave(s1.config.NodeName, true)
schristoff marked this conversation as resolved.
Show resolved Hide resolved

//double check we are alive
retry.Run(t, func(r *retry.R) {

if err := testMemberStatus(s1.Members(), s1Config.NodeName, StatusAlive); err != nil {
r.Fatal(err)
}
})
s1.Leave()

memberlen := len(s2.Members())
if memberlen != 1 {
t.Fatalf("wanted 1, got %v", s2.Members())
}

}

func TestSerf_forceLeaveFailed(t *testing.T) {
s1Config := testConfig()
s2Config := testConfig()

s1, err := Create(s1Config)
if err != nil {
t.Fatalf("err: %s", err)
}
defer s1.Shutdown()

s2, err := Create(s2Config)
if err != nil {
t.Fatalf("err: %s", err)
}

defer s2.Shutdown()
_, err = s1.Join([]string{s2Config.MemberlistConfig.BindAddr}, false)
if err != nil {
t.Fatalf("err: %s", err)
}

//Put s2 in failed state
s2.Shutdown()

retry.Run(t, func(r *retry.R) {
if err := testMemberStatus(s1.Members(), s2Config.NodeName, StatusFailed); err != nil {
r.Fatal(err)
}
})
s1.forceLeave(s2.config.NodeName, true)

memberlen := len(s1.Members())
if memberlen != 1 {
t.Fatalf("wanted 1 alive member, got %v", s1.Members())
}

}

func TestSerf_forceLeaveLeaving(t *testing.T) {
s1Config := testConfig()
s2Config := testConfig()

//make it so it doesn't get reaped
// allow for us to see the leaving state
s1Config.TombstoneTimeout = 1 * time.Hour
s1Config.LeavePropagateDelay = 5 * time.Second

s2Config.TombstoneTimeout = 1 * time.Hour
s2Config.LeavePropagateDelay = 5 * time.Second

s1, err := Create(s1Config)
if err != nil {
t.Fatalf("err: %s", err)
}

defer s1.Shutdown()

s2, err := Create(s2Config)
if err != nil {
t.Fatalf("err: %s", err)
}
defer s2.Shutdown()

_, err = s1.Join([]string{s2Config.MemberlistConfig.BindAddr}, true)
if err != nil {
t.Fatalf("err: %s", err)
}
testutil.Yield()

//Put s2 in left state
if err := s2.Leave(); err != nil {
t.Fatal(err)
}

retry.Run(t, func(r *retry.R) {
if err := testMemberStatus(s1.Members(), s2Config.NodeName, 3); err != nil {
r.Fatal(err)
}
})
s1.forceLeave(s2.config.NodeName, true)

memberlen := len(s1.Members())
if memberlen != 1 {
t.Fatalf("wanted 1 alive member, got %v", s1.Members())
}
}

func TestSerf_forceLeaveLeft(t *testing.T) {
s1Config := testConfig()
s2Config := testConfig()

//make it so it doesn't get reaped
s1Config.TombstoneTimeout = 1 * time.Hour
s2Config.TombstoneTimeout = 1 * time.Hour

s1, err := Create(s1Config)
if err != nil {
t.Fatalf("err: %s", err)
}
defer s1.Shutdown()

s2, err := Create(s2Config)
if err != nil {
t.Fatalf("err: %s", err)
}
defer s2.Shutdown()

_, err = s1.Join([]string{s2Config.MemberlistConfig.BindAddr}, true)
if err != nil {
t.Fatalf("err: %s", err)
}
testutil.Yield()

//Put s2 in left state
if err := s2.Leave(); err != nil {
t.Fatal(err)
}

retry.Run(t, func(r *retry.R) {
if err := testMemberStatus(s1.Members(), s2Config.NodeName, StatusLeft); err != nil {
r.Fatal(err)
}
})
s1.forceLeave(s2.config.NodeName, true)

memberlen := len(s1.Members())
if memberlen != 1 {
t.Fatalf("wanted 1 alive member, got %v", s1.Members())
}

}

func TestSerf_reconnect(t *testing.T) {
eventCh := make(chan Event, 64)
s1Config := testConfig()
Expand Down Expand Up @@ -672,7 +867,7 @@ func TestSerf_update(t *testing.T) {

// Add a tag to force an update event, and add a version downgrade as
// well (that alone won't trigger an update).
s2Config.ProtocolVersion -= 1
s2Config.ProtocolVersion--
s2Config.Tags["foo"] = "bar"

// We try for a little while to wait for s2 to fully shutdown since the
Expand Down Expand Up @@ -1475,31 +1670,31 @@ func TestSerf_SetTags(t *testing.T) {

// Verify the new tags
m1m := s1.Members()
m1m_tags := make(map[string]map[string]string)
m1mTags := make(map[string]map[string]string)
for _, m := range m1m {
m1m_tags[m.Name] = m.Tags
m1mTags[m.Name] = m.Tags
}

if m := m1m_tags[s1.config.NodeName]; m["port"] != "8000" {
t.Fatalf("bad: %v", m1m_tags)
if m := m1mTags[s1.config.NodeName]; m["port"] != "8000" {
t.Fatalf("bad: %v", m1mTags)
}

if m := m1m_tags[s2.config.NodeName]; m["datacenter"] != "east-aws" {
t.Fatalf("bad: %v", m1m_tags)
if m := m1mTags[s2.config.NodeName]; m["datacenter"] != "east-aws" {
t.Fatalf("bad: %v", m1mTags)
}

m2m := s2.Members()
m2m_tags := make(map[string]map[string]string)
m2mTags := make(map[string]map[string]string)
for _, m := range m2m {
m2m_tags[m.Name] = m.Tags
m2mTags[m.Name] = m.Tags
}

if m := m2m_tags[s1.config.NodeName]; m["port"] != "8000" {
t.Fatalf("bad: %v", m1m_tags)
if m := m2mTags[s1.config.NodeName]; m["port"] != "8000" {
t.Fatalf("bad: %v", m1mTags)
}

if m := m2m_tags[s2.config.NodeName]; m["datacenter"] != "east-aws" {
t.Fatalf("bad: %v", m1m_tags)
if m := m2mTags[s2.config.NodeName]; m["datacenter"] != "east-aws" {
t.Fatalf("bad: %v", m1mTags)
}
}

Expand Down
Loading