Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage/concurrency: push reservation holders to detect deadlocks #45567

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/storage/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ type lockTableGuard interface {
// have an initial notification. Note that notifications are collapsed if
// not retrieved, since it is not necessary for the waiter to see every
// state transition.
NewStateChan() <-chan struct{}
NewStateChan() chan struct{}

// CurState returns the latest waiting state.
CurState() waitingState
Expand Down
177 changes: 149 additions & 28 deletions pkg/storage/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand All @@ -40,7 +39,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uint128"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -189,7 +187,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
log.Event(ctx, "sequencing complete, returned no guard")
}
})
return mon.waitAndCollect(t)
return c.waitAndCollect(t, mon)

case "finish":
var reqName string
Expand All @@ -207,7 +205,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
delete(c.guardsByReqName, reqName)
c.mu.Unlock()
})
return mon.waitAndCollect(t)
return c.waitAndCollect(t, mon)

case "handle-write-intent-error":
var reqName string
Expand Down Expand Up @@ -235,7 +233,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
log.Eventf(ctx, "handling %v", err)
guard = m.HandleWriterIntentError(ctx, guard, err)
})
return mon.waitAndCollect(t)
return c.waitAndCollect(t, mon)

case "on-lock-acquired":
var txnName string
Expand All @@ -254,7 +252,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
up := roachpb.MakeLockUpdateWithDur(txn, span, lock.Unreplicated)
m.OnLockAcquired(ctx, &up)
})
return mon.waitAndCollect(t)
return c.waitAndCollect(t, mon)

case "on-txn-updated":
var txnName string
Expand Down Expand Up @@ -292,7 +290,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
d.Fatalf(t, err.Error())
}
})
return mon.waitAndCollect(t)
return c.waitAndCollect(t, mon)

