Skip to content

Commit

Permalink
storage/concurrency: implement lockTableWaiter
Browse files Browse the repository at this point in the history
This commit builds upon the concurrency control interfaces introduced in #44787
(and is rebased on those 3 commits). It implements the counter-part to the
`lockTable` structure, the `lockTableWaiter`.

lockTableWaiter is concerned with waiting in lock wait-queues for locks held
by conflicting transactions. It ensures that waiting requests continue to
make forward progress even in the presence of faulty transaction coordinators
and transaction deadlocks.

The waiter implements logic for a request to wait on conflicting locks in the
lockTable until they are released. Similarly, it implements logic to wait on
conflicting requests ahead of the caller's request in any lock wait-queues
that it is a part of.

This waiting state responds to a set of state transitions in the lock table:
 - a conflicting lock is released
 - a conflicting lock is updated such that it no longer conflicts
 - a conflicting request in the lock wait-queue acquires the lock
 - a conflicting request in the lock wait-queue exits the lock wait-queue

These state transitions are typically reactive - the waiter can simply wait
for locks to be released or lock wait-queues to be exited by other actors.
Reacting to state transitions for conflicting locks is powered by the
lockManager and reacting to state transitions for conflicting lock
wait-queues is powered by the requestSequencer interface.

However, in the case of transaction coordinator failures or transaction
deadlocks, a state transition may never occur without intervention from the
waiter. To ensure forward-progress, the waiter may need to actively push
either a lock holder of a conflicting lock or the head of a conflicting lock
wait-queue. This active pushing requires an RPC to the leaseholder of the
conflicting transaction's record, and will typically result in the RPC
queuing in that leaseholder's txnWaitQueue. Because this can be expensive,
the push is not immediately performed. Instead, it is only performed after a
delay.

The semantics around how the lockTableWaiter interfaces with the intentresolver
are guided by the current approach in `Replica.handleWriteIntentError`. Once
we properly integrate the `concurrency.Manager` and remove calls into
`Replica.handleWriteIntentError`, we can clean up the intentresolver's
interface to be more easy to use. This will also let us delete the
`contentionQueue` entirely.
  • Loading branch information
nvanbenschoten committed Feb 12, 2020
1 parent 3ba08aa commit ce8901d
Show file tree
Hide file tree
Showing 8 changed files with 708 additions and 54 deletions.
39 changes: 27 additions & 12 deletions pkg/storage/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@ type waitingState struct {

// Populated for waitFor* and waitElsewhere type, and represents who the
// request is waiting for.
txn *enginepb.TxnMeta // always non-nil
ts hlc.Timestamp // the timestamp of the transaction that is causing the wait
access spanset.SpanAccess // Currently only SpanReadWrite.
txn *enginepb.TxnMeta // always non-nil
ts hlc.Timestamp // the timestamp of the transaction that is causing the wait
key roachpb.Key // the key of the lock that is causing the wait
held bool // is the lock currently held?
access spanset.SpanAccess // currently only SpanReadWrite
guardAccess spanset.SpanAccess // the access method of the guard
}

