Skip to content

Commit

Permalink
Bumps default Raft protocol to version 3. (#3477)
Browse files Browse the repository at this point in the history
* Changes default Raft protocol to 3.

* Changes numPeers() to report only voters.

This should have been there before, but it's more obvious that this
is incorrect now that we default the Raft protocol to 3, which puts
new servers in a read-only state while Autopilot waits for them to
become healthy.

* Fixes TestLeader_RollRaftServer.

* Fixes TestOperator_RaftRemovePeerByAddress.

* Fixes TestServer_*.

Relaxed the check for a given number of voter peers and instead do
a thorough check that all servers see each other in their Raft
configurations.

* Fixes TestACL_*.

These now just check for Raft replication to be set up, and don't
care about the number of voter peers.

* Fixes TestOperator_Raft_ListPeers.

* Fixes TestAutopilot_CleanupDeadServerPeriodic.

* Fixes TestCatalog_ListNodes_ConsistentRead_Fail.

* Fixes TestLeader_ChangeServerID and adjusts the conn pool to throw away
sockets when it sees io.EOF.

* Changes version to 1.0.0 in the options doc.

* Makes metrics test more deterministic with autopilot metrics possible.
  • Loading branch information
slackpad authored Sep 25, 2017
1 parent 8bc8c25 commit 45646ac
Show file tree
Hide file tree
Showing 14 changed files with 160 additions and 83 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

BREAKING CHANGES:

* **Raft Protocol Defaults to 3:** The [`-raft-protocol`](https://www.consul.io/docs/agent/options.html#_raft_protocol) default has been changed from 2 to 3, enabling all Autopilot features by default. Version 3 requires Consul running 0.8.0 or newer on all servers in order to work, so if you are upgrading with older servers in a cluster then you will need to set this back to 2 in order to upgrade. See [Raft Protocol Version Compatibility](https://www.consul.io/docs/upgrade-specific.html#raft-protocol-version-compatibility) for more details. Also the format of `peers.json` used for outage recovery is different when running with the lastest Raft protocol. See [Manual Recovery Using peers.json](https://www.consul.io/docs/guides/outage.html#manual-recovery-using-peers-json) for a description of the required format.

FEATURES:

IMPROVEMENTS:
Expand All @@ -22,7 +24,7 @@ IMPROVEMENTS:
* agent: Switched to using a read lock for the agent's RPC dispatcher, which prevents RPC calls from getting serialized. [GH-3376]
* agent: When joining a cluster, Consul now skips the unique node ID constraint for Consul members running Consul older than 0.8.5. This makes it easier to upgrade to newer versions of Consul in an existing cluster with non-unique node IDs. [GH-3070]
* build: Upgraded Go version to 1.9. [GH-3428]
* server: Consul servers can re-establish quorum after all of them change their IP addresses upon a restart. [GH-1580]
* server: Consul servers can re-establish quorum after all of them change their IP addresses upon a restart. [GH-1580]

BUG FIXES:

Expand Down
12 changes: 6 additions & 6 deletions agent/consul/acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func TestACL_NonAuthority_NotFound(t *testing.T) {

// Try to join
joinLAN(t, s2, s1)
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s1, 2)) })
retry.Run(t, func(r *retry.R) { r.Check(wantRaft([]*Server{s1, s2})) })

client := rpcClient(t, s1)
defer client.Close()
Expand Down Expand Up @@ -278,7 +278,7 @@ func TestACL_NonAuthority_Found(t *testing.T) {

// Try to join
joinLAN(t, s2, s1)
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s1, 2)) })
retry.Run(t, func(r *retry.R) { r.Check(wantRaft([]*Server{s1, s2})) })

testrpc.WaitForLeader(t, s1.RPC, "dc1")

Expand Down Expand Up @@ -346,7 +346,7 @@ func TestACL_NonAuthority_Management(t *testing.T) {

// Try to join
joinLAN(t, s2, s1)
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s1, 2)) })
retry.Run(t, func(r *retry.R) { r.Check(wantRaft([]*Server{s1, s2})) })

testrpc.WaitForLeader(t, s1.RPC, "dc1")

Expand Down Expand Up @@ -395,7 +395,7 @@ func TestACL_DownPolicy_Deny(t *testing.T) {

// Try to join
joinLAN(t, s2, s1)
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s1, 2)) })
retry.Run(t, func(r *retry.R) { r.Check(wantRaft([]*Server{s1, s2})) })