case "debug-latch-manager":
global, local := m.LatchMetrics()
Expand All @@ -313,6 +311,10 @@ func TestConcurrencyManagerBasic(t *testing.T) {
if err := c.reset(); err != nil {
d.Fatalf(t, "could not reset cluster: %v", err)
}
// Reset request and txn namespace?
if d.HasArg("namespace") {
c.resetNamespace()
}
return ""

default:
Expand All @@ -333,24 +335,30 @@ type cluster struct {
m concurrency.Manager

// Definitions.
txnCounter uint128.Uint128
txnCounter uint32
txnsByName map[string]*roachpb.Transaction
requestsByName map[string]concurrency.Request

// Request state. Cleared on reset.
mu syncutil.Mutex
guardsByReqName map[string]*concurrency.Guard
txnRecords map[uuid.UUID]*txnRecord
txnPushes map[uuid.UUID]*txnPush
}

type txnRecord struct {
mu syncutil.Mutex
cond sync.Cond
sig chan struct{}
txn *roachpb.Transaction // immutable, modify fields below
updatedStatus roachpb.TransactionStatus
updatedTimestamp hlc.Timestamp
}

type txnPush struct {
ctx context.Context
pusher, pushee uuid.UUID
}

func newCluster() *cluster {
return &cluster{
nodeDesc: &roachpb.NodeDescriptor{NodeID: 1},
Expand All @@ -360,13 +368,14 @@ func newCluster() *cluster {
requestsByName: make(map[string]concurrency.Request),
guardsByReqName: make(map[string]*concurrency.Guard),
txnRecords: make(map[uuid.UUID]*txnRecord),
txnPushes: make(map[uuid.UUID]*txnPush),
}
}

func (c *cluster) makeConfig() concurrency.Config {
st := clustersettings.MakeTestingClusterSettings()
concurrency.LockTableLivenessPushDelay.Override(&st.SV, 1*time.Millisecond)
concurrency.LockTableDeadlockDetectionPushDelay.Override(&st.SV, 1*time.Millisecond)
concurrency.LockTableLivenessPushDelay.Override(&st.SV, 0*time.Millisecond)
concurrency.LockTableDeadlockDetectionPushDelay.Override(&st.SV, 0*time.Millisecond)
return concurrency.Config{
NodeDesc: c.nodeDesc,
RangeDesc: c.rangeDesc,
Expand All @@ -380,16 +389,28 @@ func (c *cluster) makeConfig() concurrency.Config {
func (c *cluster) PushTransaction(
ctx context.Context, pushee *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType,
) (roachpb.Transaction, *roachpb.Error) {
log.Eventf(ctx, "pushing txn %s", pushee.ID)
log.Eventf(ctx, "pushing txn %s", pushee.ID.Short())
pusheeRecord, err := c.getTxnRecord(pushee.ID)
if err != nil {
return roachpb.Transaction{}, roachpb.NewError(err)
}
// Wait for the transaction to be pushed.
pusheeRecord.mu.Lock()
defer pusheeRecord.mu.Unlock()
var pusherRecord *txnRecord
if h.Txn != nil {
pusherID := h.Txn.ID
pusherRecord, err = c.getTxnRecord(pusherID)
if err != nil {
return roachpb.Transaction{}, roachpb.NewError(err)
}

push, err := c.registerPush(ctx, pusherID, pushee.ID)
if err != nil {
return roachpb.Transaction{}, roachpb.NewError(err)
}
defer c.unregisterPush(push)
}
for {
pusheeTxn := pusheeRecord.asTxnLocked()
// Is the pushee pushed?
pusheeTxn, pusheeRecordSig := pusheeRecord.asTxn()
var pushed bool
switch pushType {
case roachpb.PUSH_TIMESTAMP:
Expand All @@ -402,15 +423,32 @@ func (c *cluster) PushTransaction(
if pushed {
return pusheeTxn, nil
}
pusheeRecord.cond.Wait()
// Or the pusher aborted?
var pusherRecordSig chan struct{}
if pusherRecord != nil {
var pusherTxn roachpb.Transaction
pusherTxn, pusherRecordSig = pusherRecord.asTxn()
if pusherTxn.Status == roachpb.ABORTED {
log.Eventf(ctx, "detected pusher aborted")
err := roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_PUSHER_ABORTED)
return roachpb.Transaction{}, roachpb.NewError(err)
}
}
// Wait until either record is updated.
select {
case <-pusheeRecordSig:
case <-pusherRecordSig:
case <-ctx.Done():
return roachpb.Transaction{}, roachpb.NewError(ctx.Err())
}
}
}

// ResolveIntent implements the concurrency.IntentResolver interface.
func (c *cluster) ResolveIntent(
ctx context.Context, intent roachpb.LockUpdate, _ intentresolver.ResolveOptions,
) *roachpb.Error {
log.Eventf(ctx, "resolving intent %s for txn %s with %s status", intent.Key, intent.Txn.ID, intent.Status)
log.Eventf(ctx, "resolving intent %s for txn %s with %s status", intent.Key, intent.Txn.ID.Short(), intent.Status)
c.m.OnLockUpdated(ctx, &intent)
return nil
}
Expand All @@ -425,8 +463,7 @@ func (c *cluster) registerTxn(name string, txn *roachpb.Transaction) {
c.mu.Lock()
defer c.mu.Unlock()
c.txnsByName[name] = txn
r := &txnRecord{txn: txn}
r.cond.L = &r.mu
r := &txnRecord{txn: txn, sig: make(chan struct{})}
c.txnRecords[txn.ID] = r
}

Expand All @@ -453,17 +490,89 @@ func (c *cluster) updateTxnRecord(
defer r.mu.Unlock()
r.updatedStatus = status
r.updatedTimestamp = ts
r.cond.Broadcast()
// Notify all listeners. This is a poor man's composable cond var.
close(r.sig)
r.sig = make(chan struct{})
return nil
}

func (r *txnRecord) asTxnLocked() roachpb.Transaction {
func (r *txnRecord) asTxn() (roachpb.Transaction, chan struct{}) {
r.mu.Lock()
defer r.mu.Unlock()
txn := *r.txn
if r.updatedStatus > txn.Status {
txn.Status = r.updatedStatus
}
txn.WriteTimestamp.Forward(r.updatedTimestamp)
return txn
return txn, r.sig
}

func (c *cluster) registerPush(ctx context.Context, pusher, pushee uuid.UUID) (*txnPush, error) {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.txnPushes[pusher]; ok {
return nil, errors.Errorf("txn %v already pushing", pusher)
}
p := &txnPush{
ctx: ctx,
pusher: pusher,
pushee: pushee,
}
c.txnPushes[pusher] = p
return p, nil
}

func (c *cluster) unregisterPush(push *txnPush) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.txnPushes, push.pusher)
}

// detectDeadlocks looks at all in-flight transaction pushes and determines
// whether any are blocked due to dependency cycles within transactions. If so,
// the method logs an event on the contexts of each of the members of the cycle.
func (c *cluster) detectDeadlocks() {
// This cycle detection algorithm it not particularly efficient - at worst
// it runs in O(n ^ 2) time. However, it's simple and effective at assigning
// each member of each cycle a unique view of the cycle that it's a part of.
// This works because we currently only allow a transaction to push a single
// other transaction at a time.
c.mu.Lock()
defer c.mu.Unlock()
var chain []uuid.UUID
seen := make(map[uuid.UUID]struct{})
for orig, origPush := range c.txnPushes {
pusher := orig
chain = append(chain[:0], orig)
for id := range seen {
delete(seen, id)
}
seen[pusher] = struct{}{}
for {
push, ok := c.txnPushes[pusher]
if !ok {
break
}
pusher = push.pushee
chain = append(chain, pusher)
if _, ok := seen[pusher]; ok {
// Cycle detected!
if pusher == orig {
// The cycle we were looking for (i.e. starting at orig).
var chainBuf strings.Builder
for i, id := range chain {
if i > 0 {
chainBuf.WriteString("->")
}
chainBuf.WriteString(id.Short())
}
log.Eventf(origPush.ctx, "dependency cycle detected %s", chainBuf.String())
}
break
}
seen[pusher] = struct{}{}
}
}
}

// reset clears all request state in the cluster. This avoids portions of tests
Expand Down Expand Up @@ -495,6 +604,17 @@ func (c *cluster) reset() error {
return nil
}

// resetNamespace resets the entire cluster namespace, clearing both request
// definitions and transaction definitions.
func (c *cluster) resetNamespace() {
c.mu.Lock()
defer c.mu.Unlock()
c.txnCounter = 0
c.txnsByName = make(map[string]*roachpb.Transaction)
c.requestsByName = make(map[string]concurrency.Request)
c.txnRecords = make(map[uuid.UUID]*txnRecord)
}

// collectSpans collects the declared spans for a set of requests.
// Its logic mirrors that in Replica.collectSpans.
func (c *cluster) collectSpans(
Expand All @@ -521,6 +641,12 @@ func (c *cluster) collectSpans(
return latchSpans, lockSpans
}

func (c *cluster) waitAndCollect(t *testing.T, m *monitor) string {
m.waitForAsyncGoroutinesToStall(t)
c.detectDeadlocks()
return m.collectRecordings()
}

// monitor tracks a set of running goroutines as they execute and captures
// tracing recordings from them. It is capable of watching its set of goroutines
// until they all mutually stall.
Expand Down Expand Up @@ -590,11 +716,6 @@ func (m *monitor) resetSeqNums() {
m.seq = 0
}

func (m *monitor) waitAndCollect(t *testing.T) string {
m.waitForAsyncGoroutinesToStall(t)
return m.collectRecordings()
}

func (m *monitor) collectRecordings() string {
// Collect trace recordings from each goroutine.
type logRecord struct {
Expand Down
7 changes: 4 additions & 3 deletions pkg/storage/concurrency/datadriven_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import (
"github.com/cockroachdb/datadriven"
)

func nextUUID(counter *uint128.Uint128) uuid.UUID {
*counter = counter.Add(1)
return uuid.FromUint128(*counter)
func nextUUID(counter *uint32) uuid.UUID {
*counter = *counter + 1
hi := uint64(*counter) << 32
return uuid.FromUint128(uint128.Uint128{Hi: hi})
}

func scanTimestamp(t *testing.T, d *datadriven.TestData) hlc.Timestamp {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (g *lockTableGuardImpl) ShouldWait() bool {
return g.mu.startWait
}

func (g *lockTableGuardImpl) NewStateChan() <-chan struct{} {
func (g *lockTableGuardImpl) NewStateChan() chan struct{} {
g.mu.Lock()
defer g.mu.Unlock()
return g.mu.signal
Expand Down
Loading