Skip to content

Commit

Permalink
Merge pull request #2198 from aaronlehmann/rejoin
Browse files Browse the repository at this point in the history
raft: Allow Join to be called multiple times for the same cluster member
  • Loading branch information
aaronlehmann authored Jul 20, 2017
2 parents 310b691 + 10e1838 commit 293bcd7
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 44 deletions.
7 changes: 6 additions & 1 deletion manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ type Config struct {
// cluster to join.
JoinRaft string

// Top-level state directory
// ForceJoin causes us to invoke raft's Join RPC even if already part
// of a cluster.
ForceJoin bool

// StateDir is the top-level state directory
StateDir string

// ForceNewCluster defines if we have to force a new cluster
Expand Down Expand Up @@ -203,6 +207,7 @@ func New(config *Config) (*Manager, error) {
newNodeOpts := raft.NodeOptions{
ID: config.SecurityConfig.ClientTLSCreds.NodeID(),
JoinAddr: config.JoinRaft,
ForceJoin: config.ForceJoin,
Config: raftCfg,
StateDir: raftStateDir,
ForceNewCluster: config.ForceNewCluster,
Expand Down
96 changes: 65 additions & 31 deletions manager/state/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ type NodeOptions struct {
// JoinAddr is the cluster to join. May be an empty string to create
// a standalone cluster.
JoinAddr string
// ForceJoin tells us to join even if already part of a cluster.
ForceJoin bool
// Config is the raft config.
Config *raft.Config
// StateDir is the directory to store durable state.
Expand Down Expand Up @@ -393,16 +395,17 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) {

// restore from snapshot
if loadAndStartErr == nil {
if n.opts.JoinAddr != "" {
log.G(ctx).Warning("ignoring request to join cluster, because raft state already exists")
if n.opts.JoinAddr != "" && n.opts.ForceJoin {
if err := n.joinCluster(ctx); err != nil {
return errors.Wrap(err, "failed to rejoin cluster")
}
}
n.campaignWhenAble = true
n.initTransport()
n.raftNode = raft.RestartNode(n.Config)
return nil
}

// first member of cluster
if n.opts.JoinAddr == "" {
// First member in the cluster, self-assign ID
n.Config.ID = uint64(rand.Int63()) + 1
Expand All @@ -417,6 +420,22 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) {
}

// join to existing cluster

if err := n.joinCluster(ctx); err != nil {
return err
}

if _, err := n.newRaftLogs(n.opts.ID); err != nil {
return err
}

n.initTransport()
n.raftNode = raft.StartNode(n.Config, nil)

return nil
}

func (n *Node) joinCluster(ctx context.Context) error {
if n.opts.Addr == "" {
return errors.New("attempted to join raft cluster without knowing own address")
}
Expand All @@ -438,15 +457,7 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) {
}

n.Config.ID = resp.RaftID

if _, err := n.newRaftLogs(n.opts.ID); err != nil {
return err
}
n.bootstrapMembers = resp.Members

n.initTransport()
n.raftNode = raft.StartNode(n.Config, nil)

return nil
}

Expand Down Expand Up @@ -909,24 +920,6 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons
return nil, grpc.Errorf(codes.FailedPrecondition, "%s", ErrLostLeadership.Error())
}

// A single manager must not be able to join the raft cluster twice. If
// it did, that would cause the quorum to be computed incorrectly. This
// could happen if the WAL was deleted from an active manager.
for _, m := range n.cluster.Members() {
if m.NodeID == nodeInfo.NodeID {
return nil, grpc.Errorf(codes.AlreadyExists, "%s", "a raft member with this node ID already exists")
}
}

// Find a unique ID for the joining member.
var raftID uint64
for {
raftID = uint64(rand.Int63()) + 1
if n.cluster.GetMember(raftID) == nil && !n.cluster.IsIDRemoved(raftID) {
break
}
}

remoteAddr := req.Addr

// If the joining node sent an address like 0.0.0.0:4242, automatically
Expand All @@ -953,12 +946,54 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons
return nil, err
}

// If the peer is already a member of the cluster, we will only update
// its information, not add it as a new member. Adding it again would
// cause the quorum to be computed incorrectly.
for _, m := range n.cluster.Members() {
if m.NodeID == nodeInfo.NodeID {
if remoteAddr == m.Addr {
return n.joinResponse(m.RaftID), nil
}
updatedRaftMember := &api.RaftMember{
RaftID: m.RaftID,
NodeID: m.NodeID,
Addr: remoteAddr,
}
if err := n.cluster.UpdateMember(m.RaftID, updatedRaftMember); err != nil {
return nil, err
}

if err := n.updateNodeBlocking(ctx, m.RaftID, remoteAddr); err != nil {
log.WithError(err).Error("failed to update node address")
return nil, err
}

log.Info("updated node address")
return n.joinResponse(m.RaftID), nil
}
}

// Find a unique ID for the joining member.
var raftID uint64
for {
raftID = uint64(rand.Int63()) + 1
if n.cluster.GetMember(raftID) == nil && !n.cluster.IsIDRemoved(raftID) {
break
}
}

err = n.addMember(ctx, remoteAddr, raftID, nodeInfo.NodeID)
if err != nil {
log.WithError(err).Errorf("failed to add member %x", raftID)
return nil, err
}

log.Debug("node joined")

return n.joinResponse(raftID), nil
}

