Skip to content

Commit

Permalink
Merge pull request #2222 from hashicorp/f-raft-v2
Browse files Browse the repository at this point in the history
Integrates Consul with "stage one" of HashiCorp Raft library v2.
  • Loading branch information
slackpad authored Aug 9, 2016
2 parents fbcb932 + 5586ca3 commit 66dcefc
Show file tree
Hide file tree
Showing 40 changed files with 3,348 additions and 1,899 deletions.
24 changes: 12 additions & 12 deletions consul/acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ func TestACL_NonAuthority_NotFound(t *testing.T) {
}

testutil.WaitForResult(func() (bool, error) {
p1, _ := s1.raftPeers.Peers()
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
p1, _ := s1.numPeers()
return p1 == 2, errors.New(fmt.Sprintf("%d", p1))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
Expand Down Expand Up @@ -275,8 +275,8 @@ func TestACL_NonAuthority_Found(t *testing.T) {
}

testutil.WaitForResult(func() (bool, error) {
p1, _ := s1.raftPeers.Peers()
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
p1, _ := s1.numPeers()
return p1 == 2, errors.New(fmt.Sprintf("%d", p1))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
Expand Down Expand Up @@ -351,8 +351,8 @@ func TestACL_NonAuthority_Management(t *testing.T) {
}

testutil.WaitForResult(func() (bool, error) {
p1, _ := s1.raftPeers.Peers()
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
p1, _ := s1.numPeers()
return p1 == 2, errors.New(fmt.Sprintf("%d", p1))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
Expand Down Expand Up @@ -408,8 +408,8 @@ func TestACL_DownPolicy_Deny(t *testing.T) {
}

testutil.WaitForResult(func() (bool, error) {
p1, _ := s1.raftPeers.Peers()
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
p1, _ := s1.numPeers()
return p1 == 2, errors.New(fmt.Sprintf("%d", p1))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
Expand Down Expand Up @@ -482,8 +482,8 @@ func TestACL_DownPolicy_Allow(t *testing.T) {
}

testutil.WaitForResult(func() (bool, error) {
p1, _ := s1.raftPeers.Peers()
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
p1, _ := s1.numPeers()
return p1 == 2, errors.New(fmt.Sprintf("%d", p1))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
Expand Down Expand Up @@ -558,8 +558,8 @@ func TestACL_DownPolicy_ExtendCache(t *testing.T) {
}

testutil.WaitForResult(func() (bool, error) {
p1, _ := s1.raftPeers.Peers()
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
p1, _ := s1.numPeers()
return p1 == 2, errors.New(fmt.Sprintf("%d", p1))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
Expand Down
4 changes: 4 additions & 0 deletions consul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,10 @@ func DefaultConfig() *Config {
conf.SerfLANConfig.MemberlistConfig.BindPort = DefaultLANSerfPort
conf.SerfWANConfig.MemberlistConfig.BindPort = DefaultWANSerfPort

// Enable interoperability with unversioned Raft library, and don't
// start using new ID-based features yet.
conf.RaftConfig.ProtocolVersion = 1

// Disable shutdown on removal
conf.RaftConfig.ShutdownOnRemove = false

Expand Down
52 changes: 42 additions & 10 deletions consul/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,10 +543,26 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error {
}
}

// TODO (slackpad) - This will need to be changed once we support node IDs.
addr := (&net.TCPAddr{IP: m.Addr, Port: parts.Port}).String()

// See if it's already in the configuration. It's harmless to re-add it
// but we want to avoid doing that if possible to prevent useless Raft
// log entries.
configFuture := s.raft.GetConfiguration()
if err := configFuture.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err)
return err
}
for _, server := range configFuture.Configuration().Servers {
if server.Address == raft.ServerAddress(addr) {
return nil
}
}

// Attempt to add as a peer
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: parts.Port}
future := s.raft.AddPeer(addr.String())
if err := future.Error(); err != nil && err != raft.ErrKnownPeer {
addFuture := s.raft.AddPeer(raft.ServerAddress(addr))
if err := addFuture.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err)
return err
}
Expand All @@ -555,15 +571,31 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error {

// removeConsulServer is used to try to remove a consul server that has left
func (s *Server) removeConsulServer(m serf.Member, port int) error {
// Attempt to remove as peer
peer := &net.TCPAddr{IP: m.Addr, Port: port}
future := s.raft.RemovePeer(peer.String())
if err := future.Error(); err != nil && err != raft.ErrUnknownPeer {
// TODO (slackpad) - This will need to be changed once we support node IDs.
addr := (&net.TCPAddr{IP: m.Addr, Port: port}).String()

// See if it's already in the configuration. It's harmless to re-remove it
// but we want to avoid doing that if possible to prevent useless Raft
// log entries.
configFuture := s.raft.GetConfiguration()
if err := configFuture.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err)
return err
}
for _, server := range configFuture.Configuration().Servers {
if server.Address == raft.ServerAddress(addr) {
goto REMOVE
}
}
return nil

REMOVE:
// Attempt to remove as a peer.
future := s.raft.RemovePeer(raft.ServerAddress(addr))
if err := future.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v",
peer, err)
addr, err)
return err
} else if err == nil {
s.logger.Printf("[INFO] consul: removed server '%s' as peer", m.Name)
}
return nil
}
Expand Down
24 changes: 12 additions & 12 deletions consul/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,8 @@ func TestLeader_LeftServer(t *testing.T) {

for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.raftPeers.Peers()
return len(peers) == 3, nil
peers, _ := s.numPeers()
return peers == 3, nil
}, func(err error) {
t.Fatalf("should have 3 peers")
})
Expand All @@ -358,8 +358,8 @@ func TestLeader_LeftServer(t *testing.T) {
}

for _, s := range servers[1:] {
peers, _ := s.raftPeers.Peers()
return len(peers) == 2, errors.New(fmt.Sprintf("%v", peers))
peers, _ := s.numPeers()
return peers == 2, errors.New(fmt.Sprintf("%d", peers))
}

return true, nil
Expand Down Expand Up @@ -394,8 +394,8 @@ func TestLeader_LeftLeader(t *testing.T) {

for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.raftPeers.Peers()
return len(peers) == 3, nil
peers, _ := s.numPeers()
return peers == 3, nil
}, func(err error) {
t.Fatalf("should have 3 peers")
})
Expand Down Expand Up @@ -423,8 +423,8 @@ func TestLeader_LeftLeader(t *testing.T) {
}
remain = s
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.raftPeers.Peers()
return len(peers) == 2, errors.New(fmt.Sprintf("%v", peers))
peers, _ := s.numPeers()
return peers == 2, errors.New(fmt.Sprintf("%d", peers))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
Expand Down Expand Up @@ -472,8 +472,8 @@ func TestLeader_MultiBootstrap(t *testing.T) {

// Ensure we don't have multiple raft peers
for _, s := range servers {
peers, _ := s.raftPeers.Peers()
if len(peers) != 1 {
peers, _ := s.numPeers()
if peers != 1 {
t.Fatalf("should only have 1 raft peer!")
}
}
Expand Down Expand Up @@ -505,8 +505,8 @@ func TestLeader_TombstoneGC_Reset(t *testing.T) {

for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.raftPeers.Peers()
return len(peers) == 3, nil
peers, _ := s.numPeers()
return peers == 3, nil
}, func(err error) {
t.Fatalf("should have 3 peers")
})
Expand Down
5 changes: 3 additions & 2 deletions consul/raft_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/raft"
)

// RaftLayer implements the raft.StreamLayer interface,
Expand Down Expand Up @@ -80,8 +81,8 @@ func (l *RaftLayer) Addr() net.Addr {
}

// Dial is used to create a new outgoing connection
func (l *RaftLayer) Dial(address string, timeout time.Duration) (net.Conn, error) {
conn, err := net.DialTimeout("tcp", address, timeout)
func (l *RaftLayer) Dial(address raft.ServerAddress, timeout time.Duration) (net.Conn, error) {
conn, err := net.DialTimeout("tcp", string(address), timeout)
if err != nil {
return nil, err
}
Expand Down
55 changes: 34 additions & 21 deletions consul/serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strings"

"github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
)

Expand Down Expand Up @@ -53,7 +54,7 @@ func (s *Server) lanEventHandler() {
case serf.EventMemberUpdate: // Ignore
case serf.EventQuery: // Ignore
default:
s.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e)
s.logger.Printf("[WARN] consul: Unhandled LAN Serf Event: %#v", e)
}

case <-s.shutdownCh:
Expand All @@ -77,7 +78,7 @@ func (s *Server) wanEventHandler() {
case serf.EventUser:
case serf.EventQuery: // Ignore
default:
s.logger.Printf("[WARN] consul: unhandled WAN Serf Event: %#v", e)
s.logger.Printf("[WARN] consul: Unhandled WAN Serf Event: %#v", e)
}

case <-s.shutdownCh:
Expand Down Expand Up @@ -127,7 +128,7 @@ func (s *Server) localEvent(event serf.UserEvent) {
}
case isUserEvent(name):
event.Name = rawUserEventName(name)
s.logger.Printf("[DEBUG] consul: user event: %s", event.Name)
s.logger.Printf("[DEBUG] consul: User event: %s", event.Name)

// Trigger the callback
if s.config.UserEventHandler != nil {
Expand All @@ -145,12 +146,12 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) {
if !ok {
continue
}
s.logger.Printf("[INFO] consul: adding LAN server %s", parts)
s.logger.Printf("[INFO] consul: Adding LAN server %s", parts)

// See if it's configured as part of our DC.
if parts.Datacenter == s.config.Datacenter {
s.localLock.Lock()
s.localConsuls[parts.Addr.String()] = parts
s.localConsuls[raft.ServerAddress(parts.Addr.String())] = parts
s.localLock.Unlock()
}

Expand All @@ -166,10 +167,10 @@ func (s *Server) wanNodeJoin(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := agent.IsConsulServer(m)
if !ok {
s.logger.Printf("[WARN] consul: non-server in WAN pool: %s", m.Name)
s.logger.Printf("[WARN] consul: Non-server in WAN pool: %s", m.Name)
continue
}
s.logger.Printf("[INFO] consul: adding WAN server %s", parts)
s.logger.Printf("[INFO] consul: Adding WAN server %s", parts)

// Search for this node in our existing remotes.
found := false
Expand All @@ -193,20 +194,20 @@ func (s *Server) wanNodeJoin(me serf.MemberEvent) {

// maybeBootsrap is used to handle bootstrapping when a new consul server joins
func (s *Server) maybeBootstrap() {
// Bootstrap can only be done if there are no committed logs, remove our
// expectations of bootstrapping. This is slightly cheaper than the full
// check that BootstrapCluster will do, so this is a good pre-filter.
index, err := s.raftStore.LastIndex()
if err != nil {
s.logger.Printf("[ERR] consul: failed to read last raft index: %v", err)
s.logger.Printf("[ERR] consul: Failed to read last raft index: %v", err)
return
}

// Bootstrap can only be done if there are no committed logs,
// remove our expectations of bootstrapping
if index != 0 {
s.config.BootstrapExpect = 0
return
}

// Scan for all the known servers
// Scan for all the known servers.
members := s.serfLAN.Members()
addrs := make([]string, 0)
for _, member := range members {
Expand All @@ -230,18 +231,30 @@ func (s *Server) maybeBootstrap() {
addrs = append(addrs, addr.String())
}

// Skip if we haven't met the minimum expect count
// Skip if we haven't met the minimum expect count.
if len(addrs) < s.config.BootstrapExpect {
return
}

// Update the peer set
s.logger.Printf("[INFO] consul: Attempting bootstrap with nodes: %v", addrs)
if err := s.raft.SetPeers(addrs).Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to bootstrap peers: %v", err)
// Attempt a live bootstrap!
var configuration raft.Configuration
for _, addr := range addrs {
// TODO (slackpad) - This will need to be updated once we support
// node IDs.
server := raft.Server{
ID: raft.ServerID(addr),
Address: raft.ServerAddress(addr),
}
configuration.Servers = append(configuration.Servers, server)
}
s.logger.Printf("[INFO] consul: Found expected number of peers (%s), attempting to bootstrap cluster...",
strings.Join(addrs, ","))
future := s.raft.BootstrapCluster(configuration)
if err := future.Error(); err != nil {
s.logger.Printf("[ERR] consul: Failed to bootstrap cluster: %v", err)
}

// Bootstrapping complete, don't enter this again
// Bootstrapping complete, don't enter this again.
s.config.BootstrapExpect = 0
}

Expand All @@ -252,10 +265,10 @@ func (s *Server) lanNodeFailed(me serf.MemberEvent) {
if !ok {
continue
}
s.logger.Printf("[INFO] consul: removing LAN server %s", parts)
s.logger.Printf("[INFO] consul: Removing LAN server %s", parts)

s.localLock.Lock()
delete(s.localConsuls, parts.Addr.String())
delete(s.localConsuls, raft.ServerAddress(parts.Addr.String()))
s.localLock.Unlock()
}
}
Expand All @@ -267,7 +280,7 @@ func (s *Server) wanNodeFailed(me serf.MemberEvent) {
if !ok {
continue
}
s.logger.Printf("[INFO] consul: removing WAN server %s", parts)
s.logger.Printf("[INFO] consul: Removing WAN server %s", parts)

// Remove the server if known
s.remoteLock.Lock()
Expand Down
Loading

0 comments on commit 66dcefc

Please sign in to comment.