diff --git a/pkg/storage/concurrency/concurrency_control.go b/pkg/storage/concurrency/concurrency_control.go index 689f93e3d403..fdfb9abeb3c5 100644 --- a/pkg/storage/concurrency/concurrency_control.go +++ b/pkg/storage/concurrency/concurrency_control.go @@ -288,6 +288,7 @@ type MetricExporter interface { // other in-flight requests it conflicts with. type Request struct { // The (optional) transaction that sent the request. + // Non-transactional requests do not acquire locks. Txn *roachpb.Transaction // The timestamp that the request should evaluate at. @@ -364,7 +365,8 @@ type latchGuard interface{} // based on multiple versions requires some form of mutual exclusion to ensure // that a read and a conflicting lock acquisition do not happen concurrently. // The lock table provides both locking and sequencing of requests (in concert -// with the use of latches). +// with the use of latches). The lock table sequences both transactional and +// non-transactional requests, but the latter cannot acquire locks. // // Locks outlive the requests themselves and thereby extend the duration of the // isolation provided over specific keys to the lifetime of the lock-holder diff --git a/pkg/storage/concurrency/lock_table.go b/pkg/storage/concurrency/lock_table.go index c759d81e567a..ddd7f213aef9 100644 --- a/pkg/storage/concurrency/lock_table.go +++ b/pkg/storage/concurrency/lock_table.go @@ -301,6 +301,10 @@ func (g *lockTableGuardImpl) notify() { } } +func (g *lockTableGuardImpl) isTxn(txn *enginepb.TxnMeta) bool { + return g.txn != nil && g.txn.ID == txn.ID +} + // Waiting writers in a lockState are wrapped in a queuedGuard. A waiting // writer is typically waiting in an active state, i.e., the // lockTableGuardImpl.key refers to this lockState. However, breaking of @@ -395,6 +399,25 @@ type lockWaitQueue struct { // the behavior will extend when we start supporting Shared and Upgrade // locks. // + // Non-transactional requests can do both reads and writes but cannot be + // depended on since they don't have a transaction that can be pushed. + // Therefore they not only do not acquire locks, but cannot make reservations. + // The non-reservation for reads is already covered in the previous + // paragraph. For non-transactional writes, the request waits in the queue + // with other writers. The difference occurs: + // - when it gets to the front of the queue and there is no lock holder + // or reservation: instead of acquiring the reservation it removes + // itself from the lockState and proceeds to the next lock. If it + // does not need to wait for any more locks and manages to acquire + // latches before those locks are acquired by some other request, it + // will evaluate. + // - when deciding to wait at a lock: if the lock has a reservation with + // a sequence num higher than this non-transactional request it will + // ignore that reservation. Note that ignoring such reservations is + // safe since when this non-transactional request is holding latches + // those reservation holders cannot be holding latches, so they cannot + // conflict. + // // Multiple requests from the same transaction wait independently, including // the situation where one of the requests has a reservation and the other // is waiting (currently this can only happen if both requests are doing @@ -534,7 +557,8 @@ func (l *lockState) informActiveWaiters() { checkForWaitSelf = true waitForTxn = l.reservation.txn waitForTs = l.reservation.ts - if !findDistinguished && l.distinguishedWaiter.txn.ID == waitForTxn.ID { + if !findDistinguished && l.distinguishedWaiter.txn != nil && + l.distinguishedWaiter.txn.ID == waitForTxn.ID { findDistinguished = true l.distinguishedWaiter = nil } @@ -571,7 +595,7 @@ func (l *lockState) informActiveWaiters() { } g := qg.guard var state waitingState - if checkForWaitSelf && waitForTxn.ID == g.txn.ID { + if checkForWaitSelf && g.isTxn(waitForTxn) { state = waitSelfState } else { state = waitForState @@ -601,7 +625,8 @@ func (l *lockState) tryMakeNewDistinguished() { } else if l.queuedWriters.Len() > 0 { for e := l.queuedWriters.Front(); e != nil; e = e.Next() { qg := e.Value.(*queuedGuard) - if qg.active && (l.reservation == nil || l.reservation.txn.ID != qg.guard.txn.ID) { + if qg.active && + (l.reservation == nil || qg.guard.txn == nil || l.reservation.txn.ID != qg.guard.txn.ID) { g = qg.guard break } @@ -678,7 +703,7 @@ func (l *lockState) tryActiveWait(g *lockTableGuardImpl, sa spanset.SpanAccess, // Lock is not empty. waitForTxn, waitForTs := l.getLockerInfo() - if waitForTxn != nil && g.txn != nil && g.txn.ID == waitForTxn.ID { + if waitForTxn != nil && g.isTxn(waitForTxn) { // Already locked by this txn. return false } @@ -691,7 +716,16 @@ func (l *lockState) tryActiveWait(g *lockTableGuardImpl, sa spanset.SpanAccess, } waitForTxn = l.reservation.txn waitForTs = l.reservation.ts - reservedBySelfTxn = g.txn != nil && g.txn.ID == waitForTxn.ID + reservedBySelfTxn = g.isTxn(waitForTxn) + // A non-transactional write request never makes or breaks reservations, + // and only waits for a reservation if the reservation has a lower seqNum. + // For reads, the non-transactional and transactional behavior is + // equivalent and handled later in this function. + if g.txn == nil && sa == spanset.SpanReadWrite && l.reservation.seqNum > g.seqNum { + // Reservation is held by a request with a higher seqNum and g is a + // non-transactional request. Ignore the reservation. + return false + } } if sa == spanset.SpanReadOnly { @@ -889,7 +923,7 @@ func (l *lockState) acquireLock( curr := e e = e.Next() g := qg.guard - if g.txn.ID == txn.ID { + if g.isTxn(txn) { if qg.active { doneWaiting = append(doneWaiting, g) if g == l.distinguishedWaiter { @@ -936,16 +970,20 @@ func (l *lockState) discoveredLock( l.holder.holder[lock.Replicated].txn = txn l.holder.holder[lock.Replicated].ts = ts } - // Queue the existing reservation holder. - var hadReservation bool + + g.mu.Lock() + _, presentHere := g.mu.locks[l] + if !presentHere { + g.mu.locks[l] = struct{}{} + } + g.mu.Unlock() + + // Queue the existing reservation holder. Note that this reservation + // holder may not be equal to g due to two reasons (a) the reservation + // of g could have been broken even though g is holding latches (see + // the comment in acquireLock()), (b) g may be a non-transactional + // request (read or write) that can ignore the reservation. if l.reservation != nil { - if l.reservation == g { - hadReservation = true - } else if sa == spanset.SpanReadWrite { - // There was a reservation and it was not this request -- this is wrong - // since this request should not have evaluated and discovered this lock. - return errors.Errorf("caller violated contract") - } qg := &queuedGuard{ guard: l.reservation, active: false, @@ -959,22 +997,25 @@ func (l *lockState) discoveredLock( informWaiters = false } - if !hadReservation && sa == spanset.SpanReadWrite { - // Put self in queue as inactive waiter. Since did not have the - // reservation the lock must not have been known to be held so the queue - // must be empty. - if l.queuedWriters.Len() > 0 { - panic("lockTable bug") - } + if !presentHere && sa == spanset.SpanReadWrite { + // Put self in queue as inactive waiter. qg := &queuedGuard{ guard: g, active: false, } - l.queuedWriters.PushFront(qg) - g.mu.Lock() - g.mu.locks[l] = struct{}{} - g.mu.Unlock() - informWaiters = false + // g is not necessarily first in the queue in the (rare) case (a) above. + var e *list.Element + for e = l.queuedWriters.Front(); e != nil; e = e.Next() { + qqg := e.Value.(*queuedGuard) + if qqg.guard.seqNum > g.seqNum { + break + } + } + if e == nil { + l.queuedWriters.PushBack(qg) + } else { + l.queuedWriters.InsertBefore(qg, e) + } } // Active waiters need to be told about who they are waiting for. @@ -1251,10 +1292,34 @@ func (l *lockState) lockIsFree() (doneWaiting []*lockTableGuardImpl, gc bool) { g.mu.Unlock() doneWaiting = append(doneWaiting, g) } + + // The prefix of the queue that is non-transactional writers is done + // waiting. + for e := l.queuedWriters.Front(); e != nil; { + qg := e.Value.(*queuedGuard) + g := qg.guard + if g.txn == nil { + curr := e + e = e.Next() + l.queuedWriters.Remove(curr) + if g == l.distinguishedWaiter { + findDistinguished = true + l.distinguishedWaiter = nil + } + g.mu.Lock() + delete(g.mu.locks, l) + g.mu.Unlock() + doneWaiting = append(doneWaiting, g) + } else { + break + } + } + if l.queuedWriters.Len() == 0 { return doneWaiting, true } - // First waiting writer gets the reservation. + + // First waiting writer (it must be transactional) gets the reservation. e := l.queuedWriters.Front() qg := e.Value.(*queuedGuard) g := qg.guard @@ -1267,7 +1332,7 @@ func (l *lockState) lockIsFree() (doneWaiting []*lockTableGuardImpl, gc bool) { // Need to find a new distinguished waiter if the current distinguished is // from the same transaction (possibly it is the request that has reserved). - if l.distinguishedWaiter != nil && l.distinguishedWaiter.txn.ID == g.txn.ID { + if l.distinguishedWaiter != nil && l.distinguishedWaiter.isTxn(l.reservation.txn) { findDistinguished = true l.distinguishedWaiter = nil } @@ -1286,7 +1351,7 @@ func (l *lockState) lockIsFree() (doneWaiting []*lockTableGuardImpl, gc bool) { if qg.active { g := qg.guard var state waitingState - if g.txn.ID == l.reservation.txn.ID { + if g.isTxn(l.reservation.txn) { state = waitSelfState } else { state = waitForState @@ -1699,8 +1764,13 @@ func (t *lockTableImpl) String() string { fmt.Fprintln(&buf, " queued writers:") for e := l.queuedWriters.Front(); e != nil; e = e.Next() { qg := e.Value.(*queuedGuard) - fmt.Fprintf(&buf, " active: %t req: %d, txn: %v\n", - qg.active, qg.guard.seqNum, qg.guard.txn.ID) + g := qg.guard + txnStr := "none" + if g.txn != nil { + txnStr = fmt.Sprintf("%v", g.txn.ID) + } + fmt.Fprintf(&buf, " active: %t req: %d, txn: %s\n", + qg.active, qg.guard.seqNum, txnStr) } } if l.distinguishedWaiter != nil { diff --git a/pkg/storage/concurrency/lock_table_test.go b/pkg/storage/concurrency/lock_table_test.go index 5e3ee1100823..f929660b737b 100644 --- a/pkg/storage/concurrency/lock_table_test.go +++ b/pkg/storage/concurrency/lock_table_test.go @@ -810,22 +810,18 @@ func TestLockTableConcurrentSingleRequests(t *testing.T) { var items []workloadItem var startedTxnIDs []uuid.UUID // inefficient queue, but ok for a test. const maxStartedTxns = 10 - const numRequests = 5000 + const numRequests = 10000 for i := 0; i < numRequests; i++ { ts := timestamps[rng.Intn(len(timestamps))] keysPerm := rng.Perm(len(keys)) spans := &spanset.SpanSet{} - onlyReads := true for i := 0; i < numKeys; i++ { span := roachpb.Span{Key: keys[keysPerm[i]]} acc := spanset.SpanAccess(rng.Intn(int(spanset.NumSpanAccess))) spans.AddMVCC(acc, span, ts) - if acc != spanset.SpanReadOnly { - onlyReads = false - } } var txn *roachpb.Transaction - if !onlyReads || rng.Intn(2) == 0 { + if rng.Intn(2) == 0 { txn = &roachpb.Transaction{ TxnMeta: enginepb.TxnMeta{ ID: nextUUID(&txnCounter), @@ -882,7 +878,7 @@ func TestLockTableConcurrentRequests(t *testing.T) { const numActiveTxns = 8 var activeTxns [numActiveTxns]*enginepb.TxnMeta var items []workloadItem - const numRequests = 5000 + const numRequests = 20000 for i := 0; i < numRequests; i++ { var txnMeta *enginepb.TxnMeta var ts hlc.Timestamp @@ -909,7 +905,7 @@ func TestLockTableConcurrentRequests(t *testing.T) { } keysPerm := rng.Perm(len(keys)) spans := &spanset.SpanSet{} - onlyReads := txnMeta == nil + onlyReads := txnMeta == nil && rng.Intn(2) != 0 numKeys := rng.Intn(len(keys)-1) + 1 request := &Request{ Timestamp: ts, @@ -924,7 +920,7 @@ func TestLockTableConcurrentRequests(t *testing.T) { acc := spanset.SpanReadOnly if !onlyReads { acc = spanset.SpanAccess(rng.Intn(int(spanset.NumSpanAccess))) - if acc == spanset.SpanReadWrite && rng.Intn(2) == 0 { + if acc == spanset.SpanReadWrite && txnMeta != nil && rng.Intn(2) == 0 { wi.locksToAcquire = append(wi.locksToAcquire, span.Key) } } diff --git a/pkg/storage/concurrency/testdata/lock_table/non_txn_write b/pkg/storage/concurrency/testdata/lock_table/non_txn_write new file mode 100644 index 000000000000..d3c2aea22eae --- /dev/null +++ b/pkg/storage/concurrency/testdata/lock_table/non_txn_write @@ -0,0 +1,307 @@ +txn txn=txn1 ts=10 epoch=0 +---- + +txn txn=txn2 ts=10 epoch=0 +---- + +txn txn=txn3 ts=10 epoch=0 +---- + +# First locks at a, b, c are acquired by txn1 +request r=req1 txn=txn1 ts=10 spans=w@a+w@b+w@c +---- + +scan r=req1 +---- +start-waiting: false + +acquire r=req1 k=a durability=u +---- +global: num=1 + lock: "a" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,0 +local: num=0 + +acquire r=req1 k=b durability=u +---- +global: num=2 + lock: "a" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,0 + lock: "b" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,0 +local: num=0 + +acquire r=req1 k=c durability=u +---- +global: num=3 + lock: "a" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,0 + lock: "b" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,0 + lock: "c" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,0 +local: num=0 + +done r=req1 +---- +global: num=3 + lock: "a" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,0 + lock: "b" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,0 + lock: "c" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,0 +local: num=0 + +# Next, two different transactional requests wait at a and b. +request r=req2 txn=txn2 ts=10 spans=w@a +---- + +scan r=req2 +---- +start-waiting: true + +request r=req3 txn=txn3 ts=10 spans=w@b +---- + +scan r=req3 +---- +start-waiting: true + +# Next, a non-transactional request that wants to write a, b, c waits at a. + +request r=req4 txn=none ts=10 spans=w@a+w@b+w@c +---- + +scan r=req4 +---- +start-waiting: true + +# Next, a transactional request that arrives later than the non-transactional request waits at c + +request r=req5 txn=txn3 ts=10 spans=w@c +---- + +scan r=req5 +---- +start-waiting: true + +print +---- +global: num=3 + lock: "a" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,0 + queued writers: + active: true req: 2, txn: 00000000-0000-0000-0000-000000000002 + active: true req: 4, txn: none + distinguished req: 2 + lock: "b" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,0 + queued writers: + active: true req: 3, txn: 00000000-0000-0000-0000-000000000003 + distinguished req: 3 + lock: "c" + holder: txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,0 + queued writers: + active: true req: 5, txn: 00000000-0000-0000-0000-000000000003 + distinguished req: 5 +local: num=0 + +# The locks at a, b, c are released. The non-transactional request waits behind +# the reservation holder at a. + +release txn=txn1 span=a,d +---- +global: num=3 + lock: "a" + res: req: 2, txn: 00000000-0000-0000-0000-000000000002, ts: 0.000000010,0 + queued writers: + active: true req: 4, txn: none + distinguished req: 4 + lock: "b" + res: req: 3, txn: 00000000-0000-0000-0000-000000000003, ts: 0.000000010,0 + lock: "c" + res: req: 5, txn: 00000000-0000-0000-0000-000000000003, ts: 0.000000010,0 +local: num=0 + +guard-state r=req2 +---- +new: state=doneWaiting + +guard-state r=req3 +---- +new: state=doneWaiting + +guard-state r=req4 +---- +new: state=waitForDistinguished txn=txn2 ts=10 + +guard-state r=req5 +---- +new: state=doneWaiting + +# Add another transactional request at a. It will wait behind the non-transactional request. + +request r=req6 txn=txn1 ts=10 spans=w@a +---- + +scan r=req6 +---- +start-waiting: true + +print +---- +global: num=3 + lock: "a" + res: req: 2, txn: 00000000-0000-0000-0000-000000000002, ts: 0.000000010,0 + queued writers: + active: true req: 4, txn: none + active: true req: 6, txn: 00000000-0000-0000-0000-000000000001 + distinguished req: 4 + lock: "b" + res: req: 3, txn: 00000000-0000-0000-0000-000000000003, ts: 0.000000010,0 + lock: "c" + res: req: 5, txn: 00000000-0000-0000-0000-000000000003, ts: 0.000000010,0 +local: num=0 + +# Release the reservation at a. The first waiter is non-transactional so it will not acquire the +# reservation. The second waiter will acquire the reservation. The non-transactional request will +# wait behind the reservation holder at b. + +done r=req2 +---- +global: num=3 + lock: "a" + res: req: 6, txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,0 + lock: "b" + res: req: 3, txn: 00000000-0000-0000-0000-000000000003, ts: 0.000000010,0 + lock: "c" + res: req: 5, txn: 00000000-0000-0000-0000-000000000003, ts: 0.000000010,0 +local: num=0 + +guard-state r=req4 +---- +new: state=waitForDistinguished txn=txn3 ts=10 + +guard-state r=req6 +---- +new: state=doneWaiting + +print +---- +global: num=3 + lock: "a" + res: req: 6, txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,0 + lock: "b" + res: req: 3, txn: 00000000-0000-0000-0000-000000000003, ts: 0.000000010,0 + queued writers: + active: true req: 4, txn: none + distinguished req: 4 + lock: "c" + res: req: 5, txn: 00000000-0000-0000-0000-000000000003, ts: 0.000000010,0 +local: num=0 + +# Release the reservation at b. The non-transactional waiter will be done at b, and when it gets +# to c it will see a reservation holder with a higher sequence num and ignore it. + +done r=req3 +---- +global: num=2 + lock: "a" + res: req: 6, txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,0 + lock: "c" + res: req: 5, txn: 00000000-0000-0000-0000-000000000003, ts: 0.000000010,0 +local: num=0 + +guard-state r=req4 +---- +new: state=doneWaiting + +guard-state r=req5 +---- +old: state=doneWaiting + +print +---- +global: num=2 + lock: "a" + res: req: 6, txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,0 + lock: "c" + res: req: 5, txn: 00000000-0000-0000-0000-000000000003, ts: 0.000000010,0 +local: num=0 + +# Non-transactional request scans again and proceeds to evaluation and discovers a lock at c + +scan r=req4 +---- +start-waiting: false + +add-discovered r=req4 k=c txn=txn2 +---- +global: num=2 + lock: "a" + res: req: 6, txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,0 + lock: "c" + holder: txn: 00000000-0000-0000-0000-000000000002, ts: 0.000000010,0 + queued writers: + active: false req: 4, txn: none + active: false req: 5, txn: 00000000-0000-0000-0000-000000000003 +local: num=0 + +scan r=req4 +---- +start-waiting: true + +scan r=req5 +---- +start-waiting: true + +guard-state r=req4 +---- +new: state=waitForDistinguished txn=txn2 ts=10 + +guard-state r=req5 +---- +new: state=waitFor txn=txn2 ts=10 + +# Release the lock. The non-transactional request does not acquire the reservation. + +release txn=txn2 span=c +---- +global: num=2 + lock: "a" + res: req: 6, txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,0 + lock: "c" + res: req: 5, txn: 00000000-0000-0000-0000-000000000003, ts: 0.000000010,0 +local: num=0 + +guard-state r=req4 +---- +new: state=doneWaiting + +guard-state r=req5 +---- +new: state=doneWaiting + +# Make all requests done. + +done r=req4 +---- +global: num=2 + lock: "a" + res: req: 6, txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,0 + lock: "c" + res: req: 5, txn: 00000000-0000-0000-0000-000000000003, ts: 0.000000010,0 +local: num=0 + +done r=req5 +---- +global: num=1 + lock: "a" + res: req: 6, txn: 00000000-0000-0000-0000-000000000001, ts: 0.000000010,0 +local: num=0 + +done r=req6 +---- +global: num=0 +local: num=0