diff --git a/pkg/storage/concurrency/lock_table.go b/pkg/storage/concurrency/lock_table.go index ddd7f213aef9..4d0b1ea579f1 100644 --- a/pkg/storage/concurrency/lock_table.go +++ b/pkg/storage/concurrency/lock_table.go @@ -47,9 +47,12 @@ type waitingState struct { // Populated for waitFor* and waitElsewhere type, and represents who the // request is waiting for. - txn *enginepb.TxnMeta // always non-nil - ts hlc.Timestamp // the timestamp of the transaction that is causing the wait - access spanset.SpanAccess // Currently only SpanReadWrite. + txn *enginepb.TxnMeta // always non-nil + ts hlc.Timestamp // the timestamp of the transaction that is causing the wait + key roachpb.Key // the key of the lock that is causing the wait + held bool // is the lock currently held? + access spanset.SpanAccess // currently only SpanReadWrite + guardAccess spanset.SpanAccess // the access method of the guard } // Implementation @@ -58,6 +61,8 @@ type waitingState struct { // - proper error strings and give better explanation to all panics. // - metrics about lockTable state to export to observability debug pages: // number of locks, number of waiting requests, wait time?, ... +// - update waitingState.held on each waiter when locks are acquired and +// released. // The btree for a particular SpanScope. type treeMu struct { @@ -567,11 +572,15 @@ func (l *lockState) informActiveWaiters() { stateKind: waitFor, txn: waitForTxn, ts: waitForTs, + key: l.key, + held: l.holder.locked, access: spanset.SpanReadWrite, } waitSelfState := waitingState{stateKind: waitSelf} for e := l.waitingReaders.Front(); e != nil; e = e.Next() { + state := waitForState + state.guardAccess = spanset.SpanReadOnly // Since there are waiting readers we could not have transitioned out of // or into a state with a reservation, since readers do not wait for // reservations. @@ -581,7 +590,7 @@ func (l *lockState) informActiveWaiters() { findDistinguished = false } g.mu.Lock() - g.mu.state = waitForState + g.mu.state = state if l.distinguishedWaiter == g { g.mu.state.stateKind = waitForDistinguished } @@ -599,6 +608,7 @@ func (l *lockState) informActiveWaiters() { state = waitSelfState } else { state = waitForState + state.guardAccess = spanset.SpanReadWrite if findDistinguished { l.distinguishedWaiter = g findDistinguished = false @@ -813,10 +823,13 @@ func (l *lockState) tryActiveWait(g *lockTableGuardImpl, sa spanset.SpanAccess, stateType = waitForDistinguished } g.mu.state = waitingState{ - stateKind: stateType, - txn: waitForTxn, - ts: waitForTs, - access: spanset.SpanReadWrite, + stateKind: stateType, + txn: waitForTxn, + ts: waitForTs, + key: l.key, + held: l.holder.locked, + access: spanset.SpanReadWrite, + guardAccess: sa, } } if notify { @@ -1340,10 +1353,12 @@ func (l *lockState) lockIsFree() (doneWaiting []*lockTableGuardImpl, gc bool) { // Need to tell the remaining active waiting writers who they are waiting // for. waitForState := waitingState{ - stateKind: waitFor, - txn: g.txn, - ts: g.ts, - access: spanset.SpanReadWrite, + stateKind: waitFor, + txn: g.txn, + ts: g.ts, + key: l.key, + access: spanset.SpanReadWrite, + guardAccess: spanset.SpanReadWrite, } waitSelfState := waitingState{stateKind: waitSelf} for e := l.queuedWriters.Front(); e != nil; e = e.Next() { diff --git a/pkg/storage/concurrency/lock_table_test.go b/pkg/storage/concurrency/lock_table_test.go index f929660b737b..67bcb43b672a 100644 --- a/pkg/storage/concurrency/lock_table_test.go +++ b/pkg/storage/concurrency/lock_table_test.go @@ -406,7 +406,8 @@ func TestLockTableBasic(t *testing.T) { if state.ts.Logical != 0 { tsS += fmt.Sprintf(",%d", state.ts.Logical) } - return fmt.Sprintf("%sstate=%s txn=%s ts=%s", str, typeStr, txnS, tsS) + return fmt.Sprintf("%sstate=%s txn=%s ts=%s key=%s held=%t guard-access=%s", + str, typeStr, txnS, tsS, state.key, state.held, state.guardAccess) case "print": return lt.(*lockTableImpl).String() diff --git a/pkg/storage/concurrency/lock_table_waiter.go b/pkg/storage/concurrency/lock_table_waiter.go new file mode 100644 index 000000000000..bb40e068cb1b --- /dev/null +++ b/pkg/storage/concurrency/lock_table_waiter.go @@ -0,0 +1,242 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package concurrency + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/intentresolver" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +// Default delay before pushing in order to detect dependency cycles. +const defaultDependencyCyclePushDelay = 100 * time.Millisecond + +// Silence unused warning. +var _ = defaultDependencyCyclePushDelay + +// lockTableWaiterImpl is an implementation of lockTableWaiter. +type lockTableWaiterImpl struct { + nodeID roachpb.NodeID + stopper *stop.Stopper + + // Used to push conflicting transactions and resolve conflicting intents. + ir intentResolver + + // How long to wait until pushing conflicting transactions to detect + // dependency cycles. + dependencyCyclePushDelay time.Duration +} + +// intentResolver is an interface used by lockTableWaiterImpl to push +// transactions and to resolve intents. It contains only the subset of the +// intentresolver.IntentResolver interface that lockTableWaiterImpl needs. +type intentResolver interface { + // PushTransaction pushes the provided transaction. The method will push the + // provided pushee transaction immediately, if possible. Otherwise, it will + // block until the pushee transaction is finalized or eventually can be + // pushed successfully. + PushTransaction( + context.Context, *enginepb.TxnMeta, roachpb.Header, roachpb.PushTxnType, + ) (roachpb.Transaction, *Error) + + // ResolveIntent resolves the provided intent according to the options. + ResolveIntent(context.Context, roachpb.Intent, intentresolver.ResolveOptions) *Error +} + +// WaitOn implements the lockTableWaiter interface. +func (w *lockTableWaiterImpl) WaitOn( + ctx context.Context, req Request, guard lockTableGuard, +) *Error { + newStateC := guard.NewStateChan() + ctxDoneC := ctx.Done() + shouldQuiesceC := w.stopper.ShouldQuiesce() + var timer *timeutil.Timer + var timerC <-chan time.Time + var timerWaitingState waitingState + for { + select { + case <-newStateC: + timerC = nil + state := guard.CurState() + switch state.stateKind { + case waitFor: + // waitFor indicates that the request is waiting on another + // transaction. This transaction may be the lock holder of a + // conflicting lock or the head of a lock-wait queue that the + // request is a part of. + + // For non-transactional requests, there's no need to perform + // deadlock detection and the other "distinguished" (see below) + // pusher will already push to detect coordinator failures and + // abandoned locks, so there's no need to do anything in this + // state. + if req.Txn == nil { + continue + } + + // For transactional requests, the request should push to + // resolve this conflict and detect deadlocks, but only after + // delay. This delay avoids unnecessary push traffic when the + // conflicting transaction is continuing to make forward + // progress. + delay := w.dependencyCyclePushDelay + if hasMinPriority(state.txn) || hasMaxPriority(&req.Txn.TxnMeta) { + // However, if the pushee has the minimum priority or if the + // pusher has the maximum priority, push immediately. + delay = 0 + } + if timer == nil { + timer = timeutil.NewTimer() + defer timer.Stop() + } + timer.Reset(delay) + timerC = timer.C + timerWaitingState = state + + case waitForDistinguished: + // waitForDistinguished is like waitFor, except it instructs the + // waiter to immediately push the conflicting transaction instead of + // first waiting out the dependencyCyclePushDelay. The lockTable + // guarantees that there is always at least one request in the + // waitForDistinguished state for each lock that has any waiters. + // + // The purpose of the waitForDistinguished state is to avoid adding + // a delay of dependencyCyclePushDelay to the process of recovering + // from the failure of a transaction coordinator for *each* of that + // transaction's previously written intents. If we had a cache of + // aborted transaction IDs that allowed us to notice and immediately + // resolve abandoned intents then we might be able to get rid of + // this state. + if err := w.pushTxn(ctx, req, state); err != nil { + return err + } + + case waitElsewhere: + // The lockTable has hit a memory limit and is no longer maintaining + // proper lock wait-queues. However, the waiting request is still + // not safe to proceed with evaluation because there is still a + // transaction holding the lock. It should push the transaction it + // is blocked on immediately to wait in that transaction's + // txnWaitQueue. Once this completes, the request should stop + // waiting on this lockTableGuard, as it will no longer observe + // lock-table state transitions. + return w.pushTxn(ctx, req, state) + + case waitSelf: + // Another request from the same transaction is the reservation + // holder of this lock wait-queue. This can only happen when the + // request's transaction is sending multiple requests concurrently. + // Proceed with waiting without pushing anyone. + + case doneWaiting: + // The request has waited for all conflicting locks to be released + // and is at the front of any lock wait-queues. It can now stop + // waiting, re-acquire latches, and check the lockTable again for + // any new conflicts. If it find none, it can proceed with + // evaluation. + return nil + + default: + panic("unexpected waiting state") + } + + case <-timerC: + // If the transactional request was in the waitFor state and did not + // observe any update to its state for a dependencyCyclePushDelay, + // it should push. It may be the case that the transaction is part + // of a dependency cycle. + timerC = nil + timer.Read = true + if err := w.pushTxn(ctx, req, timerWaitingState); err != nil { + return err + } + + case <-ctxDoneC: + return roachpb.NewError(ctx.Err()) + + case <-shouldQuiesceC: + return roachpb.NewError(&roachpb.NodeUnavailableError{}) + } + } +} + +func (w *lockTableWaiterImpl) pushTxn(ctx context.Context, req Request, ws waitingState) *Error { + h := roachpb.Header{ + Timestamp: req.Timestamp, + UserPriority: req.Priority, + } + if req.Txn != nil { + // We are going to hand the header (and thus the transaction proto) + // to the RPC framework, after which it must not be changed (since + // that could race). Since the subsequent execution of the original + // request might mutate the transaction, make a copy here. + // + // See #9130. + h.Txn = req.Txn.Clone() + + // We must push at least to h.Timestamp, but in fact we want to + // go all the way up to a timestamp which was taken off the HLC + // after our operation started. This allows us to not have to + // restart for uncertainty as we come back and read. + obsTS, ok := h.Txn.GetObservedTimestamp(w.nodeID) + if !ok { + // This was set earlier, so it's completely unexpected to + // not be found now. + return roachpb.NewErrorf("missing observed timestamp: %+v", h.Txn) + } + h.Timestamp.Forward(obsTS) + } + + var pushType roachpb.PushTxnType + switch ws.guardAccess { + case spanset.SpanReadOnly: + pushType = roachpb.PUSH_TIMESTAMP + case spanset.SpanReadWrite: + pushType = roachpb.PUSH_ABORT + } + + pusheeTxn, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType) + if err != nil { + return err + } + if !ws.held { + return nil + } + + // We always poison due to limitations of the API: not poisoning equals + // clearing the AbortSpan, and if our pushee transaction first got pushed + // for timestamp (by us), then (by someone else) aborted and poisoned, and + // then we run the below code, we're clearing the AbortSpan illegaly. + // Furthermore, even if our pushType is not PUSH_ABORT, we may have ended + // up with the responsibility to abort the intents (for example if we find + // the transaction aborted). + // + // To do better here, we need per-intent information on whether we need to + // poison. + resolveIntent := roachpb.Intent{Span: roachpb.Span{Key: ws.key}} + resolveIntent.SetTxn(&pusheeTxn) + opts := intentresolver.ResolveOptions{Wait: false, Poison: true} + return w.ir.ResolveIntent(ctx, resolveIntent, opts) +} + +func hasMinPriority(txn *enginepb.TxnMeta) bool { + return txn.Priority == enginepb.MinTxnPriority +} + +func hasMaxPriority(txn *enginepb.TxnMeta) bool { + return txn.Priority == enginepb.MaxTxnPriority +} diff --git a/pkg/storage/concurrency/lock_table_waiter_test.go b/pkg/storage/concurrency/lock_table_waiter_test.go new file mode 100644 index 000000000000..f3988837c51a --- /dev/null +++ b/pkg/storage/concurrency/lock_table_waiter_test.go @@ -0,0 +1,376 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package concurrency + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/intentresolver" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/stretchr/testify/require" +) + +type mockIntentResolver struct { + pushTxn func(*enginepb.TxnMeta, roachpb.Header, roachpb.PushTxnType) (roachpb.Transaction, *Error) + resolveIntent func(roachpb.Intent) *Error +} + +func (m *mockIntentResolver) PushTransaction( + _ context.Context, txn *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType, +) (roachpb.Transaction, *Error) { + return m.pushTxn(txn, h, pushType) +} + +func (m *mockIntentResolver) ResolveIntent( + _ context.Context, intent roachpb.Intent, _ intentresolver.ResolveOptions, +) *Error { + return m.resolveIntent(intent) +} + +type mockLockTableGuard struct { + state waitingState + signal chan struct{} + stateObserved chan struct{} +} + +func (g *mockLockTableGuard) ShouldWait() bool { return true } +func (g *mockLockTableGuard) NewStateChan() <-chan struct{} { return g.signal } +func (g *mockLockTableGuard) CurState() waitingState { + s := g.state + if g.stateObserved != nil { + g.stateObserved <- struct{}{} + } + return s +} +func (g *mockLockTableGuard) notify() { g.signal <- struct{}{} } + +func setupLockTableWaiterTest() (*lockTableWaiterImpl, *mockIntentResolver, *mockLockTableGuard) { + ir := &mockIntentResolver{} + w := &lockTableWaiterImpl{ + nodeID: 2, + stopper: stop.NewStopper(), + ir: ir, + dependencyCyclePushDelay: 5 * time.Millisecond, + } + guard := &mockLockTableGuard{ + signal: make(chan struct{}, 1), + } + return w, ir, guard +} + +func makeTxnProto(name string) roachpb.Transaction { + return roachpb.MakeTransaction(name, []byte("key"), 0, hlc.Timestamp{WallTime: 10}, 0) +} + +// TestLockTableWaiterWithTxn tests the lockTableWaiter's behavior under +// different waiting states while a transactional request is waiting. +func TestLockTableWaiterWithTxn(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + observedTS := hlc.Timestamp{WallTime: 15} + makeReq := func() Request { + txn := makeTxnProto("request") + txn.UpdateObservedTimestamp(2, observedTS) + return Request{ + Txn: &txn, + Timestamp: txn.ReadTimestamp, + } + } + + t.Run("state", func(t *testing.T) { + t.Run("waitFor", func(t *testing.T) { + testWaitPush(t, waitFor, makeReq, observedTS) + }) + + t.Run("waitForDistinguished", func(t *testing.T) { + testWaitPush(t, waitForDistinguished, makeReq, observedTS) + }) + + t.Run("waitElsewhere", func(t *testing.T) { + testWaitPush(t, waitElsewhere, makeReq, observedTS) + }) + + t.Run("waitSelf", func(t *testing.T) { + w, _, g := setupLockTableWaiterTest() + defer w.stopper.Stop(ctx) + + // Set up an observer channel to detect when the current + // waiting state is observed. + g.state = waitingState{stateKind: waitSelf} + g.stateObserved = make(chan struct{}) + go func() { + g.notify() + <-g.stateObserved + g.notify() + <-g.stateObserved + g.state = waitingState{stateKind: doneWaiting} + g.notify() + <-g.stateObserved + }() + + err := w.WaitOn(ctx, makeReq(), g) + require.Nil(t, err) + }) + + t.Run("doneWaiting", func(t *testing.T) { + w, _, g := setupLockTableWaiterTest() + defer w.stopper.Stop(ctx) + + g.state = waitingState{stateKind: doneWaiting} + g.notify() + + err := w.WaitOn(ctx, makeReq(), g) + require.Nil(t, err) + }) + }) + + t.Run("ctx done", func(t *testing.T) { + w, _, g := setupLockTableWaiterTest() + defer w.stopper.Stop(ctx) + + ctxWithCancel, cancel := context.WithCancel(ctx) + go cancel() + + err := w.WaitOn(ctxWithCancel, makeReq(), g) + require.NotNil(t, err) + require.Equal(t, context.Canceled.Error(), err.GoError().Error()) + }) + + t.Run("stopper quiesce", func(t *testing.T) { + w, _, g := setupLockTableWaiterTest() + defer w.stopper.Stop(ctx) + + go func() { + w.stopper.Quiesce(ctx) + }() + + err := w.WaitOn(ctx, makeReq(), g) + require.NotNil(t, err) + require.IsType(t, &roachpb.NodeUnavailableError{}, err.GetDetail()) + }) +} + +// TestLockTableWaiterWithNonTxn tests the lockTableWaiter's behavior under +// different waiting states while a non-transactional request is waiting. +func TestLockTableWaiterWithNonTxn(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + reqHeaderTS := hlc.Timestamp{WallTime: 10} + makeReq := func() Request { + return Request{ + Timestamp: reqHeaderTS, + Priority: roachpb.NormalUserPriority, + } + } + + t.Run("state", func(t *testing.T) { + t.Run("waitFor", func(t *testing.T) { + t.Log("waitFor does not cause non-transactional requests to push") + + w, _, g := setupLockTableWaiterTest() + defer w.stopper.Stop(ctx) + + // Set up an observer channel to detect when the current + // waiting state is observed. + g.state = waitingState{stateKind: waitFor} + g.stateObserved = make(chan struct{}) + go func() { + g.notify() + <-g.stateObserved + g.notify() + <-g.stateObserved + g.state = waitingState{stateKind: doneWaiting} + g.notify() + <-g.stateObserved + }() + + err := w.WaitOn(ctx, makeReq(), g) + require.Nil(t, err) + }) + + t.Run("waitForDistinguished", func(t *testing.T) { + testWaitPush(t, waitForDistinguished, makeReq, reqHeaderTS) + }) + + t.Run("waitElsewhere", func(t *testing.T) { + testWaitPush(t, waitElsewhere, makeReq, reqHeaderTS) + }) + + t.Run("waitSelf", func(t *testing.T) { + t.Log("waitSelf is not possible for non-transactional request") + }) + + t.Run("doneWaiting", func(t *testing.T) { + w, _, g := setupLockTableWaiterTest() + defer w.stopper.Stop(ctx) + + g.state = waitingState{stateKind: doneWaiting} + g.notify() + + err := w.WaitOn(ctx, makeReq(), g) + require.Nil(t, err) + }) + }) + + t.Run("ctx done", func(t *testing.T) { + w, _, g := setupLockTableWaiterTest() + defer w.stopper.Stop(ctx) + + ctxWithCancel, cancel := context.WithCancel(ctx) + go cancel() + + err := w.WaitOn(ctxWithCancel, makeReq(), g) + require.NotNil(t, err) + require.Equal(t, context.Canceled.Error(), err.GoError().Error()) + }) + + t.Run("stopper quiesce", func(t *testing.T) { + w, _, g := setupLockTableWaiterTest() + defer w.stopper.Stop(ctx) + + go func() { + w.stopper.Quiesce(ctx) + }() + + err := w.WaitOn(ctx, makeReq(), g) + require.NotNil(t, err) + require.IsType(t, &roachpb.NodeUnavailableError{}, err.GetDetail()) + }) +} + +func testWaitPush(t *testing.T, k stateKind, makeReq func() Request, expPushTS hlc.Timestamp) { + ctx := context.Background() + keyA := roachpb.Key("keyA") + testutils.RunTrueAndFalse(t, "lockHeld", func(t *testing.T, lockHeld bool) { + testutils.RunTrueAndFalse(t, "waitAsWrite", func(t *testing.T, waitAsWrite bool) { + w, ir, g := setupLockTableWaiterTest() + defer w.stopper.Stop(ctx) + pusheeTxn := makeTxnProto("pushee") + + g.state = waitingState{ + stateKind: k, + txn: &pusheeTxn.TxnMeta, + ts: pusheeTxn.WriteTimestamp, + key: keyA, + held: false, + access: spanset.SpanReadWrite, + guardAccess: spanset.SpanReadOnly, + } + if lockHeld { + g.state.held = true + } + if waitAsWrite { + g.state.guardAccess = spanset.SpanReadWrite + } + g.notify() + + req := makeReq() + ir.pushTxn = func( + pusheeArg *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType, + ) (roachpb.Transaction, *Error) { + require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg) + require.Equal(t, req.Txn, h.Txn) + require.Equal(t, expPushTS, h.Timestamp) + if waitAsWrite { + require.Equal(t, roachpb.PUSH_ABORT, pushType) + } else { + require.Equal(t, roachpb.PUSH_TIMESTAMP, pushType) + } + + resp := roachpb.Transaction{TxnMeta: *pusheeArg, Status: roachpb.ABORTED} + + // If the lock is held, we'll try to resolve it now that + // we know the holder is ABORTED. Otherwide, immediately + // tell the request to stop waiting. + if lockHeld { + ir.resolveIntent = func(intent roachpb.Intent) *Error { + require.Equal(t, keyA, intent.Key) + require.Equal(t, pusheeTxn.ID, intent.Txn.ID) + require.Equal(t, roachpb.ABORTED, intent.Status) + g.state = waitingState{stateKind: doneWaiting} + g.notify() + return nil + } + } else { + g.state = waitingState{stateKind: doneWaiting} + g.notify() + } + return resp, nil + } + + err := w.WaitOn(ctx, req, g) + require.Nil(t, err) + }) + }) +} + +// TestLockTableWaiterIntentResolverError tests that the lockTableWaiter +// propagates errors from its intent resolver when it pushes transactions +// or resolves their intents. +func TestLockTableWaiterIntentResolverError(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + w, ir, g := setupLockTableWaiterTest() + defer w.stopper.Stop(ctx) + + err1 := roachpb.NewErrorf("error1") + err2 := roachpb.NewErrorf("error2") + + req := Request{ + Timestamp: hlc.Timestamp{WallTime: 10}, + Priority: roachpb.NormalUserPriority, + } + + keyA := roachpb.Key("keyA") + pusheeTxn := makeTxnProto("pushee") + g.state = waitingState{ + stateKind: waitForDistinguished, + txn: &pusheeTxn.TxnMeta, + ts: pusheeTxn.WriteTimestamp, + key: keyA, + held: true, + access: spanset.SpanReadWrite, + guardAccess: spanset.SpanReadWrite, + } + + // Errors are propagated when observed while pushing transactions. + g.notify() + ir.pushTxn = func( + _ *enginepb.TxnMeta, _ roachpb.Header, _ roachpb.PushTxnType, + ) (roachpb.Transaction, *Error) { + return roachpb.Transaction{}, err1 + } + err := w.WaitOn(ctx, req, g) + require.Equal(t, err1, err) + + // Errors are propagated when observed while resolving intents. + g.notify() + ir.pushTxn = func( + _ *enginepb.TxnMeta, _ roachpb.Header, _ roachpb.PushTxnType, + ) (roachpb.Transaction, *Error) { + return roachpb.Transaction{}, nil + } + ir.resolveIntent = func(intent roachpb.Intent) *Error { + return err2 + } + err = w.WaitOn(ctx, req, g) + require.Equal(t, err2, err) +} diff --git a/pkg/storage/concurrency/testdata/lock_table/basic b/pkg/storage/concurrency/testdata/lock_table/basic index b736d46ae14e..22a8aeafa0dd 100644 --- a/pkg/storage/concurrency/testdata/lock_table/basic +++ b/pkg/storage/concurrency/testdata/lock_table/basic @@ -113,7 +113,7 @@ start-waiting: true guard-state r=req4 ---- -new: state=waitForDistinguished txn=txn1 ts=10,2 +new: state=waitForDistinguished txn=txn1 ts=10,2 key="b" held=true guard-access=write # Release lock on b since epoch of txn1 has changed. update txn=txn1 ts=11,1 epoch=1 span=b @@ -131,7 +131,7 @@ local: num=0 guard-state r=req4 ---- -new: state=waitForDistinguished txn=txn1 ts=10,1 +new: state=waitForDistinguished txn=txn1 ts=10,1 key="c" held=true guard-access=write # Release lock on c since epoch of txn1 has changed. update txn=txn1 ts=11,1 epoch=1 span=c,e @@ -223,7 +223,7 @@ start-waiting: true guard-state r=req4 ---- -new: state=waitForDistinguished txn=txn3 ts=6 +new: state=waitForDistinguished txn=txn3 ts=6 key="f" held=true guard-access=read print ---- @@ -322,7 +322,7 @@ local: num=0 guard-state r=req6 ---- -new: state=waitFor txn=txn3 ts=6 +new: state=waitFor txn=txn3 ts=6 key="f" held=true guard-access=read # req7 from txn3 only wants to write to c @@ -351,7 +351,7 @@ start-waiting: true guard-state r=req7 ---- -new: state=waitForDistinguished txn=txn2 ts=8,12 +new: state=waitForDistinguished txn=txn2 ts=8,12 key="c" held=false guard-access=write print ---- @@ -398,11 +398,11 @@ local: num=0 guard-state r=req6 ---- -new: state=waitForDistinguished txn=txn2 ts=8,12 +new: state=waitForDistinguished txn=txn2 ts=8,12 key="b" held=false guard-access=write guard-state r=req4 ---- -new: state=waitForDistinguished txn=txn3 ts=6 +new: state=waitForDistinguished txn=txn3 ts=6 key="a" held=true guard-access=write # Locks: # a b c d e f g @@ -464,7 +464,7 @@ local: num=0 guard-state r=req6 ---- -old: state=waitForDistinguished txn=txn2 ts=8,12 +old: state=waitForDistinguished txn=txn2 ts=8,12 key="b" held=false guard-access=write guard-state r=req4 ---- @@ -530,11 +530,11 @@ local: num=0 guard-state r=req6 ---- -old: state=waitForDistinguished txn=txn2 ts=8,12 +old: state=waitForDistinguished txn=txn2 ts=8,12 key="b" held=false guard-access=write guard-state r=req7 ---- -old: state=waitForDistinguished txn=txn2 ts=8,12 +old: state=waitForDistinguished txn=txn2 ts=8,12 key="c" held=false guard-access=write print ---- @@ -572,14 +572,6 @@ global: num=3 holder: txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,1 local: num=0 -guard-state r=req6 ----- -old: state=waitForDistinguished txn=txn2 ts=8,12 - -guard-state r=req7 ----- -old: state=waitForDistinguished txn=txn2 ts=8,12 - # Locks: # a b c d e f g # holder txn2 txn2 txn1 @@ -615,7 +607,7 @@ new: state=doneWaiting guard-state r=req6 ---- -old: state=waitForDistinguished txn=txn2 ts=8,12 +old: state=waitForDistinguished txn=txn2 ts=8,12 key="b" held=false guard-access=write print ---- @@ -688,7 +680,7 @@ start-waiting: true guard-state r=req7 ---- -new: state=waitForDistinguished txn=txn1 ts=11,1 +new: state=waitForDistinguished txn=txn1 ts=11,1 key="c" held=false guard-access=write scan r=req6 ---- @@ -730,7 +722,7 @@ start-waiting: true guard-state r=req8 ---- -new: state=waitForDistinguished txn=txn1 ts=10,1 +new: state=waitForDistinguished txn=txn1 ts=10,1 key="e" held=true guard-access=write done r=req8 ---- @@ -789,7 +781,7 @@ start-waiting: true guard-state r=req10 ---- -new: state=waitForDistinguished txn=txn1 ts=10,1 +new: state=waitForDistinguished txn=txn1 ts=10,1 key="c" held=true guard-access=write request r=req11 txn=txn3 ts=6 spans=w@c ---- @@ -800,7 +792,7 @@ start-waiting: true guard-state r=req11 ---- -new: state=waitFor txn=txn1 ts=10,1 +new: state=waitFor txn=txn1 ts=10,1 key="c" held=true guard-access=write request r=req12 txn=txn2 ts=8,12 spans=w@c ---- @@ -811,7 +803,7 @@ start-waiting: true guard-state r=req12 ---- -new: state=waitFor txn=txn1 ts=10,1 +new: state=waitFor txn=txn1 ts=10,1 key="c" held=true guard-access=write print ---- @@ -842,7 +834,7 @@ new: state=doneWaiting guard-state r=req11 ---- -new: state=waitForDistinguished txn=txn2 ts=8,12 +new: state=waitForDistinguished txn=txn2 ts=8,12 key="c" held=false guard-access=write guard-state r=req12 ---- @@ -875,7 +867,7 @@ local: num=0 guard-state r=req11 ---- -old: state=waitForDistinguished txn=txn2 ts=8,12 +old: state=waitForDistinguished txn=txn2 ts=8,12 key="c" held=false guard-access=write # Since req10 that is also txn2 has acquired the lock, req12 does not need to wait here anymore. guard-state r=req12 @@ -924,7 +916,7 @@ local: num=0 guard-state r=req11 ---- -old: state=waitForDistinguished txn=txn2 ts=8,12 +old: state=waitForDistinguished txn=txn2 ts=8,12 key="c" held=false guard-access=write release txn=txn2 span=b,d ---- @@ -1095,7 +1087,7 @@ local: num=0 guard-state r=req18 ---- -new: state=waitFor txn=txn1 ts=9 +new: state=waitFor txn=txn1 ts=9 key="d" held=true guard-access=write print ---- diff --git a/pkg/storage/concurrency/testdata/lock_table/non_txn_write b/pkg/storage/concurrency/testdata/lock_table/non_txn_write index d3c2aea22eae..b7fcce2642fa 100644 --- a/pkg/storage/concurrency/testdata/lock_table/non_txn_write +++ b/pkg/storage/concurrency/testdata/lock_table/non_txn_write @@ -134,7 +134,7 @@ new: state=doneWaiting guard-state r=req4 ---- -new: state=waitForDistinguished txn=txn2 ts=10 +new: state=waitForDistinguished txn=txn2 ts=10 key="a" held=false guard-access=write guard-state r=req5 ---- @@ -181,7 +181,7 @@ local: num=0 guard-state r=req4 ---- -new: state=waitForDistinguished txn=txn3 ts=10 +new: state=waitForDistinguished txn=txn3 ts=10 key="b" held=false guard-access=write guard-state r=req6 ---- @@ -258,11 +258,11 @@ start-waiting: true guard-state r=req4 ---- -new: state=waitForDistinguished txn=txn2 ts=10 +new: state=waitForDistinguished txn=txn2 ts=10 key="c" held=true guard-access=write guard-state r=req5 ---- -new: state=waitFor txn=txn2 ts=10 +new: state=waitFor txn=txn2 ts=10 key="c" held=true guard-access=write # Release the lock. The non-transactional request does not acquire the reservation. diff --git a/pkg/storage/intentresolver/intent_resolver.go b/pkg/storage/intentresolver/intent_resolver.go index 8819fa923b57..4e5af193f005 100644 --- a/pkg/storage/intentresolver/intent_resolver.go +++ b/pkg/storage/intentresolver/intent_resolver.go @@ -279,9 +279,9 @@ func (ir *IntentResolver) ProcessWriteIntentError( // // To do better here, we need per-intent information on whether we need to // poison. - if err := ir.ResolveIntents(ctx, resolveIntents, - ResolveOptions{Wait: false, Poison: true}); err != nil { - return cleanup, roachpb.NewError(err) + opts := ResolveOptions{Wait: false, Poison: true} + if pErr := ir.ResolveIntents(ctx, resolveIntents, opts); pErr != nil { + return cleanup, pErr } return cleanup, nil @@ -333,8 +333,9 @@ func (ir *IntentResolver) maybePushIntents( skipIfInFlight bool, ) ([]roachpb.Intent, *roachpb.Error) { // Attempt to push the transaction(s) which created the conflicting intent(s). - pushTxns := make(map[uuid.UUID]enginepb.TxnMeta) - for _, intent := range intents { + pushTxns := make(map[uuid.UUID]*enginepb.TxnMeta) + for i := range intents { + intent := &intents[i] if intent.Status != roachpb.PENDING { // The current intent does not need conflict resolution // because the transaction is already finalized. @@ -342,7 +343,7 @@ func (ir *IntentResolver) maybePushIntents( // the PENDING status. return nil, roachpb.NewErrorf("unexpected %s intent: %+v", intent.Status, intent) } - pushTxns[intent.Txn.ID] = intent.Txn + pushTxns[intent.Txn.ID] = &intent.Txn } pushedTxns, pErr := ir.MaybePushTransactions(ctx, pushTxns, h, pushType, skipIfInFlight) @@ -380,13 +381,32 @@ func updateIntentTxnStatus( return results } +// PushTransaction takes a transaction and pushes its record using the specified +// push type and request header. It returns the transaction proto corresponding +// to the pushed transaction. +func (ir *IntentResolver) PushTransaction( + ctx context.Context, pushTxn *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType, +) (roachpb.Transaction, *roachpb.Error) { + pushTxns := make(map[uuid.UUID]*enginepb.TxnMeta, 1) + pushTxns[pushTxn.ID] = pushTxn + pushedTxns, pErr := ir.MaybePushTransactions(ctx, pushTxns, h, pushType, false /* skipIfInFlight */) + if pErr != nil { + return roachpb.Transaction{}, pErr + } + pushedTxn, ok := pushedTxns[pushTxn.ID] + if !ok { + log.Fatalf(ctx, "missing PushTxn responses for %s", pushTxn) + } + return pushedTxn, nil +} + // MaybePushTransactions is like maybePushIntents except it takes a set of // transactions to push instead of a set of intents. This set of provided // transactions may be modified by the method. It returns a set of transaction // protos corresponding to the pushed transactions. func (ir *IntentResolver) MaybePushTransactions( ctx context.Context, - pushTxns map[uuid.UUID]enginepb.TxnMeta, + pushTxns map[uuid.UUID]*enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType, skipIfInFlight bool, @@ -436,7 +456,7 @@ func (ir *IntentResolver) MaybePushTransactions( Key: pushTxn.Key, }, PusherTxn: pusherTxn, - PusheeTxn: pushTxn, + PusheeTxn: *pushTxn, PushTo: h.Timestamp.Next(), PushType: pushType, }) @@ -538,7 +558,7 @@ func (ir *IntentResolver) CleanupIntents( sort.Sort(intentsByTxn(intents)) resolved := 0 const skipIfInFlight = true - pushTxns := make(map[uuid.UUID]enginepb.TxnMeta) + pushTxns := make(map[uuid.UUID]*enginepb.TxnMeta) for unpushed := intents; len(unpushed) > 0; { for k := range pushTxns { // clear the pushTxns map delete(pushTxns, k) @@ -546,7 +566,7 @@ func (ir *IntentResolver) CleanupIntents( var prevTxnID uuid.UUID var i int for i = 0; i < len(unpushed); i++ { - if curTxn := unpushed[i].Txn; curTxn.ID != prevTxnID { + if curTxn := &unpushed[i].Txn; curTxn.ID != prevTxnID { if len(pushTxns) == cleanupIntentsTxnsPerBatch { break } @@ -580,10 +600,9 @@ func (ir *IntentResolver) CleanupIntents( // same situation as above. // // Thus, we must poison. - if err := ir.ResolveIntents( - ctx, resolveIntents, ResolveOptions{Wait: true, Poison: true}, - ); err != nil { - return 0, errors.Wrapf(err, "failed to resolve intents") + opts := ResolveOptions{Wait: true, Poison: true} + if pErr := ir.ResolveIntents(ctx, resolveIntents, opts); pErr != nil { + return 0, errors.Wrapf(pErr.GoError(), "failed to resolve intents") } resolved += len(resolveIntents) unpushed = unpushed[i:] @@ -803,8 +822,8 @@ func (ir *IntentResolver) cleanupFinishedTxnIntents( // Resolve intents. min, _ := txn.InclusiveTimeBounds() opts := ResolveOptions{Wait: true, Poison: poison, MinTimestamp: min} - if err := ir.ResolveIntents(ctx, intents, opts); err != nil { - return errors.Wrapf(err, "failed to resolve intents") + if pErr := ir.ResolveIntents(ctx, intents, opts); pErr != nil { + return errors.Wrapf(pErr.GoError(), "failed to resolve intents") } // Run transaction record GC outside of ir.sem. return ir.stopper.RunAsyncTask( @@ -862,17 +881,24 @@ func (ir *IntentResolver) lookupRangeID(ctx context.Context, key roachpb.Key) ro return rDesc.RangeID } -// ResolveIntents synchronously resolves intents accordings to opts. +// ResolveIntent synchronously resolves an intent according to opts. +func (ir *IntentResolver) ResolveIntent( + ctx context.Context, intent roachpb.Intent, opts ResolveOptions, +) *roachpb.Error { + return ir.ResolveIntents(ctx, []roachpb.Intent{intent}, opts) +} + +// ResolveIntents synchronously resolves intents according to opts. func (ir *IntentResolver) ResolveIntents( ctx context.Context, intents []roachpb.Intent, opts ResolveOptions, -) error { +) *roachpb.Error { if len(intents) == 0 { return nil } // Avoid doing any work on behalf of expired contexts. See // https://github.com/cockroachdb/cockroach/issues/15997. if err := ctx.Err(); err != nil { - return errors.Wrap(err, "aborted resolving intents") + return roachpb.NewError(err) } log.Eventf(ctx, "resolving intents [wait=%t]", opts.Wait) ctx, cancel := context.WithCancel(ctx) @@ -911,18 +937,18 @@ func (ir *IntentResolver) ResolveIntents( respChan := make(chan requestbatcher.Response, len(resolveReqs)) for _, req := range resolveReqs { if err := ir.irBatcher.SendWithChan(ctx, respChan, req.rangeID, req.req); err != nil { - return err + return roachpb.NewError(err) } } for seen := 0; seen < len(resolveReqs); seen++ { select { case resp := <-respChan: if resp.Err != nil { - return resp.Err + return roachpb.NewError(resp.Err) } _ = resp.Resp // ignore the response case <-ctx.Done(): - return ctx.Err() + return roachpb.NewError(ctx.Err()) } } @@ -935,7 +961,7 @@ func (ir *IntentResolver) ResolveIntents( b.Header.MaxSpanRequestKeys = intentResolverBatchSize b.AddRawRequest(req) if err := ir.db.Run(ctx, b); err != nil { - return err + return b.MustPErr() } // Check response to see if it must be resumed. resp := b.RawResponse().Responses[0].GetInner().(*roachpb.ResolveIntentRangeResponse) diff --git a/pkg/storage/replica_rangefeed.go b/pkg/storage/replica_rangefeed.go index 328f595f298c..54d1ac740629 100644 --- a/pkg/storage/replica_rangefeed.go +++ b/pkg/storage/replica_rangefeed.go @@ -72,8 +72,9 @@ type rangefeedTxnPusher struct { func (tp *rangefeedTxnPusher) PushTxns( ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp, ) ([]roachpb.Transaction, error) { - pushTxnMap := make(map[uuid.UUID]enginepb.TxnMeta, len(txns)) - for _, txn := range txns { + pushTxnMap := make(map[uuid.UUID]*enginepb.TxnMeta, len(txns)) + for i := range txns { + txn := &txns[i] pushTxnMap[txn.ID] = txn }