diff --git a/agent/consul/leader.go b/agent/consul/leader.go index e20d5a876e79..c6047bb89dcb 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -329,9 +329,8 @@ func (s *Server) getOrCreateAutopilotConfig() (*structs.AutopilotConfig, bool) { } // reconcileReaped is used to reconcile nodes that have failed and been reaped -// from Serf but remain in the catalog. This is done by looking for SerfCheckID -// in a critical state that does not correspond to a known Serf member. We generate -// a "reap" event to cause the node to be cleaned up. +// from Serf but remain in the catalog. This is done by looking for unknown nodes with serfHealth checks registered. +// We generate a "reap" event to cause the node to be cleaned up. func (s *Server) reconcileReaped(known map[string]struct{}) error { state := s.fsm.State() _, checks, err := state.ChecksInState(nil, api.HealthAny) @@ -349,32 +348,50 @@ func (s *Server) reconcileReaped(known map[string]struct{}) error { continue } - // Create a fake member - member := serf.Member{ - Name: check.Node, - Tags: map[string]string{ - "dc": s.config.Datacenter, - "role": "node", - }, - } - // Get the node services, look for ConsulServiceID _, services, err := state.NodeServices(nil, check.Node) if err != nil { return err } serverPort := 0 + serverAddr := "" + serverID := "" + + CHECKS: for _, service := range services.Services { if service.ID == structs.ConsulServiceID { + _, node, err := state.GetNode(check.Node) + if err != nil { + s.logger.Printf("[ERR] consul: Unable to look up node with name %q: %v", check.Node, err) + continue CHECKS + } + + serverAddr = node.Address serverPort = service.Port + lookupAddr := net.JoinHostPort(serverAddr, strconv.Itoa(serverPort)) + svr := s.serverLookup.Server(raft.ServerAddress(lookupAddr)) + if svr != nil { + serverID = svr.ID + } break } } + // Create a fake member + member := serf.Member{ + Name: check.Node, + Tags: map[string]string{ + "dc": s.config.Datacenter, + "role": "node", + }, + } + // Create the appropriate tags if this was a server node if serverPort > 0 { member.Tags["role"] = "consul" member.Tags["port"] = strconv.FormatUint(uint64(serverPort), 10) + member.Tags["id"] = serverID + member.Addr = net.ParseIP(serverAddr) } // Attempt to reap this member diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index 4de48dd83486..2e106c4636e7 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -250,6 +250,79 @@ func TestLeader_ReapMember(t *testing.T) { } } +func TestLeader_ReapServer(t *testing.T) { + t.Parallel() + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "allow" + c.ACLEnforceVersion8 = true + c.Bootstrap = true + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "allow" + c.ACLEnforceVersion8 = true + c.Bootstrap = false + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + dir3, s3 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "allow" + c.ACLEnforceVersion8 = true + c.Bootstrap = false + }) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + + // Try to join + joinLAN(t, s1, s2) + joinLAN(t, s1, s3) + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + state := s1.fsm.State() + + // s3 should be registered + retry.Run(t, func(r *retry.R) { + _, node, err := state.GetNode(s3.config.NodeName) + if err != nil { + r.Fatalf("err: %v", err) + } + if node == nil { + r.Fatal("client not registered") + } + }) + + // call reconcileReaped with a map that does not contain s3 + knownMembers := make(map[string]struct{}) + knownMembers[s1.config.NodeName] = struct{}{} + knownMembers[s2.config.NodeName] = struct{}{} + + err := s1.reconcileReaped(knownMembers) + + if err != nil { + t.Fatalf("Unexpected error :%v", err) + } + // s3 should be deregistered + retry.Run(t, func(r *retry.R) { + _, node, err := state.GetNode(s3.config.NodeName) + if err != nil { + r.Fatalf("err: %v", err) + } + if node != nil { + r.Fatalf("server with id %v should not be registered", s3.config.NodeID) + } + }) + +} + func TestLeader_Reconcile_ReapMember(t *testing.T) { t.Parallel() dir1, s1 := testServerWithConfig(t, func(c *Config) {