From 18869b9686a5dbce529aa6c2b4351c6d2f4ee4dc Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Thu, 5 Jan 2017 12:30:09 -0800 Subject: [PATCH] raft: Fix race when proposal is cancelled and triggered at the same time The demotion changes revealed an interesting race. If a proposal is triggered at the same moment that something cancels it, the callback can be called (which commits the transaction to memory), but an error can be returned from ProposeValue, which causes the transaction to be aborted. ProposeValue should return an error if and only if the callback was not called. Add a return value to cancel(). If cancel fails because the proposal was already triggered, avoid returning an error. Also, remove the applyResult type, since it isn't useful. Signed-off-by: Aaron Lehmann --- manager/state/raft/raft.go | 20 ++++++++++---------- manager/state/raft/wait.go | 3 ++- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/manager/state/raft/raft.go b/manager/state/raft/raft.go index 4228d7eced..bedf3a0f88 100644 --- a/manager/state/raft/raft.go +++ b/manager/state/raft/raft.go @@ -1467,11 +1467,6 @@ func (n *Node) handleAddressChange(ctx context.Context, member *membership.Membe return nil } -type applyResult struct { - resp proto.Message - err error -} - // processInternalRaftRequest sends a message to nodes participating // in the raft to apply a log entry and then waits for it to be applied // on the server. It will block until the update is performed, there is @@ -1520,13 +1515,18 @@ func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRa select { case x := <-ch: - res := x.(*applyResult) - return res.resp, res.err + return x.(proto.Message), nil case <-waitCtx.Done(): - n.wait.cancel(r.ID) + if !n.wait.cancel(r.ID) { + // wait already triggered + return (<-ch).(proto.Message), nil + } return nil, ErrLostLeadership case <-ctx.Done(): - n.wait.cancel(r.ID) + if !n.wait.cancel(r.ID) { + // wait already triggered + return (<-ch).(proto.Message), nil + } return nil, ctx.Err() } } @@ -1588,7 +1588,7 @@ func (n *Node) processEntry(ctx context.Context, entry raftpb.Entry) error { return nil } - if !n.wait.trigger(r.ID, &applyResult{resp: r, err: nil}) { + if !n.wait.trigger(r.ID, r) { // There was no wait on this ID, meaning we don't have a // transaction in progress that would be committed to the // memory store by the "trigger" call. Either a different node diff --git a/manager/state/raft/wait.go b/manager/state/raft/wait.go index ecd39284c4..792082f910 100644 --- a/manager/state/raft/wait.go +++ b/manager/state/raft/wait.go @@ -50,7 +50,7 @@ func (w *wait) trigger(id uint64, x interface{}) bool { return false } -func (w *wait) cancel(id uint64) { +func (w *wait) cancel(id uint64) bool { w.l.Lock() waitItem, ok := w.m[id] delete(w.m, id) @@ -58,6 +58,7 @@ func (w *wait) cancel(id uint64) { if ok && waitItem.cancel != nil { waitItem.cancel() } + return ok } func (w *wait) cancelAll() {