From 4963ae437522b6e91533e96538862d1cc85b6ae1 Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Mon, 22 May 2017 18:17:10 -0700 Subject: [PATCH 1/2] raft: Allow Join to be called multiple times for the same cluster member If Join is called and the member already exists in the cluster, instead of returning an error, it will update the node's address with the new one provided to the RPC. This will allow managers to update their addresses automatically on startup, if they were configured to autodetect the addresses. It will also be possible to manually repeat the "docker swarm join" command, to specify a different advertise address, or rejoin a cluster when all known manager IPs have changed. Signed-off-by: Aaron Lehmann --- manager/state/raft/raft.go | 63 ++++++++++++++++------- manager/state/raft/raft_test.go | 35 ++++++++++--- manager/state/raft/testutils/testutils.go | 8 +-- 3 files changed, 76 insertions(+), 30 deletions(-) diff --git a/manager/state/raft/raft.go b/manager/state/raft/raft.go index b793374095..89f19b5cdd 100644 --- a/manager/state/raft/raft.go +++ b/manager/state/raft/raft.go @@ -909,24 +909,6 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons return nil, grpc.Errorf(codes.FailedPrecondition, "%s", ErrLostLeadership.Error()) } - // A single manager must not be able to join the raft cluster twice. If - // it did, that would cause the quorum to be computed incorrectly. This - // could happen if the WAL was deleted from an active manager. - for _, m := range n.cluster.Members() { - if m.NodeID == nodeInfo.NodeID { - return nil, grpc.Errorf(codes.AlreadyExists, "%s", "a raft member with this node ID already exists") - } - } - - // Find a unique ID for the joining member. - var raftID uint64 - for { - raftID = uint64(rand.Int63()) + 1 - if n.cluster.GetMember(raftID) == nil && !n.cluster.IsIDRemoved(raftID) { - break - } - } - remoteAddr := req.Addr // If the joining node sent an address like 0.0.0.0:4242, automatically @@ -953,12 +935,54 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons return nil, err } + // If the peer is already a member of the cluster, we will only update + // its information, not add it as a new member. Adding it again would + // cause the quorum to be computed incorrectly. + for _, m := range n.cluster.Members() { + if m.NodeID == nodeInfo.NodeID { + if remoteAddr == m.Addr { + return n.joinResponse(m.RaftID), nil + } + updatedRaftMember := &api.RaftMember{ + RaftID: m.RaftID, + NodeID: m.NodeID, + Addr: remoteAddr, + } + if err := n.cluster.UpdateMember(m.RaftID, updatedRaftMember); err != nil { + return nil, err + } + + if err := n.updateNodeBlocking(ctx, m.RaftID, remoteAddr); err != nil { + log.WithError(err).Error("failed to update node address") + return nil, err + } + + log.Info("updated node address") + return n.joinResponse(m.RaftID), nil + } + } + + // Find a unique ID for the joining member. + var raftID uint64 + for { + raftID = uint64(rand.Int63()) + 1 + if n.cluster.GetMember(raftID) == nil && !n.cluster.IsIDRemoved(raftID) { + break + } + } + err = n.addMember(ctx, remoteAddr, raftID, nodeInfo.NodeID) if err != nil { log.WithError(err).Errorf("failed to add member %x", raftID) return nil, err } + log.Debug("node joined") + + return n.joinResponse(raftID), nil +} + +func (n *Node) joinResponse(raftID uint64) *api.JoinResponse { var nodes []*api.RaftMember for _, node := range n.cluster.Members() { nodes = append(nodes, &api.RaftMember{ @@ -967,9 +991,8 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons Addr: node.Addr, }) } - log.Debugf("node joined") - return &api.JoinResponse{Members: nodes, RaftID: raftID}, nil + return &api.JoinResponse{Members: nodes, RaftID: raftID} } // checkHealth tries to contact an aspiring member through its advertised address diff --git a/manager/state/raft/raft_test.go b/manager/state/raft/raft_test.go index d3f7c966e9..f85ed8d9e7 100644 --- a/manager/state/raft/raft_test.go +++ b/manager/state/raft/raft_test.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "log" "math/rand" + "net" "os" "reflect" "strconv" @@ -13,7 +14,6 @@ import ( "time" "google.golang.org/grpc" - "google.golang.org/grpc/codes" "google.golang.org/grpc/grpclog" "golang.org/x/net/context" @@ -88,9 +88,19 @@ func dial(n *raftutils.TestNode, addr string) (*grpc.ClientConn, error) { func TestRaftJoinTwice(t *testing.T) { t.Parallel() - nodes, _ := raftutils.NewRaftCluster(t, tc) + nodes, clockSource := raftutils.NewRaftCluster(t, tc) defer raftutils.TeardownCluster(nodes) + // Node 3's address changes + nodes[3].Server.Stop() + nodes[3].ShutdownRaft() + nodes[3].Listener.CloseListener() + + l, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err, "can't bind to raft service port") + nodes[3].Listener = raftutils.NewWrappedListener(l) + nodes[3] = raftutils.RestartNode(t, clockSource, nodes[3], false) + // Node 3 tries to join again // Use gRPC instead of calling handler directly because of // authorization check. @@ -99,10 +109,23 @@ func TestRaftJoinTwice(t *testing.T) { raftClient := api.NewRaftMembershipClient(cc) defer cc.Close() ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) - _, err = raftClient.Join(ctx, &api.JoinRequest{}) - assert.Error(t, err, "expected error on duplicate Join") - assert.Equal(t, grpc.Code(err), codes.AlreadyExists) - assert.Equal(t, grpc.ErrorDesc(err), "a raft member with this node ID already exists") + _, err = raftClient.Join(ctx, &api.JoinRequest{Addr: l.Addr().String()}) + assert.NoError(t, err) + + // Propose a value and wait for it to propagate + value, err := raftutils.ProposeValue(t, nodes[1], DefaultProposalTime) + assert.NoError(t, err, "failed to propose value") + raftutils.CheckValue(t, clockSource, nodes[2], value) + + // Restart node 2 + nodes[2].Server.Stop() + nodes[2].ShutdownRaft() + nodes[2] = raftutils.RestartNode(t, clockSource, nodes[2], false) + raftutils.WaitForCluster(t, clockSource, nodes) + + // Node 2 should have the updated address for node 3 in its member list + require.NotNil(t, nodes[2].GetMemberlist()[nodes[3].Config.ID]) + require.Equal(t, l.Addr().String(), nodes[2].GetMemberlist()[nodes[3].Config.ID].Addr) } func TestRaftLeader(t *testing.T) { diff --git a/manager/state/raft/testutils/testutils.go b/manager/state/raft/testutils/testutils.go index 6a63b6a1ae..5fe1d4ae23 100644 --- a/manager/state/raft/testutils/testutils.go +++ b/manager/state/raft/testutils/testutils.go @@ -159,8 +159,8 @@ func (l *WrappedListener) Close() error { return nil } -// CloseListener closes the listener -func (l *WrappedListener) close() error { +// CloseListener closes the underlying listener +func (l *WrappedListener) CloseListener() error { return l.Listener.Close() } @@ -471,7 +471,7 @@ func ShutdownNode(node *TestNode) { <-node.Done() } os.RemoveAll(node.StateDir) - node.Listener.close() + node.Listener.CloseListener() } // ShutdownRaft shutdowns only raft part of node. @@ -487,7 +487,7 @@ func (n *TestNode) ShutdownRaft() { func CleanupNonRunningNode(node *TestNode) { node.Server.Stop() os.RemoveAll(node.StateDir) - node.Listener.close() + node.Listener.CloseListener() } // Leader determines who is the leader amongst a set of raft nodes From 10e1838c570a1cef250df1b18e4a30aaf47cf9eb Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Tue, 23 May 2017 13:57:56 -0700 Subject: [PATCH 2/2] raft: Allow Join to be called when already part of a cluster Signed-off-by: Aaron Lehmann --- manager/manager.go | 7 ++++++- manager/state/raft/raft.go | 33 ++++++++++++++++++++++----------- node/node.go | 12 ++++++++++-- 3 files changed, 38 insertions(+), 14 deletions(-) diff --git a/manager/manager.go b/manager/manager.go index d17e8ec231..517b37817e 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -86,7 +86,11 @@ type Config struct { // cluster to join. JoinRaft string - // Top-level state directory + // ForceJoin causes us to invoke raft's Join RPC even if already part + // of a cluster. + ForceJoin bool + + // StateDir is the top-level state directory StateDir string // ForceNewCluster defines if we have to force a new cluster @@ -201,6 +205,7 @@ func New(config *Config) (*Manager, error) { newNodeOpts := raft.NodeOptions{ ID: config.SecurityConfig.ClientTLSCreds.NodeID(), JoinAddr: config.JoinRaft, + ForceJoin: config.ForceJoin, Config: raftCfg, StateDir: raftStateDir, ForceNewCluster: config.ForceNewCluster, diff --git a/manager/state/raft/raft.go b/manager/state/raft/raft.go index 89f19b5cdd..9422c0e3b4 100644 --- a/manager/state/raft/raft.go +++ b/manager/state/raft/raft.go @@ -166,6 +166,8 @@ type NodeOptions struct { // JoinAddr is the cluster to join. May be an empty string to create // a standalone cluster. JoinAddr string + // ForceJoin tells us to join even if already part of a cluster. + ForceJoin bool // Config is the raft config. Config *raft.Config // StateDir is the directory to store durable state. @@ -393,8 +395,10 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) { // restore from snapshot if loadAndStartErr == nil { - if n.opts.JoinAddr != "" { - log.G(ctx).Warning("ignoring request to join cluster, because raft state already exists") + if n.opts.JoinAddr != "" && n.opts.ForceJoin { + if err := n.joinCluster(ctx); err != nil { + return errors.Wrap(err, "failed to rejoin cluster") + } } n.campaignWhenAble = true n.initTransport() @@ -402,7 +406,6 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) { return nil } - // first member of cluster if n.opts.JoinAddr == "" { // First member in the cluster, self-assign ID n.Config.ID = uint64(rand.Int63()) + 1 @@ -417,6 +420,22 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) { } // join to existing cluster + + if err := n.joinCluster(ctx); err != nil { + return err + } + + if _, err := n.newRaftLogs(n.opts.ID); err != nil { + return err + } + + n.initTransport() + n.raftNode = raft.StartNode(n.Config, nil) + + return nil +} + +func (n *Node) joinCluster(ctx context.Context) error { if n.opts.Addr == "" { return errors.New("attempted to join raft cluster without knowing own address") } @@ -438,15 +457,7 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) { } n.Config.ID = resp.RaftID - - if _, err := n.newRaftLogs(n.opts.ID); err != nil { - return err - } n.bootstrapMembers = resp.Members - - n.initTransport() - n.raftNode = raft.StartNode(n.Config, nil) - return nil } diff --git a/node/node.go b/node/node.go index 77fe5b3d75..68f81f0b92 100644 --- a/node/node.go +++ b/node/node.go @@ -828,14 +828,22 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig } } - remoteAddr, _ := n.remotes.Select(n.NodeID()) + joinAddr := n.config.JoinAddr + if joinAddr == "" { + remoteAddr, err := n.remotes.Select(n.NodeID()) + if err == nil { + joinAddr = remoteAddr.Addr + } + } + m, err := manager.New(&manager.Config{ ForceNewCluster: n.config.ForceNewCluster, RemoteAPI: remoteAPI, ControlAPI: n.config.ListenControlAPI, SecurityConfig: securityConfig, ExternalCAs: n.config.ExternalCAs, - JoinRaft: remoteAddr.Addr, + JoinRaft: joinAddr, + ForceJoin: n.config.JoinAddr != "", StateDir: n.config.StateDir, HeartbeatTick: n.config.HeartbeatTick, ElectionTick: n.config.ElectionTick,