Skip to content

Commit

Permalink
Merge pull request #2832 from hashicorp/node-id-integrity
Browse files Browse the repository at this point in the history
Adds node ID integrity checking to the catalog and the LAN and WAN clusters.
  • Loading branch information
slackpad authored Mar 27, 2017
2 parents 523c0c5 + 59a599a commit 7360928
Show file tree
Hide file tree
Showing 12 changed files with 271 additions and 94 deletions.
7 changes: 7 additions & 0 deletions command/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,18 @@ func nextConfig() *Config {
idx := int(atomic.AddUint64(&offset, numPortsPerIndex))
conf := DefaultConfig()

nodeID, err := uuid.GenerateUUID()
if err != nil {
panic(err)
}

conf.Version = version.Version
conf.VersionPrerelease = "c.d"
conf.AdvertiseAddr = "127.0.0.1"
conf.Bootstrap = true
conf.Datacenter = "dc1"
conf.NodeName = fmt.Sprintf("Node %d", idx)
conf.NodeID = types.NodeID(nodeID)
conf.BindAddr = "127.0.0.1"
conf.Ports.DNS = basePortNumber + idx + portOffsetDNS
conf.Ports.HTTP = basePortNumber + idx + portOffsetHTTP
Expand Down Expand Up @@ -314,6 +320,7 @@ func TestAgent_ReconnectConfigSettings(t *testing.T) {

func TestAgent_NodeID(t *testing.T) {
c := nextConfig()
c.NodeID = ""
dir, agent := makeAgent(t, c)
defer os.RemoveAll(dir)
defer agent.Shutdown()
Expand Down
8 changes: 8 additions & 0 deletions command/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
"github.com/hashicorp/consul/command/agent"
"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/consul/version"
"github.com/hashicorp/go-uuid"
"github.com/mitchellh/cli"
)

Expand Down Expand Up @@ -112,9 +114,15 @@ func nextConfig() *agent.Config {
idx := int(atomic.AddUint64(&offset, 1))
conf := agent.DefaultConfig()

nodeID, err := uuid.GenerateUUID()
if err != nil {
panic(err)
}

conf.Bootstrap = true
conf.Datacenter = "dc1"
conf.NodeName = fmt.Sprintf("Node %d", idx)
conf.NodeID = types.NodeID(nodeID)
conf.BindAddr = "127.0.0.1"
conf.Server = true

Expand Down
6 changes: 4 additions & 2 deletions consul/catalog_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,13 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error
}
}

_, err = c.srv.raftApply(structs.RegisterRequestType, args)
resp, err := c.srv.raftApply(structs.RegisterRequestType, args)
if err != nil {
return err
}

if respErr, ok := resp.(error); ok {
return respErr
}
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions consul/catalog_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestCatalog_Register(t *testing.T) {
Port: 8000,
},
Check: &structs.HealthCheck{
CheckID: types.CheckID("db-check"),
ServiceID: "db",
},
}
Expand Down Expand Up @@ -61,6 +62,7 @@ func TestCatalog_Register_NodeID(t *testing.T) {
Port: 8000,
},
Check: &structs.HealthCheck{
CheckID: types.CheckID("db-check"),
ServiceID: "db",
},
}
Expand Down
6 changes: 5 additions & 1 deletion consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,11 @@ func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
conf.SnapshotPath = filepath.Join(c.config.DataDir, path)
conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion]
conf.RejoinAfterLeave = c.config.RejoinAfterLeave
conf.Merge = &lanMergeDelegate{dc: c.config.Datacenter}
conf.Merge = &lanMergeDelegate{
dc: c.config.Datacenter,
nodeID: c.config.NodeID,
nodeName: c.config.NodeName,
}
if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil {
return nil, err
}
Expand Down
81 changes: 0 additions & 81 deletions consul/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,87 +848,6 @@ func TestLeader_RollRaftServer(t *testing.T) {
}
}

func TestLeader_ChangeServerAddress(t *testing.T) {
conf := func(c *Config) {
c.Bootstrap = false
c.BootstrapExpect = 3
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 3
}
dir1, s1 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir1)
defer s1.Shutdown()

dir2, s2 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir2)
defer s2.Shutdown()

dir3, s3 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir3)
defer s3.Shutdown()

servers := []*Server{s1, s2, s3}

// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
if _, err := s3.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}

for _, s := range servers {
if err := testutil.WaitForResult(func() (bool, error) {
peers, _ := s.numPeers()
return peers == 3, nil
}); err != nil {
t.Fatal("should have 3 peers")
}
}

// Shut down a server, freeing up its address/port
s3.Shutdown()

if err := testutil.WaitForResult(func() (bool, error) {
alive := 0
for _, m := range s1.LANMembers() {
if m.Status == serf.StatusAlive {
alive++
}
}
return alive == 2, nil
}); err != nil {
t.Fatal("should have 2 alive members")
}

// Bring up a new server with s3's address that will get a different ID
dir4, s4 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
c.BootstrapExpect = 3
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 3
c.NodeID = s3.config.NodeID
})
defer os.RemoveAll(dir4)
defer s4.Shutdown()
if _, err := s4.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
servers[2] = s4

