Skip to content

Commit

Permalink
storage/concurrency: release reqs from same txn as discovered lock
Browse files Browse the repository at this point in the history
This commit updates the AddDiscoveredLock state transition in lockTableImpl
to release all waiting writers that are part of the same transaction as a
discovered lock.

This caused issues in cockroachdb#45482, where we saw txn deadlocks in the
`pkg/sql/tests.Bank` benchmark. This issue was triggered because we were
forgetting to inform the lockTable of a lock acquisition in a subtle case.
That is now fixed and it's not clear that we can actually hit the bug here
anymore given the current policy on how wait-queues form in the lock-table.
Regardless, this is worth handling correctly in case things ever change.
  • Loading branch information
nvanbenschoten committed Mar 3, 2020
1 parent bbbb26b commit e8faf7c
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 90 deletions.
181 changes: 92 additions & 89 deletions pkg/storage/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,12 +632,70 @@ type lockWaitQueue struct {
func (l *lockState) ID() uint64 { return l.id }
func (l *lockState) Key() []byte { return l.key }
func (l *lockState) EndKey() []byte { return l.endKey }
func (l *lockState) String() string { return string(l.key) }
func (l *lockState) New() *lockState { return new(lockState) }
func (l *lockState) SetID(v uint64) { l.id = v }
func (l *lockState) SetKey(v []byte) { l.key = v }
func (l *lockState) SetEndKey(v []byte) { l.endKey = v }

// REQUIRES: l.mu is locked.
func (l *lockState) String() string {
var buf strings.Builder
l.Format(&buf)
return buf.String()
}

// REQUIRES: l.mu is locked.
func (l *lockState) Format(buf *strings.Builder) {
fmt.Fprintf(buf, " lock: %s\n", l.key)
if l.isEmptyLock() {
fmt.Fprintln(buf, " empty")
return
}
waitingOnStr := func(txn *enginepb.TxnMeta, ts hlc.Timestamp) string {
// TODO(sbhola): strip the leading 0 bytes from the UUID string since tests are assigning
// UUIDs using a counter and makes this output more readable.
var seqStr string
if txn.Sequence != 0 {
seqStr = fmt.Sprintf(", seq: %v", txn.Sequence)
}
return fmt.Sprintf("txn: %v, ts: %v%s", txn.ID, ts, seqStr)
}
txn, ts, _ := l.getLockerInfo()
if txn == nil {
fmt.Fprintf(buf, " res: req: %d, %s\n",
l.reservation.seqNum, waitingOnStr(l.reservation.txn, l.reservation.writeTS))
} else {
fmt.Fprintf(buf, " holder: %s\n", waitingOnStr(txn, ts))
}
if l.waitingReaders.Len() > 0 {
fmt.Fprintln(buf, " waiting readers:")
for e := l.waitingReaders.Front(); e != nil; e = e.Next() {
g := e.Value.(*lockTableGuardImpl)
txnStr := "none"
if g.txn != nil {
txnStr = fmt.Sprintf("%v", g.txn.ID)
}
fmt.Fprintf(buf, " req: %d, txn: %s\n", g.seqNum, txnStr)
}
}
if l.queuedWriters.Len() > 0 {
fmt.Fprintln(buf, " queued writers:")
for e := l.queuedWriters.Front(); e != nil; e = e.Next() {
qg := e.Value.(*queuedGuard)
g := qg.guard
txnStr := "none"
if g.txn != nil {
txnStr = fmt.Sprintf("%v", g.txn.ID)
}
fmt.Fprintf(buf, " active: %t req: %d, txn: %s\n",
qg.active, qg.guard.seqNum, txnStr)
}
}
if l.distinguishedWaiter != nil {
fmt.Fprintf(buf, " distinguished req: %d\n", l.distinguishedWaiter.seqNum)
}
}

// Called for a write request when there is a reservation. Returns true iff it
// succeeds.
// REQUIRES: l.mu is locked.
Expand Down Expand Up @@ -728,6 +786,31 @@ func (l *lockState) informActiveWaiters() {
}
}

// releaseWritersFromTxn removes all waiting writers for the lockState that are
// part of the specified transaction.
// REQUIRES: l.mu is locked.
func (l *lockState) releaseWritersFromTxn(txn *enginepb.TxnMeta) {
for e := l.queuedWriters.Front(); e != nil; {
qg := e.Value.(*queuedGuard)
curr := e
e = e.Next()
g := qg.guard
if g.isTxn(txn) {
if qg.active {
if g == l.distinguishedWaiter {
l.distinguishedWaiter = nil
}
g.doneWaitingAtLock(false, l)
} else {
g.mu.Lock()
delete(g.mu.locks, l)
g.mu.Unlock()
}
l.queuedWriters.Remove(curr)
}
}
}

