Skip to content

Commit

Permalink
set n.Node and n.isMember to correct value in stop/removeNode
Browse files Browse the repository at this point in the history
Signed-off-by: Runshen Zhu <runshen.zhu@gmail.com>
  • Loading branch information
runshenzhu committed Aug 5, 2016
1 parent af23e13 commit 2b5aca5
Showing 1 changed file with 41 additions and 16 deletions.
57 changes: 41 additions & 16 deletions manager/state/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ type Node struct {
doneCh chan struct{}
// removeRaftCh notifies about node deletion from raft cluster
removeRaftCh chan struct{}
removeRaftOnce sync.Once
removeRaftFunc func()
leadershipBroadcast *events.Broadcaster

// used to coordinate shutdown
Expand Down Expand Up @@ -215,6 +215,15 @@ func NewNode(ctx context.Context, opts NewNodeOptions) *Node {
n.reqIDGen = idutil.NewGenerator(uint16(n.Config.ID), time.Now())
n.wait = newWait()

n.removeRaftFunc = func(n *Node) func() {
var removeRaftOnce sync.Once
return func() {
removeRaftOnce.Do(func() {
close(n.removeRaftCh)
})
}
}(n)

return n
}

Expand Down Expand Up @@ -491,16 +500,18 @@ func (n *Node) stop() {
_ = member.Conn.Close()
}
}

n.Stop()
n.ticker.Stop()
if err := n.wal.Close(); err != nil {
n.Config.Logger.Errorf("raft: error closing WAL: %v", err)
}
atomic.StoreUint32(&n.isMember, 0)
// TODO(stevvooe): Handle ctx.Done()
}

// IsLeader checks if we are the leader or not
func (n *Node) IsLeader() bool {
// isLeader checks if we are the leader or not, without the protection of lock
func (n *Node) isLeader() bool {
if !n.IsMember() {
return false
}
Expand All @@ -511,8 +522,16 @@ func (n *Node) IsLeader() bool {
return false
}

// Leader returns the id of the leader
func (n *Node) Leader() uint64 {
// IsLeader checks if we are the leader or not, with the protection of lock
func (n *Node) IsLeader() bool {
n.stopMu.RLock()
defer n.stopMu.RUnlock()

return n.isLeader()
}

// leader returns the id of the leader, without the protection of lock
func (n *Node) leader() uint64 {
if !n.IsMember() {
return 0
}
Expand All @@ -532,6 +551,14 @@ func (n *Node) caughtUp() bool {
return n.appliedIndex >= lastIndex
}

// Leader returns the id of the leader, with the protection of lock
func (n *Node) Leader() uint64 {
n.stopMu.RLock()
defer n.stopMu.RUnlock()

return n.leader()
}

// Join asks to a member of the raft to propose
// a configuration change and add us as a member thus
// beginning the log replication process. This method
Expand Down Expand Up @@ -567,7 +594,7 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons
return nil, ErrStopped
}

if !n.IsLeader() {
if !n.isLeader() {
return nil, ErrLostLeadership
}

Expand Down Expand Up @@ -702,7 +729,7 @@ func (n *Node) Leave(ctx context.Context, req *api.LeaveRequest) (*api.LeaveResp
return nil, ErrStopped
}

if !n.IsLeader() {
if !n.isLeader() {
return nil, ErrLostLeadership
}

Expand Down Expand Up @@ -826,7 +853,7 @@ func (n *Node) LeaderAddr() (string, error) {
return "", ErrStopped
}
ms := n.cluster.Members()
l := ms[n.Leader()]
l := ms[n.leader()]
if l == nil {
return "", ErrNoClusterLeader
}
Expand Down Expand Up @@ -963,10 +990,8 @@ func (n *Node) IsMember() bool {

// IsStopped checks if the raft node is stopped or not
func (n *Node) IsStopped() bool {
if n.Node == nil {
return true
}
return false
// if a node is stopped, it's no longer the member of cluster
return !n.IsMember()
}

// canSubmitProposal defines if any more proposals
Expand Down Expand Up @@ -1092,9 +1117,7 @@ func (n *Node) sendToMember(members map[uint64]*membership.Member, m raftpb.Mess
_, err := conn.ProcessRaftMessage(ctx, &api.ProcessRaftMessageRequest{Message: &m})
if err != nil {
if grpc.ErrorDesc(err) == ErrMemberRemoved.Error() {
n.removeRaftOnce.Do(func() {
close(n.removeRaftCh)
})
n.removeRaftFunc()
}
if m.Type == raftpb.MsgSnap {
n.ReportSnapshot(m.To, raft.SnapshotFailure)
Expand Down Expand Up @@ -1310,13 +1333,15 @@ func (n *Node) applyRemoveNode(cc raftpb.ConfChange) (err error) {
// a follower and the leader steps down, Campaign
// to be the leader.

if cc.NodeID == n.Leader() && !n.IsLeader() {
if cc.NodeID == n.leader() && !n.isLeader() {
if err = n.Campaign(n.Ctx); err != nil {
return err
}
}

if cc.NodeID == n.Config.ID {
n.removeRaftFunc()

// wait the commit ack to be sent before closing connection
n.asyncTasks.Wait()

Expand Down

0 comments on commit 2b5aca5

Please sign in to comment.