From 80b2a633fdf7800e99684beaf9b46237d2413c91 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Mon, 27 Mar 2017 00:15:21 -0700 Subject: [PATCH 1/3] Adds node ID integrity checking for the catalog. --- consul/catalog_endpoint.go | 6 ++++-- consul/state/catalog.go | 39 ++++++++++++++++++++++++++++-------- consul/state/catalog_test.go | 18 +++++++++++++++++ 3 files changed, 53 insertions(+), 10 deletions(-) diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index 4abe025844fb..a5b67486951a 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -90,11 +90,13 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error } } - _, err = c.srv.raftApply(structs.RegisterRequestType, args) + resp, err := c.srv.raftApply(structs.RegisterRequestType, args) if err != nil { return err } - + if respErr, ok := resp.(error); ok { + return respErr + } return nil } diff --git a/consul/state/catalog.go b/consul/state/catalog.go index ca5eb61103e7..bbfddb23b372 100644 --- a/consul/state/catalog.go +++ b/consul/state/catalog.go @@ -165,22 +165,45 @@ func (s *StateStore) EnsureNode(idx uint64, node *structs.Node) error { // registration or modify an existing one in the state store. It allows // passing in a memdb transaction so it may be part of a larger txn. func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node) error { - // Check for an existing node - existing, err := tx.First("nodes", "id", node.Node) - if err != nil { - return fmt.Errorf("node name lookup failed: %s", err) + // See if there's an existing node with this UUID, and make sure the + // name is the same. + var n *structs.Node + if node.ID != "" { + existing, err := tx.First("nodes", "uuid", string(node.ID)) + if err != nil { + return fmt.Errorf("node lookup failed: %s", err) + } + if existing != nil { + n = existing.(*structs.Node) + fmt.Printf("XXX %#v\n", *n) + if n.Node != node.Node { + return fmt.Errorf("node ID %q for node %q aliases existing node %q", + node.ID, node.Node, n.Node) + } + } } - // Get the indexes - if existing != nil { - node.CreateIndex = existing.(*structs.Node).CreateIndex + // Check for an existing node by name to support nodes with no IDs. + if n == nil { + existing, err := tx.First("nodes", "id", node.Node) + if err != nil { + return fmt.Errorf("node name lookup failed: %s", err) + } + if existing != nil { + n = existing.(*structs.Node) + } + } + + // Get the indexes. + if n != nil { + node.CreateIndex = n.CreateIndex node.ModifyIndex = idx } else { node.CreateIndex = idx node.ModifyIndex = idx } - // Insert the node and update the index + // Insert the node and update the index. if err := tx.Insert("nodes", node); err != nil { return fmt.Errorf("failed inserting node: %s", err) } diff --git a/consul/state/catalog_test.go b/consul/state/catalog_test.go index 88dcff5a8f0f..21bf59ab6733 100644 --- a/consul/state/catalog_test.go +++ b/consul/state/catalog_test.go @@ -4,6 +4,7 @@ import ( "fmt" "reflect" "sort" + "strings" "testing" "github.com/hashicorp/consul/consul/structs" @@ -419,6 +420,23 @@ func TestStateStore_EnsureNode(t *testing.T) { if idx != 3 { t.Fatalf("bad index: %d", idx) } + + // Add an ID to the node + in.ID = types.NodeID("cda916bc-a357-4a19-b886-59419fcee50c") + if err := s.EnsureNode(4, in); err != nil { + t.Fatalf("err: %v", err) + } + + // Now try to add another node with the same ID + in = &structs.Node{ + Node: "nope", + ID: types.NodeID("cda916bc-a357-4a19-b886-59419fcee50c"), + Address: "1.2.3.4", + } + err = s.EnsureNode(5, in) + if err == nil || !strings.Contains(err.Error(), "aliases existing node") { + t.Fatalf("err: %v", err) + } } func TestStateStore_GetNodes(t *testing.T) { From 7f58576042ce0cfb3a68c9e3bdcfb65de64b2b24 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Mon, 27 Mar 2017 00:15:42 -0700 Subject: [PATCH 2/3] Adds node ID integrity checking for cluster merges. --- consul/client.go | 6 +- consul/merge.go | 25 ++++++- consul/merge_test.go | 160 +++++++++++++++++++++++++++++++++++++++++++ consul/server.go | 6 +- 4 files changed, 194 insertions(+), 3 deletions(-) create mode 100644 consul/merge_test.go diff --git a/consul/client.go b/consul/client.go index 959fde26b699..78c671289cf4 100644 --- a/consul/client.go +++ b/consul/client.go @@ -156,7 +156,11 @@ func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) ( conf.SnapshotPath = filepath.Join(c.config.DataDir, path) conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion] conf.RejoinAfterLeave = c.config.RejoinAfterLeave - conf.Merge = &lanMergeDelegate{dc: c.config.Datacenter} + conf.Merge = &lanMergeDelegate{ + dc: c.config.Datacenter, + nodeID: c.config.NodeID, + nodeName: c.config.NodeName, + } if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil { return nil, err } diff --git a/consul/merge.go b/consul/merge.go index defa7ef10856..335d0280a40b 100644 --- a/consul/merge.go +++ b/consul/merge.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/hashicorp/consul/consul/agent" + "github.com/hashicorp/consul/types" "github.com/hashicorp/serf/serf" ) @@ -11,11 +12,33 @@ import ( // ring. We check that the peers are in the same datacenter and abort the // merge if there is a mis-match. type lanMergeDelegate struct { - dc string + dc string + nodeID types.NodeID + nodeName string } func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error { + nodeMap := make(map[types.NodeID]string) for _, m := range members { + if rawID, ok := m.Tags["id"]; ok && rawID != "" { + nodeID := types.NodeID(rawID) + + // See if there's another node that conflicts with us. + if (nodeID == md.nodeID) && (m.Name != md.nodeName) { + return fmt.Errorf("Member '%s' has conflicting node ID '%s' with this agent's ID", + m.Name, nodeID) + } + + // See if there are any two nodes that conflict with each + // other. This lets us only do joins into a hygienic + // cluster now that node IDs are critical for operation. + if other, ok := nodeMap[nodeID]; ok { + return fmt.Errorf("Member '%s' has conflicting node ID '%s' with member '%s'", + m.Name, nodeID, other) + } + nodeMap[nodeID] = m.Name + } + ok, dc := isConsulNode(*m) if ok { if dc != md.dc { diff --git a/consul/merge_test.go b/consul/merge_test.go new file mode 100644 index 000000000000..034a99bc3c96 --- /dev/null +++ b/consul/merge_test.go @@ -0,0 +1,160 @@ +package consul + +import ( + "strings" + "testing" + + "github.com/hashicorp/consul/types" + "github.com/hashicorp/serf/serf" +) + +func makeNode(dc, name, id string, server bool) *serf.Member { + var role string + if server { + role = "consul" + } else { + role = "node" + } + + return &serf.Member{ + Name: name, + Tags: map[string]string{ + "role": role, + "dc": dc, + "id": id, + "port": "8300", + "build": "0.7.5", + "vsn": "2", + "vsn_max": "3", + "vsn_min": "2", + }, + } +} + +func TestMerge_LAN(t *testing.T) { + cases := []struct { + members []*serf.Member + expect string + }{ + // Client in the wrong datacenter. + { + members: []*serf.Member{ + makeNode("dc2", + "node1", + "96430788-246f-4379-94ce-257f7429e340", + false), + }, + expect: "wrong datacenter", + }, + // Server in the wrong datacenter. + { + members: []*serf.Member{ + makeNode("dc2", + "node1", + "96430788-246f-4379-94ce-257f7429e340", + true), + }, + expect: "wrong datacenter", + }, + // Node ID conflict with delegate's ID. + { + members: []*serf.Member{ + makeNode("dc1", + "node1", + "ee954a2f-80de-4b34-8780-97b942a50a99", + true), + }, + expect: "with this agent's ID", + }, + // Cluster with existing conflicting node IDs. + { + members: []*serf.Member{ + makeNode("dc1", + "node1", + "6185913b-98d7-4441-bd8f-f7f7d854a4af", + true), + makeNode("dc1", + "node2", + "6185913b-98d7-4441-bd8f-f7f7d854a4af", + true), + }, + expect: "with member", + }, + // Good cluster. + { + members: []*serf.Member{ + makeNode("dc1", + "node1", + "6185913b-98d7-4441-bd8f-f7f7d854a4af", + true), + makeNode("dc1", + "node2", + "cda916bc-a357-4a19-b886-59419fcee50c", + true), + }, + expect: "", + }, + } + + delegate := &lanMergeDelegate{ + dc: "dc1", + nodeID: types.NodeID("ee954a2f-80de-4b34-8780-97b942a50a99"), + nodeName: "node0", + } + for i, c := range cases { + if err := delegate.NotifyMerge(c.members); c.expect == "" { + if err != nil { + t.Fatalf("case %d: err: %v", i+1, err) + } + } else { + if err == nil || !strings.Contains(err.Error(), c.expect) { + t.Fatalf("case %d: err: %v", i+1, err) + } + } + } +} + +func TestMerge_WAN(t *testing.T) { + cases := []struct { + members []*serf.Member + expect string + }{ + // Not a server + { + members: []*serf.Member{ + makeNode("dc2", + "node1", + "96430788-246f-4379-94ce-257f7429e340", + false), + }, + expect: "not a server", + }, + // Good cluster. + { + members: []*serf.Member{ + makeNode("dc2", + "node1", + "6185913b-98d7-4441-bd8f-f7f7d854a4af", + true), + makeNode("dc3", + "node2", + "cda916bc-a357-4a19-b886-59419fcee50c", + true), + }, + expect: "", + }, + } + + delegate := &wanMergeDelegate{} + for i, c := range cases { + if err := delegate.NotifyMerge(c.members); c.expect == "" { + if err != nil { + t.Fatalf("case %d: err: %v", i+1, err) + } + } else { + if err == nil || !strings.Contains(err.Error(), c.expect) { + t.Fatalf("case %d: err: %v", i+1, err) + } + } + } +} diff --git a/consul/server.go b/consul/server.go index 5822ab9659f8..2e9fbce0a756 100644 --- a/consul/server.go +++ b/consul/server.go @@ -396,7 +396,11 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w if wan { conf.Merge = &wanMergeDelegate{} } else { - conf.Merge = &lanMergeDelegate{dc: s.config.Datacenter} + conf.Merge = &lanMergeDelegate{ + dc: s.config.Datacenter, + nodeID: s.config.NodeID, + nodeName: s.config.NodeName, + } } // Until Consul supports this fully, we disable automatic resolution. From 59a599a14b894cb2bd08d9bd5802d591fbc791c3 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Mon, 27 Mar 2017 01:28:54 -0700 Subject: [PATCH 3/3] Cleans up a stray mark and fixes unit tests. Ended up removing the leader_test.go server address change test as part of this. The join was failing becase we were using a new node name with the new logic here, but realized this was hitting some of the memberlist conflict logic and not working as we expected. We need some additional work to fully support address changes, so removed the test for now. --- command/agent/agent_test.go | 7 +++ command/util_test.go | 8 ++++ consul/catalog_endpoint_test.go | 2 + consul/leader_test.go | 81 --------------------------------- consul/state/catalog.go | 1 - testutil/server.go | 8 ++++ 6 files changed, 25 insertions(+), 82 deletions(-) diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 26d8888916e3..b23327c66582 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -48,12 +48,18 @@ func nextConfig() *Config { idx := int(atomic.AddUint64(&offset, numPortsPerIndex)) conf := DefaultConfig() + nodeID, err := uuid.GenerateUUID() + if err != nil { + panic(err) + } + conf.Version = version.Version conf.VersionPrerelease = "c.d" conf.AdvertiseAddr = "127.0.0.1" conf.Bootstrap = true conf.Datacenter = "dc1" conf.NodeName = fmt.Sprintf("Node %d", idx) + conf.NodeID = types.NodeID(nodeID) conf.BindAddr = "127.0.0.1" conf.Ports.DNS = basePortNumber + idx + portOffsetDNS conf.Ports.HTTP = basePortNumber + idx + portOffsetHTTP @@ -314,6 +320,7 @@ func TestAgent_ReconnectConfigSettings(t *testing.T) { func TestAgent_NodeID(t *testing.T) { c := nextConfig() + c.NodeID = "" dir, agent := makeAgent(t, c) defer os.RemoveAll(dir) defer agent.Shutdown() diff --git a/command/util_test.go b/command/util_test.go index 577b70d03566..5c4b60c068fc 100644 --- a/command/util_test.go +++ b/command/util_test.go @@ -15,7 +15,9 @@ import ( "github.com/hashicorp/consul/command/agent" "github.com/hashicorp/consul/consul" "github.com/hashicorp/consul/logger" + "github.com/hashicorp/consul/types" "github.com/hashicorp/consul/version" + "github.com/hashicorp/go-uuid" "github.com/mitchellh/cli" ) @@ -112,9 +114,15 @@ func nextConfig() *agent.Config { idx := int(atomic.AddUint64(&offset, 1)) conf := agent.DefaultConfig() + nodeID, err := uuid.GenerateUUID() + if err != nil { + panic(err) + } + conf.Bootstrap = true conf.Datacenter = "dc1" conf.NodeName = fmt.Sprintf("Node %d", idx) + conf.NodeID = types.NodeID(nodeID) conf.BindAddr = "127.0.0.1" conf.Server = true diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index 6a0d36654d9d..306508ee9cf6 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -32,6 +32,7 @@ func TestCatalog_Register(t *testing.T) { Port: 8000, }, Check: &structs.HealthCheck{ + CheckID: types.CheckID("db-check"), ServiceID: "db", }, } @@ -61,6 +62,7 @@ func TestCatalog_Register_NodeID(t *testing.T) { Port: 8000, }, Check: &structs.HealthCheck{ + CheckID: types.CheckID("db-check"), ServiceID: "db", }, } diff --git a/consul/leader_test.go b/consul/leader_test.go index 427e07d709b2..931f99cd9e2f 100644 --- a/consul/leader_test.go +++ b/consul/leader_test.go @@ -848,87 +848,6 @@ func TestLeader_RollRaftServer(t *testing.T) { } } -func TestLeader_ChangeServerAddress(t *testing.T) { - conf := func(c *Config) { - c.Bootstrap = false - c.BootstrapExpect = 3 - c.Datacenter = "dc1" - c.RaftConfig.ProtocolVersion = 3 - } - dir1, s1 := testServerWithConfig(t, conf) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - - dir2, s2 := testServerWithConfig(t, conf) - defer os.RemoveAll(dir2) - defer s2.Shutdown() - - dir3, s3 := testServerWithConfig(t, conf) - defer os.RemoveAll(dir3) - defer s3.Shutdown() - - servers := []*Server{s1, s2, s3} - - // Try to join - addr := fmt.Sprintf("127.0.0.1:%d", - s1.config.SerfLANConfig.MemberlistConfig.BindPort) - if _, err := s2.JoinLAN([]string{addr}); err != nil { - t.Fatalf("err: %v", err) - } - if _, err := s3.JoinLAN([]string{addr}); err != nil { - t.Fatalf("err: %v", err) - } - - for _, s := range servers { - if err := testutil.WaitForResult(func() (bool, error) { - peers, _ := s.numPeers() - return peers == 3, nil - }); err != nil { - t.Fatal("should have 3 peers") - } - } - - // Shut down a server, freeing up its address/port - s3.Shutdown() - - if err := testutil.WaitForResult(func() (bool, error) { - alive := 0 - for _, m := range s1.LANMembers() { - if m.Status == serf.StatusAlive { - alive++ - } - } - return alive == 2, nil - }); err != nil { - t.Fatal("should have 2 alive members") - } - - // Bring up a new server with s3's address that will get a different ID - dir4, s4 := testServerWithConfig(t, func(c *Config) { - c.Bootstrap = false - c.BootstrapExpect = 3 - c.Datacenter = "dc1" - c.RaftConfig.ProtocolVersion = 3 - c.NodeID = s3.config.NodeID - }) - defer os.RemoveAll(dir4) - defer s4.Shutdown() - if _, err := s4.JoinLAN([]string{addr}); err != nil { - t.Fatalf("err: %v", err) - } - servers[2] = s4 - - // Make sure the dead server is removed and we're back to 3 total peers - for _, s := range servers { - if err := testutil.WaitForResult(func() (bool, error) { - peers, _ := s.numPeers() - return peers == 3, nil - }); err != nil { - t.Fatal("should have 3 members") - } - } -} - func TestLeader_ChangeServerID(t *testing.T) { conf := func(c *Config) { c.Bootstrap = false diff --git a/consul/state/catalog.go b/consul/state/catalog.go index bbfddb23b372..217ad02ea56b 100644 --- a/consul/state/catalog.go +++ b/consul/state/catalog.go @@ -175,7 +175,6 @@ func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node } if existing != nil { n = existing.(*structs.Node) - fmt.Printf("XXX %#v\n", *n) if n.Node != node.Node { return fmt.Errorf("node ID %q for node %q aliases existing node %q", node.ID, node.Node, n.Node) diff --git a/testutil/server.go b/testutil/server.go index e0a554acdcd1..aa20670f7f0c 100644 --- a/testutil/server.go +++ b/testutil/server.go @@ -25,6 +25,7 @@ import ( "strings" "github.com/hashicorp/go-cleanhttp" + "github.com/hashicorp/go-uuid" "github.com/pkg/errors" ) @@ -55,6 +56,7 @@ type TestAddressConfig struct { // TestServerConfig is the main server configuration struct. type TestServerConfig struct { NodeName string `json:"node_name"` + NodeID string `json:"node_id"` NodeMeta map[string]string `json:"node_meta,omitempty"` Performance *TestPerformanceConfig `json:"performance,omitempty"` Bootstrap bool `json:"bootstrap,omitempty"` @@ -83,8 +85,14 @@ type ServerConfigCallback func(c *TestServerConfig) // defaultServerConfig returns a new TestServerConfig struct // with all of the listen ports incremented by one. func defaultServerConfig() *TestServerConfig { + nodeID, err := uuid.GenerateUUID() + if err != nil { + panic(err) + } + return &TestServerConfig{ NodeName: fmt.Sprintf("node%d", randomPort()), + NodeID: nodeID, DisableCheckpoint: true, Performance: &TestPerformanceConfig{ RaftMultiplier: 1,