From 3ba08aa650672fa3b72f182d4d6745eca12614d6 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 7 Feb 2020 17:06:31 -0500 Subject: [PATCH 1/2] storage/intentresolver: switch ResolveIntents to return roachpb.Error This will be useful to provide a consistent interface for the next commit. --- pkg/storage/intentresolver/intent_resolver.go | 31 +++++++++---------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/pkg/storage/intentresolver/intent_resolver.go b/pkg/storage/intentresolver/intent_resolver.go index 8819fa923b57..59a1c771606e 100644 --- a/pkg/storage/intentresolver/intent_resolver.go +++ b/pkg/storage/intentresolver/intent_resolver.go @@ -279,9 +279,9 @@ func (ir *IntentResolver) ProcessWriteIntentError( // // To do better here, we need per-intent information on whether we need to // poison. - if err := ir.ResolveIntents(ctx, resolveIntents, - ResolveOptions{Wait: false, Poison: true}); err != nil { - return cleanup, roachpb.NewError(err) + opts := ResolveOptions{Wait: false, Poison: true} + if pErr := ir.ResolveIntents(ctx, resolveIntents, opts); pErr != nil { + return cleanup, pErr } return cleanup, nil @@ -580,10 +580,9 @@ func (ir *IntentResolver) CleanupIntents( // same situation as above. // // Thus, we must poison. - if err := ir.ResolveIntents( - ctx, resolveIntents, ResolveOptions{Wait: true, Poison: true}, - ); err != nil { - return 0, errors.Wrapf(err, "failed to resolve intents") + opts := ResolveOptions{Wait: true, Poison: true} + if pErr := ir.ResolveIntents(ctx, resolveIntents, opts); pErr != nil { + return 0, errors.Wrapf(pErr.GoError(), "failed to resolve intents") } resolved += len(resolveIntents) unpushed = unpushed[i:] @@ -803,8 +802,8 @@ func (ir *IntentResolver) cleanupFinishedTxnIntents( // Resolve intents. min, _ := txn.InclusiveTimeBounds() opts := ResolveOptions{Wait: true, Poison: poison, MinTimestamp: min} - if err := ir.ResolveIntents(ctx, intents, opts); err != nil { - return errors.Wrapf(err, "failed to resolve intents") + if pErr := ir.ResolveIntents(ctx, intents, opts); pErr != nil { + return errors.Wrapf(pErr.GoError(), "failed to resolve intents") } // Run transaction record GC outside of ir.sem. return ir.stopper.RunAsyncTask( @@ -862,17 +861,17 @@ func (ir *IntentResolver) lookupRangeID(ctx context.Context, key roachpb.Key) ro return rDesc.RangeID } -// ResolveIntents synchronously resolves intents accordings to opts. +// ResolveIntents synchronously resolves intents according to opts. func (ir *IntentResolver) ResolveIntents( ctx context.Context, intents []roachpb.Intent, opts ResolveOptions, -) error { +) *roachpb.Error { if len(intents) == 0 { return nil } // Avoid doing any work on behalf of expired contexts. See // https://github.com/cockroachdb/cockroach/issues/15997. if err := ctx.Err(); err != nil { - return errors.Wrap(err, "aborted resolving intents") + return roachpb.NewError(err) } log.Eventf(ctx, "resolving intents [wait=%t]", opts.Wait) ctx, cancel := context.WithCancel(ctx) @@ -911,18 +910,18 @@ func (ir *IntentResolver) ResolveIntents( respChan := make(chan requestbatcher.Response, len(resolveReqs)) for _, req := range resolveReqs { if err := ir.irBatcher.SendWithChan(ctx, respChan, req.rangeID, req.req); err != nil { - return err + return roachpb.NewError(err) } } for seen := 0; seen < len(resolveReqs); seen++ { select { case resp := <-respChan: if resp.Err != nil { - return resp.Err + return roachpb.NewError(resp.Err) } _ = resp.Resp // ignore the response case <-ctx.Done(): - return ctx.Err() + return roachpb.NewError(ctx.Err()) } } @@ -935,7 +934,7 @@ func (ir *IntentResolver) ResolveIntents( b.Header.MaxSpanRequestKeys = intentResolverBatchSize b.AddRawRequest(req) if err := ir.db.Run(ctx, b); err != nil { - return err + return b.MustPErr() } // Check response to see if it must be resumed. resp := b.RawResponse().Responses[0].GetInner().(*roachpb.ResolveIntentRangeResponse) From ce8901db91b6c2dbcae30f4836d61297f1781c87 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 7 Feb 2020 21:02:45 -0500 Subject: [PATCH 2/2] storage/concurrency: implement lockTableWaiter This commit builds upon the concurrency control interfaces introduced in #44787 (and is rebased on those 3 commits). It implements the counter-part to the `lockTable` structure, the `lockTableWaiter`. lockTableWaiter is concerned with waiting in lock wait-queues for locks held by conflicting transactions. It ensures that waiting requests continue to make forward progress even in the presence of faulty transaction coordinators and transaction deadlocks. The waiter implements logic for a request to wait on conflicting locks in the lockTable until they are released. Similarly, it implements logic to wait on conflicting requests ahead of the caller's request in any lock wait-queues that it is a part of. This waiting state responds to a set of state transitions in the lock table: - a conflicting lock is released - a conflicting lock is updated such that it no longer conflicts - a conflicting request in the lock wait-queue acquires the lock - a conflicting request in the lock wait-queue exits the lock wait-queue These state transitions are typically reactive - the waiter can simply wait for locks to be released or lock wait-queues to be exited by other actors. Reacting to state transitions for conflicting locks is powered by the lockManager and reacting to state transitions for conflicting lock wait-queues is powered by the requestSequencer interface. However, in the case of transaction coordinator failures or transaction deadlocks, a state transition may never occur without intervention from the waiter. To ensure forward-progress, the waiter may need to actively push either a lock holder of a conflicting lock or the head of a conflicting lock wait-queue. This active pushing requires an RPC to the leaseholder of the conflicting transaction's record, and will typically result in the RPC queuing in that leaseholder's txnWaitQueue. Because this can be expensive, the push is not immediately performed. Instead, it is only performed after a delay. The semantics around how the lockTableWaiter interfaces with the intentresolver are guided by the current approach in `Replica.handleWriteIntentError`. Once we properly integrate the `concurrency.Manager` and remove calls into `Replica.handleWriteIntentError`, we can clean up the intentresolver's interface to be more easy to use. This will also let us delete the `contentionQueue` entirely. --- pkg/storage/concurrency/lock_table.go | 39 +- pkg/storage/concurrency/lock_table_test.go | 3 +- pkg/storage/concurrency/lock_table_waiter.go | 242 +++++++++++ .../concurrency/lock_table_waiter_test.go | 376 ++++++++++++++++++ .../concurrency/testdata/lock_table/basic | 48 +-- .../testdata/lock_table/non_txn_write | 8 +- pkg/storage/intentresolver/intent_resolver.go | 41 +- pkg/storage/replica_rangefeed.go | 5 +- 8 files changed, 708 insertions(+), 54 deletions(-) create mode 100644 pkg/storage/concurrency/lock_table_waiter.go create mode 100644 pkg/storage/concurrency/lock_table_waiter_test.go diff --git a/pkg/storage/concurrency/lock_table.go b/pkg/storage/concurrency/lock_table.go index ddd7f213aef9..4d0b1ea579f1 100644 --- a/pkg/storage/concurrency/lock_table.go +++ b/pkg/storage/concurrency/lock_table.go @@ -47,9 +47,12 @@ type waitingState struct { // Populated for waitFor* and waitElsewhere type, and represents who the // request is waiting for. - txn *enginepb.TxnMeta // always non-nil - ts hlc.Timestamp // the timestamp of the transaction that is causing the wait - access spanset.SpanAccess // Currently only SpanReadWrite. + txn *enginepb.TxnMeta // always non-nil + ts hlc.Timestamp // the timestamp of the transaction that is causing the wait + key roachpb.Key // the key of the lock that is causing the wait + held bool // is the lock currently held? + access spanset.SpanAccess // currently only SpanReadWrite + guardAccess spanset.SpanAccess // the access method of the guard } // Implementation @@ -58,6 +61,8 @@ type waitingState struct { // - proper error strings and give better explanation to all panics. // - metrics about lockTable state to export to observability debug pages: // number of locks, number of waiting requests, wait time?, ... +// - update waitingState.held on each waiter when locks are acquired and +// released. // The btree for a particular SpanScope. type treeMu struct { @@ -567,11 +572,15 @@ func (l *lockState) informActiveWaiters() { stateKind: waitFor, txn: waitForTxn, ts: waitForTs, + key: l.key, + held: l.holder.locked, access: spanset.SpanReadWrite, } waitSelfState := waitingState{stateKind: waitSelf} for e := l.waitingReaders.Front(); e != nil; e = e.Next() { + state := waitForState + state.guardAccess = spanset.SpanReadOnly // Since there are waiting readers we could not have transitioned out of // or into a state with a reservation, since readers do not wait for // reservations. @@ -581,7 +590,7 @@ func (l *lockState) informActiveWaiters() { findDistinguished = false } g.mu.Lock() - g.mu.state = waitForState + g.mu.state = state if l.distinguishedWaiter == g { g.mu.state.stateKind = waitForDistinguished } @@ -599,6 +608,7 @@ func (l *lockState) informActiveWaiters() { state = waitSelfState } else { state = waitForState + state.guardAccess = spanset.SpanReadWrite if findDistinguished { l.distinguishedWaiter = g findDistinguished = false @@ -813,10 +823,13 @@ func (l *lockState) tryActiveWait(g *lockTableGuardImpl, sa spanset.SpanAccess, stateType = waitForDistinguished } g.mu.state = waitingState{ - stateKind: stateType, - txn: waitForTxn, - ts: waitForTs, - access: spanset.SpanReadWrite, + stateKind: stateType, + txn: waitForTxn, + ts: waitForTs, + key: l.key, + held: l.holder.locked, + access: spanset.SpanReadWrite, + guardAccess: sa, } } if notify { @@ -1340,10 +1353,12 @@ func (l *lockState) lockIsFree() (doneWaiting []*lockTableGuardImpl, gc bool) { // Need to tell the remaining active waiting writers who they are waiting // for. waitForState := waitingState{ - stateKind: waitFor, - txn: g.txn, - ts: g.ts, - access: spanset.SpanReadWrite, + stateKind: waitFor, + txn: g.txn, + ts: g.ts, + key: l.key, + access: spanset.SpanReadWrite, + guardAccess: spanset.SpanReadWrite, } waitSelfState := waitingState{stateKind: waitSelf} for e := l.queuedWriters.Front(); e != nil; e = e.Next() { diff --git a/pkg/storage/concurrency/lock_table_test.go b/pkg/storage/concurrency/lock_table_test.go index f929660b737b..67bcb43b672a 100644 --- a/pkg/storage/concurrency/lock_table_test.go +++ b/pkg/storage/concurrency/lock_table_test.go @@ -406,7 +406,8 @@ func TestLockTableBasic(t *testing.T) { if state.ts.Logical != 0 { tsS += fmt.Sprintf(",%d", state.ts.Logical) } - return fmt.Sprintf("%sstate=%s txn=%s ts=%s", str, typeStr, txnS, tsS) + return fmt.Sprintf("%sstate=%s txn=%s ts=%s key=%s held=%t guard-access=%s", + str, typeStr, txnS, tsS, state.key, state.held, state.guardAccess) case "print": return lt.(*lockTableImpl).String() diff --git a/pkg/storage/concurrency/lock_table_waiter.go b/pkg/storage/concurrency/lock_table_waiter.go new file mode 100644 index 000000000000..bb40e068cb1b --- /dev/null +++ b/pkg/storage/concurrency/lock_table_waiter.go @@ -0,0 +1,242 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package concurrency + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/intentresolver" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +// Default delay before pushing in order to detect dependency cycles. +const defaultDependencyCyclePushDelay = 100 * time.Millisecond + +// Silence unused warning. +var _ = defaultDependencyCyclePushDelay + +// lockTableWaiterImpl is an implementation of lockTableWaiter. +type lockTableWaiterImpl struct { + nodeID roachpb.NodeID + stopper *stop.Stopper + + // Used to push conflicting transactions and resolve conflicting intents. + ir intentResolver + + // How long to wait until pushing conflicting transactions to detect + // dependency cycles. + dependencyCyclePushDelay time.Duration +} + +// intentResolver is an interface used by lockTableWaiterImpl to push +// transactions and to resolve intents. It contains only the subset of the +// intentresolver.IntentResolver interface that lockTableWaiterImpl needs. +type intentResolver interface { + // PushTransaction pushes the provided transaction. The method will push the + // provided pushee transaction immediately, if possible. Otherwise, it will + // block until the pushee transaction is finalized or eventually can be + // pushed successfully. + PushTransaction( + context.Context, *enginepb.TxnMeta, roachpb.Header, roachpb.PushTxnType, + ) (roachpb.Transaction, *Error) + + // ResolveIntent resolves the provided intent according to the options. + ResolveIntent(context.Context, roachpb.Intent, intentresolver.ResolveOptions) *Error +} + +// WaitOn implements the lockTableWaiter interface. +func (w *lockTableWaiterImpl) WaitOn( + ctx context.Context, req Request, guard lockTableGuard, +) *Error { + newStateC := guard.NewStateChan() + ctxDoneC := ctx.Done() + shouldQuiesceC := w.stopper.ShouldQuiesce() + var timer *timeutil.Timer + var timerC <-chan time.Time + var timerWaitingState waitingState + for { + select { + case <-newStateC: + timerC = nil + state := guard.CurState() + switch state.stateKind { + case waitFor: + // waitFor indicates that the request is waiting on another + // transaction. This transaction may be the lock holder of a + // conflicting lock or the head of a lock-wait queue that the + // request is a part of. + + // For non-transactional requests, there's no need to perform + // deadlock detection and the other "distinguished" (see below) + // pusher will already push to detect coordinator failures and + // abandoned locks, so there's no need to do anything in this + // state. + if req.Txn == nil { + continue + } + + // For transactional requests, the request should push to + // resolve this conflict and detect deadlocks, but only after + // delay. This delay avoids unnecessary push traffic when the + // conflicting transaction is continuing to make forward + // progress. + delay := w.dependencyCyclePushDelay + if hasMinPriority(state.txn) || hasMaxPriority(&req.Txn.TxnMeta) { + // However, if the pushee has the minimum priority or if the + // pusher has the maximum priority, push immediately. + delay = 0 + } + if timer == nil { + timer = timeutil.NewTimer() + defer timer.Stop() + } + timer.Reset(delay) + timerC = timer.C + timerWaitingState = state + + case waitForDistinguished: + // waitForDistinguished is like waitFor, except it instructs the + // waiter to immediately push the conflicting transaction instead of + // first waiting out the dependencyCyclePushDelay. The lockTable + // guarantees that there is always at least one request in the + // waitForDistinguished state for each lock that has any waiters. + // + // The purpose of the waitForDistinguished state is to avoid adding + // a delay of dependencyCyclePushDelay to the process of recovering + // from the failure of a transaction coordinator for *each* of that + // transaction's previously written intents. If we had a cache of + // aborted transaction IDs that allowed us to notice and immediately + // resolve abandoned intents then we might be able to get rid of + // this state. + if err := w.pushTxn(ctx, req, state); err != nil { + return err + } + + case waitElsewhere: + // The lockTable has hit a memory limit and is no longer maintaining + // proper lock wait-queues. However, the waiting request is still + // not safe to proceed with evaluation because there is still a + // transaction holding the lock. It should push the transaction it + // is blocked on immediately to wait in that transaction's + // txnWaitQueue. Once this completes, the request should stop + // waiting on this lockTableGuard, as it will no longer observe + // lock-table state transitions. + return w.pushTxn(ctx, req, state) + + case waitSelf: + // Another request from the same transaction is the reservation + // holder of this lock wait-queue. This can only happen when the + // request's transaction is sending multiple requests concurrently. + // Proceed with waiting without pushing anyone. + + case doneWaiting: + // The request has waited for all conflicting locks to be released + // and is at the front of any lock wait-queues. It can now stop + // waiting, re-acquire latches, and check the lockTable again for + // any new conflicts. If it find none, it can proceed with + // evaluation. + return nil + + default: + panic("unexpected waiting state") + } + + case <-timerC: + // If the transactional request was in the waitFor state and did not + // observe any update to its state for a dependencyCyclePushDelay, + // it should push. It may be the case that the transaction is part + // of a dependency cycle. + timerC = nil + timer.Read = true + if err := w.pushTxn(ctx, req, timerWaitingState); err != nil { + return err + } + + case <-ctxDoneC: + return roachpb.NewError(ctx.Err()) + + case <-shouldQuiesceC: + return roachpb.NewError(&roachpb.NodeUnavailableError{}) + } + } +} + +func (w *lockTableWaiterImpl) pushTxn(ctx context.Context, req Request, ws waitingState) *Error { + h := roachpb.Header{ + Timestamp: req.Timestamp, + UserPriority: req.Priority, + } + if req.Txn != nil { + // We are going to hand the header (and thus the transaction proto) + // to the RPC framework, after which it must not be changed (since + // that could race). Since the subsequent execution of the original + // request might mutate the transaction, make a copy here. + // + // See #9130. + h.Txn = req.Txn.Clone() + + // We must push at least to h.Timestamp, but in fact we want to + // go all the way up to a timestamp which was taken off the HLC + // after our operation started. This allows us to not have to + // restart for uncertainty as we come back and read. + obsTS, ok := h.Txn.GetObservedTimestamp(w.nodeID) + if !ok { + // This was set earlier, so it's completely unexpected to + // not be found now. + return roachpb.NewErrorf("missing observed timestamp: %+v", h.Txn) + } + h.Timestamp.Forward(obsTS) + } + + var pushType roachpb.PushTxnType + switch ws.guardAccess { + case spanset.SpanReadOnly: + pushType = roachpb.PUSH_TIMESTAMP + case spanset.SpanReadWrite: + pushType = roachpb.PUSH_ABORT + } + + pusheeTxn, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType) + if err != nil { + return err + } + if !ws.held { + return nil + } + + // We always poison due to limitations of the API: not poisoning equals + // clearing the AbortSpan, and if our pushee transaction first got pushed + // for timestamp (by us), then (by someone else) aborted and poisoned, and + // then we run the below code, we're clearing the AbortSpan illegaly. + // Furthermore, even if our pushType is not PUSH_ABORT, we may have ended + // up with the responsibility to abort the intents (for example if we find + // the transaction aborted). + // + // To do better here, we need per-intent information on whether we need to + // poison. + resolveIntent := roachpb.Intent{Span: roachpb.Span{Key: ws.key}} + resolveIntent.SetTxn(&pusheeTxn) + opts := intentresolver.ResolveOptions{Wait: false, Poison: true} + return w.ir.ResolveIntent(ctx, resolveIntent, opts) +} + +func hasMinPriority(txn *enginepb.TxnMeta) bool { + return txn.Priority == enginepb.MinTxnPriority +} + +func hasMaxPriority(txn *enginepb.TxnMeta) bool { + return txn.Priority == enginepb.MaxTxnPriority +} diff --git a/pkg/storage/concurrency/lock_table_waiter_test.go b/pkg/storage/concurrency/lock_table_waiter_test.go new file mode 100644 index 000000000000..f3988837c51a --- /dev/null +++ b/pkg/storage/concurrency/lock_table_waiter_test.go @@ -0,0 +1,376 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package concurrency + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/intentresolver" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/stretchr/testify/require" +) + +type mockIntentResolver struct { + pushTxn func(*enginepb.TxnMeta, roachpb.Header, roachpb.PushTxnType) (roachpb.Transaction, *Error) + resolveIntent func(roachpb.Intent) *Error +} + +func (m *mockIntentResolver) PushTransaction( + _ context.Context, txn *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType, +) (roachpb.Transaction, *Error) { + return m.pushTxn(txn, h, pushType) +} + +func (m *mockIntentResolver) ResolveIntent( + _ context.Context, intent roachpb.Intent, _ intentresolver.ResolveOptions, +) *Error { + return m.resolveIntent(intent) +} + +type mockLockTableGuard struct { + state waitingState + signal chan struct{} + stateObserved chan struct{} +} + +func (g *mockLockTableGuard) ShouldWait() bool { return true } +func (g *mockLockTableGuard) NewStateChan() <-chan struct{} { return g.signal } +func (g *mockLockTableGuard) CurState() waitingState { + s := g.state + if g.stateObserved != nil { + g.stateObserved <- struct{}{} + } + return s +} +func (g *mockLockTableGuard) notify() { g.signal <- struct{}{} } + +func setupLockTableWaiterTest() (*lockTableWaiterImpl, *mockIntentResolver, *mockLockTableGuard) { + ir := &mockIntentResolver{} + w := &lockTableWaiterImpl{ + nodeID: 2, + stopper: stop.NewStopper(), + ir: ir, + dependencyCyclePushDelay: 5 * time.Millisecond, + } + guard := &mockLockTableGuard{ + signal: make(chan struct{}, 1), + } + return w, ir, guard +} + +func makeTxnProto(name string) roachpb.Transaction { + return roachpb.MakeTransaction(name, []byte("key"), 0, hlc.Timestamp{WallTime: 10}, 0) +} + +// TestLockTableWaiterWithTxn tests the lockTableWaiter's behavior under +// different waiting states while a transactional request is waiting. +func TestLockTableWaiterWithTxn(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + observedTS := hlc.Timestamp{WallTime: 15} + makeReq := func() Request { + txn := makeTxnProto("request") + txn.UpdateObservedTimestamp(2, observedTS) + return Request{ + Txn: &txn, + Timestamp: txn.ReadTimestamp, + } + } + + t.Run("state", func(t *testing.T) { + t.Run("waitFor", func(t *testing.T) { + testWaitPush(t, waitFor, makeReq, observedTS) + }) + + t.Run("waitForDistinguished", func(t *testing.T) { + testWaitPush(t, waitForDistinguished, makeReq, observedTS) + }) + + t.Run("waitElsewhere", func(t *testing.T) { + testWaitPush(t, waitElsewhere, makeReq, observedTS) + }) + + t.Run("waitSelf", func(t *testing.T) { + w, _, g := setupLockTableWaiterTest() + defer w.stopper.Stop(ctx) + + // Set up an observer channel to detect when the current + // waiting state is observed. + g.state = waitingState{stateKind: waitSelf} + g.stateObserved = make(chan struct{}) + go func() { + g.notify() + <-g.stateObserved + g.notify() + <-g.stateObserved + g.state = waitingState{stateKind: doneWaiting} + g.notify() + <-g.stateObserved + }() + + err := w.WaitOn(ctx, makeReq(), g) + require.Nil(t, err) + }) + + t.Run("doneWaiting", func(t *testing.T) { + w, _, g := setupLockTableWaiterTest() + defer w.stopper.Stop(ctx) + + g.state = waitingState{stateKind: doneWaiting} + g.notify() + + err := w.WaitOn(ctx, makeReq(), g) + require.Nil(t, err) + }) + }) + + t.Run("ctx done", func(t *testing.T) { + w, _, g := setupLockTableWaiterTest() + defer w.stopper.Stop(ctx) + + ctxWithCancel, cancel := context.WithCancel(ctx) + go cancel() + + err := w.WaitOn(ctxWithCancel, makeReq(), g) + require.NotNil(t, err) + require.Equal(t, context.Canceled.Error(), err.GoError().Error()) + }) + + t.Run("stopper quiesce", func(t *testing.T) { + w, _, g := setupLockTableWaiterTest() + defer w.stopper.Stop(ctx) + + go func() { + w.stopper.Quiesce(ctx) + }() + + err := w.WaitOn(ctx, makeReq(), g) + require.NotNil(t, err) + require.IsType(t, &roachpb.NodeUnavailableError{}, err.GetDetail()) + }) +} + +// TestLockTableWaiterWithNonTxn tests the lockTableWaiter's behavior under +// different waiting states while a non-transactional request is waiting. +func TestLockTableWaiterWithNonTxn(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + reqHeaderTS := hlc.Timestamp{WallTime: 10} + makeReq := func() Request { + return Request{ + Timestamp: reqHeaderTS, + Priority: roachpb.NormalUserPriority, + } + } + + t.Run("state", func(t *testing.T) { + t.Run("waitFor", func(t *testing.T) { + t.Log("waitFor does not cause non-transactional requests to push") + + w, _, g := setupLockTableWaiterTest() + defer w.stopper.Stop(ctx) + + // Set up an observer channel to detect when the current + // waiting state is observed. + g.state = waitingState{stateKind: waitFor} + g.stateObserved = make(chan struct{}) + go func() { + g.notify() + <-g.stateObserved + g.notify() + <-g.stateObserved + g.state = waitingState{stateKind: doneWaiting} + g.notify() + <-g.stateObserved + }() + + err := w.WaitOn(ctx, makeReq(), g) + require.Nil(t, err) + }) + + t.Run("waitForDistinguished", func(t *testing.T) { + testWaitPush(t, waitForDistinguished, makeReq, reqHeaderTS) + }) + + t.Run("waitElsewhere", func(t *testing.T) { + testWaitPush(t, waitElsewhere, makeReq, reqHeaderTS) + }) + + t.Run("waitSelf", func(t *testing.T) { + t.Log("waitSelf is not possible for non-transactional request") + }) + + t.Run("doneWaiting", func(t *testing.T) { + w, _, g := setupLockTableWaiterTest() + defer w.stopper.Stop(ctx) + + g.state = waitingState{stateKind: doneWaiting} + g.notify() + + err := w.WaitOn(ctx, makeReq(), g) + require.Nil(t, err) + }) + }) + + t.Run("ctx done", func(t *testing.T) { + w, _, g := setupLockTableWaiterTest() + defer w.stopper.Stop(ctx) + + ctxWithCancel, cancel := context.WithCancel(ctx) + go cancel() + + err := w.WaitOn(ctxWithCancel, makeReq(), g) + require.NotNil(t, err) + require.Equal(t, context.Canceled.Error(), err.GoError().Error()) + }) + + t.Run("stopper quiesce", func(t *testing.T) { + w, _, g := setupLockTableWaiterTest() + defer w.stopper.Stop(ctx) + + go func() { + w.stopper.Quiesce(ctx) + }() + + err := w.WaitOn(ctx, makeReq(), g) + require.NotNil(t, err) + require.IsType(t, &roachpb.NodeUnavailableError{}, err.GetDetail()) + }) +} + +func testWaitPush(t *testing.T, k stateKind, makeReq func() Request, expPushTS hlc.Timestamp) { + ctx := context.Background() + keyA := roachpb.Key("keyA") + testutils.RunTrueAndFalse(t, "lockHeld", func(t *testing.T, lockHeld bool) { + testutils.RunTrueAndFalse(t, "waitAsWrite", func(t *testing.T, waitAsWrite bool) { + w, ir, g := setupLockTableWaiterTest() + defer w.stopper.Stop(ctx) + pusheeTxn := makeTxnProto("pushee") + + g.state = waitingState{ + stateKind: k, + txn: &pusheeTxn.TxnMeta, + ts: pusheeTxn.WriteTimestamp, + key: keyA, + held: false, + access: spanset.SpanReadWrite, + guardAccess: spanset.SpanReadOnly, + } + if lockHeld { + g.state.held = true + } + if waitAsWrite { + g.state.guardAccess = spanset.SpanReadWrite + } + g.notify() + + req := makeReq() + ir.pushTxn = func( + pusheeArg *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType, + ) (roachpb.Transaction, *Error) { + require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg) + require.Equal(t, req.Txn, h.Txn) + require.Equal(t, expPushTS, h.Timestamp) + if waitAsWrite { + require.Equal(t, roachpb.PUSH_ABORT, pushType) + } else { + require.Equal(t, roachpb.PUSH_TIMESTAMP, pushType) + } + + resp := roachpb.Transaction{TxnMeta: *pusheeArg, Status: roachpb.ABORTED} + + // If the lock is held, we'll try to resolve it now that + // we know the holder is ABORTED. Otherwide, immediately + // tell the request to stop waiting. + if lockHeld { + ir.resolveIntent = func(intent roachpb.Intent) *Error { + require.Equal(t, keyA, intent.Key) + require.Equal(t, pusheeTxn.ID, intent.Txn.ID) + require.Equal(t, roachpb.ABORTED, intent.Status) + g.state = waitingState{stateKind: doneWaiting} + g.notify() + return nil + } + } else { + g.state = waitingState{stateKind: doneWaiting} + g.notify() + } + return resp, nil + } + + err := w.WaitOn(ctx, req, g) + require.Nil(t, err) + }) + }) +} + +// TestLockTableWaiterIntentResolverError tests that the lockTableWaiter +// propagates errors from its intent resolver when it pushes transactions +// or resolves their intents. +func TestLockTableWaiterIntentResolverError(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + w, ir, g := setupLockTableWaiterTest() + defer w.stopper.Stop(ctx) + + err1 := roachpb.NewErrorf("error1") + err2 := roachpb.NewErrorf("error2") + + req := Request{ + Timestamp: hlc.Timestamp{WallTime: 10}, + Priority: roachpb.NormalUserPriority, + } + + keyA := roachpb.Key("keyA") + pusheeTxn := makeTxnProto("pushee") + g.state = waitingState{ + stateKind: waitForDistinguished, + txn: &pusheeTxn.TxnMeta, + ts: pusheeTxn.WriteTimestamp, + key: keyA, + held: true, + access: spanset.SpanReadWrite, + guardAccess: spanset.SpanReadWrite, + } + + // Errors are propagated when observed while pushing transactions. + g.notify() + ir.pushTxn = func( + _ *enginepb.TxnMeta, _ roachpb.Header, _ roachpb.PushTxnType, + ) (roachpb.Transaction, *Error) { + return roachpb.Transaction{}, err1 + } + err := w.WaitOn(ctx, req, g) + require.Equal(t, err1, err) + + // Errors are propagated when observed while resolving intents. + g.notify() + ir.pushTxn = func( + _ *enginepb.TxnMeta, _ roachpb.Header, _ roachpb.PushTxnType, + ) (roachpb.Transaction, *Error) { + return roachpb.Transaction{}, nil + } + ir.resolveIntent = func(intent roachpb.Intent) *Error { + return err2 + } + err = w.WaitOn(ctx, req, g) + require.Equal(t, err2, err) +} diff --git a/pkg/storage/concurrency/testdata/lock_table/basic b/pkg/storage/concurrency/testdata/lock_table/basic index b736d46ae14e..22a8aeafa0dd 100644 --- a/pkg/storage/concurrency/testdata/lock_table/basic +++ b/pkg/storage/concurrency/testdata/lock_table/basic @@ -113,7 +113,7 @@ start-waiting: true guard-state r=req4 ---- -new: state=waitForDistinguished txn=txn1 ts=10,2 +new: state=waitForDistinguished txn=txn1 ts=10,2 key="b" held=true guard-access=write # Release lock on b since epoch of txn1 has changed. update txn=txn1 ts=11,1 epoch=1 span=b @@ -131,7 +131,7 @@ local: num=0 guard-state r=req4 ---- -new: state=waitForDistinguished txn=txn1 ts=10,1 +new: state=waitForDistinguished txn=txn1 ts=10,1 key="c" held=true guard-access=write # Release lock on c since epoch of txn1 has changed. update txn=txn1 ts=11,1 epoch=1 span=c,e @@ -223,7 +223,7 @@ start-waiting: true guard-state r=req4 ---- -new: state=waitForDistinguished txn=txn3 ts=6 +new: state=waitForDistinguished txn=txn3 ts=6 key="f" held=true guard-access=read print ---- @@ -322,7 +322,7 @@ local: num=0 guard-state r=req6 ---- -new: state=waitFor txn=txn3 ts=6 +new: state=waitFor txn=txn3 ts=6 key="f" held=true guard-access=read # req7 from txn3 only wants to write to c @@ -351,7 +351,7 @@ start-waiting: true guard-state r=req7 ---- -new: state=waitForDistinguished txn=txn2 ts=8,12 +new: state=waitForDistinguished txn=txn2 ts=8,12 key="c" held=false guard-access=write print ---- @@ -398,11 +398,11 @@ local: num=0 guard-state r=req6 ---- -new: state=waitForDistinguished txn=txn2 ts=8,12 +new: state=waitForDistinguished txn=txn2 ts=8,12 key="b" held=false guard-access=write guard-state r=req4 ---- -new: state=waitForDistinguished txn=txn3 ts=6 +new: state=waitForDistinguished txn=txn3 ts=6 key="a" held=true guard-access=write # Locks: # a b c d e f g @@ -464,7 +464,7 @@ local: num=0 guard-state r=req6 ---- -old: state=waitForDistinguished txn=txn2 ts=8,12 +old: state=waitForDistinguished txn=txn2 ts=8,12 key="b" held=false guard-access=write guard-state r=req4 ---- @@ -530,11 +530,11 @@ local: num=0 guard-state r=req6 ---- -old: state=waitForDistinguished txn=txn2 ts=8,12 +old: state=waitForDistinguished txn=txn2 ts=8,12 key="b" held=false guard-access=write guard-state r=req7 ---- -old: state=waitForDistinguished txn=txn2 ts=8,12 +old: state=waitForDistinguished txn=txn2 ts=8,12 key="c" held=false guard-access=write print ---- @@ -572,14 +572,6 @@ global: num=3 holder: txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,1 local: num=0 -guard-state r=req6 ----- -old: state=waitForDistinguished txn=txn2 ts=8,12 - -guard-state r=req7 ----- -old: state=waitForDistinguished txn=txn2 ts=8,12 - # Locks: # a b c d e f g # holder txn2 txn2 txn1 @@ -615,7 +607,7 @@ new: state=doneWaiting guard-state r=req6 ---- -old: state=waitForDistinguished txn=txn2 ts=8,12 +old: state=waitForDistinguished txn=txn2 ts=8,12 key="b" held=false guard-access=write print ---- @@ -688,7 +680,7 @@ start-waiting: true guard-state r=req7 ---- -new: state=waitForDistinguished txn=txn1 ts=11,1 +new: state=waitForDistinguished txn=txn1 ts=11,1 key="c" held=false guard-access=write scan r=req6 ---- @@ -730,7 +722,7 @@ start-waiting: true guard-state r=req8 ---- -new: state=waitForDistinguished txn=txn1 ts=10,1 +new: state=waitForDistinguished txn=txn1 ts=10,1 key="e" held=true guard-access=write done r=req8 ---- @@ -789,7 +781,7 @@ start-waiting: true guard-state r=req10 ---- -new: state=waitForDistinguished txn=txn1 ts=10,1 +new: state=waitForDistinguished txn=txn1 ts=10,1 key="c" held=true guard-access=write request r=req11 txn=txn3 ts=6 spans=w@c ---- @@ -800,7 +792,7 @@ start-waiting: true guard-state r=req11 ---- -new: state=waitFor txn=txn1 ts=10,1 +new: state=waitFor txn=txn1 ts=10,1 key="c" held=true guard-access=write request r=req12 txn=txn2 ts=8,12 spans=w@c ---- @@ -811,7 +803,7 @@ start-waiting: true guard-state r=req12 ---- -new: state=waitFor txn=txn1 ts=10,1 +new: state=waitFor txn=txn1 ts=10,1 key="c" held=true guard-access=write print ---- @@ -842,7 +834,7 @@ new: state=doneWaiting guard-state r=req11 ---- -new: state=waitForDistinguished txn=txn2 ts=8,12 +new: state=waitForDistinguished txn=txn2 ts=8,12 key="c" held=false guard-access=write guard-state r=req12 ---- @@ -875,7 +867,7 @@ local: num=0 guard-state r=req11 ---- -old: state=waitForDistinguished txn=txn2 ts=8,12 +old: state=waitForDistinguished txn=txn2 ts=8,12 key="c" held=false guard-access=write # Since req10 that is also txn2 has acquired the lock, req12 does not need to wait here anymore. guard-state r=req12 @@ -924,7 +916,7 @@ local: num=0 guard-state r=req11 ---- -old: state=waitForDistinguished txn=txn2 ts=8,12 +old: state=waitForDistinguished txn=txn2 ts=8,12 key="c" held=false guard-access=write release txn=txn2 span=b,d ---- @@ -1095,7 +1087,7 @@ local: num=0 guard-state r=req18 ---- -new: state=waitFor txn=txn1 ts=9 +new: state=waitFor txn=txn1 ts=9 key="d" held=true guard-access=write print ---- diff --git a/pkg/storage/concurrency/testdata/lock_table/non_txn_write b/pkg/storage/concurrency/testdata/lock_table/non_txn_write index d3c2aea22eae..b7fcce2642fa 100644 --- a/pkg/storage/concurrency/testdata/lock_table/non_txn_write +++ b/pkg/storage/concurrency/testdata/lock_table/non_txn_write @@ -134,7 +134,7 @@ new: state=doneWaiting guard-state r=req4 ---- -new: state=waitForDistinguished txn=txn2 ts=10 +new: state=waitForDistinguished txn=txn2 ts=10 key="a" held=false guard-access=write guard-state r=req5 ---- @@ -181,7 +181,7 @@ local: num=0 guard-state r=req4 ---- -new: state=waitForDistinguished txn=txn3 ts=10 +new: state=waitForDistinguished txn=txn3 ts=10 key="b" held=false guard-access=write guard-state r=req6 ---- @@ -258,11 +258,11 @@ start-waiting: true guard-state r=req4 ---- -new: state=waitForDistinguished txn=txn2 ts=10 +new: state=waitForDistinguished txn=txn2 ts=10 key="c" held=true guard-access=write guard-state r=req5 ---- -new: state=waitFor txn=txn2 ts=10 +new: state=waitFor txn=txn2 ts=10 key="c" held=true guard-access=write # Release the lock. The non-transactional request does not acquire the reservation. diff --git a/pkg/storage/intentresolver/intent_resolver.go b/pkg/storage/intentresolver/intent_resolver.go index 59a1c771606e..4e5af193f005 100644 --- a/pkg/storage/intentresolver/intent_resolver.go +++ b/pkg/storage/intentresolver/intent_resolver.go @@ -333,8 +333,9 @@ func (ir *IntentResolver) maybePushIntents( skipIfInFlight bool, ) ([]roachpb.Intent, *roachpb.Error) { // Attempt to push the transaction(s) which created the conflicting intent(s). - pushTxns := make(map[uuid.UUID]enginepb.TxnMeta) - for _, intent := range intents { + pushTxns := make(map[uuid.UUID]*enginepb.TxnMeta) + for i := range intents { + intent := &intents[i] if intent.Status != roachpb.PENDING { // The current intent does not need conflict resolution // because the transaction is already finalized. @@ -342,7 +343,7 @@ func (ir *IntentResolver) maybePushIntents( // the PENDING status. return nil, roachpb.NewErrorf("unexpected %s intent: %+v", intent.Status, intent) } - pushTxns[intent.Txn.ID] = intent.Txn + pushTxns[intent.Txn.ID] = &intent.Txn } pushedTxns, pErr := ir.MaybePushTransactions(ctx, pushTxns, h, pushType, skipIfInFlight) @@ -380,13 +381,32 @@ func updateIntentTxnStatus( return results } +// PushTransaction takes a transaction and pushes its record using the specified +// push type and request header. It returns the transaction proto corresponding +// to the pushed transaction. +func (ir *IntentResolver) PushTransaction( + ctx context.Context, pushTxn *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType, +) (roachpb.Transaction, *roachpb.Error) { + pushTxns := make(map[uuid.UUID]*enginepb.TxnMeta, 1) + pushTxns[pushTxn.ID] = pushTxn + pushedTxns, pErr := ir.MaybePushTransactions(ctx, pushTxns, h, pushType, false /* skipIfInFlight */) + if pErr != nil { + return roachpb.Transaction{}, pErr + } + pushedTxn, ok := pushedTxns[pushTxn.ID] + if !ok { + log.Fatalf(ctx, "missing PushTxn responses for %s", pushTxn) + } + return pushedTxn, nil +} + // MaybePushTransactions is like maybePushIntents except it takes a set of // transactions to push instead of a set of intents. This set of provided // transactions may be modified by the method. It returns a set of transaction // protos corresponding to the pushed transactions. func (ir *IntentResolver) MaybePushTransactions( ctx context.Context, - pushTxns map[uuid.UUID]enginepb.TxnMeta, + pushTxns map[uuid.UUID]*enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType, skipIfInFlight bool, @@ -436,7 +456,7 @@ func (ir *IntentResolver) MaybePushTransactions( Key: pushTxn.Key, }, PusherTxn: pusherTxn, - PusheeTxn: pushTxn, + PusheeTxn: *pushTxn, PushTo: h.Timestamp.Next(), PushType: pushType, }) @@ -538,7 +558,7 @@ func (ir *IntentResolver) CleanupIntents( sort.Sort(intentsByTxn(intents)) resolved := 0 const skipIfInFlight = true - pushTxns := make(map[uuid.UUID]enginepb.TxnMeta) + pushTxns := make(map[uuid.UUID]*enginepb.TxnMeta) for unpushed := intents; len(unpushed) > 0; { for k := range pushTxns { // clear the pushTxns map delete(pushTxns, k) @@ -546,7 +566,7 @@ func (ir *IntentResolver) CleanupIntents( var prevTxnID uuid.UUID var i int for i = 0; i < len(unpushed); i++ { - if curTxn := unpushed[i].Txn; curTxn.ID != prevTxnID { + if curTxn := &unpushed[i].Txn; curTxn.ID != prevTxnID { if len(pushTxns) == cleanupIntentsTxnsPerBatch { break } @@ -861,6 +881,13 @@ func (ir *IntentResolver) lookupRangeID(ctx context.Context, key roachpb.Key) ro return rDesc.RangeID } +// ResolveIntent synchronously resolves an intent according to opts. +func (ir *IntentResolver) ResolveIntent( + ctx context.Context, intent roachpb.Intent, opts ResolveOptions, +) *roachpb.Error { + return ir.ResolveIntents(ctx, []roachpb.Intent{intent}, opts) +} + // ResolveIntents synchronously resolves intents according to opts. func (ir *IntentResolver) ResolveIntents( ctx context.Context, intents []roachpb.Intent, opts ResolveOptions, diff --git a/pkg/storage/replica_rangefeed.go b/pkg/storage/replica_rangefeed.go index 328f595f298c..54d1ac740629 100644 --- a/pkg/storage/replica_rangefeed.go +++ b/pkg/storage/replica_rangefeed.go @@ -72,8 +72,9 @@ type rangefeedTxnPusher struct { func (tp *rangefeedTxnPusher) PushTxns( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, ) ([]roachpb.Transaction, error) { - pushTxnMap := make(map[uuid.UUID]enginepb.TxnMeta, len(txns)) - for _, txn := range txns { + pushTxnMap := make(map[uuid.UUID]*enginepb.TxnMeta, len(txns)) + for i := range txns { + txn := &txns[i] pushTxnMap[txn.ID] = txn }