diff --git a/pkg/storage/concurrency/lock_table_test.go b/pkg/storage/concurrency/lock_table_test.go index 67228fc47ad6..17aaf4b6e7c3 100644 --- a/pkg/storage/concurrency/lock_table_test.go +++ b/pkg/storage/concurrency/lock_table_test.go @@ -900,7 +900,7 @@ func TestLockTableConcurrentRequests(t *testing.T) { const numActiveTxns = 8 var activeTxns [numActiveTxns]*enginepb.TxnMeta var items []workloadItem - const numRequests = 20000 + const numRequests = 1000 for i := 0; i < numRequests; i++ { var txnMeta *enginepb.TxnMeta var ts hlc.Timestamp @@ -968,7 +968,7 @@ func TestLockTableConcurrentRequests(t *testing.T) { for _, c := range concurrency { t.Run(fmt.Sprintf("concurrency %d", c), func(t *testing.T) { exec := newWorkLoadExecutor(items, c) - if err := exec.execute(false, 2000); err != nil { + if err := exec.execute(false, 200); err != nil { t.Fatal(err) } }) diff --git a/pkg/storage/concurrency/lock_table_waiter.go b/pkg/storage/concurrency/lock_table_waiter.go index 31e2f7efb740..b7af90ff4507 100644 --- a/pkg/storage/concurrency/lock_table_waiter.go +++ b/pkg/storage/concurrency/lock_table_waiter.go @@ -72,6 +72,36 @@ func (w *lockTableWaiterImpl) WaitOn( case <-newStateC: timerC = nil state := guard.CurState() + if !state.held { + // If the lock is not held and instead has a reservation, we don't + // want to push the reservation transaction. A transaction push will + // block until the pushee transaction has either committed, aborted, + // pushed, or rolled back savepoints, i.e., there is some state + // change that has happened to the transaction record that unblocks + // the pusher. It will not unblock merely because a request issued + // by the pushee transaction has completed and released a + // reservation. Note that: + // - reservations are not a guarantee that the lock will be acquired. + // - the following two reasons to push do not apply to requests + // holding reservations: + // 1. competing requests compete at exactly one lock table, so there + // is no possibility of distributed deadlock due to reservations. + // 2. the lock table can prioritize requests based on transaction + // priorities. + // + // TODO(sbhola): remove the need for this by only notifying waiters + // for held locks and never for reservations. + // TODO(sbhola): now that we never push reservation holders, we + // should stop special-casing non-transactional writes and let them + // acquire reservations. + switch state.stateKind { + case waitFor, waitForDistinguished: + continue + case waitElsewhere: + return nil + } + } + switch state.stateKind { case waitFor: // waitFor indicates that the request is waiting on another diff --git a/pkg/storage/concurrency/lock_table_waiter_test.go b/pkg/storage/concurrency/lock_table_waiter_test.go index 18110c701be5..cfed5735e8c8 100644 --- a/pkg/storage/concurrency/lock_table_waiter_test.go +++ b/pkg/storage/concurrency/lock_table_waiter_test.go @@ -270,53 +270,72 @@ func testWaitPush(t *testing.T, k stateKind, makeReq func() Request, expPushTS h txn: &pusheeTxn.TxnMeta, ts: pusheeTxn.WriteTimestamp, key: keyA, - held: false, + held: lockHeld, access: spanset.SpanReadWrite, guardAccess: spanset.SpanReadOnly, } - if lockHeld { - g.state.held = true - } if waitAsWrite { g.state.guardAccess = spanset.SpanReadWrite } g.notify() req := makeReq() - ir.pushTxn = func( - _ context.Context, - pusheeArg *enginepb.TxnMeta, - h roachpb.Header, - pushType roachpb.PushTxnType, - ) (roachpb.Transaction, *Error) { - require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg) - require.Equal(t, req.Txn, h.Txn) - require.Equal(t, expPushTS, h.Timestamp) - if waitAsWrite { - require.Equal(t, roachpb.PUSH_ABORT, pushType) - } else { - require.Equal(t, roachpb.PUSH_TIMESTAMP, pushType) - } - - resp := roachpb.Transaction{TxnMeta: *pusheeArg, Status: roachpb.ABORTED} + if lockHeld { + // We expect the holder to be pushed. + ir.pushTxn = func( + _ context.Context, + pusheeArg *enginepb.TxnMeta, + h roachpb.Header, + pushType roachpb.PushTxnType, + ) (roachpb.Transaction, *Error) { + require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg) + require.Equal(t, req.Txn, h.Txn) + require.Equal(t, expPushTS, h.Timestamp) + if waitAsWrite { + require.Equal(t, roachpb.PUSH_ABORT, pushType) + } else { + require.Equal(t, roachpb.PUSH_TIMESTAMP, pushType) + } - // If the lock is held, we'll try to resolve it now that - // we know the holder is ABORTED. Otherwide, immediately - // tell the request to stop waiting. - if lockHeld { - ir.resolveIntent = func(_ context.Context, intent roachpb.LockUpdate) *Error { - require.Equal(t, keyA, intent.Key) - require.Equal(t, pusheeTxn.ID, intent.Txn.ID) - require.Equal(t, roachpb.ABORTED, intent.Status) + resp := roachpb.Transaction{TxnMeta: *pusheeArg, Status: roachpb.ABORTED} + + // If the lock is held, we'll try to resolve it now that + // we know the holder is ABORTED. Otherwide, immediately + // tell the request to stop waiting. + if lockHeld { + ir.resolveIntent = func(_ context.Context, intent roachpb.LockUpdate) *Error { + require.Equal(t, keyA, intent.Key) + require.Equal(t, pusheeTxn.ID, intent.Txn.ID) + require.Equal(t, roachpb.ABORTED, intent.Status) + g.state = waitingState{stateKind: doneWaiting} + g.notify() + return nil + } + } else { g.state = waitingState{stateKind: doneWaiting} g.notify() - return nil } - } else { - g.state = waitingState{stateKind: doneWaiting} - g.notify() + return resp, nil + } + } else { + switch k { + case waitFor, waitForDistinguished: + // We don't expect the holder to be pushed. Set up an observer + // channel to detect when the current waiting state is observed. + g.stateObserved = make(chan struct{}) + go func() { + <-g.stateObserved + g.notify() + <-g.stateObserved + g.state = waitingState{stateKind: doneWaiting} + g.notify() + <-g.stateObserved + }() + case waitElsewhere: + // Expect an immediate return. + default: + t.Fatalf("unexpected state: %v", k) } - return resp, nil } err := w.WaitOn(ctx, req, g)