Skip to content

Commit

Permalink
storage: process intents synchronously
Browse files Browse the repository at this point in the history
Process intents synchronously on the goroutine which generated them.
  • Loading branch information
petermattis committed Nov 23, 2016
1 parent aecff9f commit 6e31b79
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 84 deletions.
61 changes: 2 additions & 59 deletions pkg/storage/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand All @@ -39,11 +38,6 @@ import (
// TODO(bdarnell): how to determine best value?
const intentResolverTaskLimit = 100

type resolveTask struct {
r *Replica
intents []intentsWithArg
}

// intentResolver manages the process of pushing transactions and
// resolving intents.
type intentResolver struct {
Expand All @@ -56,48 +50,17 @@ type intentResolver struct {
// Maps transaction ids to a refcount.
inFlight map[uuid.UUID]int
}

resolveWorkCh chan struct{}
resolveWorkMu struct {
syncutil.Mutex
tasks []resolveTask
}
}

func newIntentResolver(store *Store) *intentResolver {
ir := &intentResolver{
store: store,
sem: make(chan struct{}, intentResolverTaskLimit),
resolveWorkCh: make(chan struct{}, 1),
store: store,
sem: make(chan struct{}, intentResolverTaskLimit),
}
ir.mu.inFlight = map[uuid.UUID]int{}
return ir
}

func (ir *intentResolver) Start(stopper *stop.Stopper) {
stopper.RunWorker(func() {
for {
select {
case <-ir.resolveWorkCh:
for {
ir.resolveWorkMu.Lock()
tasks := ir.resolveWorkMu.tasks
ir.resolveWorkMu.tasks = nil
ir.resolveWorkMu.Unlock()
if len(tasks) == 0 {
break
}
for _, task := range tasks {
ir.processIntents(task.r, task.intents)
}
}
case <-stopper.ShouldStop():
return
}
}
})
}

// processWriteIntentError tries to push the conflicting
// transaction(s) responsible for the given WriteIntentError, and to
// resolve those intents if possible. Returns a new error to be used
Expand Down Expand Up @@ -302,26 +265,6 @@ func (ir *intentResolver) maybePushTransactions(
// differently and would be better served by different entry points,
// but combining them simplifies the plumbing necessary in Replica.
func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithArg) {
if len(intents) == 0 {
return
}
ir.resolveWorkMu.Lock()
ir.resolveWorkMu.tasks = append(ir.resolveWorkMu.tasks, resolveTask{r, intents})
ir.resolveWorkMu.Unlock()
select {
case ir.resolveWorkCh <- struct{}{}:
default:
}
}

// processIntents asynchronously processes intents which were
// encountered during another command but did not interfere with the
// execution of that command. This occurs in two cases: inconsistent
// reads and EndTransaction (which queues its own external intents for
// processing via this method). The two cases are handled somewhat
// differently and would be better served by different entry points,
// but combining them simplifies the plumbing necessary in Replica.
func (ir *intentResolver) processIntents(r *Replica, intents []intentsWithArg) {
now := r.store.Clock().Now()
ctx := context.TODO()
stopper := r.store.Stopper()
Expand Down
24 changes: 17 additions & 7 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ type proposalResult struct {
Reply *roachpb.BatchResponse
Err *roachpb.Error
ProposalRetry proposalRetryReason
Intents []intentsWithArg
}

type replicaChecksum struct {
Expand Down Expand Up @@ -1664,10 +1665,9 @@ func (r *Replica) addReadOnlyCmd(
// described in #2231.
pErr = r.checkIfTxnAborted(ctx, r.store.Engine(), *ba.Txn)
}
if result.Local.intents != nil && len(*result.Local.intents) > 0 {
log.Eventf(ctx, "submitting %d intents to asynchronous processing",
len(*result.Local.intents))
r.store.intentResolver.processIntentsAsync(r, *result.Local.intents)
if intents := result.Local.detachIntents(); len(intents) > 0 {
log.Eventf(ctx, "submitting %d intents to asynchronous processing", len(intents))
r.store.intentResolver.processIntentsAsync(r, intents)
}
if pErr != nil {
log.ErrEvent(ctx, pErr.String())
Expand Down Expand Up @@ -1869,6 +1869,14 @@ func (r *Replica) tryAddWriteCmd(
// Set endCmds to nil because they have already been invoked
// in processRaftCommand.
endCmds = nil
if len(propResult.Intents) > 0 {
// Semi-synchronously process any intents that need resolving here in
// order to apply back pressure on the client which generated them. The
// resolution is semi-synchronous in that there is a limited number of
// outstanding asynchronous resolution tasks allowed after which
// further calls will block.
r.store.intentResolver.processIntentsAsync(r, propResult.Intents)
}
return propResult.Reply, propResult.Err, propResult.ProposalRetry
case <-ctxDone:
// If our context was cancelled, return an AmbiguousResultError
Expand Down Expand Up @@ -2077,12 +2085,13 @@ func (r *Replica) propose(
// An error here corresponds to a failfast-proposal: The command resulted
// in an error and did not need to commit a batch (the common error case).
if pErr != nil {
intents := pCmd.Local.detachIntents()
r.handleEvalResult(ctx, repDesc, pCmd.Local, pCmd.Replicated)
if endCmds != nil {
endCmds.done(nil, pErr, proposalNoRetry)
}
ch := make(chan proposalResult, 1)
ch <- proposalResult{Err: pErr}
ch <- proposalResult{Err: pErr, Intents: intents}
close(ch)
return ch, func() bool { return false }, nil
}
Expand Down Expand Up @@ -3287,6 +3296,7 @@ func (r *Replica) processRaftCommand(
} else {
log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", cmd)
}
response.Intents = cmd.Local.detachIntents()
lResult = cmd.Local
}

Expand Down Expand Up @@ -4224,11 +4234,11 @@ func (r *Replica) loadSystemConfigSpan() ([]roachpb.KeyValue, []byte, error) {
if pErr != nil {
return nil, nil, pErr.GoError()
}
if result.Local.intents != nil && len(*result.Local.intents) > 0 {
if intents := result.Local.detachIntents(); len(intents) > 0 {
// There were intents, so what we read may not be consistent. Attempt
// to nudge the intents in case they're expired; next time around we'll
// hopefully have more luck.
r.store.intentResolver.processIntentsAsync(r, *result.Local.intents)
r.store.intentResolver.processIntentsAsync(r, intents)
return nil, nil, errSystemConfigIntent
}
kvs := br.Responses[0].GetInner().(*roachpb.ScanResponse).Rows
Expand Down
25 changes: 9 additions & 16 deletions pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ func (lResult *LocalEvalResult) finish(pr proposalResult) {
close(lResult.doneCh)
}

func (lResult *LocalEvalResult) detachIntents() []intentsWithArg {
if lResult.intents == nil {
return nil
}
intents := *lResult.intents
lResult.intents = nil
return intents
}

// EvalResult is the result of evaluating a KV request. That is, the
// proposer (which holds the lease, at least in the case in which the command
// will complete successfully) has evaluated the request and is holding on to:
Expand Down Expand Up @@ -600,22 +609,6 @@ func (r *Replica) handleLocalEvalResult(
// Non-state updates and actions.
// ======================

if originReplica.StoreID == r.store.StoreID() {
// On the replica on which this command originated, resolve skipped
// intents asynchronously - even on failure.
//
// TODO(tschottdorf): EndTransaction will use this pathway to return
// intents which should immediately be resolved. However, there's
// a slight chance that an error between the origin of that intents
// slice and here still results in that intent slice arriving here
// without the EndTransaction having committed. We should clearly
// separate the part of the EvalResult which also applies on errors.
if lResult.intents != nil {
r.store.intentResolver.processIntentsAsync(r, *lResult.intents)
}
}
lResult.intents = nil

// The above are present too often, so we assert only if there are
// "nontrivial" actions below.
shouldAssert = (lResult != LocalEvalResult{})
Expand Down
2 changes: 0 additions & 2 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -992,8 +992,6 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
// Add the bookie to the store.
s.bookie = newBookie(s.metrics)

s.intentResolver.Start(stopper)

// Read the store ident if not already initialized. "NodeID != 0" implies
// the store has already been initialized.
if s.Ident.NodeID == 0 {
Expand Down

0 comments on commit 6e31b79

Please sign in to comment.