func (n *Node) joinResponse(raftID uint64) *api.JoinResponse {
var nodes []*api.RaftMember
for _, node := range n.cluster.Members() {
nodes = append(nodes, &api.RaftMember{
Expand All @@ -967,9 +1002,8 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons
Addr: node.Addr,
})
}
log.Debugf("node joined")

return &api.JoinResponse{Members: nodes, RaftID: raftID}, nil
return &api.JoinResponse{Members: nodes, RaftID: raftID}
}

// checkHealth tries to contact an aspiring member through its advertised address
Expand Down
35 changes: 29 additions & 6 deletions manager/state/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import (
"io/ioutil"
"log"
"math/rand"
"net"
"os"
"reflect"
"strconv"
"testing"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"

"golang.org/x/net/context"
Expand Down Expand Up @@ -88,9 +88,19 @@ func dial(n *raftutils.TestNode, addr string) (*grpc.ClientConn, error) {
func TestRaftJoinTwice(t *testing.T) {
t.Parallel()

nodes, _ := raftutils.NewRaftCluster(t, tc)
nodes, clockSource := raftutils.NewRaftCluster(t, tc)
defer raftutils.TeardownCluster(nodes)

// Node 3's address changes
nodes[3].Server.Stop()
nodes[3].ShutdownRaft()
nodes[3].Listener.CloseListener()

l, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err, "can't bind to raft service port")
nodes[3].Listener = raftutils.NewWrappedListener(l)
nodes[3] = raftutils.RestartNode(t, clockSource, nodes[3], false)

// Node 3 tries to join again
// Use gRPC instead of calling handler directly because of
// authorization check.
Expand All @@ -99,10 +109,23 @@ func TestRaftJoinTwice(t *testing.T) {
raftClient := api.NewRaftMembershipClient(cc)
defer cc.Close()
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
_, err = raftClient.Join(ctx, &api.JoinRequest{})
assert.Error(t, err, "expected error on duplicate Join")
assert.Equal(t, grpc.Code(err), codes.AlreadyExists)
assert.Equal(t, grpc.ErrorDesc(err), "a raft member with this node ID already exists")
_, err = raftClient.Join(ctx, &api.JoinRequest{Addr: l.Addr().String()})
assert.NoError(t, err)

// Propose a value and wait for it to propagate
value, err := raftutils.ProposeValue(t, nodes[1], DefaultProposalTime)
assert.NoError(t, err, "failed to propose value")
raftutils.CheckValue(t, clockSource, nodes[2], value)

// Restart node 2
nodes[2].Server.Stop()
nodes[2].ShutdownRaft()
nodes[2] = raftutils.RestartNode(t, clockSource, nodes[2], false)
raftutils.WaitForCluster(t, clockSource, nodes)

// Node 2 should have the updated address for node 3 in its member list
require.NotNil(t, nodes[2].GetMemberlist()[nodes[3].Config.ID])
require.Equal(t, l.Addr().String(), nodes[2].GetMemberlist()[nodes[3].Config.ID].Addr)
}

func TestRaftLeader(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions manager/state/raft/testutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ func (l *WrappedListener) Close() error {
return nil
}

// CloseListener closes the listener
func (l *WrappedListener) close() error {
// CloseListener closes the underlying listener
func (l *WrappedListener) CloseListener() error {
return l.Listener.Close()
}

Expand Down Expand Up @@ -471,7 +471,7 @@ func ShutdownNode(node *TestNode) {
<-node.Done()
}
os.RemoveAll(node.StateDir)
node.Listener.close()
node.Listener.CloseListener()
}

// ShutdownRaft shutdowns only raft part of node.
Expand All @@ -487,7 +487,7 @@ func (n *TestNode) ShutdownRaft() {
func CleanupNonRunningNode(node *TestNode) {
node.Server.Stop()
os.RemoveAll(node.StateDir)
node.Listener.close()
node.Listener.CloseListener()
}

// Leader determines who is the leader amongst a set of raft nodes
Expand Down
12 changes: 10 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,14 +828,22 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig
}
}

remoteAddr, _ := n.remotes.Select(n.NodeID())
joinAddr := n.config.JoinAddr
if joinAddr == "" {
remoteAddr, err := n.remotes.Select(n.NodeID())
if err == nil {
joinAddr = remoteAddr.Addr
}
}

m, err := manager.New(&manager.Config{
ForceNewCluster: n.config.ForceNewCluster,
RemoteAPI: remoteAPI,
ControlAPI: n.config.ListenControlAPI,
SecurityConfig: securityConfig,
ExternalCAs: n.config.ExternalCAs,
JoinRaft: remoteAddr.Addr,
JoinRaft: joinAddr,
ForceJoin: n.config.JoinAddr != "",
StateDir: n.config.StateDir,
HeartbeatTick: n.config.HeartbeatTick,
ElectionTick: n.config.ElectionTick,
Expand Down

0 comments on commit 293bcd7

Please sign in to comment.