Skip to content

Commit

Permalink
consul: Adding merge delegate to prevent mixing clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
armon committed Jan 6, 2015
1 parent 3ec5e6f commit 2dcadff
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 2 deletions.
6 changes: 4 additions & 2 deletions consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package consul
import (
"crypto/tls"
"fmt"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/serf/serf"
"log"
"math/rand"
"os"
Expand All @@ -13,6 +11,9 @@ import (
"strings"
"sync"
"time"

"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/serf/serf"
)

const (
Expand Down Expand Up @@ -138,6 +139,7 @@ func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
conf.SnapshotPath = filepath.Join(c.config.DataDir, path)
conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion]
conf.RejoinAfterLeave = c.config.RejoinAfterLeave
conf.Merge = &lanMergeDelegate{logger: c.logger, dc: c.config.Datacenter}
if err := ensurePath(conf.SnapshotPath, false); err != nil {
return nil, err
}
Expand Down
50 changes: 50 additions & 0 deletions consul/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,56 @@ func TestClient_JoinLAN(t *testing.T) {
})
}

func TestClient_JoinLAN_Invalid(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()

dir2, c1 := testClientDC(t, "other")
defer os.RemoveAll(dir2)
defer c1.Shutdown()

// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := c1.JoinLAN([]string{addr}); err == nil {
t.Fatalf("should error")
}

time.Sleep(50 * time.Millisecond)
if len(s1.LANMembers()) != 1 {
t.Fatalf("should not join")
}
if len(c1.LANMembers()) != 1 {
t.Fatalf("should not join")
}
}

func TestClient_JoinWAN_Invalid(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()

dir2, c1 := testClientDC(t, "dc2")
defer os.RemoveAll(dir2)
defer c1.Shutdown()

// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfWANConfig.MemberlistConfig.BindPort)
if _, err := c1.JoinLAN([]string{addr}); err == nil {
t.Fatalf("should error")
}

time.Sleep(50 * time.Millisecond)
if len(s1.WANMembers()) != 1 {
t.Fatalf("should not join")
}
if len(c1.LANMembers()) != 1 {
t.Fatalf("should not join")
}
}

func TestClient_RPC(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
Expand Down
56 changes: 56 additions & 0 deletions consul/merge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package consul

import (
"log"

"github.com/hashicorp/serf/serf"
)

// lanMergeDelegate is used to handle a cluster merge on the LAN gossip
// ring. We check that the peers are in the same datacenter and abort the
// merge if there is a mis-match.
type lanMergeDelegate struct {
logger *log.Logger
dc string
}

func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) (cancel bool) {
for _, m := range members {
ok, dc := isConsulNode(*m)
if ok {
if dc != md.dc {
md.logger.Printf("[WARN] consul: Canceling cluster merge, member '%s' part of wrong datacenter '%s'",
m.Name, dc)
return true
}
continue
}

ok, parts := isConsulServer(*m)
if ok && parts.Datacenter != md.dc {
md.logger.Printf("[WARN] consul: Canceling cluster merge, member '%s' part of wrong datacenter '%s'",
m.Name, parts.Datacenter)
return true
}
}
return false
}

// wanMergeDelegate is used to handle a cluster merge on the WAN gossip
// ring. We check that the peers are server nodes and abort the merge
// otherwise.
type wanMergeDelegate struct {
logger *log.Logger
}

func (md *wanMergeDelegate) NotifyMerge(members []*serf.Member) (cancel bool) {
for _, m := range members {
ok, _ := isConsulServer(*m)
if !ok {
md.logger.Printf("[WARN] consul: Canceling cluster merge, member '%s' is not a server",
m.Name)
return true
}
}
return false
}
5 changes: 5 additions & 0 deletions consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,11 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
conf.SnapshotPath = filepath.Join(s.config.DataDir, path)
conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion]
conf.RejoinAfterLeave = s.config.RejoinAfterLeave
if wan {
conf.Merge = &wanMergeDelegate{logger: s.logger}
} else {
conf.Merge = &lanMergeDelegate{logger: s.logger, dc: s.config.Datacenter}
}

// Until Consul supports this fully, we disable automatic resolution.
// When enabled, the Serf gossip may just turn off if we are the minority
Expand Down

0 comments on commit 2dcadff

Please sign in to comment.