testrpc.WaitForLeader(t, s1.RPC, "dc1")

Expand Down Expand Up @@ -461,7 +461,7 @@ func TestACL_DownPolicy_Allow(t *testing.T) {

// Try to join
joinLAN(t, s2, s1)
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s1, 2)) })
retry.Run(t, func(r *retry.R) { r.Check(wantRaft([]*Server{s1, s2})) })

testrpc.WaitForLeader(t, s1.RPC, "dc1")

Expand Down Expand Up @@ -529,7 +529,7 @@ func TestACL_DownPolicy_ExtendCache(t *testing.T) {

// Try to join
joinLAN(t, s2, s1)
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s1, 2)) })
retry.Run(t, func(r *retry.R) { r.Check(wantRaft([]*Server{s1, s2})) })

testrpc.WaitForLeader(t, s1.RPC, "dc1")

Expand Down
30 changes: 21 additions & 9 deletions agent/consul/autopilot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func TestAutopilot_CleanupDeadServerPeriodic(t *testing.T) {
c.Datacenter = "dc1"
c.Bootstrap = false
}

dir2, s2 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
Expand All @@ -101,24 +102,35 @@ func TestAutopilot_CleanupDeadServerPeriodic(t *testing.T) {
defer os.RemoveAll(dir4)
defer s4.Shutdown()

servers := []*Server{s1, s2, s3, s4}
dir5, s5 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir5)
defer s5.Shutdown()

// Join the servers to s1
servers := []*Server{s1, s2, s3, s4, s5}

// Join the servers to s1, and wait until they are all promoted to
// voters.
for _, s := range servers[1:] {
joinLAN(t, s, s1)
}

for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 4)) })
}
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 5))
}
})

// Kill a non-leader server
s4.Shutdown()

// Should be removed from the peers automatically
for _, s := range []*Server{s1, s2, s3} {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
servers = []*Server{s1, s2, s3, s5}
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 4))
}
})
}

func TestAutopilot_CleanupStaleRaftServer(t *testing.T) {
Expand Down
40 changes: 24 additions & 16 deletions agent/consul/catalog_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,29 +779,38 @@ func TestCatalog_ListNodes_ConsistentRead_Fail(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec1 := rpcClient(t, s1)
defer codec1.Close()

dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
codec2 := rpcClient(t, s2)
defer codec2.Close()

// Try to join
joinLAN(t, s2, s1)
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3)
defer s3.Shutdown()

testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s2.RPC, "dc1")
// Try to join and wait for all servers to get promoted to voters.
joinLAN(t, s2, s1)
joinLAN(t, s3, s2)
servers := []*Server{s1, s2, s3}
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 3))
}
})