// Implementation
Expand All @@ -58,6 +61,8 @@ type waitingState struct {
// - proper error strings and give better explanation to all panics.
// - metrics about lockTable state to export to observability debug pages:
// number of locks, number of waiting requests, wait time?, ...
// - update waitingState.held on each waiter when locks are acquired and
// released.

// The btree for a particular SpanScope.
type treeMu struct {
Expand Down Expand Up @@ -567,11 +572,15 @@ func (l *lockState) informActiveWaiters() {
stateKind: waitFor,
txn: waitForTxn,
ts: waitForTs,
key: l.key,
held: l.holder.locked,
access: spanset.SpanReadWrite,
}
waitSelfState := waitingState{stateKind: waitSelf}

for e := l.waitingReaders.Front(); e != nil; e = e.Next() {
state := waitForState
state.guardAccess = spanset.SpanReadOnly
// Since there are waiting readers we could not have transitioned out of
// or into a state with a reservation, since readers do not wait for
// reservations.
Expand All @@ -581,7 +590,7 @@ func (l *lockState) informActiveWaiters() {
findDistinguished = false
}
g.mu.Lock()
g.mu.state = waitForState
g.mu.state = state
if l.distinguishedWaiter == g {
g.mu.state.stateKind = waitForDistinguished
}
Expand All @@ -599,6 +608,7 @@ func (l *lockState) informActiveWaiters() {
state = waitSelfState
} else {
state = waitForState
state.guardAccess = spanset.SpanReadWrite
if findDistinguished {
l.distinguishedWaiter = g
findDistinguished = false
Expand Down Expand Up @@ -813,10 +823,13 @@ func (l *lockState) tryActiveWait(g *lockTableGuardImpl, sa spanset.SpanAccess,
stateType = waitForDistinguished
}
g.mu.state = waitingState{
stateKind: stateType,
txn: waitForTxn,
ts: waitForTs,
access: spanset.SpanReadWrite,
stateKind: stateType,
txn: waitForTxn,
ts: waitForTs,
key: l.key,
held: l.holder.locked,
access: spanset.SpanReadWrite,
guardAccess: sa,
}
}
if notify {
Expand Down Expand Up @@ -1340,10 +1353,12 @@ func (l *lockState) lockIsFree() (doneWaiting []*lockTableGuardImpl, gc bool) {
// Need to tell the remaining active waiting writers who they are waiting
// for.
waitForState := waitingState{
stateKind: waitFor,
txn: g.txn,
ts: g.ts,
access: spanset.SpanReadWrite,
stateKind: waitFor,
txn: g.txn,
ts: g.ts,
key: l.key,
access: spanset.SpanReadWrite,
guardAccess: spanset.SpanReadWrite,
}
waitSelfState := waitingState{stateKind: waitSelf}
for e := l.queuedWriters.Front(); e != nil; e = e.Next() {
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/concurrency/lock_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,8 @@ func TestLockTableBasic(t *testing.T) {
if state.ts.Logical != 0 {
tsS += fmt.Sprintf(",%d", state.ts.Logical)
}
return fmt.Sprintf("%sstate=%s txn=%s ts=%s", str, typeStr, txnS, tsS)
return fmt.Sprintf("%sstate=%s txn=%s ts=%s key=%s held=%t guard-access=%s",
str, typeStr, txnS, tsS, state.key, state.held, state.guardAccess)

case "print":
return lt.(*lockTableImpl).String()
Expand Down
242 changes: 242 additions & 0 deletions pkg/storage/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package concurrency

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/intentresolver"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

// Default delay before pushing in order to detect dependency cycles.
const defaultDependencyCyclePushDelay = 100 * time.Millisecond

// Silence unused warning.
var _ = defaultDependencyCyclePushDelay

// lockTableWaiterImpl is an implementation of lockTableWaiter.
type lockTableWaiterImpl struct {
nodeID roachpb.NodeID
stopper *stop.Stopper

// Used to push conflicting transactions and resolve conflicting intents.
ir intentResolver

// How long to wait until pushing conflicting transactions to detect
// dependency cycles.
dependencyCyclePushDelay time.Duration
}

// intentResolver is an interface used by lockTableWaiterImpl to push
// transactions and to resolve intents. It contains only the subset of the
// intentresolver.IntentResolver interface that lockTableWaiterImpl needs.
type intentResolver interface {
// PushTransaction pushes the provided transaction. The method will push the
// provided pushee transaction immediately, if possible. Otherwise, it will
// block until the pushee transaction is finalized or eventually can be
// pushed successfully.
PushTransaction(
context.Context, *enginepb.TxnMeta, roachpb.Header, roachpb.PushTxnType,
) (roachpb.Transaction, *Error)

// ResolveIntent resolves the provided intent according to the options.
ResolveIntent(context.Context, roachpb.Intent, intentresolver.ResolveOptions) *Error
}

// WaitOn implements the lockTableWaiter interface.
func (w *lockTableWaiterImpl) WaitOn(
ctx context.Context, req Request, guard lockTableGuard,
) *Error {
newStateC := guard.NewStateChan()
ctxDoneC := ctx.Done()
shouldQuiesceC := w.stopper.ShouldQuiesce()
var timer *timeutil.Timer
var timerC <-chan time.Time
var timerWaitingState waitingState
for {
select {
case <-newStateC:
timerC = nil
state := guard.CurState()
switch state.stateKind {
case waitFor:
// waitFor indicates that the request is waiting on another
// transaction. This transaction may be the lock holder of a
// conflicting lock or the head of a lock-wait queue that the
// request is a part of.

// For non-transactional requests, there's no need to perform
// deadlock detection and the other "distinguished" (see below)
// pusher will already push to detect coordinator failures and
// abandoned locks, so there's no need to do anything in this
// state.
if req.Txn == nil {
continue
}

// For transactional requests, the request should push to
// resolve this conflict and detect deadlocks, but only after
// delay. This delay avoids unnecessary push traffic when the
// conflicting transaction is continuing to make forward
// progress.
delay := w.dependencyCyclePushDelay
if hasMinPriority(state.txn) || hasMaxPriority(&req.Txn.TxnMeta) {
// However, if the pushee has the minimum priority or if the
// pusher has the maximum priority, push immediately.
delay = 0
}
if timer == nil {
timer = timeutil.NewTimer()
defer timer.Stop()
}
timer.Reset(delay)
timerC = timer.C
timerWaitingState = state

case waitForDistinguished:
// waitForDistinguished is like waitFor, except it instructs the
// waiter to immediately push the conflicting transaction instead of
// first waiting out the dependencyCyclePushDelay. The lockTable
// guarantees that there is always at least one request in the
// waitForDistinguished state for each lock that has any waiters.
//
// The purpose of the waitForDistinguished state is to avoid adding
// a delay of dependencyCyclePushDelay to the process of recovering
// from the failure of a transaction coordinator for *each* of that
// transaction's previously written intents. If we had a cache of
// aborted transaction IDs that allowed us to notice and immediately
// resolve abandoned intents then we might be able to get rid of
// this state.
if err := w.pushTxn(ctx, req, state); err != nil {
return err
}

case waitElsewhere:
// The lockTable has hit a memory limit and is no longer maintaining
// proper lock wait-queues. However, the waiting request is still
// not safe to proceed with evaluation because there is still a
// transaction holding the lock. It should push the transaction it
// is blocked on immediately to wait in that transaction's
// txnWaitQueue. Once this completes, the request should stop
// waiting on this lockTableGuard, as it will no longer observe
// lock-table state transitions.
return w.pushTxn(ctx, req, state)

case waitSelf:
// Another request from the same transaction is the reservation
// holder of this lock wait-queue. This can only happen when the
// request's transaction is sending multiple requests concurrently.
// Proceed with waiting without pushing anyone.

case doneWaiting:
// The request has waited for all conflicting locks to be released
// and is at the front of any lock wait-queues. It can now stop
// waiting, re-acquire latches, and check the lockTable again for
// any new conflicts. If it find none, it can proceed with
// evaluation.
return nil

default:
panic("unexpected waiting state")
}

case <-timerC:
// If the transactional request was in the waitFor state and did not
// observe any update to its state for a dependencyCyclePushDelay,
// it should push. It may be the case that the transaction is part
// of a dependency cycle.
timerC = nil
timer.Read = true
if err := w.pushTxn(ctx, req, timerWaitingState); err != nil {
return err
}

case <-ctxDoneC:
return roachpb.NewError(ctx.Err())

case <-shouldQuiesceC:
return roachpb.NewError(&roachpb.NodeUnavailableError{})
}
}
}

func (w *lockTableWaiterImpl) pushTxn(ctx context.Context, req Request, ws waitingState) *Error {
h := roachpb.Header{
Timestamp: req.Timestamp,
UserPriority: req.Priority,
}
if req.Txn != nil {
// We are going to hand the header (and thus the transaction proto)
// to the RPC framework, after which it must not be changed (since
// that could race). Since the subsequent execution of the original
// request might mutate the transaction, make a copy here.
//
// See #9130.
h.Txn = req.Txn.Clone()

// We must push at least to h.Timestamp, but in fact we want to
// go all the way up to a timestamp which was taken off the HLC
// after our operation started. This allows us to not have to
// restart for uncertainty as we come back and read.
obsTS, ok := h.Txn.GetObservedTimestamp(w.nodeID)
if !ok {
// This was set earlier, so it's completely unexpected to
// not be found now.
return roachpb.NewErrorf("missing observed timestamp: %+v", h.Txn)
}
h.Timestamp.Forward(obsTS)
}

var pushType roachpb.PushTxnType
switch ws.guardAccess {
case spanset.SpanReadOnly:
pushType = roachpb.PUSH_TIMESTAMP
case spanset.SpanReadWrite:
pushType = roachpb.PUSH_ABORT
}

pusheeTxn, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType)
if err != nil {
return err
}
if !ws.held {
return nil
}

// We always poison due to limitations of the API: not poisoning equals
// clearing the AbortSpan, and if our pushee transaction first got pushed
// for timestamp (by us), then (by someone else) aborted and poisoned, and
// then we run the below code, we're clearing the AbortSpan illegaly.
// Furthermore, even if our pushType is not PUSH_ABORT, we may have ended
// up with the responsibility to abort the intents (for example if we find
// the transaction aborted).
//
// To do better here, we need per-intent information on whether we need to
// poison.
resolveIntent := roachpb.Intent{Span: roachpb.Span{Key: ws.key}}
resolveIntent.SetTxn(&pusheeTxn)
opts := intentresolver.ResolveOptions{Wait: false, Poison: true}
return w.ir.ResolveIntent(ctx, resolveIntent, opts)
}

func hasMinPriority(txn *enginepb.TxnMeta) bool {
return txn.Priority == enginepb.MinTxnPriority
}

func hasMaxPriority(txn *enginepb.TxnMeta) bool {
return txn.Priority == enginepb.MaxTxnPriority
}
Loading

0 comments on commit ce8901d

Please sign in to comment.