Skip to content

Commit

Permalink
Merge #44975
Browse files Browse the repository at this point in the history
44975: storage/concurrency: add support for non-transactional writes to r=sumeerbhola a=sumeerbhola

lockTable.
And updated the datadriven and randomized tests to exercise
non-transactional writes.

Release note: None

Co-authored-by: sumeerbhola <sumeer@cockroachlabs.com>
  • Loading branch information
craig[bot] and sumeerbhola committed Feb 12, 2020
2 parents f24a014 + fff3e0c commit 79bf7ec
Show file tree
Hide file tree
Showing 4 changed files with 417 additions and 42 deletions.
4 changes: 3 additions & 1 deletion pkg/storage/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ type MetricExporter interface {
// other in-flight requests it conflicts with.
type Request struct {
// The (optional) transaction that sent the request.
// Non-transactional requests do not acquire locks.
Txn *roachpb.Transaction

// The timestamp that the request should evaluate at.
Expand Down Expand Up @@ -364,7 +365,8 @@ type latchGuard interface{}
// based on multiple versions requires some form of mutual exclusion to ensure
// that a read and a conflicting lock acquisition do not happen concurrently.
// The lock table provides both locking and sequencing of requests (in concert
// with the use of latches).
// with the use of latches). The lock table sequences both transactional and
// non-transactional requests, but the latter cannot acquire locks.
//
// Locks outlive the requests themselves and thereby extend the duration of the
// isolation provided over specific keys to the lifetime of the lock-holder
Expand Down
134 changes: 102 additions & 32 deletions pkg/storage/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,10 @@ func (g *lockTableGuardImpl) notify() {
}
}

func (g *lockTableGuardImpl) isTxn(txn *enginepb.TxnMeta) bool {
return g.txn != nil && g.txn.ID == txn.ID
}

// Waiting writers in a lockState are wrapped in a queuedGuard. A waiting
// writer is typically waiting in an active state, i.e., the
// lockTableGuardImpl.key refers to this lockState. However, breaking of
Expand Down Expand Up @@ -395,6 +399,25 @@ type lockWaitQueue struct {
// the behavior will extend when we start supporting Shared and Upgrade
// locks.
//
// Non-transactional requests can do both reads and writes but cannot be
// depended on since they don't have a transaction that can be pushed.
// Therefore they not only do not acquire locks, but cannot make reservations.
// The non-reservation for reads is already covered in the previous
// paragraph. For non-transactional writes, the request waits in the queue
// with other writers. The difference occurs:
// - when it gets to the front of the queue and there is no lock holder
// or reservation: instead of acquiring the reservation it removes
// itself from the lockState and proceeds to the next lock. If it
// does not need to wait for any more locks and manages to acquire
// latches before those locks are acquired by some other request, it
// will evaluate.
// - when deciding to wait at a lock: if the lock has a reservation with
// a sequence num higher than this non-transactional request it will
// ignore that reservation. Note that ignoring such reservations is
// safe since when this non-transactional request is holding latches
// those reservation holders cannot be holding latches, so they cannot
// conflict.
//
// Multiple requests from the same transaction wait independently, including
// the situation where one of the requests has a reservation and the other
// is waiting (currently this can only happen if both requests are doing
Expand Down Expand Up @@ -534,7 +557,8 @@ func (l *lockState) informActiveWaiters() {
checkForWaitSelf = true
waitForTxn = l.reservation.txn
waitForTs = l.reservation.ts
if !findDistinguished && l.distinguishedWaiter.txn.ID == waitForTxn.ID {
if !findDistinguished && l.distinguishedWaiter.txn != nil &&
l.distinguishedWaiter.txn.ID == waitForTxn.ID {
findDistinguished = true
l.distinguishedWaiter = nil
}
Expand Down Expand Up @@ -571,7 +595,7 @@ func (l *lockState) informActiveWaiters() {
}
g := qg.guard
var state waitingState
if checkForWaitSelf && waitForTxn.ID == g.txn.ID {
if checkForWaitSelf && g.isTxn(waitForTxn) {
state = waitSelfState
} else {
state = waitForState
Expand Down Expand Up @@ -601,7 +625,8 @@ func (l *lockState) tryMakeNewDistinguished() {
} else if l.queuedWriters.Len() > 0 {
for e := l.queuedWriters.Front(); e != nil; e = e.Next() {
qg := e.Value.(*queuedGuard)
if qg.active && (l.reservation == nil || l.reservation.txn.ID != qg.guard.txn.ID) {
if qg.active &&
(l.reservation == nil || qg.guard.txn == nil || l.reservation.txn.ID != qg.guard.txn.ID) {
g = qg.guard
break
}
Expand Down Expand Up @@ -678,7 +703,7 @@ func (l *lockState) tryActiveWait(g *lockTableGuardImpl, sa spanset.SpanAccess,

// Lock is not empty.
waitForTxn, waitForTs := l.getLockerInfo()
if waitForTxn != nil && g.txn != nil && g.txn.ID == waitForTxn.ID {
if waitForTxn != nil && g.isTxn(waitForTxn) {
// Already locked by this txn.
return false
}
Expand All @@ -691,7 +716,16 @@ func (l *lockState) tryActiveWait(g *lockTableGuardImpl, sa spanset.SpanAccess,
}
waitForTxn = l.reservation.txn
waitForTs = l.reservation.ts
reservedBySelfTxn = g.txn != nil && g.txn.ID == waitForTxn.ID
reservedBySelfTxn = g.isTxn(waitForTxn)
// A non-transactional write request never makes or breaks reservations,
// and only waits for a reservation if the reservation has a lower seqNum.
// For reads, the non-transactional and transactional behavior is
// equivalent and handled later in this function.
if g.txn == nil && sa == spanset.SpanReadWrite && l.reservation.seqNum > g.seqNum {
// Reservation is held by a request with a higher seqNum and g is a
// non-transactional request. Ignore the reservation.
return false
}
}

if sa == spanset.SpanReadOnly {
Expand Down Expand Up @@ -889,7 +923,7 @@ func (l *lockState) acquireLock(
curr := e
e = e.Next()
g := qg.guard
if g.txn.ID == txn.ID {
if g.isTxn(txn) {
if qg.active {
doneWaiting = append(doneWaiting, g)
if g == l.distinguishedWaiter {
Expand Down Expand Up @@ -936,16 +970,20 @@ func (l *lockState) discoveredLock(
l.holder.holder[lock.Replicated].txn = txn
l.holder.holder[lock.Replicated].ts = ts
}
// Queue the existing reservation holder.
var hadReservation bool

g.mu.Lock()
_, presentHere := g.mu.locks[l]
if !presentHere {
g.mu.locks[l] = struct{}{}
}
g.mu.Unlock()

// Queue the existing reservation holder. Note that this reservation
// holder may not be equal to g due to two reasons (a) the reservation
// of g could have been broken even though g is holding latches (see
// the comment in acquireLock()), (b) g may be a non-transactional
// request (read or write) that can ignore the reservation.
if l.reservation != nil {
if l.reservation == g {
hadReservation = true
} else if sa == spanset.SpanReadWrite {
// There was a reservation and it was not this request -- this is wrong
// since this request should not have evaluated and discovered this lock.
return errors.Errorf("caller violated contract")
}
qg := &queuedGuard{
guard: l.reservation,
active: false,
Expand All @@ -959,22 +997,25 @@ func (l *lockState) discoveredLock(
informWaiters = false
}

if !hadReservation && sa == spanset.SpanReadWrite {
// Put self in queue as inactive waiter. Since did not have the
// reservation the lock must not have been known to be held so the queue
// must be empty.
if l.queuedWriters.Len() > 0 {
panic("lockTable bug")
}
if !presentHere && sa == spanset.SpanReadWrite {
// Put self in queue as inactive waiter.
qg := &queuedGuard{
guard: g,
active: false,
}
l.queuedWriters.PushFront(qg)
g.mu.Lock()
g.mu.locks[l] = struct{}{}
g.mu.Unlock()
informWaiters = false
// g is not necessarily first in the queue in the (rare) case (a) above.
var e *list.Element
for e = l.queuedWriters.Front(); e != nil; e = e.Next() {
qqg := e.Value.(*queuedGuard)
if qqg.guard.seqNum > g.seqNum {
break
}
}
if e == nil {
l.queuedWriters.PushBack(qg)
} else {
l.queuedWriters.InsertBefore(qg, e)
}
}

// Active waiters need to be told about who they are waiting for.
Expand Down Expand Up @@ -1251,10 +1292,34 @@ func (l *lockState) lockIsFree() (doneWaiting []*lockTableGuardImpl, gc bool) {
g.mu.Unlock()
doneWaiting = append(doneWaiting, g)
}

// The prefix of the queue that is non-transactional writers is done
// waiting.
for e := l.queuedWriters.Front(); e != nil; {
qg := e.Value.(*queuedGuard)
g := qg.guard
if g.txn == nil {
curr := e
e = e.Next()
l.queuedWriters.Remove(curr)
if g == l.distinguishedWaiter {
findDistinguished = true
l.distinguishedWaiter = nil
}
g.mu.Lock()
delete(g.mu.locks, l)
g.mu.Unlock()
doneWaiting = append(doneWaiting, g)
} else {
break
}
}

if l.queuedWriters.Len() == 0 {
return doneWaiting, true
}
// First waiting writer gets the reservation.

// First waiting writer (it must be transactional) gets the reservation.
e := l.queuedWriters.Front()
qg := e.Value.(*queuedGuard)
g := qg.guard
Expand All @@ -1267,7 +1332,7 @@ func (l *lockState) lockIsFree() (doneWaiting []*lockTableGuardImpl, gc bool) {

// Need to find a new distinguished waiter if the current distinguished is
// from the same transaction (possibly it is the request that has reserved).
if l.distinguishedWaiter != nil && l.distinguishedWaiter.txn.ID == g.txn.ID {
if l.distinguishedWaiter != nil && l.distinguishedWaiter.isTxn(l.reservation.txn) {
findDistinguished = true
l.distinguishedWaiter = nil
}
Expand All @@ -1286,7 +1351,7 @@ func (l *lockState) lockIsFree() (doneWaiting []*lockTableGuardImpl, gc bool) {
if qg.active {
g := qg.guard
var state waitingState
if g.txn.ID == l.reservation.txn.ID {
if g.isTxn(l.reservation.txn) {
state = waitSelfState
} else {
state = waitForState
Expand Down Expand Up @@ -1699,8 +1764,13 @@ func (t *lockTableImpl) String() string {
fmt.Fprintln(&buf, " queued writers:")
for e := l.queuedWriters.Front(); e != nil; e = e.Next() {
qg := e.Value.(*queuedGuard)
fmt.Fprintf(&buf, " active: %t req: %d, txn: %v\n",
qg.active, qg.guard.seqNum, qg.guard.txn.ID)
g := qg.guard
txnStr := "none"
if g.txn != nil {
txnStr = fmt.Sprintf("%v", g.txn.ID)
}
fmt.Fprintf(&buf, " active: %t req: %d, txn: %s\n",
qg.active, qg.guard.seqNum, txnStr)
}
}
if l.distinguishedWaiter != nil {
Expand Down
14 changes: 5 additions & 9 deletions pkg/storage/concurrency/lock_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,22 +810,18 @@ func TestLockTableConcurrentSingleRequests(t *testing.T) {
var items []workloadItem
var startedTxnIDs []uuid.UUID // inefficient queue, but ok for a test.
const maxStartedTxns = 10
const numRequests = 5000
const numRequests = 10000
for i := 0; i < numRequests; i++ {
ts := timestamps[rng.Intn(len(timestamps))]
keysPerm := rng.Perm(len(keys))
spans := &spanset.SpanSet{}
onlyReads := true
for i := 0; i < numKeys; i++ {
span := roachpb.Span{Key: keys[keysPerm[i]]}
acc := spanset.SpanAccess(rng.Intn(int(spanset.NumSpanAccess)))
spans.AddMVCC(acc, span, ts)
if acc != spanset.SpanReadOnly {
onlyReads = false
}
}
var txn *roachpb.Transaction
if !onlyReads || rng.Intn(2) == 0 {
if rng.Intn(2) == 0 {
txn = &roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{
ID: nextUUID(&txnCounter),
Expand Down Expand Up @@ -882,7 +878,7 @@ func TestLockTableConcurrentRequests(t *testing.T) {
const numActiveTxns = 8
var activeTxns [numActiveTxns]*enginepb.TxnMeta
var items []workloadItem
const numRequests = 5000
const numRequests = 20000
for i := 0; i < numRequests; i++ {
var txnMeta *enginepb.TxnMeta
var ts hlc.Timestamp
Expand All @@ -909,7 +905,7 @@ func TestLockTableConcurrentRequests(t *testing.T) {
}
keysPerm := rng.Perm(len(keys))
spans := &spanset.SpanSet{}
onlyReads := txnMeta == nil
onlyReads := txnMeta == nil && rng.Intn(2) != 0
numKeys := rng.Intn(len(keys)-1) + 1
request := &Request{
Timestamp: ts,
Expand All @@ -924,7 +920,7 @@ func TestLockTableConcurrentRequests(t *testing.T) {
acc := spanset.SpanReadOnly
if !onlyReads {
acc = spanset.SpanAccess(rng.Intn(int(spanset.NumSpanAccess)))
if acc == spanset.SpanReadWrite && rng.Intn(2) == 0 {
if acc == spanset.SpanReadWrite && txnMeta != nil && rng.Intn(2) == 0 {
wi.locksToAcquire = append(wi.locksToAcquire, span.Key)
}
}
Expand Down
Loading

0 comments on commit 79bf7ec

Please sign in to comment.