// Use the leader as the client, kill the follower
// Use the leader as the client, kill the followers.
var codec rpc.ClientCodec
if s1.IsLeader() {
codec = codec1
s2.Shutdown()
} else {
codec = codec2
s1.Shutdown()
for _, s := range servers {
if s.IsLeader() {
codec = rpcClient(t, s)
defer codec.Close()
} else {
s.Shutdown()
}
}
if codec == nil {
t.Fatalf("no leader")
}

args := structs.DCSpecificRequest{
Expand All @@ -813,7 +822,6 @@ func TestCatalog_ListNodes_ConsistentRead_Fail(t *testing.T) {
if err == nil || !strings.HasPrefix(err.Error(), "leadership lost") {
t.Fatalf("err: %v", err)
}

if out.QueryMeta.LastContact != 0 {
t.Fatalf("should not have a last contact time")
}
Expand Down
7 changes: 3 additions & 4 deletions agent/consul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,10 +434,9 @@ func DefaultConfig() *Config {
conf.SerfLANConfig.MemberlistConfig.BindPort = DefaultLANSerfPort
conf.SerfWANConfig.MemberlistConfig.BindPort = DefaultWANSerfPort

// TODO: default to 3 in Consul 0.9
// Use a transitional version of the raft protocol to interoperate with
// versions 1 and 3
conf.RaftConfig.ProtocolVersion = 2
// Raft protocol version 3 only works with other Consul servers running
// 0.8.0 or later.
conf.RaftConfig.ProtocolVersion = 3

// Disable shutdown on removal
conf.RaftConfig.ShutdownOnRemove = false
Expand Down
37 changes: 37 additions & 0 deletions agent/consul/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

"github.com/hashicorp/consul/testutil/retry"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
)

Expand Down Expand Up @@ -41,6 +42,42 @@ func wantPeers(s *Server, peers int) error {
return nil
}

// wantRaft determines if the servers have all of each other in their
// Raft configurations,
func wantRaft(servers []*Server) error {
// Make sure all the servers are represented in the Raft config,
// and that there are no extras.
verifyRaft := func(c raft.Configuration) error {
want := make(map[raft.ServerID]bool)
for _, s := range servers {
want[s.config.RaftConfig.LocalID] = true
}

for _, s := range c.Servers {
if !want[s.ID] {
return fmt.Errorf("don't want %q", s.ID)
}
delete(want, s.ID)
}

if len(want) > 0 {
return fmt.Errorf("didn't find %v", want)
}
return nil
}

for _, s := range servers {
future := s.raft.GetConfiguration()
if err := future.Error(); err != nil {
return err
}
if err := verifyRaft(future.Configuration()); err != nil {
return err
}
}
return nil
}

// joinAddrLAN returns the address other servers can
// use to join the cluster on the LAN interface.
func joinAddrLAN(s *Server) string {
Expand Down
27 changes: 21 additions & 6 deletions agent/consul/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,7 @@ func TestLeader_RollRaftServer(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = true
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 2
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
Expand All @@ -715,7 +716,11 @@ func TestLeader_RollRaftServer(t *testing.T) {
defer os.RemoveAll(dir2)
defer s2.Shutdown()

dir3, s3 := testServerDCBootstrap(t, "dc1", false)
dir3, s3 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 2
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()

Expand Down Expand Up @@ -803,10 +808,9 @@ func TestLeader_ChangeServerID(t *testing.T) {

servers := []*Server{s1, s2, s3}

// Try to join
// Try to join and wait for all servers to get promoted
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)

for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
Expand Down Expand Up @@ -841,10 +845,21 @@ func TestLeader_ChangeServerID(t *testing.T) {
joinLAN(t, s4, s1)
servers[2] = s4

// While integrating #3327 it uncovered that this test was flaky. The
// connection pool would use the same TCP connection to the old server
// which would give EOF errors to the autopilot health check RPC call.
// To make this more reliable we changed the connection pool to throw
// away the connection if it sees an EOF error, since there's no way
// that connection is going to work again. This made this test reliable
// since it will make a new connection to s4.

// Make sure the dead server is removed and we're back to 3 total peers
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 3))
}
})
}

func TestLeader_ACL_Initialization(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion agent/consul/operator_raft_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"reflect"
"strings"
"testing"
"time"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
Expand Down Expand Up @@ -157,7 +158,8 @@ func TestOperator_RaftRemovePeerByAddress(t *testing.T) {

// Add it manually to Raft.
{
future := s1.raft.AddPeer(arg.Address)
id := raft.ServerID("fake-node-id")
future := s1.raft.AddVoter(id, arg.Address, 0, time.Second)
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
17 changes: 11 additions & 6 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,8 +588,6 @@ func (s *Server) setupRaft() error {
return err
}
if !hasState {
// TODO (slackpad) - This will need to be updated when
// we add support for node IDs.
configuration := raft.Configuration{
Servers: []raft.Server{
raft.Server{
Expand Down Expand Up @@ -835,15 +833,22 @@ func (s *Server) Leave() error {
return nil
}

// numPeers is used to check on the number of known peers, including the local
// node.
// numPeers is used to check on the number of known peers, including potentially
// the local node. We count only voters, since others can't actually become
// leader, so aren't considered peers.
func (s *Server) numPeers() (int, error) {
future := s.raft.GetConfiguration()
if err := future.Error(); err != nil {
return 0, err
}
configuration := future.Configuration()
return len(configuration.Servers), nil

var numPeers int
for _, server := range future.Configuration().Servers {
if server.Suffrage == raft.Voter {
numPeers++
}
}
return numPeers, nil
}

// JoinLAN is used to have Consul join the inner-DC pool
Expand Down
Loading

0 comments on commit 45646ac

Please sign in to comment.