diff --git a/manager/manager.go b/manager/manager.go index d17e8ec231..517b37817e 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -86,7 +86,11 @@ type Config struct { // cluster to join. JoinRaft string - // Top-level state directory + // ForceJoin causes us to invoke raft's Join RPC even if already part + // of a cluster. + ForceJoin bool + + // StateDir is the top-level state directory StateDir string // ForceNewCluster defines if we have to force a new cluster @@ -201,6 +205,7 @@ func New(config *Config) (*Manager, error) { newNodeOpts := raft.NodeOptions{ ID: config.SecurityConfig.ClientTLSCreds.NodeID(), JoinAddr: config.JoinRaft, + ForceJoin: config.ForceJoin, Config: raftCfg, StateDir: raftStateDir, ForceNewCluster: config.ForceNewCluster, diff --git a/manager/state/raft/raft.go b/manager/state/raft/raft.go index b793374095..9422c0e3b4 100644 --- a/manager/state/raft/raft.go +++ b/manager/state/raft/raft.go @@ -166,6 +166,8 @@ type NodeOptions struct { // JoinAddr is the cluster to join. May be an empty string to create // a standalone cluster. JoinAddr string + // ForceJoin tells us to join even if already part of a cluster. + ForceJoin bool // Config is the raft config. Config *raft.Config // StateDir is the directory to store durable state. @@ -393,8 +395,10 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) { // restore from snapshot if loadAndStartErr == nil { - if n.opts.JoinAddr != "" { - log.G(ctx).Warning("ignoring request to join cluster, because raft state already exists") + if n.opts.JoinAddr != "" && n.opts.ForceJoin { + if err := n.joinCluster(ctx); err != nil { + return errors.Wrap(err, "failed to rejoin cluster") + } } n.campaignWhenAble = true n.initTransport() @@ -402,7 +406,6 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) { return nil } - // first member of cluster if n.opts.JoinAddr == "" { // First member in the cluster, self-assign ID n.Config.ID = uint64(rand.Int63()) + 1 @@ -417,6 +420,22 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) { } // join to existing cluster + + if err := n.joinCluster(ctx); err != nil { + return err + } + + if _, err := n.newRaftLogs(n.opts.ID); err != nil { + return err + } + + n.initTransport() + n.raftNode = raft.StartNode(n.Config, nil) + + return nil +} + +func (n *Node) joinCluster(ctx context.Context) error { if n.opts.Addr == "" { return errors.New("attempted to join raft cluster without knowing own address") } @@ -438,15 +457,7 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) { } n.Config.ID = resp.RaftID - - if _, err := n.newRaftLogs(n.opts.ID); err != nil { - return err - } n.bootstrapMembers = resp.Members - - n.initTransport() - n.raftNode = raft.StartNode(n.Config, nil) - return nil } @@ -909,24 +920,6 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons return nil, grpc.Errorf(codes.FailedPrecondition, "%s", ErrLostLeadership.Error()) } - // A single manager must not be able to join the raft cluster twice. If - // it did, that would cause the quorum to be computed incorrectly. This - // could happen if the WAL was deleted from an active manager. - for _, m := range n.cluster.Members() { - if m.NodeID == nodeInfo.NodeID { - return nil, grpc.Errorf(codes.AlreadyExists, "%s", "a raft member with this node ID already exists") - } - } - - // Find a unique ID for the joining member. - var raftID uint64 - for { - raftID = uint64(rand.Int63()) + 1 - if n.cluster.GetMember(raftID) == nil && !n.cluster.IsIDRemoved(raftID) { - break - } - } - remoteAddr := req.Addr // If the joining node sent an address like 0.0.0.0:4242, automatically @@ -953,12 +946,54 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons return nil, err } + // If the peer is already a member of the cluster, we will only update + // its information, not add it as a new member. Adding it again would + // cause the quorum to be computed incorrectly. + for _, m := range n.cluster.Members() { + if m.NodeID == nodeInfo.NodeID { + if remoteAddr == m.Addr { + return n.joinResponse(m.RaftID), nil + } + updatedRaftMember := &api.RaftMember{ + RaftID: m.RaftID, + NodeID: m.NodeID, + Addr: remoteAddr, + } + if err := n.cluster.UpdateMember(m.RaftID, updatedRaftMember); err != nil { + return nil, err + } + + if err := n.updateNodeBlocking(ctx, m.RaftID, remoteAddr); err != nil { + log.WithError(err).Error("failed to update node address") + return nil, err + } + + log.Info("updated node address") + return n.joinResponse(m.RaftID), nil + } + } + + // Find a unique ID for the joining member. + var raftID uint64 + for { + raftID = uint64(rand.Int63()) + 1 + if n.cluster.GetMember(raftID) == nil && !n.cluster.IsIDRemoved(raftID) { + break + } + } + err = n.addMember(ctx, remoteAddr, raftID, nodeInfo.NodeID) if err != nil { log.WithError(err).Errorf("failed to add member %x", raftID) return nil, err } + log.Debug("node joined") + + return n.joinResponse(raftID), nil +} + +func (n *Node) joinResponse(raftID uint64) *api.JoinResponse { var nodes []*api.RaftMember for _, node := range n.cluster.Members() { nodes = append(nodes, &api.RaftMember{ @@ -967,9 +1002,8 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons Addr: node.Addr, }) } - log.Debugf("node joined") - return &api.JoinResponse{Members: nodes, RaftID: raftID}, nil + return &api.JoinResponse{Members: nodes, RaftID: raftID} } // checkHealth tries to contact an aspiring member through its advertised address diff --git a/manager/state/raft/raft_test.go b/manager/state/raft/raft_test.go index d3f7c966e9..f85ed8d9e7 100644 --- a/manager/state/raft/raft_test.go +++ b/manager/state/raft/raft_test.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "log" "math/rand" + "net" "os" "reflect" "strconv" @@ -13,7 +14,6 @@ import ( "time" "google.golang.org/grpc" - "google.golang.org/grpc/codes" "google.golang.org/grpc/grpclog" "golang.org/x/net/context" @@ -88,9 +88,19 @@ func dial(n *raftutils.TestNode, addr string) (*grpc.ClientConn, error) { func TestRaftJoinTwice(t *testing.T) { t.Parallel() - nodes, _ := raftutils.NewRaftCluster(t, tc) + nodes, clockSource := raftutils.NewRaftCluster(t, tc) defer raftutils.TeardownCluster(nodes) + // Node 3's address changes + nodes[3].Server.Stop() + nodes[3].ShutdownRaft() + nodes[3].Listener.CloseListener() + + l, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err, "can't bind to raft service port") + nodes[3].Listener = raftutils.NewWrappedListener(l) + nodes[3] = raftutils.RestartNode(t, clockSource, nodes[3], false) + // Node 3 tries to join again // Use gRPC instead of calling handler directly because of // authorization check. @@ -99,10 +109,23 @@ func TestRaftJoinTwice(t *testing.T) { raftClient := api.NewRaftMembershipClient(cc) defer cc.Close() ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) - _, err = raftClient.Join(ctx, &api.JoinRequest{}) - assert.Error(t, err, "expected error on duplicate Join") - assert.Equal(t, grpc.Code(err), codes.AlreadyExists) - assert.Equal(t, grpc.ErrorDesc(err), "a raft member with this node ID already exists") + _, err = raftClient.Join(ctx, &api.JoinRequest{Addr: l.Addr().String()}) + assert.NoError(t, err) + + // Propose a value and wait for it to propagate + value, err := raftutils.ProposeValue(t, nodes[1], DefaultProposalTime) + assert.NoError(t, err, "failed to propose value") + raftutils.CheckValue(t, clockSource, nodes[2], value) + + // Restart node 2 + nodes[2].Server.Stop() + nodes[2].ShutdownRaft() + nodes[2] = raftutils.RestartNode(t, clockSource, nodes[2], false) + raftutils.WaitForCluster(t, clockSource, nodes) + + // Node 2 should have the updated address for node 3 in its member list + require.NotNil(t, nodes[2].GetMemberlist()[nodes[3].Config.ID]) + require.Equal(t, l.Addr().String(), nodes[2].GetMemberlist()[nodes[3].Config.ID].Addr) } func TestRaftLeader(t *testing.T) { diff --git a/manager/state/raft/testutils/testutils.go b/manager/state/raft/testutils/testutils.go index 6a63b6a1ae..5fe1d4ae23 100644 --- a/manager/state/raft/testutils/testutils.go +++ b/manager/state/raft/testutils/testutils.go @@ -159,8 +159,8 @@ func (l *WrappedListener) Close() error { return nil } -// CloseListener closes the listener -func (l *WrappedListener) close() error { +// CloseListener closes the underlying listener +func (l *WrappedListener) CloseListener() error { return l.Listener.Close() } @@ -471,7 +471,7 @@ func ShutdownNode(node *TestNode) { <-node.Done() } os.RemoveAll(node.StateDir) - node.Listener.close() + node.Listener.CloseListener() } // ShutdownRaft shutdowns only raft part of node. @@ -487,7 +487,7 @@ func (n *TestNode) ShutdownRaft() { func CleanupNonRunningNode(node *TestNode) { node.Server.Stop() os.RemoveAll(node.StateDir) - node.Listener.close() + node.Listener.CloseListener() } // Leader determines who is the leader amongst a set of raft nodes diff --git a/node/node.go b/node/node.go index 77fe5b3d75..68f81f0b92 100644 --- a/node/node.go +++ b/node/node.go @@ -828,14 +828,22 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig } } - remoteAddr, _ := n.remotes.Select(n.NodeID()) + joinAddr := n.config.JoinAddr + if joinAddr == "" { + remoteAddr, err := n.remotes.Select(n.NodeID()) + if err == nil { + joinAddr = remoteAddr.Addr + } + } + m, err := manager.New(&manager.Config{ ForceNewCluster: n.config.ForceNewCluster, RemoteAPI: remoteAPI, ControlAPI: n.config.ListenControlAPI, SecurityConfig: securityConfig, ExternalCAs: n.config.ExternalCAs, - JoinRaft: remoteAddr.Addr, + JoinRaft: joinAddr, + ForceJoin: n.config.JoinAddr != "", StateDir: n.config.StateDir, HeartbeatTick: n.config.HeartbeatTick, ElectionTick: n.config.ElectionTick,