From 6e31b791079dd847506bf90982cc44346363785c Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Wed, 23 Nov 2016 10:08:40 -0500 Subject: [PATCH] storage: process intents synchronously Process intents synchronously on the goroutine which generated them. --- pkg/storage/intent_resolver.go | 61 ++------------------------------- pkg/storage/replica.go | 24 +++++++++---- pkg/storage/replica_proposal.go | 25 +++++--------- pkg/storage/store.go | 2 -- 4 files changed, 28 insertions(+), 84 deletions(-) diff --git a/pkg/storage/intent_resolver.go b/pkg/storage/intent_resolver.go index 2460ba62c48d..d0ac1a03e795 100644 --- a/pkg/storage/intent_resolver.go +++ b/pkg/storage/intent_resolver.go @@ -24,7 +24,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -39,11 +38,6 @@ import ( // TODO(bdarnell): how to determine best value? const intentResolverTaskLimit = 100 -type resolveTask struct { - r *Replica - intents []intentsWithArg -} - // intentResolver manages the process of pushing transactions and // resolving intents. type intentResolver struct { @@ -56,48 +50,17 @@ type intentResolver struct { // Maps transaction ids to a refcount. inFlight map[uuid.UUID]int } - - resolveWorkCh chan struct{} - resolveWorkMu struct { - syncutil.Mutex - tasks []resolveTask - } } func newIntentResolver(store *Store) *intentResolver { ir := &intentResolver{ - store: store, - sem: make(chan struct{}, intentResolverTaskLimit), - resolveWorkCh: make(chan struct{}, 1), + store: store, + sem: make(chan struct{}, intentResolverTaskLimit), } ir.mu.inFlight = map[uuid.UUID]int{} return ir } -func (ir *intentResolver) Start(stopper *stop.Stopper) { - stopper.RunWorker(func() { - for { - select { - case <-ir.resolveWorkCh: - for { - ir.resolveWorkMu.Lock() - tasks := ir.resolveWorkMu.tasks - ir.resolveWorkMu.tasks = nil - ir.resolveWorkMu.Unlock() - if len(tasks) == 0 { - break - } - for _, task := range tasks { - ir.processIntents(task.r, task.intents) - } - } - case <-stopper.ShouldStop(): - return - } - } - }) -} - // processWriteIntentError tries to push the conflicting // transaction(s) responsible for the given WriteIntentError, and to // resolve those intents if possible. Returns a new error to be used @@ -302,26 +265,6 @@ func (ir *intentResolver) maybePushTransactions( // differently and would be better served by different entry points, // but combining them simplifies the plumbing necessary in Replica. func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithArg) { - if len(intents) == 0 { - return - } - ir.resolveWorkMu.Lock() - ir.resolveWorkMu.tasks = append(ir.resolveWorkMu.tasks, resolveTask{r, intents}) - ir.resolveWorkMu.Unlock() - select { - case ir.resolveWorkCh <- struct{}{}: - default: - } -} - -// processIntents asynchronously processes intents which were -// encountered during another command but did not interfere with the -// execution of that command. This occurs in two cases: inconsistent -// reads and EndTransaction (which queues its own external intents for -// processing via this method). The two cases are handled somewhat -// differently and would be better served by different entry points, -// but combining them simplifies the plumbing necessary in Replica. -func (ir *intentResolver) processIntents(r *Replica, intents []intentsWithArg) { now := r.store.Clock().Now() ctx := context.TODO() stopper := r.store.Stopper() diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 70cfee255651..1949abddc34c 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -183,6 +183,7 @@ type proposalResult struct { Reply *roachpb.BatchResponse Err *roachpb.Error ProposalRetry proposalRetryReason + Intents []intentsWithArg } type replicaChecksum struct { @@ -1664,10 +1665,9 @@ func (r *Replica) addReadOnlyCmd( // described in #2231. pErr = r.checkIfTxnAborted(ctx, r.store.Engine(), *ba.Txn) } - if result.Local.intents != nil && len(*result.Local.intents) > 0 { - log.Eventf(ctx, "submitting %d intents to asynchronous processing", - len(*result.Local.intents)) - r.store.intentResolver.processIntentsAsync(r, *result.Local.intents) + if intents := result.Local.detachIntents(); len(intents) > 0 { + log.Eventf(ctx, "submitting %d intents to asynchronous processing", len(intents)) + r.store.intentResolver.processIntentsAsync(r, intents) } if pErr != nil { log.ErrEvent(ctx, pErr.String()) @@ -1869,6 +1869,14 @@ func (r *Replica) tryAddWriteCmd( // Set endCmds to nil because they have already been invoked // in processRaftCommand. endCmds = nil + if len(propResult.Intents) > 0 { + // Semi-synchronously process any intents that need resolving here in + // order to apply back pressure on the client which generated them. The + // resolution is semi-synchronous in that there is a limited number of + // outstanding asynchronous resolution tasks allowed after which + // further calls will block. + r.store.intentResolver.processIntentsAsync(r, propResult.Intents) + } return propResult.Reply, propResult.Err, propResult.ProposalRetry case <-ctxDone: // If our context was cancelled, return an AmbiguousResultError @@ -2077,12 +2085,13 @@ func (r *Replica) propose( // An error here corresponds to a failfast-proposal: The command resulted // in an error and did not need to commit a batch (the common error case). if pErr != nil { + intents := pCmd.Local.detachIntents() r.handleEvalResult(ctx, repDesc, pCmd.Local, pCmd.Replicated) if endCmds != nil { endCmds.done(nil, pErr, proposalNoRetry) } ch := make(chan proposalResult, 1) - ch <- proposalResult{Err: pErr} + ch <- proposalResult{Err: pErr, Intents: intents} close(ch) return ch, func() bool { return false }, nil } @@ -3287,6 +3296,7 @@ func (r *Replica) processRaftCommand( } else { log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", cmd) } + response.Intents = cmd.Local.detachIntents() lResult = cmd.Local } @@ -4224,11 +4234,11 @@ func (r *Replica) loadSystemConfigSpan() ([]roachpb.KeyValue, []byte, error) { if pErr != nil { return nil, nil, pErr.GoError() } - if result.Local.intents != nil && len(*result.Local.intents) > 0 { + if intents := result.Local.detachIntents(); len(intents) > 0 { // There were intents, so what we read may not be consistent. Attempt // to nudge the intents in case they're expired; next time around we'll // hopefully have more luck. - r.store.intentResolver.processIntentsAsync(r, *result.Local.intents) + r.store.intentResolver.processIntentsAsync(r, intents) return nil, nil, errSystemConfigIntent } kvs := br.Responses[0].GetInner().(*roachpb.ScanResponse).Rows diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index 231ed3fe3dce..4bc26bbb9426 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -110,6 +110,15 @@ func (lResult *LocalEvalResult) finish(pr proposalResult) { close(lResult.doneCh) } +func (lResult *LocalEvalResult) detachIntents() []intentsWithArg { + if lResult.intents == nil { + return nil + } + intents := *lResult.intents + lResult.intents = nil + return intents +} + // EvalResult is the result of evaluating a KV request. That is, the // proposer (which holds the lease, at least in the case in which the command // will complete successfully) has evaluated the request and is holding on to: @@ -600,22 +609,6 @@ func (r *Replica) handleLocalEvalResult( // Non-state updates and actions. // ====================== - if originReplica.StoreID == r.store.StoreID() { - // On the replica on which this command originated, resolve skipped - // intents asynchronously - even on failure. - // - // TODO(tschottdorf): EndTransaction will use this pathway to return - // intents which should immediately be resolved. However, there's - // a slight chance that an error between the origin of that intents - // slice and here still results in that intent slice arriving here - // without the EndTransaction having committed. We should clearly - // separate the part of the EvalResult which also applies on errors. - if lResult.intents != nil { - r.store.intentResolver.processIntentsAsync(r, *lResult.intents) - } - } - lResult.intents = nil - // The above are present too often, so we assert only if there are // "nontrivial" actions below. shouldAssert = (lResult != LocalEvalResult{}) diff --git a/pkg/storage/store.go b/pkg/storage/store.go index a93b976887a1..28f096d0509b 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -992,8 +992,6 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { // Add the bookie to the store. s.bookie = newBookie(s.metrics) - s.intentResolver.Start(stopper) - // Read the store ident if not already initialized. "NodeID != 0" implies // the store has already been initialized. if s.Ident.NodeID == 0 {