diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 26d8888916e3..b23327c66582 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -48,12 +48,18 @@ func nextConfig() *Config { idx := int(atomic.AddUint64(&offset, numPortsPerIndex)) conf := DefaultConfig() + nodeID, err := uuid.GenerateUUID() + if err != nil { + panic(err) + } + conf.Version = version.Version conf.VersionPrerelease = "c.d" conf.AdvertiseAddr = "127.0.0.1" conf.Bootstrap = true conf.Datacenter = "dc1" conf.NodeName = fmt.Sprintf("Node %d", idx) + conf.NodeID = types.NodeID(nodeID) conf.BindAddr = "127.0.0.1" conf.Ports.DNS = basePortNumber + idx + portOffsetDNS conf.Ports.HTTP = basePortNumber + idx + portOffsetHTTP @@ -314,6 +320,7 @@ func TestAgent_ReconnectConfigSettings(t *testing.T) { func TestAgent_NodeID(t *testing.T) { c := nextConfig() + c.NodeID = "" dir, agent := makeAgent(t, c) defer os.RemoveAll(dir) defer agent.Shutdown() diff --git a/command/util_test.go b/command/util_test.go index 577b70d03566..5c4b60c068fc 100644 --- a/command/util_test.go +++ b/command/util_test.go @@ -15,7 +15,9 @@ import ( "github.com/hashicorp/consul/command/agent" "github.com/hashicorp/consul/consul" "github.com/hashicorp/consul/logger" + "github.com/hashicorp/consul/types" "github.com/hashicorp/consul/version" + "github.com/hashicorp/go-uuid" "github.com/mitchellh/cli" ) @@ -112,9 +114,15 @@ func nextConfig() *agent.Config { idx := int(atomic.AddUint64(&offset, 1)) conf := agent.DefaultConfig() + nodeID, err := uuid.GenerateUUID() + if err != nil { + panic(err) + } + conf.Bootstrap = true conf.Datacenter = "dc1" conf.NodeName = fmt.Sprintf("Node %d", idx) + conf.NodeID = types.NodeID(nodeID) conf.BindAddr = "127.0.0.1" conf.Server = true diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index 4abe025844fb..a5b67486951a 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -90,11 +90,13 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error } } - _, err = c.srv.raftApply(structs.RegisterRequestType, args) + resp, err := c.srv.raftApply(structs.RegisterRequestType, args) if err != nil { return err } - + if respErr, ok := resp.(error); ok { + return respErr + } return nil } diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index 6a0d36654d9d..306508ee9cf6 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -32,6 +32,7 @@ func TestCatalog_Register(t *testing.T) { Port: 8000, }, Check: &structs.HealthCheck{ + CheckID: types.CheckID("db-check"), ServiceID: "db", }, } @@ -61,6 +62,7 @@ func TestCatalog_Register_NodeID(t *testing.T) { Port: 8000, }, Check: &structs.HealthCheck{ + CheckID: types.CheckID("db-check"), ServiceID: "db", }, } diff --git a/consul/client.go b/consul/client.go index 959fde26b699..78c671289cf4 100644 --- a/consul/client.go +++ b/consul/client.go @@ -156,7 +156,11 @@ func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) ( conf.SnapshotPath = filepath.Join(c.config.DataDir, path) conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion] conf.RejoinAfterLeave = c.config.RejoinAfterLeave - conf.Merge = &lanMergeDelegate{dc: c.config.Datacenter} + conf.Merge = &lanMergeDelegate{ + dc: c.config.Datacenter, + nodeID: c.config.NodeID, + nodeName: c.config.NodeName, + } if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil { return nil, err } diff --git a/consul/leader_test.go b/consul/leader_test.go index 427e07d709b2..931f99cd9e2f 100644 --- a/consul/leader_test.go +++ b/consul/leader_test.go @@ -848,87 +848,6 @@ func TestLeader_RollRaftServer(t *testing.T) { } } -func TestLeader_ChangeServerAddress(t *testing.T) { - conf := func(c *Config) { - c.Bootstrap = false - c.BootstrapExpect = 3 - c.Datacenter = "dc1" - c.RaftConfig.ProtocolVersion = 3 - } - dir1, s1 := testServerWithConfig(t, conf) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - - dir2, s2 := testServerWithConfig(t, conf) - defer os.RemoveAll(dir2) - defer s2.Shutdown() - - dir3, s3 := testServerWithConfig(t, conf) - defer os.RemoveAll(dir3) - defer s3.Shutdown() - - servers := []*Server{s1, s2, s3} - - // Try to join - addr := fmt.Sprintf("127.0.0.1:%d", - s1.config.SerfLANConfig.MemberlistConfig.BindPort) - if _, err := s2.JoinLAN([]string{addr}); err != nil { - t.Fatalf("err: %v", err) - } - if _, err := s3.JoinLAN([]string{addr}); err != nil { - t.Fatalf("err: %v", err) - } - - for _, s := range servers { - if err := testutil.WaitForResult(func() (bool, error) { - peers, _ := s.numPeers() - return peers == 3, nil - }); err != nil { - t.Fatal("should have 3 peers") - } - } - - // Shut down a server, freeing up its address/port - s3.Shutdown() - - if err := testutil.WaitForResult(func() (bool, error) { - alive := 0 - for _, m := range s1.LANMembers() { - if m.Status == serf.StatusAlive { - alive++ - } - } - return alive == 2, nil - }); err != nil { - t.Fatal("should have 2 alive members") - } - - // Bring up a new server with s3's address that will get a different ID - dir4, s4 := testServerWithConfig(t, func(c *Config) { - c.Bootstrap = false - c.BootstrapExpect = 3 - c.Datacenter = "dc1" - c.RaftConfig.ProtocolVersion = 3 - c.NodeID = s3.config.NodeID - }) - defer os.RemoveAll(dir4) - defer s4.Shutdown() - if _, err := s4.JoinLAN([]string{addr}); err != nil { - t.Fatalf("err: %v", err) - } - servers[2] = s4 - - // Make sure the dead server is removed and we're back to 3 total peers - for _, s := range servers { - if err := testutil.WaitForResult(func() (bool, error) { - peers, _ := s.numPeers() - return peers == 3, nil - }); err != nil { - t.Fatal("should have 3 members") - } - } -} - func TestLeader_ChangeServerID(t *testing.T) { conf := func(c *Config) { c.Bootstrap = false diff --git a/consul/merge.go b/consul/merge.go index defa7ef10856..335d0280a40b 100644 --- a/consul/merge.go +++ b/consul/merge.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/hashicorp/consul/consul/agent" + "github.com/hashicorp/consul/types" "github.com/hashicorp/serf/serf" ) @@ -11,11 +12,33 @@ import ( // ring. We check that the peers are in the same datacenter and abort the // merge if there is a mis-match. type lanMergeDelegate struct { - dc string + dc string + nodeID types.NodeID + nodeName string } func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error { + nodeMap := make(map[types.NodeID]string) for _, m := range members { + if rawID, ok := m.Tags["id"]; ok && rawID != "" { + nodeID := types.NodeID(rawID) + + // See if there's another node that conflicts with us. + if (nodeID == md.nodeID) && (m.Name != md.nodeName) { + return fmt.Errorf("Member '%s' has conflicting node ID '%s' with this agent's ID", + m.Name, nodeID) + } + + // See if there are any two nodes that conflict with each + // other. This lets us only do joins into a hygienic + // cluster now that node IDs are critical for operation. + if other, ok := nodeMap[nodeID]; ok { + return fmt.Errorf("Member '%s' has conflicting node ID '%s' with member '%s'", + m.Name, nodeID, other) + } + nodeMap[nodeID] = m.Name + } + ok, dc := isConsulNode(*m) if ok { if dc != md.dc { diff --git a/consul/merge_test.go b/consul/merge_test.go new file mode 100644 index 000000000000..034a99bc3c96 --- /dev/null +++ b/consul/merge_test.go @@ -0,0 +1,160 @@ +package consul + +import ( + "strings" + "testing" + + "github.com/hashicorp/consul/types" + "github.com/hashicorp/serf/serf" +) + +func makeNode(dc, name, id string, server bool) *serf.Member { + var role string + if server { + role = "consul" + } else { + role = "node" + } + + return &serf.Member{ + Name: name, + Tags: map[string]string{ + "role": role, + "dc": dc, + "id": id, + "port": "8300", + "build": "0.7.5", + "vsn": "2", + "vsn_max": "3", + "vsn_min": "2", + }, + } +} + +func TestMerge_LAN(t *testing.T) { + cases := []struct { + members []*serf.Member + expect string + }{ + // Client in the wrong datacenter. + { + members: []*serf.Member{ + makeNode("dc2", + "node1", + "96430788-246f-4379-94ce-257f7429e340", + false), + }, + expect: "wrong datacenter", + }, + // Server in the wrong datacenter. + { + members: []*serf.Member{ + makeNode("dc2", + "node1", + "96430788-246f-4379-94ce-257f7429e340", + true), + }, + expect: "wrong datacenter", + }, + // Node ID conflict with delegate's ID. + { + members: []*serf.Member{ + makeNode("dc1", + "node1", + "ee954a2f-80de-4b34-8780-97b942a50a99", + true), + }, + expect: "with this agent's ID", + }, + // Cluster with existing conflicting node IDs. + { + members: []*serf.Member{ + makeNode("dc1", + "node1", + "6185913b-98d7-4441-bd8f-f7f7d854a4af", + true), + makeNode("dc1", + "node2", + "6185913b-98d7-4441-bd8f-f7f7d854a4af", + true), + }, + expect: "with member", + }, + // Good cluster. + { + members: []*serf.Member{ + makeNode("dc1", + "node1", + "6185913b-98d7-4441-bd8f-f7f7d854a4af", + true), + makeNode("dc1", + "node2", + "cda916bc-a357-4a19-b886-59419fcee50c", + true), + }, + expect: "", + }, + } + + delegate := &lanMergeDelegate{ + dc: "dc1", + nodeID: types.NodeID("ee954a2f-80de-4b34-8780-97b942a50a99"), + nodeName: "node0", + } + for i, c := range cases { + if err := delegate.NotifyMerge(c.members); c.expect == "" { + if err != nil { + t.Fatalf("case %d: err: %v", i+1, err) + } + } else { + if err == nil || !strings.Contains(err.Error(), c.expect) { + t.Fatalf("case %d: err: %v", i+1, err) + } + } + } +} + +func TestMerge_WAN(t *testing.T) { + cases := []struct { + members []*serf.Member + expect string + }{ + // Not a server + { + members: []*serf.Member{ + makeNode("dc2", + "node1", + "96430788-246f-4379-94ce-257f7429e340", + false), + }, + expect: "not a server", + }, + // Good cluster. + { + members: []*serf.Member{ + makeNode("dc2", + "node1", + "6185913b-98d7-4441-bd8f-f7f7d854a4af", + true), + makeNode("dc3", + "node2", + "cda916bc-a357-4a19-b886-59419fcee50c", + true), + }, + expect: "", + }, + } + + delegate := &wanMergeDelegate{} + for i, c := range cases { + if err := delegate.NotifyMerge(c.members); c.expect == "" { + if err != nil { + t.Fatalf("case %d: err: %v", i+1, err) + } + } else { + if err == nil || !strings.Contains(err.Error(), c.expect) { + t.Fatalf("case %d: err: %v", i+1, err) + } + } + } +} diff --git a/consul/server.go b/consul/server.go index 5822ab9659f8..2e9fbce0a756 100644 --- a/consul/server.go +++ b/consul/server.go @@ -396,7 +396,11 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w if wan { conf.Merge = &wanMergeDelegate{} } else { - conf.Merge = &lanMergeDelegate{dc: s.config.Datacenter} + conf.Merge = &lanMergeDelegate{ + dc: s.config.Datacenter, + nodeID: s.config.NodeID, + nodeName: s.config.NodeName, + } } // Until Consul supports this fully, we disable automatic resolution. diff --git a/consul/state/catalog.go b/consul/state/catalog.go index ca5eb61103e7..217ad02ea56b 100644 --- a/consul/state/catalog.go +++ b/consul/state/catalog.go @@ -165,22 +165,44 @@ func (s *StateStore) EnsureNode(idx uint64, node *structs.Node) error { // registration or modify an existing one in the state store. It allows // passing in a memdb transaction so it may be part of a larger txn. func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node) error { - // Check for an existing node - existing, err := tx.First("nodes", "id", node.Node) - if err != nil { - return fmt.Errorf("node name lookup failed: %s", err) + // See if there's an existing node with this UUID, and make sure the + // name is the same. + var n *structs.Node + if node.ID != "" { + existing, err := tx.First("nodes", "uuid", string(node.ID)) + if err != nil { + return fmt.Errorf("node lookup failed: %s", err) + } + if existing != nil { + n = existing.(*structs.Node) + if n.Node != node.Node { + return fmt.Errorf("node ID %q for node %q aliases existing node %q", + node.ID, node.Node, n.Node) + } + } } - // Get the indexes - if existing != nil { - node.CreateIndex = existing.(*structs.Node).CreateIndex + // Check for an existing node by name to support nodes with no IDs. + if n == nil { + existing, err := tx.First("nodes", "id", node.Node) + if err != nil { + return fmt.Errorf("node name lookup failed: %s", err) + } + if existing != nil { + n = existing.(*structs.Node) + } + } + + // Get the indexes. + if n != nil { + node.CreateIndex = n.CreateIndex node.ModifyIndex = idx } else { node.CreateIndex = idx node.ModifyIndex = idx } - // Insert the node and update the index + // Insert the node and update the index. if err := tx.Insert("nodes", node); err != nil { return fmt.Errorf("failed inserting node: %s", err) } diff --git a/consul/state/catalog_test.go b/consul/state/catalog_test.go index 88dcff5a8f0f..21bf59ab6733 100644 --- a/consul/state/catalog_test.go +++ b/consul/state/catalog_test.go @@ -4,6 +4,7 @@ import ( "fmt" "reflect" "sort" + "strings" "testing" "github.com/hashicorp/consul/consul/structs" @@ -419,6 +420,23 @@ func TestStateStore_EnsureNode(t *testing.T) { if idx != 3 { t.Fatalf("bad index: %d", idx) } + + // Add an ID to the node + in.ID = types.NodeID("cda916bc-a357-4a19-b886-59419fcee50c") + if err := s.EnsureNode(4, in); err != nil { + t.Fatalf("err: %v", err) + } + + // Now try to add another node with the same ID + in = &structs.Node{ + Node: "nope", + ID: types.NodeID("cda916bc-a357-4a19-b886-59419fcee50c"), + Address: "1.2.3.4", + } + err = s.EnsureNode(5, in) + if err == nil || !strings.Contains(err.Error(), "aliases existing node") { + t.Fatalf("err: %v", err) + } } func TestStateStore_GetNodes(t *testing.T) { diff --git a/testutil/server.go b/testutil/server.go index e0a554acdcd1..aa20670f7f0c 100644 --- a/testutil/server.go +++ b/testutil/server.go @@ -25,6 +25,7 @@ import ( "strings" "github.com/hashicorp/go-cleanhttp" + "github.com/hashicorp/go-uuid" "github.com/pkg/errors" ) @@ -55,6 +56,7 @@ type TestAddressConfig struct { // TestServerConfig is the main server configuration struct. type TestServerConfig struct { NodeName string `json:"node_name"` + NodeID string `json:"node_id"` NodeMeta map[string]string `json:"node_meta,omitempty"` Performance *TestPerformanceConfig `json:"performance,omitempty"` Bootstrap bool `json:"bootstrap,omitempty"` @@ -83,8 +85,14 @@ type ServerConfigCallback func(c *TestServerConfig) // defaultServerConfig returns a new TestServerConfig struct // with all of the listen ports incremented by one. func defaultServerConfig() *TestServerConfig { + nodeID, err := uuid.GenerateUUID() + if err != nil { + panic(err) + } + return &TestServerConfig{ NodeName: fmt.Sprintf("node%d", randomPort()), + NodeID: nodeID, DisableCheckpoint: true, Performance: &TestPerformanceConfig{ RaftMultiplier: 1,