Skip to content

Commit

Permalink
set n.Node and n.isMember to correct value in stop/removeNode
Browse files Browse the repository at this point in the history
Signed-off-by: Runshen Zhu <runshen.zhu@gmail.com>
  • Loading branch information
runshenzhu committed Aug 5, 2016
1 parent af23e13 commit e7a941e
Showing 1 changed file with 51 additions and 37 deletions.
88 changes: 51 additions & 37 deletions manager/state/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ type Node struct {
doneCh chan struct{}
// removeRaftCh notifies about node deletion from raft cluster
removeRaftCh chan struct{}
removeRaftOnce sync.Once
removeRaftFunc func()
leadershipBroadcast *events.Broadcaster

// used to coordinate shutdown
Expand Down Expand Up @@ -215,6 +215,15 @@ func NewNode(ctx context.Context, opts NewNodeOptions) *Node {
n.reqIDGen = idutil.NewGenerator(uint16(n.Config.ID), time.Now())
n.wait = newWait()

n.removeRaftFunc = func(n *Node) func() {
var removeRaftOnce sync.Once
return func() {
removeRaftOnce.Do(func() {
close(n.removeRaftCh)
})
}
}(n)

return n
}

Expand Down Expand Up @@ -491,16 +500,18 @@ func (n *Node) stop() {
_ = member.Conn.Close()
}
}

n.Stop()
n.ticker.Stop()
if err := n.wal.Close(); err != nil {
n.Config.Logger.Errorf("raft: error closing WAL: %v", err)
}
atomic.StoreUint32(&n.isMember, 0)
// TODO(stevvooe): Handle ctx.Done()
}

// IsLeader checks if we are the leader or not
func (n *Node) IsLeader() bool {
// isLeader checks if we are the leader or not, without the protection of lock
func (n *Node) isLeader() bool {
if !n.IsMember() {
return false
}
Expand All @@ -511,14 +522,30 @@ func (n *Node) IsLeader() bool {
return false
}

// Leader returns the id of the leader
func (n *Node) Leader() uint64 {
// IsLeader checks if we are the leader or not, with the protection of lock
func (n *Node) IsLeader() bool {
n.stopMu.RLock()
defer n.stopMu.RUnlock()

return n.isLeader()
}

// leader returns the id of the leader, without the protection of lock
func (n *Node) leader() uint64 {
if !n.IsMember() {
return 0
}
return n.Node.Status().Lead
}

// Leader returns the id of the leader, with the protection of lock
func (n *Node) Leader() uint64 {
n.stopMu.RLock()
defer n.stopMu.RUnlock()

return n.leader()
}

// ReadyForProposals returns true if the node has broadcasted a message
// saying that it has become the leader. This means it is ready to accept
// proposals.
Expand Down Expand Up @@ -562,12 +589,7 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons
return nil, ErrNoRaftMember
}

if n.IsStopped() {
log.WithError(ErrStopped).Errorf(ErrStopped.Error())
return nil, ErrStopped
}

if !n.IsLeader() {
if !n.isLeader() {
return nil, ErrLostLeadership
}

Expand Down Expand Up @@ -698,11 +720,7 @@ func (n *Node) Leave(ctx context.Context, req *api.LeaveRequest) (*api.LeaveResp
return nil, ErrNoRaftMember
}

if n.IsStopped() {
return nil, ErrStopped
}

if !n.IsLeader() {
if !n.isLeader() {
return nil, ErrLostLeadership
}

Expand Down Expand Up @@ -771,10 +789,6 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa
return nil, ErrNoRaftMember
}

if n.IsStopped() {
return nil, ErrStopped
}

if err := n.Step(n.Ctx, *msg.Message); err != nil {
return nil, err
}
Expand Down Expand Up @@ -822,11 +836,11 @@ func (n *Node) LeaderAddr() (string, error) {
if err := WaitForLeader(ctx, n); err != nil {
return "", ErrNoClusterLeader
}
if n.IsStopped() {
return "", ErrStopped
if !n.IsMember() {
return "", ErrNoRaftMember
}
ms := n.cluster.Members()
l := ms[n.Leader()]
l := ms[n.leader()]
if l == nil {
return "", ErrNoClusterLeader
}
Expand Down Expand Up @@ -904,6 +918,13 @@ func (n *Node) ProposeValue(ctx context.Context, storeAction []*api.StoreAction,

// GetVersion returns the sequence information for the current raft round.
func (n *Node) GetVersion() *api.Version {
n.stopMu.RLock()
defer n.stopMu.RUnlock()

if !n.IsMember() {
return nil
}

status := n.Node.Status()
return &api.Version{Index: status.Commit}
}
Expand Down Expand Up @@ -961,14 +982,6 @@ func (n *Node) IsMember() bool {
return atomic.LoadUint32(&n.isMember) == 1
}

// IsStopped checks if the raft node is stopped or not
func (n *Node) IsStopped() bool {
if n.Node == nil {
return true
}
return false
}

// canSubmitProposal defines if any more proposals
// could be submitted and processed.
func (n *Node) canSubmitProposal() bool {
Expand Down Expand Up @@ -1092,15 +1105,14 @@ func (n *Node) sendToMember(members map[uint64]*membership.Member, m raftpb.Mess
_, err := conn.ProcessRaftMessage(ctx, &api.ProcessRaftMessageRequest{Message: &m})
if err != nil {
if grpc.ErrorDesc(err) == ErrMemberRemoved.Error() {
n.removeRaftOnce.Do(func() {
close(n.removeRaftCh)
})
n.removeRaftFunc()
}
if m.Type == raftpb.MsgSnap {
n.ReportSnapshot(m.To, raft.SnapshotFailure)
}
if n.IsStopped() {
panic("node is nil")
if !n.IsMember() {
// node is removed from cluster or stopped
return
}
n.ReportUnreachable(m.To)

Expand Down Expand Up @@ -1310,13 +1322,15 @@ func (n *Node) applyRemoveNode(cc raftpb.ConfChange) (err error) {
// a follower and the leader steps down, Campaign
// to be the leader.

if cc.NodeID == n.Leader() && !n.IsLeader() {
if cc.NodeID == n.leader() && !n.isLeader() {
if err = n.Campaign(n.Ctx); err != nil {
return err
}
}

if cc.NodeID == n.Config.ID {
n.removeRaftFunc()

// wait the commit ack to be sent before closing connection
n.asyncTasks.Wait()

Expand Down

0 comments on commit e7a941e

Please sign in to comment.