// When the active waiters have shrunk and the distinguished waiter has gone,
// try to make a new distinguished waiter if there is at least 1 active
// waiter.
Expand Down Expand Up @@ -1010,11 +1093,9 @@ func (l *lockState) acquireLock(
// possibility that some other request has broken this reservation because
// of a concurrent release but that is harmless since this request is
// holding latches and has proceeded to evaluation.
brokeReservation := false
if l.reservation != nil {
if l.reservation.txn.ID != txn.ID {
// Reservation is broken.
brokeReservation = true
qg := &queuedGuard{
guard: l.reservation,
active: false,
Expand Down Expand Up @@ -1046,29 +1127,7 @@ func (l *lockState) acquireLock(
l.holder.holder[durability].seqs = append([]enginepb.TxnSeq(nil), txn.Sequence)

// If there are waiting requests from the same txn, they no longer need to wait.
for e := l.queuedWriters.Front(); e != nil; {
qg := e.Value.(*queuedGuard)
curr := e
e = e.Next()
g := qg.guard
if g.isTxn(txn) {
if qg.active {
if g == l.distinguishedWaiter {
if brokeReservation {
l.distinguishedWaiter = nil
} else {
panic("lockTable bug")
}
}
g.doneWaitingAtLock(false, l)
} else {
g.mu.Lock()
delete(g.mu.locks, l)
g.mu.Unlock()
}
l.queuedWriters.Remove(curr)
}
}
l.releaseWritersFromTxn(txn)

// Inform active waiters since lock has transitioned to held.
l.informActiveWaiters()
Expand All @@ -1084,7 +1143,6 @@ func (l *lockState) discoveredLock(
l.mu.Lock()
defer l.mu.Unlock()

informWaiters := true
if l.holder.locked {
if !l.isLockedBy(txn.ID) {
panic("bug in caller or lockTable")
Expand All @@ -1093,7 +1151,6 @@ func (l *lockState) discoveredLock(
l.holder.holder[lock.Replicated].txn = txn
l.holder.holder[lock.Replicated].ts = ts
}
informWaiters = false
} else {
l.holder.locked = true
l.holder.holder[lock.Replicated].txn = txn
Expand All @@ -1112,11 +1169,6 @@ func (l *lockState) discoveredLock(
}
l.queuedWriters.PushFront(qg)
l.reservation = nil
} else {
// No reservation, so either the lock was already known to be held, in
// which case the active waiters know about the holder, or it was not held
// and so there are no active waiters.
informWaiters = false
}

switch sa {
Expand Down Expand Up @@ -1164,10 +1216,11 @@ func (l *lockState) discoveredLock(
}
}

// If there are waiting requests from the same txn, they no longer need to wait.
l.releaseWritersFromTxn(txn)

// Active waiters need to be told about who they are waiting for.
if informWaiters {
l.informActiveWaiters()
}
l.informActiveWaiters()
return nil
}

Expand Down Expand Up @@ -1779,57 +1832,6 @@ func (t *lockTableImpl) Clear() {
// For tests.
func (t *lockTableImpl) String() string {
var buf strings.Builder
waitingOnStr := func(txn *enginepb.TxnMeta, ts hlc.Timestamp) string {
// TODO(sbhola): strip the leading 0 bytes from the UUID string since tests are assigning
// UUIDs using a counter and makes this output more readable.
var seqStr string
if txn.Sequence != 0 {
seqStr = fmt.Sprintf(", seq: %v", txn.Sequence)
}
return fmt.Sprintf("txn: %v, ts: %v%s", txn.ID, ts, seqStr)
}
lockStateStrings := func(l *lockState) {
l.mu.Lock()
defer l.mu.Unlock()
if l.isEmptyLock() {
fmt.Fprintln(&buf, "empty")
return
}
txn, ts, _ := l.getLockerInfo()
if txn == nil {
fmt.Fprintf(&buf, " res: req: %d, %s\n",
l.reservation.seqNum, waitingOnStr(l.reservation.txn, l.reservation.writeTS))
} else {
fmt.Fprintf(&buf, " holder: %s\n", waitingOnStr(txn, ts))
}
if l.waitingReaders.Len() > 0 {
fmt.Fprintln(&buf, " waiting readers:")
for e := l.waitingReaders.Front(); e != nil; e = e.Next() {
g := e.Value.(*lockTableGuardImpl)
txnStr := "none"
if g.txn != nil {
txnStr = fmt.Sprintf("%v", g.txn.ID)
}
fmt.Fprintf(&buf, " req: %d, txn: %s\n", g.seqNum, txnStr)
}
}
if l.queuedWriters.Len() > 0 {
fmt.Fprintln(&buf, " queued writers:")
for e := l.queuedWriters.Front(); e != nil; e = e.Next() {
qg := e.Value.(*queuedGuard)
g := qg.guard
txnStr := "none"
if g.txn != nil {
txnStr = fmt.Sprintf("%v", g.txn.ID)
}
fmt.Fprintf(&buf, " active: %t req: %d, txn: %s\n",
qg.active, qg.guard.seqNum, txnStr)
}
}
if l.distinguishedWaiter != nil {
fmt.Fprintf(&buf, " distinguished req: %d\n", l.distinguishedWaiter.seqNum)
}
}
for i := 0; i < len(t.locks); i++ {
tree := &t.locks[i]
scope := spanset.SpanScope(i).String()
Expand All @@ -1838,8 +1840,9 @@ func (t *lockTableImpl) String() string {
iter := tree.MakeIter()
for iter.First(); iter.Valid(); iter.Next() {
l := iter.Cur()
fmt.Fprintf(&buf, " lock: %s\n", l.key)
lockStateStrings(l)
l.mu.Lock()
l.Format(&buf)
l.mu.Unlock()
}
tree.mu.RUnlock()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/concurrency/lock_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
Test needs to handle caller constraints wrt latches being held. The datadriven
test uses the following format:
new-locktable maxlocks=<int>
new-lock-table maxlocks=<int>
----
Creates a lockTable.
Expand Down
Loading

0 comments on commit e8faf7c

Please sign in to comment.