Skip to content

Commit

Permalink
raft: Allow Join to be called multiple times for the same cluster member
Browse files Browse the repository at this point in the history
If Join is called and the member already exists in the cluster, instead
of returning an error, it will update the node's address with the new
one provided to the RPC.

This will allow managers to update their addresses automatically on
startup, if they were configured to autodetect the addresses.

It will also be possible to manually repeat the "docker swarm join"
command, to specify a different advertise address, or rejoin a cluster
when all known manager IPs have changed.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
  • Loading branch information
aaronlehmann committed May 23, 2017
1 parent eb07af5 commit ba3a0ec
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 30 deletions.
61 changes: 41 additions & 20 deletions manager/state/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -909,24 +909,6 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons
return nil, grpc.Errorf(codes.FailedPrecondition, "%s", ErrLostLeadership.Error())
}

// A single manager must not be able to join the raft cluster twice. If
// it did, that would cause the quorum to be computed incorrectly. This
// could happen if the WAL was deleted from an active manager.
for _, m := range n.cluster.Members() {
if m.NodeID == nodeInfo.NodeID {
return nil, grpc.Errorf(codes.AlreadyExists, "%s", "a raft member with this node ID already exists")
}
}

// Find a unique ID for the joining member.
var raftID uint64
for {
raftID = uint64(rand.Int63()) + 1
if n.cluster.GetMember(raftID) == nil && !n.cluster.IsIDRemoved(raftID) {
break
}
}

remoteAddr := req.Addr

// If the joining node sent an address like 0.0.0.0:4242, automatically
Expand All @@ -953,12 +935,52 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons
return nil, err
}

// If the peer is already a member of the cluster, we will only update
// its information, not add it as a new member. Adding it again would
// cause the quorum to be computed incorrectly.
for _, m := range n.cluster.Members() {
if m.NodeID == nodeInfo.NodeID {
if remoteAddr == m.Addr {
return n.joinResponse(m.RaftID), nil
}
updatedRaftMember := &api.RaftMember{
RaftID: m.RaftID,
NodeID: m.NodeID,
Addr: remoteAddr,
}
if err := n.cluster.UpdateMember(m.RaftID, updatedRaftMember); err != nil {
return nil, err
}

if err := n.updateNodeBlocking(ctx, m.RaftID, remoteAddr); err != nil {
return nil, err
}

return n.joinResponse(m.RaftID), nil
}
}

// Find a unique ID for the joining member.
var raftID uint64
for {
raftID = uint64(rand.Int63()) + 1
if n.cluster.GetMember(raftID) == nil && !n.cluster.IsIDRemoved(raftID) {
break
}
}

err = n.addMember(ctx, remoteAddr, raftID, nodeInfo.NodeID)
if err != nil {
log.WithError(err).Errorf("failed to add member %x", raftID)
return nil, err
}

log.Debugf("node joined")

return n.joinResponse(raftID), nil
}

func (n *Node) joinResponse(raftID uint64) *api.JoinResponse {
var nodes []*api.RaftMember
for _, node := range n.cluster.Members() {
nodes = append(nodes, &api.RaftMember{
Expand All @@ -967,9 +989,8 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons
Addr: node.Addr,
})
}
log.Debugf("node joined")

return &api.JoinResponse{Members: nodes, RaftID: raftID}, nil
return &api.JoinResponse{Members: nodes, RaftID: raftID}
}

// checkHealth tries to contact an aspiring member through its advertised address
Expand Down
35 changes: 29 additions & 6 deletions manager/state/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import (
"io/ioutil"
"log"
"math/rand"
"net"
"os"
"reflect"
"strconv"
"testing"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"

"golang.org/x/net/context"
Expand Down Expand Up @@ -88,9 +88,19 @@ func dial(n *raftutils.TestNode, addr string) (*grpc.ClientConn, error) {
func TestRaftJoinTwice(t *testing.T) {
t.Parallel()

nodes, _ := raftutils.NewRaftCluster(t, tc)
nodes, clockSource := raftutils.NewRaftCluster(t, tc)
defer raftutils.TeardownCluster(nodes)

// Node 3's address changes
nodes[3].Server.Stop()
nodes[3].ShutdownRaft()
nodes[3].Listener.CloseListener()

l, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err, "can't bind to raft service port")
nodes[3].Listener = raftutils.NewWrappedListener(l)
nodes[3] = raftutils.RestartNode(t, clockSource, nodes[3], false)

// Node 3 tries to join again
// Use gRPC instead of calling handler directly because of
// authorization check.
Expand All @@ -99,10 +109,23 @@ func TestRaftJoinTwice(t *testing.T) {
raftClient := api.NewRaftMembershipClient(cc)
defer cc.Close()
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
_, err = raftClient.Join(ctx, &api.JoinRequest{})
assert.Error(t, err, "expected error on duplicate Join")
assert.Equal(t, grpc.Code(err), codes.AlreadyExists)
assert.Equal(t, grpc.ErrorDesc(err), "a raft member with this node ID already exists")
_, err = raftClient.Join(ctx, &api.JoinRequest{Addr: l.Addr().String()})
assert.NoError(t, err)

// Propose a value and wait for it to propagate
value, err := raftutils.ProposeValue(t, nodes[1], DefaultProposalTime)
assert.NoError(t, err, "failed to propose value")
raftutils.CheckValue(t, clockSource, nodes[2], value)

// Restart node 2
nodes[2].Server.Stop()
nodes[2].ShutdownRaft()
nodes[2] = raftutils.RestartNode(t, clockSource, nodes[2], false)
raftutils.WaitForCluster(t, clockSource, nodes)

// Node 2 should have the updated address for node 3 in its member list
require.NotNil(t, nodes[2].GetMemberlist()[nodes[3].Config.ID])
require.Equal(t, l.Addr().String(), nodes[2].GetMemberlist()[nodes[3].Config.ID].Addr)
}

func TestRaftLeader(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions manager/state/raft/testutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ func (l *WrappedListener) Close() error {
return nil
}

// CloseListener closes the listener
func (l *WrappedListener) close() error {
// CloseListener closes the underlying listener
func (l *WrappedListener) CloseListener() error {
return l.Listener.Close()
}

Expand Down Expand Up @@ -471,7 +471,7 @@ func ShutdownNode(node *TestNode) {
<-node.Done()
}
os.RemoveAll(node.StateDir)
node.Listener.close()
node.Listener.CloseListener()
}

// ShutdownRaft shutdowns only raft part of node.
Expand All @@ -487,7 +487,7 @@ func (n *TestNode) ShutdownRaft() {
func CleanupNonRunningNode(node *TestNode) {
node.Server.Stop()
os.RemoveAll(node.StateDir)
node.Listener.close()
node.Listener.CloseListener()
}

// Leader determines who is the leader amongst a set of raft nodes
Expand Down

0 comments on commit ba3a0ec

Please sign in to comment.