Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage/concurrency: don't push reservation transactions #45420

Merged
merged 2 commits into from
Feb 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/storage/concurrency/lock_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,7 @@ func TestLockTableConcurrentRequests(t *testing.T) {
const numActiveTxns = 8
var activeTxns [numActiveTxns]*enginepb.TxnMeta
var items []workloadItem
const numRequests = 20000
const numRequests = 1000
for i := 0; i < numRequests; i++ {
var txnMeta *enginepb.TxnMeta
var ts hlc.Timestamp
Expand Down Expand Up @@ -968,7 +968,7 @@ func TestLockTableConcurrentRequests(t *testing.T) {
for _, c := range concurrency {
t.Run(fmt.Sprintf("concurrency %d", c), func(t *testing.T) {
exec := newWorkLoadExecutor(items, c)
if err := exec.execute(false, 2000); err != nil {
if err := exec.execute(false, 200); err != nil {
t.Fatal(err)
}
})
Expand Down
30 changes: 30 additions & 0 deletions pkg/storage/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,36 @@ func (w *lockTableWaiterImpl) WaitOn(
case <-newStateC:
timerC = nil
state := guard.CurState()
if !state.held {
// If the lock is not held and instead has a reservation, we don't
// want to push the reservation transaction. A transaction push will
// block until the pushee transaction has either committed, aborted,
// pushed, or rolled back savepoints, i.e., there is some state
// change that has happened to the transaction record that unblocks
// the pusher. It will not unblock merely because a request issued
// by the pushee transaction has completed and released a
// reservation. Note that:
// - reservations are not a guarantee that the lock will be acquired.
// - the following two reasons to push do not apply to requests
// holding reservations:
// 1. competing requests compete at exactly one lock table, so there
// is no possibility of distributed deadlock due to reservations.
// 2. the lock table can prioritize requests based on transaction
// priorities.
//
// TODO(sbhola): remove the need for this by only notifying waiters
// for held locks and never for reservations.
// TODO(sbhola): now that we never push reservation holders, we
// should stop special-casing non-transactional writes and let them
// acquire reservations.
switch state.stateKind {
case waitFor, waitForDistinguished:
continue
case waitElsewhere:
return nil
}
}

switch state.stateKind {
case waitFor:
// waitFor indicates that the request is waiting on another
Expand Down
85 changes: 52 additions & 33 deletions pkg/storage/concurrency/lock_table_waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,53 +270,72 @@ func testWaitPush(t *testing.T, k stateKind, makeReq func() Request, expPushTS h
txn: &pusheeTxn.TxnMeta,
ts: pusheeTxn.WriteTimestamp,
key: keyA,
held: false,
held: lockHeld,
access: spanset.SpanReadWrite,
guardAccess: spanset.SpanReadOnly,
}
if lockHeld {
g.state.held = true
}
if waitAsWrite {
g.state.guardAccess = spanset.SpanReadWrite
}
g.notify()

req := makeReq()
ir.pushTxn = func(
_ context.Context,
pusheeArg *enginepb.TxnMeta,
h roachpb.Header,
pushType roachpb.PushTxnType,
) (roachpb.Transaction, *Error) {
require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg)
require.Equal(t, req.Txn, h.Txn)
require.Equal(t, expPushTS, h.Timestamp)
if waitAsWrite {
require.Equal(t, roachpb.PUSH_ABORT, pushType)
} else {
require.Equal(t, roachpb.PUSH_TIMESTAMP, pushType)
}

resp := roachpb.Transaction{TxnMeta: *pusheeArg, Status: roachpb.ABORTED}
if lockHeld {
// We expect the holder to be pushed.
ir.pushTxn = func(
_ context.Context,
pusheeArg *enginepb.TxnMeta,
h roachpb.Header,
pushType roachpb.PushTxnType,
) (roachpb.Transaction, *Error) {
require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg)
require.Equal(t, req.Txn, h.Txn)
require.Equal(t, expPushTS, h.Timestamp)
if waitAsWrite {
require.Equal(t, roachpb.PUSH_ABORT, pushType)
} else {
require.Equal(t, roachpb.PUSH_TIMESTAMP, pushType)
}

// If the lock is held, we'll try to resolve it now that
// we know the holder is ABORTED. Otherwide, immediately
// tell the request to stop waiting.
if lockHeld {
ir.resolveIntent = func(_ context.Context, intent roachpb.LockUpdate) *Error {
require.Equal(t, keyA, intent.Key)
require.Equal(t, pusheeTxn.ID, intent.Txn.ID)
require.Equal(t, roachpb.ABORTED, intent.Status)
resp := roachpb.Transaction{TxnMeta: *pusheeArg, Status: roachpb.ABORTED}

// If the lock is held, we'll try to resolve it now that
// we know the holder is ABORTED. Otherwide, immediately
// tell the request to stop waiting.
if lockHeld {
ir.resolveIntent = func(_ context.Context, intent roachpb.LockUpdate) *Error {
require.Equal(t, keyA, intent.Key)
require.Equal(t, pusheeTxn.ID, intent.Txn.ID)
require.Equal(t, roachpb.ABORTED, intent.Status)
g.state = waitingState{stateKind: doneWaiting}
g.notify()
return nil
}
} else {
g.state = waitingState{stateKind: doneWaiting}
g.notify()
return nil
}
} else {
g.state = waitingState{stateKind: doneWaiting}
g.notify()
return resp, nil
}
} else {
switch k {
case waitFor, waitForDistinguished:
// We don't expect the holder to be pushed. Set up an observer
// channel to detect when the current waiting state is observed.
g.stateObserved = make(chan struct{})
go func() {
<-g.stateObserved
g.notify()
<-g.stateObserved
g.state = waitingState{stateKind: doneWaiting}
g.notify()
<-g.stateObserved
}()
case waitElsewhere:
// Expect an immediate return.
default:
t.Fatalf("unexpected state: %v", k)
}
return resp, nil
}

err := w.WaitOn(ctx, req, g)
Expand Down