// Make sure the dead server is removed and we're back to 3 total peers
for _, s := range servers {
if err := testutil.WaitForResult(func() (bool, error) {
peers, _ := s.numPeers()
return peers == 3, nil
}); err != nil {
t.Fatal("should have 3 members")
}
}
}

func TestLeader_ChangeServerID(t *testing.T) {
conf := func(c *Config) {
c.Bootstrap = false
Expand Down
25 changes: 24 additions & 1 deletion consul/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,41 @@ import (
"fmt"

"github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/serf/serf"
)

// lanMergeDelegate is used to handle a cluster merge on the LAN gossip
// ring. We check that the peers are in the same datacenter and abort the
// merge if there is a mis-match.
type lanMergeDelegate struct {
dc string
dc string
nodeID types.NodeID
nodeName string
}

func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error {
nodeMap := make(map[types.NodeID]string)
for _, m := range members {
if rawID, ok := m.Tags["id"]; ok && rawID != "" {
nodeID := types.NodeID(rawID)

// See if there's another node that conflicts with us.
if (nodeID == md.nodeID) && (m.Name != md.nodeName) {
return fmt.Errorf("Member '%s' has conflicting node ID '%s' with this agent's ID",
m.Name, nodeID)
}

// See if there are any two nodes that conflict with each
// other. This lets us only do joins into a hygienic
// cluster now that node IDs are critical for operation.
if other, ok := nodeMap[nodeID]; ok {
return fmt.Errorf("Member '%s' has conflicting node ID '%s' with member '%s'",
m.Name, nodeID, other)
}
nodeMap[nodeID] = m.Name
}

ok, dc := isConsulNode(*m)
if ok {
if dc != md.dc {
Expand Down
160 changes: 160 additions & 0 deletions consul/merge_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package consul

import (
"strings"
"testing"

"github.com/hashicorp/consul/types"
"github.com/hashicorp/serf/serf"
)

func makeNode(dc, name, id string, server bool) *serf.Member {
var role string
if server {
role = "consul"
} else {
role = "node"
}

return &serf.Member{
Name: name,
Tags: map[string]string{
"role": role,
"dc": dc,
"id": id,
"port": "8300",
"build": "0.7.5",
"vsn": "2",
"vsn_max": "3",
"vsn_min": "2",
},
}
}

func TestMerge_LAN(t *testing.T) {
cases := []struct {
members []*serf.Member
expect string
}{
// Client in the wrong datacenter.
{
members: []*serf.Member{
makeNode("dc2",
"node1",
"96430788-246f-4379-94ce-257f7429e340",
false),
},
expect: "wrong datacenter",
},
// Server in the wrong datacenter.
{
members: []*serf.Member{
makeNode("dc2",
"node1",
"96430788-246f-4379-94ce-257f7429e340",
true),
},
expect: "wrong datacenter",
},
// Node ID conflict with delegate's ID.
{
members: []*serf.Member{
makeNode("dc1",
"node1",
"ee954a2f-80de-4b34-8780-97b942a50a99",
true),
},
expect: "with this agent's ID",
},
// Cluster with existing conflicting node IDs.
{
members: []*serf.Member{
makeNode("dc1",
"node1",
"6185913b-98d7-4441-bd8f-f7f7d854a4af",
true),
makeNode("dc1",
"node2",
"6185913b-98d7-4441-bd8f-f7f7d854a4af",
true),
},
expect: "with member",
},
// Good cluster.
{
members: []*serf.Member{
makeNode("dc1",
"node1",
"6185913b-98d7-4441-bd8f-f7f7d854a4af",
true),
makeNode("dc1",
"node2",
"cda916bc-a357-4a19-b886-59419fcee50c",
true),
},
expect: "",
},
}

delegate := &lanMergeDelegate{
dc: "dc1",
nodeID: types.NodeID("ee954a2f-80de-4b34-8780-97b942a50a99"),
nodeName: "node0",
}
for i, c := range cases {
if err := delegate.NotifyMerge(c.members); c.expect == "" {
if err != nil {
t.Fatalf("case %d: err: %v", i+1, err)
}
} else {
if err == nil || !strings.Contains(err.Error(), c.expect) {
t.Fatalf("case %d: err: %v", i+1, err)
}
}
}
}

func TestMerge_WAN(t *testing.T) {
cases := []struct {
members []*serf.Member
expect string
}{
// Not a server
{
members: []*serf.Member{
makeNode("dc2",
"node1",
"96430788-246f-4379-94ce-257f7429e340",
false),
},
expect: "not a server",
},
// Good cluster.
{
members: []*serf.Member{
makeNode("dc2",
"node1",
"6185913b-98d7-4441-bd8f-f7f7d854a4af",
true),
makeNode("dc3",
"node2",
"cda916bc-a357-4a19-b886-59419fcee50c",
true),
},
expect: "",
},
}

delegate := &wanMergeDelegate{}
for i, c := range cases {
if err := delegate.NotifyMerge(c.members); c.expect == "" {
if err != nil {
t.Fatalf("case %d: err: %v", i+1, err)
}
} else {
if err == nil || !strings.Contains(err.Error(), c.expect) {
t.Fatalf("case %d: err: %v", i+1, err)
}
}
}
}
Loading

0 comments on commit 7360928

Please sign in to comment.