diff --git a/pkg/roachpb/batch.go b/pkg/roachpb/batch.go index cf543be43bb..b412e93ad6b 100644 --- a/pkg/roachpb/batch.go +++ b/pkg/roachpb/batch.go @@ -150,16 +150,6 @@ func (ba *BatchRequest) IsSinglePushTxnRequest() bool { return false } -// IsSingleQueryTxnRequest returns true iff the batch contains a single -// request, and that request is for a QueryTxn. -func (ba *BatchRequest) IsSingleQueryTxnRequest() bool { - if ba.IsSingleRequest() { - _, ok := ba.Requests[0].GetInner().(*QueryTxnRequest) - return ok - } - return false -} - // IsSingleHeartbeatTxnRequest returns true iff the batch contains a single // request, and that request is a HeartbeatTxn. func (ba *BatchRequest) IsSingleHeartbeatTxnRequest() bool { diff --git a/pkg/sql/tests/monotonic_insert_test.go b/pkg/sql/tests/monotonic_insert_test.go index 26819b5e734..6605f49e462 100644 --- a/pkg/sql/tests/monotonic_insert_test.go +++ b/pkg/sql/tests/monotonic_insert_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/storage/concurrency" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -107,6 +108,10 @@ func testMonotonicInserts(t *testing.T, distSQLMode sessiondata.DistSQLExecMode) st := server.ClusterSettings() st.Manual.Store(true) sql.DistSQLClusterExecMode.Override(&st.SV, int64(distSQLMode)) + // Let transactions push immediately to detect deadlocks. The test creates a + // large amount of contention and dependency cycles, and could take a long + // time to complete without this. + concurrency.LockTableDeadlockDetectionPushDelay.Override(&st.SV, 0) } var clients []mtClient diff --git a/pkg/storage/batcheval/cmd_query_txn.go b/pkg/storage/batcheval/cmd_query_txn.go index cfb55bdcaeb..26a86dcfb63 100644 --- a/pkg/storage/batcheval/cmd_query_txn.go +++ b/pkg/storage/batcheval/cmd_query_txn.go @@ -75,6 +75,6 @@ func QueryTxn( } // Get the list of txns waiting on this txn. - reply.WaitingTxns = cArgs.EvalCtx.GetTxnWaitQueue().GetDependents(args.Txn.ID) + reply.WaitingTxns = cArgs.EvalCtx.GetConcurrencyManager().GetDependents(args.Txn.ID) return result.Result{}, nil } diff --git a/pkg/storage/batcheval/eval_context.go b/pkg/storage/batcheval/eval_context.go index 99433915d64..50634e321e0 100644 --- a/pkg/storage/batcheval/eval_context.go +++ b/pkg/storage/batcheval/eval_context.go @@ -19,10 +19,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/abortspan" "github.com/cockroachdb/cockroach/pkg/storage/cloud" + "github.com/cockroachdb/cockroach/pkg/storage/concurrency" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" - "github.com/cockroachdb/cockroach/pkg/storage/txnwait" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/limit" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -53,7 +53,7 @@ type EvalContext interface { Clock() *hlc.Clock DB() *client.DB AbortSpan() *abortspan.AbortSpan - GetTxnWaitQueue() *txnwait.Queue + GetConcurrencyManager() concurrency.Manager GetLimiters() *Limiters NodeID() roachpb.NodeID @@ -149,7 +149,7 @@ func (m *mockEvalCtxImpl) GetLimiters() *Limiters { func (m *mockEvalCtxImpl) AbortSpan() *abortspan.AbortSpan { return m.MockEvalCtx.AbortSpan } -func (m *mockEvalCtxImpl) GetTxnWaitQueue() *txnwait.Queue { +func (m *mockEvalCtxImpl) GetConcurrencyManager() concurrency.Manager { panic("unimplemented") } func (m *mockEvalCtxImpl) NodeID() roachpb.NodeID { diff --git a/pkg/storage/client_merge_test.go b/pkg/storage/client_merge_test.go index a2a9b6637b5..76f52f26898 100644 --- a/pkg/storage/client_merge_test.go +++ b/pkg/storage/client_merge_test.go @@ -1105,7 +1105,7 @@ func TestStoreRangeMergeInFlightTxns(t *testing.T) { t.Fatal(err) } for { - if _, ok := repl.GetTxnWaitQueue().TrackedTxns()[txn1.ID()]; ok { + if _, ok := repl.GetConcurrencyManager().TxnWaitQueue().TrackedTxns()[txn1.ID()]; ok { break } select { diff --git a/pkg/storage/client_split_test.go b/pkg/storage/client_split_test.go index 395f00cc9bb..923a08b2a14 100644 --- a/pkg/storage/client_split_test.go +++ b/pkg/storage/client_split_test.go @@ -2283,7 +2283,7 @@ func TestStoreTxnWaitQueueEnabledOnSplit(t *testing.T) { } rhsRepl := store.LookupReplica(roachpb.RKey(keys.UserTableDataMin)) - if !rhsRepl.IsTxnWaitQueueEnabled() { + if !rhsRepl.GetConcurrencyManager().TxnWaitQueue().IsEnabled() { t.Errorf("expected RHS replica's push txn queue to be enabled post-split") } } diff --git a/pkg/storage/concurrency/concurrency_control.go b/pkg/storage/concurrency/concurrency_control.go index af4560d23e9..370146010d7 100644 --- a/pkg/storage/concurrency/concurrency_control.go +++ b/pkg/storage/concurrency/concurrency_control.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" + "github.com/cockroachdb/cockroach/pkg/storage/txnwait" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) @@ -281,6 +282,12 @@ type MetricExporter interface { // lockTable. LockTableDebug() string + // TxnWaitQueue returns the concurrency manager's txnWaitQueue. + // TODO(nvanbenschoten): this doesn't really fit into this interface. It + // would be nice if the txnWaitQueue was hidden behind the concurrency + // manager abstraction entirely, but tests want to access it directly. + TxnWaitQueue() *txnwait.Queue + // TODO(nvanbenschoten): fill out this interface to provide observability // into the state of the concurrency manager. // LatchMetrics() @@ -327,7 +334,7 @@ type Request struct { // Guard is returned from Manager.SequenceReq. The guard is passed back in to // Manager.FinishReq to release the request's resources when it has completed. type Guard struct { - req Request + Req Request lg latchGuard ltg lockTableGuard } diff --git a/pkg/storage/concurrency/concurrency_manager.go b/pkg/storage/concurrency/concurrency_manager.go index 9eaf43b4d21..2298f6edf7c 100644 --- a/pkg/storage/concurrency/concurrency_manager.go +++ b/pkg/storage/concurrency/concurrency_manager.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/storage/spanlatch" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" "github.com/cockroachdb/cockroach/pkg/storage/txnwait" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -55,8 +56,9 @@ type Config struct { TxnWaitMetrics *txnwait.Metrics SlowLatchGauge *metric.Gauge // Configs + Knobs. - MaxLockTableSize int64 - TxnWaitKnobs txnwait.TestingKnobs + MaxLockTableSize int64 + DisableTxnPushing bool + TxnWaitKnobs txnwait.TestingKnobs } func (c *Config) initDefaults() { @@ -82,10 +84,11 @@ func NewManager(cfg Config) Manager { maxLocks: cfg.MaxLockTableSize, }, ltw: &lockTableWaiterImpl{ - nodeID: cfg.NodeDesc.NodeID, - st: cfg.Settings, - stopper: cfg.Stopper, - ir: cfg.IntentResolver, + nodeID: cfg.NodeDesc.NodeID, + st: cfg.Settings, + stopper: cfg.Stopper, + ir: cfg.IntentResolver, + disableTxnPushing: cfg.DisableTxnPushing, }, // TODO(nvanbenschoten): move pkg/storage/txnwait to a new // pkg/storage/concurrency/txnwait package. @@ -110,7 +113,7 @@ func (m *managerImpl) SequenceReq( log.Event(ctx, "sequencing request") } else { g = prev - g.assertNoLatches() + g.AssertNoLatches() log.Event(ctx, "re-sequencing request") } @@ -156,14 +159,14 @@ func (m *managerImpl) sequenceReqWithGuard( // Scan for conflicting locks. log.Event(ctx, "scanning lock table for conflicting locks") - g.ltg = m.lt.ScanAndEnqueue(g.req, g.ltg) + g.ltg = m.lt.ScanAndEnqueue(g.Req, g.ltg) // Wait on conflicting locks, if necessary. if g.ltg.ShouldWait() { m.lm.Release(g.moveLatchGuard()) log.Event(ctx, "waiting in lock wait-queues") - if err := m.ltw.WaitOn(ctx, g.req, g.ltg); err != nil { + if err := m.ltw.WaitOn(ctx, g.Req, g.ltg); err != nil { return nil, err } continue @@ -240,6 +243,11 @@ func (m *managerImpl) FinishReq(g *Guard) { func (m *managerImpl) HandleWriterIntentError( ctx context.Context, g *Guard, t *roachpb.WriteIntentError, ) *Guard { + if g.ltg == nil { + log.Fatalf(ctx, "cannot handle WriteIntentError %v for request without "+ + "lockTableGuard; were lock spans declared for this request?", t) + } + // Add a discovered lock to lock-table for each intent and enter each lock's // wait-queue. for i := range t.Intents { @@ -336,6 +344,11 @@ func (m *managerImpl) LockTableDebug() string { return m.lt.String() } +// TxnWaitQueue implements the MetricExporter interface. +func (m *managerImpl) TxnWaitQueue() *txnwait.Queue { + return m.twq.(*txnwait.Queue) +} + func (r *Request) isSingle(m roachpb.Method) bool { if len(r.Requests) != 1 { return false @@ -345,11 +358,29 @@ func (r *Request) isSingle(m roachpb.Method) bool { func newGuard(req Request) *Guard { // TODO(nvanbenschoten): Pool these guard objects. - return &Guard{req: req} + return &Guard{Req: req} +} + +// LatchSpans returns the maximal set of spans that the request will access. +func (g *Guard) LatchSpans() *spanset.SpanSet { + return g.Req.LatchSpans +} + +// HoldingLatches returned whether the guard is holding latches or not. +func (g *Guard) HoldingLatches() bool { + return g != nil && g.lg != nil +} + +// AssertLatches asserts that the guard is non-nil and holding latches. +func (g *Guard) AssertLatches() { + if !g.HoldingLatches() { + panic("expected latches held, found none") + } } -func (g *Guard) assertNoLatches() { - if g.lg != nil { +// AssertNoLatches asserts that the guard is non-nil and not holding latches. +func (g *Guard) AssertNoLatches() { + if g.HoldingLatches() { panic("unexpected latches held") } } diff --git a/pkg/storage/concurrency/lock_table_waiter.go b/pkg/storage/concurrency/lock_table_waiter.go index 6b6158a4859..124afba182b 100644 --- a/pkg/storage/concurrency/lock_table_waiter.go +++ b/pkg/storage/concurrency/lock_table_waiter.go @@ -46,6 +46,9 @@ type lockTableWaiterImpl struct { st *cluster.Settings stopper *stop.Stopper ir IntentResolver + + // When set, WriteIntentError are propagated instead of pushing. + disableTxnPushing bool } // IntentResolver is an interface used by lockTableWaiterImpl to push @@ -159,7 +162,10 @@ func (w *lockTableWaiterImpl) WaitOn( // had a cache of aborted transaction IDs that allowed us to notice // and quickly resolve abandoned intents then we might be able to // get rid of this state. - delay := LockTableLivenessPushDelay.Get(&w.st.SV) + delay := minDuration( + LockTableLivenessPushDelay.Get(&w.st.SV), + LockTableDeadlockDetectionPushDelay.Get(&w.st.SV), + ) if hasMinPriority(state.txn) || hasMaxPriority(req.Txn) { // However, if the pushee has the minimum priority or if the // pusher has the maximum priority, push immediately. @@ -224,6 +230,12 @@ func (w *lockTableWaiterImpl) WaitOn( } func (w *lockTableWaiterImpl) pushTxn(ctx context.Context, req Request, ws waitingState) *Error { + if w.disableTxnPushing { + return roachpb.NewError(&roachpb.WriteIntentError{ + Intents: []roachpb.Intent{roachpb.MakeIntent(ws.txn, ws.key)}, + }) + } + h := roachpb.Header{ Timestamp: req.Timestamp, UserPriority: req.Priority, @@ -242,12 +254,9 @@ func (w *lockTableWaiterImpl) pushTxn(ctx context.Context, req Request, ws waiti // after our operation started. This allows us to not have to // restart for uncertainty as we come back and read. obsTS, ok := h.Txn.GetObservedTimestamp(w.nodeID) - if !ok { - // This was set earlier, so it's completely unexpected to - // not be found now. - return roachpb.NewErrorf("missing observed timestamp: %+v", h.Txn) + if ok { + h.Timestamp.Forward(obsTS) } - h.Timestamp.Forward(obsTS) } var pushType roachpb.PushTxnType @@ -288,3 +297,10 @@ func hasMinPriority(txn *enginepb.TxnMeta) bool { func hasMaxPriority(txn *roachpb.Transaction) bool { return txn != nil && txn.Priority == enginepb.MaxTxnPriority } + +func minDuration(a, b time.Duration) time.Duration { + if a < b { + return a + } + return b +} diff --git a/pkg/storage/helpers_test.go b/pkg/storage/helpers_test.go index 24d45132eb5..2a68eb7f861 100644 --- a/pkg/storage/helpers_test.go +++ b/pkg/storage/helpers_test.go @@ -458,10 +458,6 @@ func (r *Replica) IsQuiescent() bool { return r.mu.quiescent } -func (r *Replica) IsTxnWaitQueueEnabled() bool { - return r.txnWaitQueue.IsEnabled() -} - // GetQueueLastProcessed returns the last processed timestamp for the // specified queue, or the zero timestamp if not available. func (r *Replica) GetQueueLastProcessed(ctx context.Context, queue string) (hlc.Timestamp, error) { diff --git a/pkg/storage/intent_resolver_integration_test.go b/pkg/storage/intent_resolver_integration_test.go index 7a4dc125b9e..a95540c0e3c 100644 --- a/pkg/storage/intent_resolver_integration_test.go +++ b/pkg/storage/intent_resolver_integration_test.go @@ -17,10 +17,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/pkg/errors" ) func beginTransaction( @@ -44,17 +42,18 @@ func beginTransaction( } // TestContendedIntentWithDependencyCycle verifies that a queue of -// writers on a contended key, each pushing the prior writer, will -// still notice a dependency cycle. In this case, txn3 writes "a", -// then txn1 writes "b" and "a", then txn2 writes "b", then txn3 -// writes "b". The deadlock is broken by an aborted transaction. +// writers on a contended key will still notice a dependency cycle. +// In this case, txn3 writes "a", then txn1 writes "b" and "a", then +// txn2 writes "b", then txn3 writes "b". The deadlock is broken by +// an aborted transaction. // // Additional non-transactional reads on the same contended key are // inserted to verify they do not interfere with writing transactions // always pushing to ensure the dependency cycle can be detected. // -// This test is something of an integration test which exercises both -// the IntentResolver as well as the txnWaitQueue. +// This test is something of an integration test which exercises the +// IntentResolver as well as the concurrency Manager's lockTable and +// txnWaitQueue. func TestContendedIntentWithDependencyCycle(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() @@ -160,346 +159,3 @@ func TestContendedIntentWithDependencyCycle(t *testing.T) { t.Fatal(err) } } - -// TestContendedIntentChangesOnRetry verifies that a batch which observes a -// WriteIntentError for one key and then a WriteIntentError for a different -// key doesn't pollute the old key's contentionQueue state. -// -// This also serves as a regression test for #32582. In that issue, we -// saw a transaction wait in the contentionQueue without pushing the -// transaction that it was deadlocked on. This was because of a bug in -// how the queue handled WriteIntentErrors for different intents on -// the re-evaluation of a batch. -// -// The scenario requires 5 unique transactions: -// 1. txn1 writes to keyA. -// 2. txn2 writes to keyB. -// 3. txn4 writes to keyC and keyB in the same batch. The batch initially -// fails with a WriteIntentError on keyA. It enters the contentionQueue -// and becomes the front of a contendedKey list. -// 4. txn5 writes to keyB. It enters the contentionQueue behind txn4. -// 5. txn3 writes to keyC. -// 6. txn2 is committed and the intent on keyB is resolved. -// 7. txn4 exits the contentionQueue and re-evaluates. This time, it hits -// a WriteIntentError on the first request in its batch: keyC. HOWEVER, -// before it updates the contentionQueue, steps 8-10 occur. -// 8. txn3 writes to keyB. It never enters the contentionQueue. txn3 then -// writes to keyA in a separate batch and gets stuck waiting on txn1. -// 9. txn2 writes to keyB. It observes txn3's intent and informs the -// contentionQueue of the new txn upon its entrance. -// 10. txn5, the new front of the contendedKey list, pushes txn2 and gets -// stuck in the txnWaitQueue. -// 11. txn4 finally updates the contentionQueue. A bug previously existed -// where it would set the contendedKey's lastTxnMeta to nil because it -// saw a WriteIntentError for a different key. -// 12. txn1 notices the nil lastTxnMeta and does not push txn2. This prevents -// cycle detection from succeeding and we observe a deadlock. -// -func TestContendedIntentChangesOnRetry(t *testing.T) { - defer leaktest.AfterTest(t)() - ctx := context.Background() - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - store, _ := createTestStore(t, testStoreOpts{createSystemRanges: true}, stopper) - - keyA := roachpb.Key("a") - keyB := roachpb.Key("b") - keyC := roachpb.Key("c") - keyD := roachpb.Key("d") - spanA := roachpb.Span{Key: keyA} - spanB := roachpb.Span{Key: keyB} - spanC := roachpb.Span{Key: keyC} - - // Steps 1 and 2. - // - // Create the five transactions; at this point, none of them have - // conflicts. Txn1 has written "a", Txn2 has written "b". - txn1 := beginTransaction(t, store, -5, keyA, true /* putKey */) - txn2 := beginTransaction(t, store, -4, keyB, true /* putKey */) - txn3 := beginTransaction(t, store, -3, keyC, false /* putKey */) - txn4 := beginTransaction(t, store, -5, keyD, false /* putKey */) - txn5 := beginTransaction(t, store, -1, keyD, false /* putKey */) - - t.Log(txn1.ID, txn2.ID, txn3.ID, txn4.ID, txn5.ID) - - txnCh1 := make(chan error, 1) - txnCh3 := make(chan error, 1) - txnCh4 := make(chan error, 1) - txnCh5 := make(chan error, 1) - - // waitForContended waits until the provided key has the specified - // number of pushers in the contentionQueue. - waitForContended := func(key roachpb.Key, count int) { - testutils.SucceedsSoon(t, func() error { - contentionCount := store.intentResolver.NumContended(key) - if contentionCount != count { - return errors.Errorf("expected len %d; got %d", count, contentionCount) - } - return nil - }) - } - - // Steps 3, 7, and 11. - // - // Send txn4's puts in a single batch, followed by an end transaction. - // This txn will hit a WriteIntentError on its second request during the - // first time that it evaluates the batch and will hit a WriteIntentError - // on its first request during the second time that it evaluates. This - // second WriteIntentError must not be entangled with the contentionQueue - // entry for the first key. - { - go func() { - putC := putArgs(keyC, []byte("value")) // will hit intent on 2nd iter - putB := putArgs(keyB, []byte("value")) // will hit intent on 1st iter - assignSeqNumsForReqs(txn4, &putC) - assignSeqNumsForReqs(txn4, &putB) - ba := roachpb.BatchRequest{} - ba.Header = roachpb.Header{Txn: txn4} - ba.Add(&putC, &putB) - br, pErr := store.TestSender().Send(ctx, ba) - if pErr != nil { - txnCh4 <- pErr.GoError() - return - } - txn4.Update(br.Txn) - - et, _ := endTxnArgs(txn4, true) - et.IntentSpans = []roachpb.Span{spanB, spanC} - et.CanCommitAtHigherTimestamp = true - assignSeqNumsForReqs(txn4, &et) - _, pErr = client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn4}, &et) - txnCh4 <- pErr.GoError() - }() - - waitForContended(keyB, 1) - t.Log("txn4 in contentionQueue") - } - - // Steps 4 and 10. - // - // Send txn5's put, followed by an end transaction. This request will - // wait at the head of the contention queue once txn2 is committed. - { - go func() { - // Write keyB to create a cycle with txn3. - putB := putArgs(keyB, []byte("value")) - assignSeqNumsForReqs(txn5, &putB) - repl, pErr := client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn5}, &putB) - if pErr != nil { - txnCh5 <- pErr.GoError() - return - } - txn5.Update(repl.Header().Txn) - - et, _ := endTxnArgs(txn5, true) - et.IntentSpans = []roachpb.Span{spanB} - et.CanCommitAtHigherTimestamp = true - assignSeqNumsForReqs(txn5, &et) - _, pErr = client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn5}, &et) - txnCh5 <- pErr.GoError() - }() - - waitForContended(keyB, 2) - t.Log("txn5 in contentionQueue") - } - - // Step 5. - // - // Write to keyC, which will block txn4 on its re-evaluation. - { - putC := putArgs(keyC, []byte("value")) - assignSeqNumsForReqs(txn3, &putC) - if _, pErr := client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn3}, &putC); pErr != nil { - t.Fatal(pErr) - } - } - - // Step 6. - // - // Commit txn2, which should set off a chain reaction of movement. txn3 should - // write an intent at keyB and go on to write at keyA. In doing so, it should - // create a cycle with txn1. This cycle should be detected. - // - // In #32582 we saw a case where txn4 would hit a different WriteIntentError - // during its re-evaluation (keyC instead of keyB). This caused it to remove - // the lastTxnMeta from keyB's contendedKey record, which prevented txn1 from - // pushing txn3 and in turn prevented cycle detection from finding the deadlock. - { - // Sleeping for dependencyCyclePushDelay before committing txn2 makes the - // failure reproduce more easily. - time.Sleep(100 * time.Millisecond) - - et, _ := endTxnArgs(txn2, true) - et.IntentSpans = []roachpb.Span{spanB} - et.CanCommitAtHigherTimestamp = true - assignSeqNumsForReqs(txn2, &et) - if _, pErr := client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn2}, &et); pErr != nil { - t.Fatal(pErr) - } - } - - // Step 8. - // - // Send txn3's two other put requests, followed by an end transaction. This - // txn will first write to keyB before writing to keyA. This will create a - // cycle between txn1 and txn3. - { - // Write to keyB, which will hit an intent from txn2. - putB := putArgs(keyB, []byte("value")) - assignSeqNumsForReqs(txn3, &putB) - repl, pErr := client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn3}, &putB) - if pErr != nil { - txnCh3 <- pErr.GoError() - return - } - txn3.Update(repl.Header().Txn) - - go func() { - // Write keyA, which will hit an intent from txn1 and create a cycle. - putA := putArgs(keyA, []byte("value")) - assignSeqNumsForReqs(txn3, &putA) - repl, pErr = client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn3}, &putA) - if pErr != nil { - txnCh3 <- pErr.GoError() - return - } - txn3.Update(repl.Header().Txn) - - et, _ := endTxnArgs(txn3, true) - et.IntentSpans = []roachpb.Span{spanA, spanB} - et.CanCommitAtHigherTimestamp = true - assignSeqNumsForReqs(txn3, &et) - _, pErr = client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn3}, &et) - txnCh3 <- pErr.GoError() - }() - } - - // Step 9. - // - // Send txn1's put request to keyB, which completes the cycle between txn1 - // and txn3. - { - go func() { - // Write keyB to create a cycle with txn3. - putB := putArgs(keyB, []byte("value")) - assignSeqNumsForReqs(txn1, &putB) - repl, pErr := client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{ - Txn: txn1, - }, &putB) - if pErr != nil { - txnCh1 <- pErr.GoError() - return - } - txn1.Update(repl.Header().Txn) - - et, _ := endTxnArgs(txn1, true) - et.IntentSpans = []roachpb.Span{spanB} - et.CanCommitAtHigherTimestamp = true - assignSeqNumsForReqs(txn1, &et) - _, pErr = client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn1}, &et) - txnCh1 <- pErr.GoError() - }() - } - - // The third transaction will always be aborted because it has - // a lower priority than the first. - err := <-txnCh3 - if _, ok := err.(*roachpb.UnhandledRetryableError); !ok { - t.Fatalf("expected transaction aborted error; got %T", err) - } - if err := <-txnCh1; err != nil { - t.Fatal(err) - } - if err := <-txnCh4; err != nil { - // txn4 can end up being aborted due to a perceived deadlock. This - // is rare and isn't important to the test, so we allow it. - if _, ok := err.(*roachpb.UnhandledRetryableError); !ok { - t.Fatal(err) - } - } - if err := <-txnCh5; err != nil { - // txn5 can end up being aborted due to a perceived deadlock. This - // is rare and isn't important to the test, so we allow it. - if _, ok := err.(*roachpb.UnhandledRetryableError); !ok { - t.Fatal(err) - } - } -} - -// TestContendedIntentPushedByHighPriorityScan verifies that a queue of readers -// and writers for a contended key will not prevent a high priority scan from -// pushing the head of the queue and reading a committed value. -func TestContendedIntentPushedByHighPriorityScan(t *testing.T) { - defer leaktest.AfterTest(t)() - ctx := context.Background() - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - store, _ := createTestStore(t, testStoreOpts{createSystemRanges: true}, stopper) - - keyA := roachpb.Key("a") - keyB := roachpb.Key("b") - spanA := roachpb.Span{Key: keyA} - spanB := roachpb.Span{Key: keyB} - - txn1 := beginTransaction(t, store, -3, keyA, true /* putKey */) - txn2 := beginTransaction(t, store, -2, keyB, true /* putKey */) - - // txn1 already wrote an intent at keyA. - _ = txn1 - - // Send txn2 put, followed by an end transaction. This should add - // txn2 to the contentionQueue, blocking on the result of txn1. - txnCh2 := make(chan error, 1) - go func() { - put := putArgs(keyA, []byte("value")) - assignSeqNumsForReqs(txn2, &put) - if _, pErr := client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{ - Txn: txn2, - }, &put); pErr != nil { - txnCh2 <- pErr.GoError() - return - } - et, _ := endTxnArgs(txn2, true) - et.IntentSpans = []roachpb.Span{spanA, spanB} - et.CanCommitAtHigherTimestamp = true - assignSeqNumsForReqs(txn2, &et) - _, pErr := client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn2}, &et) - txnCh2 <- pErr.GoError() - }() - - // Wait for txn2 to enter the contentionQueue and begin pushing txn1. - testutils.SucceedsSoon(t, func() error { - contentionCount := store.intentResolver.NumContended(keyA) - if exp := 1; contentionCount != exp { - return errors.Errorf("expected len %d; got %d", exp, contentionCount) - } - return nil - }) - - // Perform a scan over the same keyspace with a high priority transaction. - txnHigh := newTransaction("high-priority", nil, roachpb.MaxUserPriority, store.Clock()) - scan := scanArgs(keyA, keyB) - scanResp, pErr := client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txnHigh}, scan) - if pErr != nil { - t.Fatal(pErr) - } - if kvs := scanResp.(*roachpb.ScanResponse).Rows; len(kvs) > 0 { - t.Fatalf("expected kvs returned from scan %v", kvs) - } - - // Commit txn1. This succeeds even though the txn was pushed because the - // transaction has no refresh spans. - et, _ := endTxnArgs(txn1, true) - et.IntentSpans = []roachpb.Span{spanA} - et.CanCommitAtHigherTimestamp = true - assignSeqNumsForReqs(txn1, &et) - if _, pErr := client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn1}, &et); pErr != nil { - t.Fatal(pErr) - } - - // Wait for txn2 to commit. Again, this succeeds even though the txn was - // pushed because the transaction has no refresh spans. - if err := <-txnCh2; err != nil { - t.Fatal(err) - } -} diff --git a/pkg/storage/intentresolver/contention_queue.go b/pkg/storage/intentresolver/contention_queue.go deleted file mode 100644 index ef67e656af8..00000000000 --- a/pkg/storage/intentresolver/contention_queue.go +++ /dev/null @@ -1,355 +0,0 @@ -// Copyright 2018 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package intentresolver - -import ( - "container/list" - "context" - "time" - - "github.com/cockroachdb/cockroach/pkg/internal/client" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" - "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" -) - -type pusher struct { - txn *roachpb.Transaction - waitCh chan *enginepb.TxnMeta - detectCh chan struct{} -} - -func newPusher(txn *roachpb.Transaction) *pusher { - p := &pusher{ - txn: txn, - waitCh: make(chan *enginepb.TxnMeta, 1), - } - if p.activeTxn() { - p.detectCh = make(chan struct{}, 1) - // Signal the channel in order to begin dependency cycle detection. - p.detectCh <- struct{}{} - } - return p -} - -func (p *pusher) activeTxn() bool { - return p.txn != nil && p.txn.IsWriting() -} - -type contendedKey struct { - ll *list.List - // lastTxnMeta is the most recent txn to own the intent. Active - // transactions in the contention queue for this intent must push - // this txn record in order to detect dependency cycles. - lastTxnMeta *enginepb.TxnMeta -} - -func newContendedKey() *contendedKey { - return &contendedKey{ - ll: list.New(), - } -} - -// setLastTxnMeta sets the most recent txn meta and sends to all -// pushers w/ active txns still in the queue to push, to guarantee -// detection of dependency cycles. -func (ck *contendedKey) setLastTxnMeta(txnMeta *enginepb.TxnMeta) { - ck.lastTxnMeta = txnMeta - for e := ck.ll.Front(); e != nil; e = e.Next() { - p := e.Value.(*pusher) - if p.detectCh != nil { - select { - case p.detectCh <- struct{}{}: - default: - } - } - } -} - -// contentionQueue handles contention on keys with conflicting intents -// by forming queues of "pushers" which are requests that experienced -// a WriteIntentError. There is a queue for each key with one or more -// pushers. Queues are complicated by the difference between pushers -// with writing transactions (i.e. they may have a transaction record -// which can be pushed) and non-writing transactions (e.g., -// non-transactional requests, and read-only transactions). -// -// Queues are linked lists, with each element containing a transaction -// (can be nil), and a wait channel. The wait channel is closed when -// the request is dequeued and run to completion, whether to success -// or failure. Pushers wait on the most recent pusher in the queue to -// complete. However, pushers with an active transaction (i.e., txns -// with a non-nil key) must send a PushTxn RPC. This is necessary in -// order to properly detect dependency cycles. -type contentionQueue struct { - clock *hlc.Clock - db *client.DB - - // keys is a map from key to a linked list of pusher instances, - // ordered as a FIFO queue. - mu struct { - syncutil.Mutex - keys map[string]*contendedKey - } -} - -func (cq *contentionQueue) numContended(key roachpb.Key) int { - cq.mu.Lock() - defer cq.mu.Unlock() - ck, ok := cq.mu.keys[string(key)] - if !ok { - return 0 - } - return ck.ll.Len() -} - -func newContentionQueue(clock *hlc.Clock, db *client.DB) *contentionQueue { - cq := &contentionQueue{ - clock: clock, - db: db, - } - cq.mu.keys = map[string]*contendedKey{} - return cq -} - -func txnID(txn *roachpb.Transaction) string { - if txn == nil { - return "nil txn" - } - return txn.ID.Short() -} - -// add adds the intent specified in the supplied wiErr to the -// contention queue. This may block the current goroutine if the -// pusher has no transaction or the transaction is not yet writing. -// -// Note that the supplied wiErr write intent error must have only a -// single intent (len(wiErr.Intents) == 1). -// -// Returns a cleanup function to be invoked by the caller after the -// original request completes, a possibly updated WriteIntentError, -// a bool indicating whether the intent resolver should regard the -// original push / resolve as no longer applicable and skip those -// steps to retry the original request that generated the -// WriteIntentError, and an error if one was encountered while -// waiting in the queue. The cleanup function takes two arguments, -// a newWIErr, non-nil in case the re-executed request experienced -// another write intent error and could not complete; and -// newIntentTxn, nil if the re-executed request left no intent, and -// non-nil if it did. At most one of these two arguments should be -// provided. -func (cq *contentionQueue) add( - ctx context.Context, wiErr *roachpb.WriteIntentError, h roachpb.Header, -) (CleanupFunc, *roachpb.WriteIntentError, bool, *roachpb.Error) { - if len(wiErr.Intents) != 1 { - log.Fatalf(ctx, "write intent error must contain only a single intent: %s", wiErr) - } - if hasExtremePriority(h) { - // Never queue maximum or minimum priority transactions. - return nil, wiErr, false, nil - } - intent := wiErr.Intents[0] - key := string(intent.Key) - curPusher := newPusher(h.Txn) - log.VEventf(ctx, 3, "adding %s to contention queue on intent %s @%s", txnID(h.Txn), intent.Key, intent.Txn.ID.Short()) - - // Consider prior pushers in reverse arrival order to build queue - // by waiting on the most recent overlapping pusher. - var waitCh chan *enginepb.TxnMeta - var curElement *list.Element - - cq.mu.Lock() - contended, ok := cq.mu.keys[key] - if !ok { - contended = newContendedKey() - contended.setLastTxnMeta(&intent.Txn) - cq.mu.keys[key] = contended - } else if contended.lastTxnMeta.ID != intent.Txn.ID { - contended.setLastTxnMeta(&intent.Txn) - } - - // Get the prior pusher to wait on. - if e := contended.ll.Back(); e != nil { - p := e.Value.(*pusher) - waitCh = p.waitCh - log.VEventf(ctx, 3, "%s waiting on %s", txnID(curPusher.txn), txnID(p.txn)) - } - - // Append the current pusher to the queue. - curElement = contended.ll.PushBack(curPusher) - cq.mu.Unlock() - - // Delay before pushing in order to detect dependency cycles. - const dependencyCyclePushDelay = 100 * time.Millisecond - - // Wait on prior pusher, if applicable. - var done bool - var pErr *roachpb.Error - if waitCh != nil { - var detectCh chan struct{} - var detectReady <-chan time.Time - // If the current pusher has an active txn, we need to push the - // transaction which owns the intent to detect dependency cycles. - // To avoid unnecessary push traffic, insert a delay by - // instantiating the detectReady, which sets detectCh when it - // fires. When detectCh receives, it sends a push to the most - // recent txn to own the intent. - if curPusher.detectCh != nil { - detectReady = time.After(dependencyCyclePushDelay) - } - - Loop: - for { - select { - case txnMeta, ok := <-waitCh: - if !ok { - log.Fatalf(ctx, "the wait channel of a prior pusher was used twice (pusher=%s)", txnMeta) - } - // If the prior pusher wrote an intent, push it instead by - // creating a copy of the WriteIntentError with updated txn. - if txnMeta != nil { - log.VEventf(ctx, 3, "%s exiting contention queue to push %s", txnID(curPusher.txn), txnMeta.ID.Short()) - wiErrCopy := *wiErr - wiErrCopy.Intents = []roachpb.Intent{ - roachpb.MakeIntent(txnMeta, intent.Key), - } - wiErr = &wiErrCopy - } else { - // No intent was left by the prior pusher; don't push, go - // immediately to retrying the conflicted request. - log.VEventf(ctx, 3, "%s exiting contention queue to proceed", txnID(curPusher.txn)) - done = true - } - break Loop - - case <-ctx.Done(): - // The pusher's context is done. Return without pushing. - pErr = roachpb.NewError(ctx.Err()) - break Loop - - case <-detectReady: - // When the detect timer fires, set detectCh and loop. - log.VEventf(ctx, 3, "%s cycle detection is ready", txnID(curPusher.txn)) - detectCh = curPusher.detectCh - - case <-detectCh: - cq.mu.Lock() - frontOfQueue := curElement == contended.ll.Front() - pusheeTxn := contended.lastTxnMeta - cq.mu.Unlock() - // If we're at the start of the queue loop and wait - // for the wait channel to signal. - if frontOfQueue { - log.VEventf(ctx, 3, "%s at front of queue; breaking from loop", txnID(curPusher.txn)) - break Loop - } - - b := &client.Batch{} - b.Header.Timestamp = cq.clock.Now() - b.AddRawRequest(&roachpb.PushTxnRequest{ - RequestHeader: roachpb.RequestHeader{ - Key: pusheeTxn.Key, - }, - PusherTxn: getPusherTxn(h), - PusheeTxn: *pusheeTxn, - PushTo: h.Timestamp.Next(), - PushType: roachpb.PUSH_ABORT, - }) - log.VEventf(ctx, 3, "%s pushing %s to detect dependency cycles", txnID(curPusher.txn), pusheeTxn.ID.Short()) - if err := cq.db.Run(ctx, b); err != nil { - pErr = b.MustPErr() - log.VErrEventf(ctx, 2, "while waiting in push contention queue to push %s: %s", pusheeTxn.ID.Short(), pErr) - break Loop - } - // Note that this pusher may have aborted the pushee, but it - // should still wait on the previous pusher's wait channel. - detectCh = nil - detectReady = time.After(dependencyCyclePushDelay) - } - } - } - - return func(newWIErr *roachpb.WriteIntentError, newIntentTxn *enginepb.TxnMeta) { - if newWIErr != nil && newIntentTxn != nil { - // The need for this implies that the function should be split - // into a more rigidly defined handle with multiple accessors. - // TODO(nvanbenschoten): clean this up and test better when we're - // not intending the change to be backported. - panic("newWIErr and newIntentTxn both provided") - } - if newWIErr == nil { - log.VEventf(ctx, 3, "%s finished, leaving intent? %t (owned by %s)", txnID(curPusher.txn), newIntentTxn != nil, newIntentTxn) - } else { - log.VEventf(ctx, 3, "%s encountered another write intent error %s", txnID(curPusher.txn), newWIErr) - } - cq.mu.Lock() - defer cq.mu.Unlock() - // Remove the current element from its list of pushers. - // If the current element isn't the front, it's being removed - // because its context was canceled. Swap the wait channel with - // the previous element. - if curElement != contended.ll.Front() { - prevPusher := curElement.Prev().Value.(*pusher) - if waitCh != nil && prevPusher.waitCh != waitCh { - log.Fatalf(ctx, "expected previous pusher's wait channel to be the one current pusher waited on") - } - prevPusher.waitCh, curPusher.waitCh = curPusher.waitCh, prevPusher.waitCh - } - contended.ll.Remove(curElement) - - if contended.ll.Len() == 0 { - // If the contendedKey's list is now empty, remove it. We don't need - // to send on or close our waitCh because no one is or ever will wait - // on it. - delete(cq.mu.keys, key) - } else { - // If the pusher re-executed its request and encountered another - // write intent error, check if it's for the same intent; if so, - // we can set the newIntentTxn to match the new intent. If not, - // make sure that we don't pollute the old contendedKey with any - // new information. - if newWIErr != nil { - sameKey := len(newWIErr.Intents) == 1 && newWIErr.Intents[0].Key.Equal(intent.Key) - if sameKey { - newIntentTxn = &newWIErr.Intents[0].Txn - } else { - // If the pusher re-executed and found a different intent, make - // sure that we don't tell the old contendedKey anything about - // the new intent's transaction. This new intent could be from - // an earlier request in the batch than the one that previously - // hit the error, so we don't know anything about the state of - // the old intent. - newIntentTxn = nil - } - } - if newIntentTxn != nil { - // Shallow copy the TxnMeta. After this request returns (i.e. - // now), we might mutate it (DistSender and such), but the - // receiver of the channel will read it. - newIntentTxnCopy := *newIntentTxn - newIntentTxn = &newIntentTxnCopy - contended.setLastTxnMeta(newIntentTxn) - } - curPusher.waitCh <- newIntentTxn - close(curPusher.waitCh) - } - }, wiErr, done, pErr -} - -func hasExtremePriority(h roachpb.Header) bool { - if h.Txn != nil { - p := h.Txn.Priority - return p == enginepb.MaxTxnPriority || p == enginepb.MinTxnPriority - } - return false -} diff --git a/pkg/storage/intentresolver/intent_resolver.go b/pkg/storage/intentresolver/intent_resolver.go index ad3df1d0dad..728169ff408 100644 --- a/pkg/storage/intentresolver/intent_resolver.go +++ b/pkg/storage/intentresolver/intent_resolver.go @@ -36,14 +36,6 @@ import ( "github.com/pkg/errors" ) -// CleanupFunc is used to report the result of later writes in the face of -// WriteIntentError. It should never be called with both a newWIErr and a -// newIntentTxn. -type CleanupFunc func( - newWIErr *roachpb.WriteIntentError, - newIntentTxn *enginepb.TxnMeta, -) - const ( // defaultTaskLimit is the maximum number of asynchronous tasks // that may be started by intentResolver. When this limit is reached @@ -125,8 +117,7 @@ type IntentResolver struct { stopper *stop.Stopper testingKnobs storagebase.IntentResolverTestingKnobs ambientCtx log.AmbientContext - sem *quotapool.IntPool // Semaphore to limit async goroutines. - contentionQ *contentionQueue // manages contention on individual keys + sem *quotapool.IntPool // semaphore to limit async goroutines rdc kvbase.RangeDescriptorCache @@ -188,7 +179,6 @@ func New(c Config) *IntentResolver { db: c.DB, stopper: c.Stopper, sem: quotapool.NewIntPool("intent resolver", uint64(c.TaskLimit)), - contentionQ: newContentionQueue(c.Clock, c.DB), every: log.Every(time.Minute), Metrics: makeMetrics(), rdc: c.RangeDescriptorCache, @@ -224,72 +214,6 @@ func New(c Config) *IntentResolver { return ir } -// NumContended exists to ease writing tests at the storage level above which -// want to verify that contention is occurring within the IntentResolver. -func (ir *IntentResolver) NumContended(key roachpb.Key) int { - return ir.contentionQ.numContended(key) -} - -// ProcessWriteIntentError tries to push the conflicting -// transaction(s) responsible for the given WriteIntentError, and to -// resolve those intents if possible. Returns a cleanup function and -// potentially a new error to be used in place of the original. The -// cleanup function should be invoked by the caller after the request -// which experienced the conflict has completed with a parameter -// specifying a transaction in the event that the request left its own -// intent. -func (ir *IntentResolver) ProcessWriteIntentError( - ctx context.Context, wiPErr *roachpb.Error, h roachpb.Header, pushType roachpb.PushTxnType, -) (CleanupFunc, *roachpb.Error) { - wiErr, ok := wiPErr.GetDetail().(*roachpb.WriteIntentError) - if !ok { - return nil, roachpb.NewErrorf("not a WriteIntentError: %v", wiPErr) - } - - if log.V(6) { - log.Infof(ctx, "resolving write intent %s", wiErr) - } - - // Possibly queue this processing if the write intent error is for a - // single intent affecting a unitary key. - var cleanup func(*roachpb.WriteIntentError, *enginepb.TxnMeta) - if len(wiErr.Intents) == 1 { - var done bool - var pErr *roachpb.Error - // Note that the write intent error may be mutated here in the event - // that this pusher is queued to wait for a different transaction - // instead. - cleanup, wiErr, done, pErr = ir.contentionQ.add(ctx, wiErr, h) - if done || pErr != nil { - return cleanup, pErr - } - } - - resolveIntents, pErr := ir.maybePushIntents( - ctx, wiErr.Intents, h, pushType, false, /* skipIfInFlight */ - ) - if pErr != nil { - return cleanup, pErr - } - - // We always poison due to limitations of the API: not poisoning equals - // clearing the AbortSpan, and if our pushee transaction first got pushed - // for timestamp (by us), then (by someone else) aborted and poisoned, and - // then we run the below code, we're clearing the AbortSpan illegaly. - // Furthermore, even if our pushType is not PUSH_ABORT, we may have ended - // up with the responsibility to abort the intents (for example if we find - // the transaction aborted). - // - // To do better here, we need per-intent information on whether we need to - // poison. - opts := ResolveOptions{Poison: true} - if pErr := ir.ResolveIntents(ctx, resolveIntents, opts); pErr != nil { - return cleanup, pErr - } - - return cleanup, nil -} - func getPusherTxn(h roachpb.Header) roachpb.Transaction { // If the txn is nil, we communicate a priority by sending an empty // txn with only the priority set. This is official usage of PushTxn. @@ -304,51 +228,6 @@ func getPusherTxn(h roachpb.Header) roachpb.Transaction { return *txn } -// maybePushIntents tries to push the conflicting transaction(s) -// responsible for the given intents: either move its -// timestamp forward on a read/write conflict, abort it on a -// write/write conflict, or do nothing if the transaction is no longer -// pending. -// -// Returns a slice of intents which can now be resolved, and an error. -// The returned intents should be resolved via IntentResolver.ResolveIntents. -// -// If skipIfInFlight is true, then no PushTxns will be sent and no -// intents will be returned for any transaction for which there is -// another push in progress. This should only be used by callers who -// are not relying on the side effect of a push (i.e. only -// pushType==PUSH_TOUCH), and who also don't need to synchronize with -// the resolution of those intents (e.g. asynchronous resolutions of -// intents skipped on inconsistent reads). -// -// Callers are involved with -// a) conflict resolution for commands being executed at the Store with the -// client waiting, -// b) resolving intents encountered during inconsistent operations, and -// c) resolving intents upon EndTxn which are not local to the given range. -// This is the only path in which the transaction is going to be in -// non-pending state and doesn't require a push. -func (ir *IntentResolver) maybePushIntents( - ctx context.Context, - intents []roachpb.Intent, - h roachpb.Header, - pushType roachpb.PushTxnType, - skipIfInFlight bool, -) ([]roachpb.LockUpdate, *roachpb.Error) { - // Attempt to push the transaction(s) which created the conflicting intent(s). - pushTxns := make(map[uuid.UUID]*enginepb.TxnMeta) - for i := range intents { - intent := &intents[i] - pushTxns[intent.Txn.ID] = &intent.Txn - } - - pushedTxns, pErr := ir.MaybePushTransactions(ctx, pushTxns, h, pushType, skipIfInFlight) - if pErr != nil { - return nil, pErr - } - return updateIntentTxnStatus(ctx, pushedTxns, intents, skipIfInFlight, nil), nil -} - // updateIntentTxnStatus takes a slice of intents and a set of pushed // transactions (like returned from MaybePushTransactions) and updates // each intent with its corresponding TxnMeta and Status. @@ -396,10 +275,28 @@ func (ir *IntentResolver) PushTransaction( return pushedTxn, nil } -// MaybePushTransactions is like maybePushIntents except it takes a set of -// transactions to push instead of a set of intents. This set of provided -// transactions may be modified by the method. It returns a set of transaction -// protos corresponding to the pushed transactions. +// MaybePushTransactions tries to push the conflicting transaction(s): +// either moving their timestamp forward on a read/write conflict, aborting +// it on a write/write conflict, or doing nothing if the transaction is no +// longer pending. +// +// Returns a set of transaction protos who correspond to the pushed +// transactions and whose intents can now be resolved, and an error. +// +// If skipIfInFlight is true, then no PushTxns will be sent and no intents +// will be returned for any transaction for which there is another push in +// progress. This should only be used by callers who are not relying on the +// side effect of a push (i.e. only pushType==PUSH_TOUCH), and who also +// don't need to synchronize with the resolution of those intents (e.g. +// asynchronous resolutions of intents skipped on inconsistent reads). +// +// Callers are involved with +// a) conflict resolution for commands being executed at the Store with the +// client waiting, +// b) resolving intents encountered during inconsistent operations, and +// c) resolving intents upon EndTxn which are not local to the given range. +// This is the only path in which the transaction is going to be in +// non-pending state and doesn't require a push. func (ir *IntentResolver) MaybePushTransactions( ctx context.Context, pushTxns map[uuid.UUID]*enginepb.TxnMeta, @@ -544,13 +441,13 @@ func (ir *IntentResolver) CleanupIntents( ) (int, error) { h := roachpb.Header{Timestamp: now} - // All transactions in MaybePushTxns (called by maybePushIntents) will be sent - // in a single batch. In order to ensure that progress is made, we want to - // ensure that this batch does not become too big as to time out due to a - // deadline set above this call. If the attempt to push intents times out - // before any intents have been resolved, no progress is made. Since batches - // are atomic, a batch that times out has no effect. Hence, we chunk the work - // to ensure progress even when a timeout is eventually hit. + // All transactions in MaybePushTransactions will be sent in a single batch. + // In order to ensure that progress is made, we want to ensure that this + // batch does not become too big as to time out due to a deadline set above + // this call. If the attempt to push intents times out before any intents + // have been resolved, no progress is made. Since batches are atomic, a + // batch that times out has no effect. Hence, we chunk the work to ensure + // progress even when a timeout is eventually hit. sort.Sort(intentsByTxn(intents)) resolved := 0 const skipIfInFlight = true diff --git a/pkg/storage/intentresolver/intent_resolver_test.go b/pkg/storage/intentresolver/intent_resolver_test.go index 39dcd4f4cea..3bd78bc5441 100644 --- a/pkg/storage/intentresolver/intent_resolver_test.go +++ b/pkg/storage/intentresolver/intent_resolver_test.go @@ -242,267 +242,6 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) { } } -// TestContendedIntent verifies that multiple transactions, some actively -// writing and others read-only, are queued if processing write intent -// errors on a contended key. The test verifies the expected ordering in -// the queue and then works to invoke the corresponding cleanup functions -// to exercise the various transaction pushing and intent resolution which -// should occur. -func TestContendedIntent(t *testing.T) { - defer leaktest.AfterTest(t)() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) - - // The intent resolver's db uses a sender which always resolves intents - // and pushes transactions to aborted successfully. All requests are sent to - // reqChan so that the test can inspect all sent requests. Given the buffer - // size of 2, it is important that the test take care to consume from reqChan - // when it expects requests - reqChan := make(chan roachpb.BatchRequest, 2) - sender := client.NonTransactionalFactoryFunc( - func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { - resp := &roachpb.BatchResponse{} - switch req := ba.Requests[0].GetInner().(type) { - case *roachpb.PushTxnRequest: - resp.Add(&roachpb.PushTxnResponse{ - PusheeTxn: roachpb.Transaction{ - TxnMeta: req.PusheeTxn, - Status: roachpb.ABORTED, - }, - }) - case *roachpb.ResolveIntentRequest: - resp.Add(&roachpb.ResolveIntentRangeResponse{}) - default: - t.Errorf("Unexpected request of type %T", req) - } - reqChan <- ba - return resp, nil - }) - verifyPushTxn := func(ba roachpb.BatchRequest, pusher, pushee uuid.UUID) { - if req, ok := ba.Requests[0].GetInner().(*roachpb.PushTxnRequest); ok { - if req.PusherTxn.ID != pusher { - t.Fatalf("expected pusher to be %v, got %v", pusher, req.PusherTxn.ID) - } else if req.PusheeTxn.ID != pushee { - t.Fatalf("expected pushee to be %v, got %v", pushee, req.PusheeTxn.ID) - } - } else { - t.Fatalf("expected PushTxnRequest, got %T", ba.Requests[0].GetInner()) - } - } - verifyResolveIntent := func(ba roachpb.BatchRequest, key roachpb.Key) { - if req, ok := ba.Requests[0].GetInner().(*roachpb.ResolveIntentRequest); ok { - key.Equal(req.Key) - } else { - t.Fatalf("expected ResolveIntentRequest, got %T", ba.Requests[0].GetInner()) - } - } - - db := client.NewDB(log.AmbientContext{ - Tracer: tracing.NewTracer(), - }, sender, clock) - ir := New(Config{ - Stopper: stopper, - DB: db, - Clock: clock, - }) - - // The below code constructs 7 transactions which will encounter - // WriteIntentErrors for origTxn. These transactions all contend with each - // other. The test begins by creating goroutines for each of the 7 txns to - // record a WriteIntentError for an intent for keyA. We verify that these - // transactions queue up behind eachother in the contention queue. - // We then proceed to process their returned CleanupFuncs and verify that - // the corresponding requests are sent. - // - // The expectation which is verified below is that the roTxn1 will lead to - // a PushTxn request for origTxn followed by a ResolveIntents request for - // origTxn's intent. The rest of the read-only transactions when rapidly - // cleaned up should send no requests. Note that roTxn4 gets canceled after - // roTxn1 is cleaned up in order to ensure that the cancellation code path - // does not interfere with correct code behavior. - // - // Interesting nuances in resolution begin with rwTxn1 for which we call - // the CleanupFunc with now a new WriteIntentError with the unrelatedRWTxn - // as the associated transaction. This leads all future push requests to - // push the unrelatedRWTxn rather than the origTxn though intent resolution - // requests will be for the original key. - // - // For rwTxn2 we wait for the dependencyCyclePushDelay to trigger the sending - // of another Push request for the unrelatedRWTxn and then we call cleanup - // for rwTxn2 with a new WriteIntentError on a different intent key which - // should not result in any new requests. - // Lastly we rapidly resolve the last rwTxn3 with (nil, nil) which also should - // lead to no requests being sent. - - keyA := roachpb.Key("a") - keyB := roachpb.Key("b") - keyC := roachpb.Key("c") - origTxn := newTransaction("orig", keyA, 1, clock) - unrelatedRWTxn := newTransaction("unrel", keyB, 1, clock) - - roTxn1 := newTransaction("ro-txn1", nil, 1, clock) - roTxn2 := newTransaction("ro-txn2", nil, 1, clock) - roTxn3 := newTransaction("ro-txn3", nil, 1, clock) - roTxn4 := newTransaction("ro-txn4", nil, 1, clock) // this one gets canceled - rwTxn1 := newTransaction("rw-txn1", keyB, 1, clock) - rwTxn2 := newTransaction("rw-txn2", keyC, 1, clock) - rwTxn3 := newTransaction("rw-txn3", keyB, 1, clock) - - testCases := []struct { - pusher *roachpb.Transaction - expTxns []*roachpb.Transaction - cancelFunc context.CancelFunc - }{ - // First establish a chain of four read-only txns. - {pusher: roTxn1, expTxns: []*roachpb.Transaction{roTxn1}}, - {pusher: roTxn2, expTxns: []*roachpb.Transaction{roTxn1, roTxn2}}, - {pusher: roTxn3, expTxns: []*roachpb.Transaction{roTxn1, roTxn2, roTxn3}}, - // The fourth txn will be canceled before its predecessor is cleaned up to - // exercise the cancellation code path. - {pusher: roTxn4, expTxns: []*roachpb.Transaction{roTxn1, roTxn2, roTxn3, roTxn4}}, - // Now, verify that a writing txn is inserted at the end of the queue. - {pusher: rwTxn1, expTxns: []*roachpb.Transaction{roTxn1, roTxn2, roTxn3, roTxn4, rwTxn1}}, - // And other writing txns are inserted after it. - {pusher: rwTxn2, expTxns: []*roachpb.Transaction{roTxn1, roTxn2, roTxn3, roTxn4, rwTxn1, rwTxn2}}, - {pusher: rwTxn3, expTxns: []*roachpb.Transaction{roTxn1, roTxn2, roTxn3, roTxn4, rwTxn1, rwTxn2, rwTxn3}}, - } - var wg sync.WaitGroup - type intentResolverResp struct { - idx int - fn CleanupFunc - pErr *roachpb.Error - } - resps := make(chan intentResolverResp, 1) - for i, tc := range testCases { - testCtx, cancel := context.WithCancel(ctx) - defer cancel() - testCases[i].cancelFunc = cancel - t.Run(tc.pusher.ID.String(), func(t *testing.T) { - wiErr := &roachpb.WriteIntentError{Intents: []roachpb.Intent{ - roachpb.MakeIntent(&origTxn.TxnMeta, keyA)}} - h := roachpb.Header{Txn: tc.pusher} - wg.Add(1) - go func(idx int) { - defer wg.Done() - cleanupFunc, pErr := ir.ProcessWriteIntentError(testCtx, roachpb.NewError(wiErr), h, roachpb.PUSH_ABORT) - resps <- intentResolverResp{idx: idx, fn: cleanupFunc, pErr: pErr} - }(i) - testutils.SucceedsSoon(t, func() error { - if lc, let := ir.NumContended(keyA), len(tc.expTxns); lc != let { - return errors.Errorf("expected len %d; got %d", let, lc) - } - ir.contentionQ.mu.Lock() - defer ir.contentionQ.mu.Unlock() - contended, ok := ir.contentionQ.mu.keys[string(keyA)] - if !ok { - return errors.Errorf("key not contended") - } - var idx int - for e := contended.ll.Front(); e != nil; e = e.Next() { - p := e.Value.(*pusher) - if p.txn != tc.expTxns[idx] { - return errors.Errorf("expected txn %s at index %d; got %s", tc.expTxns[idx], idx, p.txn) - } - idx++ - } - return nil - }) - }) - } - - // Wait until all of the WriteIntentErrors have been processed to stop - // processing the resps. - go func() { wg.Wait(); close(resps) }() - expIdx := 0 - for resp := range resps { - // Check index. - idx := resp.idx - if idx != expIdx { - t.Errorf("expected response from request %d, found %d", expIdx, idx) - } - expIdx++ - - // Check error. - pErr := resp.pErr - switch idx { - case 3: - // Expected to be canceled. - if !testutils.IsPError(pErr, context.Canceled.Error()) { - t.Errorf("expected context canceled error; got %v", pErr) - } - default: - if pErr != nil { - t.Errorf("unexpected error from ProcessWriteIntentError: %v", pErr) - } - } - - // Call cleanup function. - f := resp.fn - switch idx { - // The read only transactions should be cleaned up with nil, nil. - case 0: - // There should be a push of orig and then a resolve of intent. - verifyPushTxn(<-reqChan, roTxn1.ID, origTxn.ID) - verifyResolveIntent(<-reqChan, keyA) - - // Minimum and maximum priority txns pass through without queuing. - // This is a convenient place to perform this check because roTxn1 - // push is still blocking all other txn pushes, but the min and max - // priority txn's requests won't mix with requests. - for name, pusher := range map[string]*roachpb.Transaction{ - "min priority": newTransaction("min-txn", keyA, roachpb.MinUserPriority, clock), - "max priority": newTransaction("max-txn", keyA, roachpb.MaxUserPriority, clock), - } { - t.Run(name, func(t *testing.T) { - wiErr := &roachpb.WriteIntentError{Intents: []roachpb.Intent{ - roachpb.MakeIntent(&origTxn.TxnMeta, keyA)}} - h := roachpb.Header{Txn: pusher} - cleanupFunc, pErr := ir.ProcessWriteIntentError(ctx, roachpb.NewError(wiErr), h, roachpb.PUSH_ABORT) - if pErr != nil { - panic(pErr) - } - if cleanupFunc != nil { - t.Fatal("unexpected cleanup func; should not have entered contentionQueue") - } - verifyPushTxn(<-reqChan, pusher.ID, origTxn.ID) - verifyResolveIntent(<-reqChan, keyA) - }) - } - f(nil, nil) - case 1, 2, 3: - // The remaining roTxns should not do anything upon cleanup. - if idx == 2 { - // Cancel request 3 before request 2 cleans up. Wait for - // it to return an error before continuing. - testCases[3].cancelFunc() - cf3 := <-resps - resps <- cf3 - } - f(nil, nil) - case 4: - // Call the CleanupFunc with a new WriteIntentError with a different - // transaction. This should lean to a new push on the new transaction and - // an intent resolution of the original intent. - f(&roachpb.WriteIntentError{Intents: []roachpb.Intent{ - roachpb.MakeIntent(&unrelatedRWTxn.TxnMeta, keyA)}}, nil) - verifyPushTxn(<-reqChan, rwTxn2.ID, unrelatedRWTxn.ID) - verifyResolveIntent(<-reqChan, rwTxn1.Key) - case 5: - verifyPushTxn(<-reqChan, rwTxn3.ID, unrelatedRWTxn.ID) - f(&roachpb.WriteIntentError{Intents: []roachpb.Intent{ - roachpb.MakeIntent(&rwTxn1.TxnMeta, keyB)}}, nil) - case 6: - f(nil, &testCases[idx].pusher.TxnMeta) - default: - t.Fatalf("unexpected response %d", idx) - } - } -} - // TestCleanupIntentsAsync verifies that CleanupIntentsAsync either runs // synchronously or returns an error when there are too many concurrently // running tasks. diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 9a219cec6ae..531fa9984fc 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -30,16 +30,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/batcheval" "github.com/cockroachdb/cockroach/pkg/storage/closedts/ctpb" "github.com/cockroachdb/cockroach/pkg/storage/cloud" + "github.com/cockroachdb/cockroach/pkg/storage/concurrency" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/rangefeed" - "github.com/cockroachdb/cockroach/pkg/storage/spanlatch" - "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/split" "github.com/cockroachdb/cockroach/pkg/storage/stateloader" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" - "github.com/cockroachdb/cockroach/pkg/storage/txnwait" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -176,9 +174,8 @@ type Replica struct { // TODO(tschottdorf): Duplicates r.mu.state.desc.RangeID; revisit that. RangeID roachpb.RangeID // Only set by the constructor - store *Store - abortSpan *abortspan.AbortSpan // Avoids anomalous reads after abort - txnWaitQueue *txnwait.Queue // Queues push txn attempts by txn ID + store *Store + abortSpan *abortspan.AbortSpan // Avoids anomalous reads after abort // leaseholderStats tracks all incoming BatchRequests to the replica and which // localities they come from in order to aid in lease rebalancing decisions. @@ -224,11 +221,10 @@ type Replica struct { // Contains the lease history when enabled. leaseHistory *leaseHistory - // Enforces at most one command is running per key(s) within each span - // scope. The globally-scoped component tracks user writes (i.e. all - // keys for which keys.Addr is the identity), the locally-scoped component - // the rest (e.g. RangeDescriptor, transaction record, Lease, ...). - latchMgr spanlatch.Manager + // concMgr sequences incoming requests and provides isolation between + // requests that intend to perform conflicting operations. It is the + // centerpiece of transaction contention handling. + concMgr concurrency.Manager mu struct { // Protects all fields in the mu struct. @@ -664,9 +660,9 @@ func (r *Replica) GetLimiters() *batcheval.Limiters { return &r.store.limiters } -// GetTxnWaitQueue returns the Replica's txnwait.Queue. -func (r *Replica) GetTxnWaitQueue() *txnwait.Queue { - return r.txnWaitQueue +// GetConcurrencyManager returns the Replica's concurrency.Manager. +func (r *Replica) GetConcurrencyManager() concurrency.Manager { + return r.concMgr } // GetTerm returns the term of the given index in the raft log. @@ -927,14 +923,15 @@ func (r *Replica) assertStateLocked(ctx context.Context, reader engine.Reader) { // able to serve traffic or that the request is not compatible with the state of // the Range. // -// The method accepts a spanlatch.Guard and a LeaseStatus parameter. These are +// The method accepts a concurrency Guard and a LeaseStatus parameter. These are // used to indicate whether the caller has acquired latches and checked the // Range lease. The method will only check for a pending merge if both of these -// conditions are true. If either lg == nil or st == nil then the method will -// not check for a pending merge. Callers might be ok with this if they know -// that they will end up checking for a pending merge at some later time. +// conditions are true. If either !g.HoldingLatches() or st == nil then the +// method will not check for a pending merge. Callers might be ok with this if +// they know that they will end up checking for a pending merge at some later +// time. func (r *Replica) checkExecutionCanProceed( - ba *roachpb.BatchRequest, lg *spanlatch.Guard, st *storagepb.LeaseStatus, + ba *roachpb.BatchRequest, g *concurrency.Guard, st *storagepb.LeaseStatus, ) error { rSpan, err := keys.Range(ba.Requests) if err != nil { @@ -949,7 +946,7 @@ func (r *Replica) checkExecutionCanProceed( return err } else if err := r.checkTSAboveGCThresholdRLocked(ba.Timestamp); err != nil { return err - } else if lg != nil && st != nil { + } else if g.HoldingLatches() && st != nil { // Only check for a pending merge if latches are held and the Range // lease is held by this Replica. Without both of these conditions, // checkForPendingMergeRLocked could return false negatives. @@ -1121,7 +1118,7 @@ func (r *Replica) isNewerThanSplitRLocked(split *roachpb.SplitTrigger) bool { // command processing. type endCmds struct { repl *Replica - lg *spanlatch.Guard + g *concurrency.Guard } // move moves the endCmds into the return value, clearing and making @@ -1148,66 +1145,15 @@ func (ec *endCmds) done( // Update the timestamp cache if the request is not being re-evaluated. Each // request is considered in turn; only those marked as affecting the cache are - // processed. Inconsistent reads are excluded. - if ba.ReadConsistency == roachpb.CONSISTENT { - ec.repl.updateTimestampCache(ctx, ba, br, pErr) + // processed. + ec.repl.updateTimestampCache(ctx, ba, br, pErr) + + // Release the latches acquired by the request and exit lock wait-queues. + // Must be done AFTER the timestamp cache is updated. ec.g is only set when + // the Raft proposal has assumed responsibility for the request. + if ec.g != nil { + ec.repl.concMgr.FinishReq(ec.g) } - - // Release the latches acquired by the request back to the spanlatch - // manager. Must be done AFTER the timestamp cache is updated. - if ec.lg != nil { - ec.repl.latchMgr.Release(ec.lg) - } -} - -// beginCmds waits for any in-flight, conflicting commands to complete. More -// specifically, beginCmds acquires latches for the request based on keys -// affected by the batched commands. This gates subsequent commands with -// overlapping keys or key ranges. It returns a cleanup function to be called -// when the commands are done and can release their latches. -func (r *Replica) beginCmds( - ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet, -) (*spanlatch.Guard, error) { - // Only acquire latches for consistent operations. - if ba.ReadConsistency != roachpb.CONSISTENT { - log.Event(ctx, "operation accepts inconsistent results") - return nil, nil - } - - // Don't acquire latches for lease requests. These are run on replicas that - // do not hold the lease, so acquiring latches wouldn't help synchronize - // with other requests. - if ba.IsLeaseRequest() { - return nil, nil - } - - var beforeLatch time.Time - if log.ExpensiveLogEnabled(ctx, 2) { - beforeLatch = timeutil.Now() - } - - // Acquire latches for all the request's declared spans to ensure - // protected access and to avoid interacting requests from operating at - // the same time. The latches will be held for the duration of request. - log.Event(ctx, "acquire latches") - lg, err := r.latchMgr.Acquire(ctx, spans) - if err != nil { - return nil, err - } - - if !beforeLatch.IsZero() { - dur := timeutil.Since(beforeLatch) - log.VEventf(ctx, 2, "waited %s to acquire latches", dur) - } - - if filter := r.store.cfg.TestingKnobs.TestingLatchFilter; filter != nil { - if pErr := filter(ctx, *ba); pErr != nil { - r.latchMgr.Release(lg) - return nil, pErr.GoError() - } - } - - return lg, nil } // maybeWatchForMerge checks whether a merge of this replica into its left diff --git a/pkg/storage/replica_eval_context_span.go b/pkg/storage/replica_eval_context_span.go index d91e7f3cc07..33b77b0fcbe 100644 --- a/pkg/storage/replica_eval_context_span.go +++ b/pkg/storage/replica_eval_context_span.go @@ -20,11 +20,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/abortspan" "github.com/cockroachdb/cockroach/pkg/storage/batcheval" "github.com/cockroachdb/cockroach/pkg/storage/cloud" + "github.com/cockroachdb/cockroach/pkg/storage/concurrency" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" - "github.com/cockroachdb/cockroach/pkg/storage/txnwait" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) @@ -74,9 +74,9 @@ func (rec *SpanSetReplicaEvalContext) DB() *client.DB { return rec.i.DB() } -// GetTxnWaitQueue returns the txnwait.Queue. -func (rec *SpanSetReplicaEvalContext) GetTxnWaitQueue() *txnwait.Queue { - return rec.i.GetTxnWaitQueue() +// GetConcurrencyManager returns the concurrency.Manager. +func (rec *SpanSetReplicaEvalContext) GetConcurrencyManager() concurrency.Manager { + return rec.i.GetConcurrencyManager() } // NodeID returns the NodeID. diff --git a/pkg/storage/replica_init.go b/pkg/storage/replica_init.go index 20bf5df4d31..8e2d15a261b 100644 --- a/pkg/storage/replica_init.go +++ b/pkg/storage/replica_init.go @@ -18,11 +18,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/storage/abortspan" - "github.com/cockroachdb/cockroach/pkg/storage/spanlatch" + "github.com/cockroachdb/cockroach/pkg/storage/concurrency" "github.com/cockroachdb/cockroach/pkg/storage/split" "github.com/cockroachdb/cockroach/pkg/storage/stateloader" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" - "github.com/cockroachdb/cockroach/pkg/storage/txnwait" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -70,15 +69,20 @@ func newUnloadedReplica( RangeID: desc.RangeID, store: store, abortSpan: abortspan.New(desc.RangeID), + concMgr: concurrency.NewManager(concurrency.Config{ + NodeDesc: store.nodeDesc, + RangeDesc: desc, + Settings: store.ClusterSettings(), + DB: store.DB(), + Clock: store.Clock(), + Stopper: store.Stopper(), + IntentResolver: store.intentResolver, + TxnWaitMetrics: store.txnWaitMetrics, + SlowLatchGauge: store.metrics.SlowLatchRequests, + DisableTxnPushing: store.TestingKnobs().DontPushOnWriteIntentError, + TxnWaitKnobs: store.TestingKnobs().TxnWaitKnobs, + }), } - r.txnWaitQueue = txnwait.NewQueue(txnwait.Config{ - RangeDesc: desc, - DB: store.DB(), - Clock: store.Clock(), - Stopper: store.Stopper(), - Metrics: store.txnWaitMetrics, - Knobs: store.TestingKnobs().TxnWaitKnobs, - }) r.mu.pendingLeaseRequest = makePendingLeaseRequest(r) r.mu.stateLoader = stateloader.Make(desc.RangeID) r.mu.quiescent = true @@ -87,7 +91,6 @@ func newUnloadedReplica( split.Init(&r.loadBasedSplitter, rand.Intn, func() float64 { return float64(SplitByLoadQPSThreshold.Get(&store.cfg.Settings.SV)) }) - r.latchMgr = spanlatch.Make(r.store.stopper, r.store.metrics.SlowLatchRequests) r.mu.proposals = map[storagebase.CmdIDKey]*ProposalData{} r.mu.checksums = map[uuid.UUID]ReplicaChecksum{} r.mu.proposalBuf.Init((*replicaProposer)(r)) @@ -307,6 +310,6 @@ func (r *Replica) setDescLockedRaftMuLocked(ctx context.Context, desc *roachpb.R r.rangeStr.store(r.mu.replicaID, desc) r.connectionClass.set(rpc.ConnectionClassForKey(desc.StartKey)) - r.txnWaitQueue.OnRangeDescUpdated(desc) + r.concMgr.OnRangeDescUpdated(desc) r.mu.state.Desc = desc } diff --git a/pkg/storage/replica_init_test.go b/pkg/storage/replica_init_test.go index 9c45dff8a02..4fe3012b9f0 100644 --- a/pkg/storage/replica_init_test.go +++ b/pkg/storage/replica_init_test.go @@ -63,7 +63,7 @@ func TestReplicaUpdateLastReplicaAdded(t *testing.T) { r.mu.state.Desc = &c.oldDesc r.mu.lastReplicaAdded = c.lastReplicaAdded r.store = tc.store - r.txnWaitQueue = tc.repl.txnWaitQueue + r.concMgr = tc.repl.concMgr r.setDescRaftMuLocked(context.Background(), &c.newDesc) if c.expectedLastReplicaAdded != r.mu.lastReplicaAdded { t.Fatalf("expected %d, but found %d", diff --git a/pkg/storage/replica_metrics.go b/pkg/storage/replica_metrics.go index 21d21452d61..88c10a03b92 100644 --- a/pkg/storage/replica_metrics.go +++ b/pkg/storage/replica_metrics.go @@ -64,7 +64,7 @@ func (r *Replica) Metrics( _, ticking := r.store.unquiescedReplicas.m[r.RangeID] r.store.unquiescedReplicas.Unlock() - latchInfoGlobal, latchInfoLocal := r.latchMgr.Info() + latchInfoGlobal, latchInfoLocal := r.concMgr.LatchMetrics() return calcReplicaMetrics( ctx, diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index 608c4fb3d7f..aaed9128508 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -87,7 +87,7 @@ type ProposalData struct { tmpFooter storagepb.RaftCommandFooter // ec.done is called after command application to update the timestamp - // cache and release latches. + // cache and optionally release latches and exits lock wait-queues. ec endCmds // applied is set when the a command finishes application. It is used to @@ -413,12 +413,6 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, pe } } - if leaseChangingHands && !iAmTheLeaseHolder { - // Also clear and disable the push transaction queue. Any waiters - // must be redirected to the new lease holder. - r.txnWaitQueue.Clear(true /* disable */) - } - // If we're the current raft leader, may want to transfer the leadership to // the new leaseholder. Note that this condition is also checked periodically // when ticking the replica. @@ -439,6 +433,9 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, pe } } + // Inform the concurrency manager that the lease holder has been updated. + r.concMgr.OnRangeLeaseUpdated(iAmTheLeaseHolder) + // Potentially re-gossip if the range contains system data (e.g. system // config or node liveness). We need to perform this gossip at startup as // soon as possible. Trying to minimize how often we gossip is a fool's @@ -453,8 +450,6 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, pe if err := r.MaybeGossipNodeLiveness(ctx, keys.NodeLivenessSpan); err != nil { log.Error(ctx, err) } - // Make sure the push transaction queue is enabled. - r.txnWaitQueue.Enable() // Emit an MLAI on the leaseholder replica, as follower will be looking // for one and if we went on to quiesce, they wouldn't necessarily get @@ -610,18 +605,22 @@ func (r *Replica) handleReadWriteLocalEvalResult(ctx context.Context, lResult re } if lResult.WrittenIntents != nil { - // TODO(nvanbenschoten): handle WrittenIntents. + for i := range lResult.WrittenIntents { + r.concMgr.OnLockAcquired(ctx, &lResult.WrittenIntents[i]) + } lResult.WrittenIntents = nil } if lResult.ResolvedIntents != nil { - // TODO(nvanbenschoten): handle ResolvedIntents. + for i := range lResult.ResolvedIntents { + r.concMgr.OnLockUpdated(ctx, &lResult.ResolvedIntents[i]) + } lResult.ResolvedIntents = nil } if lResult.UpdatedTxns != nil { for _, txn := range lResult.UpdatedTxns { - r.txnWaitQueue.UpdateTxn(ctx, txn) + r.concMgr.OnTransactionUpdated(ctx, txn) } lResult.UpdatedTxns = nil } diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 1e7bc7701d2..00e96638a1d 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -20,8 +20,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/apply" + "github.com/cockroachdb/cockroach/pkg/storage/concurrency" "github.com/cockroachdb/cockroach/pkg/storage/engine" - "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/stateloader" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" @@ -64,25 +64,14 @@ func makeIDKey() storagebase.CmdIDKey { // - any error obtained during the creation or proposal of the command, in // which case the other returned values are zero. func (r *Replica) evalAndPropose( - ctx context.Context, - lease *roachpb.Lease, - ba *roachpb.BatchRequest, - spans *spanset.SpanSet, - ec endCmds, -) (_ chan proposalResult, _ func(), _ int64, pErr *roachpb.Error) { + ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard, lease *roachpb.Lease, +) (chan proposalResult, func(), int64, *concurrency.Guard, *roachpb.Error) { idKey := makeIDKey() - proposal, pErr := r.requestToProposal(ctx, idKey, ba, spans) + proposal, pErr := r.requestToProposal(ctx, idKey, ba, g.LatchSpans()) log.Event(proposal.ctx, "evaluated request") - // Attach the endCmds to the proposal. This moves responsibility of - // releasing latches to "below Raft" machinery. However, we make sure - // we clean up this resource if the proposal doesn't make it to Raft. - proposal.ec = ec.move() - defer func() { - if pErr != nil { - proposal.ec.done(ctx, ba, nil /* br */, pErr) - } - }() + // Attach the endCmds to the proposal. + proposal.ec = endCmds{repl: r} // Pull out proposal channel to return. proposal.doneCh may be set to // nil if it is signaled in this function. @@ -106,7 +95,7 @@ func (r *Replica) evalAndPropose( EndTxns: endTxns, } proposal.finishApplication(ctx, pr) - return proposalCh, func() {}, 0, nil + return proposalCh, func() {}, 0, g, nil } // If the request requested that Raft consensus be performed asynchronously, @@ -117,7 +106,7 @@ func (r *Replica) evalAndPropose( // Disallow async consensus for commands with EndTxnIntents because // any !Always EndTxnIntent can't be cleaned up until after the // command succeeds. - return nil, nil, 0, roachpb.NewErrorf("cannot perform consensus asynchronously for "+ + return nil, nil, 0, g, roachpb.NewErrorf("cannot perform consensus asynchronously for "+ "proposal with EndTxnIntents=%v; %v", ets, ba) } @@ -137,6 +126,10 @@ func (r *Replica) evalAndPropose( // Continue with proposal... } + // Assume responsibility for releasing the concurrency guard + // if the proposal makes it to Raft. + proposal.ec.g = g + // Attach information about the proposer to the command. proposal.command.ProposerLeaseSequence = lease.Sequence @@ -154,14 +147,14 @@ func (r *Replica) evalAndPropose( // behavior. quotaSize := uint64(proposal.command.Size()) if maxSize := uint64(MaxCommandSize.Get(&r.store.cfg.Settings.SV)); quotaSize > maxSize { - return nil, nil, 0, roachpb.NewError(errors.Errorf( + return nil, nil, 0, g, roachpb.NewError(errors.Errorf( "command is too large: %d bytes (max: %d)", quotaSize, maxSize, )) } var err error proposal.quotaAlloc, err = r.maybeAcquireProposalQuota(ctx, quotaSize) if err != nil { - return nil, nil, 0, roachpb.NewError(err) + return nil, nil, 0, g, roachpb.NewError(err) } // Make sure we clean up the proposal if we fail to insert it into the // proposal buffer successfully. This ensures that we always release any @@ -180,13 +173,13 @@ func (r *Replica) evalAndPropose( Req: *ba, } if pErr := filter(filterArgs); pErr != nil { - return nil, nil, 0, pErr + return nil, nil, 0, g, pErr } } maxLeaseIndex, pErr := r.propose(ctx, proposal) if pErr != nil { - return nil, nil, 0, pErr + return nil, nil, 0, g, pErr } // Abandoning a proposal unbinds its context so that the proposal's client // is free to terminate execution. However, it does nothing to try to @@ -208,7 +201,7 @@ func (r *Replica) evalAndPropose( // We'd need to make sure the span is finished eventually. proposal.ctx = r.AnnotateCtx(context.TODO()) } - return proposalCh, abandon, maxLeaseIndex, nil + return proposalCh, abandon, maxLeaseIndex, nil, nil } // propose encodes a command, starts tracking it, and proposes it to raft. The diff --git a/pkg/storage/replica_read.go b/pkg/storage/replica_read.go index 6b8d4b037fb..918d5f10e2e 100644 --- a/pkg/storage/replica_read.go +++ b/pkg/storage/replica_read.go @@ -15,7 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" - "github.com/cockroachdb/cockroach/pkg/storage/spanlatch" + "github.com/cockroachdb/cockroach/pkg/storage/concurrency" "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" @@ -29,22 +29,15 @@ import ( // iterator to evaluate the batch and then updates the timestamp cache to // reflect the key spans that it read. func (r *Replica) executeReadOnlyBatch( - ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet, lg *spanlatch.Guard, -) (br *roachpb.BatchResponse, pErr *roachpb.Error) { - // Guarantee we release the provided latches. This is wrapped to delay pErr - // evaluation to its value when returning. - ec := endCmds{repl: r, lg: lg} - defer func() { - ec.done(ctx, ba, br, pErr) - }() - + ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard, +) (br *roachpb.BatchResponse, _ *concurrency.Guard, pErr *roachpb.Error) { // If the read is not inconsistent, the read requires the range lease or // permission to serve via follower reads. var status storagepb.LeaseStatus if ba.ReadConsistency.RequiresReadLease() { if status, pErr = r.redirectOnOrAcquireLease(ctx); pErr != nil { if nErr := r.canServeFollowerRead(ctx, ba, pErr); nErr != nil { - return nil, nErr + return nil, g, nErr } r.store.metrics.FollowerReadsCount.Inc(1) } @@ -56,12 +49,12 @@ func (r *Replica) executeReadOnlyBatch( defer r.readOnlyCmdMu.RUnlock() // Verify that the batch can be executed. - if err := r.checkExecutionCanProceed(ba, ec.lg, &status); err != nil { - return nil, roachpb.NewError(err) + if err := r.checkExecutionCanProceed(ba, g, &status); err != nil { + return nil, g, roachpb.NewError(err) } // Evaluate read-only batch command. - var result result.Result + spans := g.LatchSpans() rec := NewReplicaEvalContext(r, spans) // TODO(irfansharif): It's unfortunate that in this read-only code path, @@ -72,17 +65,28 @@ func (r *Replica) executeReadOnlyBatch( rw = spanset.NewReadWriterAt(rw, spans, ba.Timestamp) } defer rw.Close() + + // TODO(nvanbenschoten): once all replicated intents are pulled into the + // concurrency manager's lock-table, we can be sure that if we reached this + // point, we will not conflict with any of them during evaluation. This in + // turn means that we can bump the timestamp cache *before* evaluation + // without risk of starving writes. Once we start doing that, we're free to + // release latches immediately after we acquire an engine iterator as long + // as we're performing a non-locking read. + + var result result.Result br, result, pErr = evaluateBatch(ctx, storagebase.CmdIDKey(""), rw, rec, nil, ba, true /* readOnly */) if err := r.handleReadOnlyLocalEvalResult(ctx, ba, result.Local); err != nil { pErr = roachpb.NewError(err) } + r.updateTimestampCache(ctx, ba, br, pErr) if pErr != nil { log.VErrEvent(ctx, 3, pErr.String()) } else { log.Event(ctx, "read completed") } - return br, pErr + return br, g, pErr } func (r *Replica) handleReadOnlyLocalEvalResult( @@ -105,6 +109,14 @@ func (r *Replica) handleReadOnlyLocalEvalResult( lResult.MaybeWatchForMerge = false } + if lResult.WrittenIntents != nil { + // These will all be unreplicated locks. + for i := range lResult.WrittenIntents { + r.concMgr.OnLockAcquired(ctx, &lResult.WrittenIntents[i]) + } + lResult.WrittenIntents = nil + } + if intents := lResult.DetachEncounteredIntents(); len(intents) > 0 { log.Eventf(ctx, "submitting %d intents to asynchronous processing", len(intents)) // We only allow synchronous intent resolution for consistent requests. diff --git a/pkg/storage/replica_send.go b/pkg/storage/replica_send.go index 7c9bd186910..96d8fbe78ef 100644 --- a/pkg/storage/replica_send.go +++ b/pkg/storage/replica_send.go @@ -16,8 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/batcheval" - "github.com/cockroachdb/cockroach/pkg/storage/intentresolver" - "github.com/cockroachdb/cockroach/pkg/storage/spanlatch" + "github.com/cockroachdb/cockroach/pkg/storage/concurrency" "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" "github.com/cockroachdb/cockroach/pkg/storage/txnwait" @@ -121,10 +120,26 @@ func (r *Replica) sendWithRangeID( // batchExecutionFn is a method on Replica that is able to execute a // BatchRequest. It is called with the batch, along with the span bounds that // the batch will operate over and a guard for the latches protecting the span -// bounds. The function must ensure that the latch guard is eventually released. +// bounds. +// +// The function will return either a batch response or an error. The function +// also has the option to pass ownership of the concurrency guard back to the +// caller. However, it does not need to. Instead, it can assume responsibility +// for releasing the concurrency guard it was provided by returning nil. This is +// useful is cases where the function: +// 1. eagerly released the concurrency guard after it determined that isolation +// from conflicting requests was no longer needed. +// 2. is continuing to execute asynchronously and needs to maintain isolation +// from conflicting requests throughout the lifetime of its asynchronous +// processing. The most prominent example of asynchronous processing is +// with requests that have the "async consensus" flag set. A more subtle +// case is with requests that are acknowledged by the Raft machinery after +// their Raft entry has been committed but before it has been applied to +// the replicated state machine. In all of these cases, responsibility +// for releasing the concurrency guard is handed to Raft. type batchExecutionFn func( - *Replica, context.Context, *roachpb.BatchRequest, *spanset.SpanSet, *spanlatch.Guard, -) (*roachpb.BatchResponse, *roachpb.Error) + *Replica, context.Context, *roachpb.BatchRequest, *concurrency.Guard, +) (*roachpb.BatchResponse, *concurrency.Guard, *roachpb.Error) var _ batchExecutionFn = (*Replica).executeWriteBatch var _ batchExecutionFn = (*Replica).executeReadOnlyBatch @@ -147,76 +162,91 @@ func (r *Replica) executeBatchWithConcurrencyRetries( ctx context.Context, ba *roachpb.BatchRequest, fn batchExecutionFn, ) (br *roachpb.BatchResponse, pErr *roachpb.Error) { // Determine the maximal set of key spans that the batch will operate on. - spans, err := r.collectSpans(ba) + latchSpans, lockSpans, err := r.collectSpans(ba) if err != nil { return nil, roachpb.NewError(err) } // Handle load-based splitting. - r.recordBatchForLoadBasedSplitting(ctx, ba, spans) + r.recordBatchForLoadBasedSplitting(ctx, ba, latchSpans) - // TODO(nvanbenschoten): Clean this up once it's pulled inside the - // concurrency manager. - var cleanup intentresolver.CleanupFunc + // Try to execute command; exit retry loop on success. + var g *concurrency.Guard defer func() { - if cleanup != nil { - // This request wrote an intent only if there was no error, the - // request is transactional, the transaction is not yet finalized, - // and the request wasn't read-only. - if pErr == nil && ba.Txn != nil && !br.Txn.Status.IsFinalized() && !ba.IsReadOnly() { - cleanup(nil, &br.Txn.TxnMeta) - } else { - cleanup(nil, nil) - } + // NB: wrapped to delay g evaluation to its value when returning. + if g != nil { + r.concMgr.FinishReq(g) } }() - - // Try to execute command; exit retry loop on success. for { // Exit loop if context has been canceled or timed out. if err := ctx.Err(); err != nil { return nil, roachpb.NewError(errors.Wrap(err, "aborted during Replica.Send")) } - // If necessary, the request may need to wait in the txn wait queue, - // pending updates to the target transaction for either PushTxn or - // QueryTxn requests. - // TODO(nvanbenschoten): Push this into the concurrency package. - br, pErr = r.maybeWaitForPushee(ctx, ba) - if br != nil || pErr != nil { - return br, pErr + // Acquire latches to prevent overlapping requests from executing until + // this request completes. After latching, wait on any conflicting locks + // to ensure that the request has full isolation during evaluation. This + // returns a request guard that must be eventually released. + var resp []roachpb.ResponseUnion + g, resp, pErr = r.concMgr.SequenceReq(ctx, g, concurrency.Request{ + Txn: ba.Txn, + Timestamp: ba.Timestamp, + Priority: ba.UserPriority, + ReadConsistency: ba.ReadConsistency, + Requests: ba.Requests, + LatchSpans: latchSpans, + LockSpans: lockSpans, + }) + if pErr != nil { + return nil, pErr + } else if resp != nil { + br = new(roachpb.BatchResponse) + br.Responses = resp + return br, nil } - // Acquire latches to prevent overlapping commands from executing until - // this command completes. - // TODO(nvanbenschoten): Replace this with a call into the upcoming - // concurrency package when it is introduced. - lg, err := r.beginCmds(ctx, ba, spans) - if err != nil { - return nil, roachpb.NewError(err) + if filter := r.store.cfg.TestingKnobs.TestingLatchFilter; filter != nil { + if pErr := filter(ctx, *ba); pErr != nil { + return nil, pErr + } } - br, pErr = fn(r, ctx, ba, spans, lg) + br, g, pErr = fn(r, ctx, ba, g) switch t := pErr.GetDetail().(type) { case nil: // Success. return br, nil case *roachpb.WriteIntentError: - if cleanup, pErr = r.handleWriteIntentError(ctx, ba, pErr, t, cleanup); pErr != nil { + // Drop latches, but retain lock wait-queues. + g.AssertLatches() + if g, pErr = r.handleWriteIntentError(ctx, ba, g, pErr, t); pErr != nil { return nil, pErr } // Retry... case *roachpb.TransactionPushError: - if pErr = r.handleTransactionPushError(ctx, ba, pErr, t); pErr != nil { + // Drop latches, but retain lock wait-queues. + g.AssertLatches() + if g, pErr = r.handleTransactionPushError(ctx, ba, g, pErr, t); pErr != nil { return nil, pErr } // Retry... case *roachpb.IndeterminateCommitError: + // Drop latches and lock wait-queues. + g.AssertLatches() + r.concMgr.FinishReq(g) + g = nil + // Then launch a task to handle the indeterminate commit error. if pErr = r.handleIndeterminateCommitError(ctx, ba, pErr, t); pErr != nil { return nil, pErr } // Retry... case *roachpb.MergeInProgressError: + // Drop latches and lock wait-queues. + g.AssertLatches() + r.concMgr.FinishReq(g) + g = nil + // Then listen for the merge to complete. if pErr = r.handleMergeInProgressError(ctx, ba, pErr, t); pErr != nil { return nil, pErr } @@ -231,73 +261,24 @@ func (r *Replica) executeBatchWithConcurrencyRetries( func (r *Replica) handleWriteIntentError( ctx context.Context, ba *roachpb.BatchRequest, + g *concurrency.Guard, pErr *roachpb.Error, t *roachpb.WriteIntentError, - cleanup intentresolver.CleanupFunc, -) (intentresolver.CleanupFunc, *roachpb.Error) { +) (*concurrency.Guard, *roachpb.Error) { if r.store.cfg.TestingKnobs.DontPushOnWriteIntentError { - return cleanup, pErr - } - - // Process and resolve write intent error. - var pushType roachpb.PushTxnType - if ba.IsWrite() { - pushType = roachpb.PUSH_ABORT - } else { - pushType = roachpb.PUSH_TIMESTAMP - } - - index := pErr.Index - // Make a copy of the header for the upcoming push; we will update the - // timestamp. - h := ba.Header - if h.Txn != nil { - // We must push at least to h.Timestamp, but in fact we want to - // go all the way up to a timestamp which was taken off the HLC - // after our operation started. This allows us to not have to - // restart for uncertainty as we come back and read. - obsTS, ok := h.Txn.GetObservedTimestamp(ba.Replica.NodeID) - if !ok { - // This was set earlier in this method, so it's - // completely unexpected to not be found now. - log.Fatalf(ctx, "missing observed timestamp: %+v", h.Txn) - } - h.Timestamp.Forward(obsTS) - // We are going to hand the header (and thus the transaction proto) - // to the RPC framework, after which it must not be changed (since - // that could race). Since the subsequent execution of the original - // request might mutate the transaction, make a copy here. - // - // See #9130. - h.Txn = h.Txn.Clone() - } - - // Handle the case where we get more than one write intent error; - // we need to cleanup the previous attempt to handle it to allow - // any other pusher queued up behind this RPC to proceed. - if cleanup != nil { - cleanup(t, nil) - } - cleanup, pErr = r.store.intentResolver.ProcessWriteIntentError(ctx, pErr, h, pushType) - if pErr != nil { - // Do not propagate ambiguous results; assume success and retry original op. - if _, ok := pErr.GetDetail().(*roachpb.AmbiguousResultError); ok { - return cleanup, nil - } - // Propagate new error. Preserve the error index. - pErr.Index = index - return cleanup, pErr + return g, pErr } - // We've resolved the write intent; retry command. - return cleanup, nil + // g's latches will be dropped, but it retains its spot in lock wait-queues. + return r.concMgr.HandleWriterIntentError(ctx, g, t), nil } func (r *Replica) handleTransactionPushError( ctx context.Context, ba *roachpb.BatchRequest, + g *concurrency.Guard, pErr *roachpb.Error, t *roachpb.TransactionPushError, -) *roachpb.Error { +) (*concurrency.Guard, *roachpb.Error) { // On a transaction push error, retry immediately if doing so will enqueue // into the txnWaitQueue in order to await further updates to the unpushed // txn's status. We check ShouldPushImmediately to avoid retrying @@ -308,11 +289,11 @@ func (r *Replica) handleTransactionPushError( dontRetry = txnwait.ShouldPushImmediately(pushReq) } if dontRetry { - return pErr + return g, pErr } - // Enqueue unsuccessfully pushed transaction on the txnWaitQueue and retry. - r.txnWaitQueue.EnqueueTxn(&t.PusheeTxn) - return nil + // g's latches will be dropped, but it retains its spot in lock wait-queues + // (though a PushTxn shouldn't be in any lock wait-queues). + return r.concMgr.HandleTransactionPushError(ctx, g, t), nil } func (r *Replica) handleIndeterminateCommitError( @@ -506,8 +487,10 @@ func (r *Replica) checkBatchRequest(ba *roachpb.BatchRequest, isReadOnly bool) e return nil } -func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, error) { - var latchSpans, lockSpans spanset.SpanSet +func (r *Replica) collectSpans( + ba *roachpb.BatchRequest, +) (latchSpans, lockSpans *spanset.SpanSet, _ error) { + latchSpans, lockSpans = new(spanset.SpanSet), new(spanset.SpanSet) // TODO(bdarnell): need to make this less global when local // latches are used more heavily. For example, a split will // have a large read-only span but also a write (see #10084). @@ -537,31 +520,29 @@ func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, erro // This is still safe as we're only ever writing at timestamps higher than the // timestamp any write latch would be declared at. desc := r.Desc() - batcheval.DeclareKeysForBatch(desc, ba.Header, &latchSpans) + batcheval.DeclareKeysForBatch(desc, ba.Header, latchSpans) for _, union := range ba.Requests { inner := union.GetInner() if cmd, ok := batcheval.LookupCommand(inner.Method()); ok { - cmd.DeclareKeys(desc, ba.Header, inner, &latchSpans, &lockSpans) + cmd.DeclareKeys(desc, ba.Header, inner, latchSpans, lockSpans) } else { - return nil, errors.Errorf("unrecognized command %s", inner.Method()) + return nil, nil, errors.Errorf("unrecognized command %s", inner.Method()) } } // Commands may create a large number of duplicate spans. De-duplicate // them to reduce the number of spans we pass to the spanlatch manager. - for _, s := range [...]*spanset.SpanSet{&latchSpans, &lockSpans} { + for _, s := range [...]*spanset.SpanSet{latchSpans, lockSpans} { s.SortAndDedup() // If any command gave us spans that are invalid, bail out early // (before passing them to the spanlatch manager, which may panic). if err := s.Validate(); err != nil { - return nil, err + return nil, nil, err } } - // NB: not used yet. - _ = lockSpans - return &latchSpans, nil + return latchSpans, lockSpans, nil } // limitTxnMaxTimestamp limits the batch transaction's max timestamp diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 553d481e2a3..2a7d6d042ee 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -40,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/apply" "github.com/cockroachdb/cockroach/pkg/storage/batcheval" "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/storage/concurrency" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/intentresolver" @@ -90,6 +91,15 @@ var allSpans = func() spanset.SpanSet { return ss }() +// allSpansGuard is a concurrency guard that indicates that it provides +// isolation across all key spans for use in tests that don't care about +// properly declaring their spans or sequencing with the concurrency manager. +var allSpansGuard = concurrency.Guard{ + Req: concurrency.Request{ + LatchSpans: &allSpans, + }, +} + func testRangeDescriptor() *roachpb.RangeDescriptor { return &roachpb.RangeDescriptor{ RangeID: 1, @@ -592,7 +602,7 @@ func sendLeaseRequest(r *Replica, l *roachpb.Lease) error { ba.Timestamp = r.store.Clock().Now() ba.Add(&roachpb.RequestLeaseRequest{Lease: *l}) exLease, _ := r.GetLease() - ch, _, _, pErr := r.evalAndPropose(context.TODO(), &exLease, &ba, &allSpans, endCmds{}) + ch, _, _, _, pErr := r.evalAndPropose(context.TODO(), &ba, &allSpansGuard, &exLease) if pErr == nil { // Next if the command was committed, wait for the range to apply it. // TODO(bdarnell): refactor this to a more conventional error-handling pattern. @@ -1373,7 +1383,7 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) { ba := roachpb.BatchRequest{} ba.Timestamp = tc.repl.store.Clock().Now() ba.Add(&roachpb.RequestLeaseRequest{Lease: *lease}) - ch, _, _, pErr := tc.repl.evalAndPropose(context.Background(), &exLease, &ba, &allSpans, endCmds{}) + ch, _, _, _, pErr := tc.repl.evalAndPropose(context.Background(), &ba, &allSpansGuard, &exLease) if pErr == nil { // Next if the command was committed, wait for the range to apply it. // TODO(bdarnell): refactor to a more conventional error-handling pattern. @@ -7317,7 +7327,7 @@ func TestReplicaAbandonProposal(t *testing.T) { } // The request should still be holding its latches. - latchInfoGlobal, _ := tc.repl.latchMgr.Info() + latchInfoGlobal, _ := tc.repl.concMgr.LatchMetrics() if w := latchInfoGlobal.WriteCount; w == 0 { t.Fatal("expected non-empty latch manager") } @@ -7328,7 +7338,7 @@ func TestReplicaAbandonProposal(t *testing.T) { // Even though we canceled the command it will still get executed and its // latches cleaned up. testutils.SucceedsSoon(t, func() error { - latchInfoGlobal, _ := tc.repl.latchMgr.Info() + latchInfoGlobal, _ := tc.repl.concMgr.LatchMetrics() if w := latchInfoGlobal.WriteCount; w != 0 { return errors.Errorf("expected empty latch manager") } @@ -7633,7 +7643,7 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) { Key: roachpb.Key(fmt.Sprintf("k%d", i)), }, }) - ch, _, idx, err := repl.evalAndPropose(ctx, &lease, &ba, &allSpans, endCmds{}) + ch, _, idx, _, err := repl.evalAndPropose(ctx, &ba, &allSpansGuard, &lease) if err != nil { t.Fatal(err) } @@ -7702,7 +7712,7 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { Key: roachpb.Key(fmt.Sprintf("k%d", i)), }, }) - ch, _, idx, err := tc.repl.evalAndPropose(ctx, &lease, &ba, &allSpans, endCmds{}) + ch, _, idx, _, err := tc.repl.evalAndPropose(ctx, &ba, &allSpansGuard, &lease) if err != nil { t.Fatal(err) } @@ -9020,9 +9030,7 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) { } exLease, _ := repl.GetLease() - ch, _, _, pErr := repl.evalAndPropose( - context.Background(), &exLease, &ba, &allSpans, endCmds{}, - ) + ch, _, _, _, pErr := repl.evalAndPropose(context.Background(), &ba, &allSpansGuard, &exLease) if pErr != nil { t.Fatal(pErr) } @@ -9067,9 +9075,7 @@ func TestProposeWithAsyncConsensus(t *testing.T) { atomic.StoreInt32(&filterActive, 1) exLease, _ := repl.GetLease() - ch, _, _, pErr := repl.evalAndPropose( - context.Background(), &exLease, &ba, &allSpans, endCmds{}, - ) + ch, _, _, _, pErr := repl.evalAndPropose(context.Background(), &ba, &allSpansGuard, &exLease) if pErr != nil { t.Fatal(pErr) } @@ -9132,7 +9138,7 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) { atomic.StoreInt32(&filterActive, 1) exLease, _ := repl.GetLease() - _, _, _, pErr := repl.evalAndPropose(ctx, &exLease, &ba, &allSpans, endCmds{}) + _, _, _, _, pErr := repl.evalAndPropose(ctx, &ba, &allSpansGuard, &exLease) if pErr != nil { t.Fatal(pErr) } @@ -9150,7 +9156,7 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) { ba2.Timestamp = tc.Clock().Now() var pErr *roachpb.Error - ch, _, _, pErr = repl.evalAndPropose(ctx, &exLease, &ba, &allSpans, endCmds{}) + ch, _, _, _, pErr = repl.evalAndPropose(ctx, &ba, &allSpansGuard, &exLease) if pErr != nil { t.Fatal(pErr) } @@ -11820,7 +11826,7 @@ func TestProposalNotAcknowledgedOrReproposedAfterApplication(t *testing.T) { // the proposal map. Entries are only removed from that map underneath raft. tc.repl.RaftLock() tracedCtx, cleanup := tracing.EnsureContext(ctx, cfg.AmbientCtx.Tracer, "replica send") - ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &lease, &ba, &allSpans, endCmds{}) + ch, _, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, &allSpansGuard, &lease) if pErr != nil { t.Fatal(pErr) } @@ -11916,7 +11922,7 @@ func TestLaterReproposalsDoNotReuseContext(t *testing.T) { // Go out of our way to enable recording so that expensive logging is enabled // for this context. tracing.StartRecording(sp, tracing.SingleNodeRecording) - ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &lease, &ba, &allSpans, endCmds{}) + ch, _, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, &allSpansGuard, &lease) if pErr != nil { t.Fatal(pErr) } diff --git a/pkg/storage/replica_tscache.go b/pkg/storage/replica_tscache.go index 69f35a6d583..ab0bd4adf29 100644 --- a/pkg/storage/replica_tscache.go +++ b/pkg/storage/replica_tscache.go @@ -43,6 +43,10 @@ func setTimestampCacheLowWaterMark( func (r *Replica) updateTimestampCache( ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse, pErr *roachpb.Error, ) { + if ba.ReadConsistency != roachpb.CONSISTENT { + // Inconsistent reads are excluded from the timestamp cache. + return + } addToTSCache := r.store.tsCache.Add if util.RaceEnabled { addToTSCache = checkedTSCacheUpdate(r.store.Clock().Now(), r.store.tsCache, ba, br, pErr) diff --git a/pkg/storage/replica_write.go b/pkg/storage/replica_write.go index 17b4d97ed94..9dcec60cdbd 100644 --- a/pkg/storage/replica_write.go +++ b/pkg/storage/replica_write.go @@ -18,9 +18,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/batcheval" "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/closedts/ctpb" + "github.com/cockroachdb/cockroach/pkg/storage/concurrency" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" - "github.com/cockroachdb/cockroach/pkg/storage/spanlatch" "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" @@ -40,9 +40,6 @@ import ( // // Concretely, // -// - Latches for the keys affected by the command are acquired (i.e. -// tracked as in-flight mutations). -// - In doing so, we wait until no overlapping mutations are in flight. // - The timestamp cache is checked to determine if the command's affected keys // were accessed with a timestamp exceeding that of the command; if so, the // command's timestamp is incremented accordingly. @@ -56,25 +53,17 @@ import ( // registered with the timestamp cache, its latches are released, and // its result (which could be an error) is returned to the client. // -// Returns exactly one of a response, an error or re-evaluation reason. +// Returns either a response or an error, along with the provided concurrency +// guard if it is passing ownership back to the caller of the function. // // NB: changing BatchRequest to a pointer here would have to be done cautiously // as this method makes the assumption that it operates on a shallow copy (see // call to applyTimestampCache). func (r *Replica) executeWriteBatch( - ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet, lg *spanlatch.Guard, -) (br *roachpb.BatchResponse, pErr *roachpb.Error) { + ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard, +) (br *roachpb.BatchResponse, _ *concurrency.Guard, pErr *roachpb.Error) { startTime := timeutil.Now() - // Guarantee we release the provided latches if we never make it to - // passing responsibility to evalAndPropose. This is wrapped to delay - // pErr evaluation to its value when returning. - ec := endCmds{repl: r, lg: lg} - defer func() { - // No-op if we move ec into evalAndPropose. - ec.done(ctx, ba, br, pErr) - }() - // Determine the lease under which to evaluate the write. var lease roachpb.Lease var status storagepb.LeaseStatus @@ -85,7 +74,7 @@ func (r *Replica) executeWriteBatch( // Other write commands require that this replica has the range // lease. if status, pErr = r.redirectOnOrAcquireLease(ctx); pErr != nil { - return nil, pErr + return nil, g, pErr } lease = status.Lease } @@ -96,8 +85,8 @@ func (r *Replica) executeWriteBatch( // at proposal time, not at application time, because the spanlatch manager // will synchronize all requests (notably EndTxn with SplitTrigger) that may // cause this condition to change. - if err := r.checkExecutionCanProceed(ba, ec.lg, &status); err != nil { - return nil, roachpb.NewError(err) + if err := r.checkExecutionCanProceed(ba, g, &status); err != nil { + return nil, g, roachpb.NewError(err) } minTS, untrack := r.store.cfg.ClosedTimestamp.Tracker.Track(ctx) @@ -129,12 +118,13 @@ func (r *Replica) executeWriteBatch( // Checking the context just before proposing can help avoid ambiguous errors. if err := ctx.Err(); err != nil { log.VEventf(ctx, 2, "%s before proposing: %s", err, ba.Summary()) - return nil, roachpb.NewError(errors.Wrap(err, "aborted before proposing")) + return nil, g, roachpb.NewError(errors.Wrap(err, "aborted before proposing")) } - // After the command is proposed to Raft, invoking endCmds.done is the - // responsibility of Raft, so move the endCmds into evalAndPropose. - ch, abandon, maxLeaseIndex, pErr := r.evalAndPropose(ctx, &lease, ba, spans, ec.move()) + // If the command is proposed to Raft, ownership of and responsibility for + // the concurrency guard will be assumed by Raft, so provide the guard to + // evalAndPropose. + ch, abandon, maxLeaseIndex, g, pErr := r.evalAndPropose(ctx, ba, g, &lease) if pErr != nil { if maxLeaseIndex != 0 { log.Fatalf( @@ -142,7 +132,7 @@ func (r *Replica) executeWriteBatch( maxLeaseIndex, ba, pErr, ) } - return nil, pErr + return nil, g, pErr } // A max lease index of zero is returned when no proposal was made or a lease was proposed. // In both cases, we don't need to communicate a MLAI. Furthermore, for lease proposals we @@ -202,7 +192,7 @@ func (r *Replica) executeWriteBatch( log.Warning(ctx, err) } } - return propResult.Reply, propResult.Err + return propResult.Reply, g, propResult.Err case <-slowTimer.C: slowTimer.Read = true r.store.metrics.SlowRaftRequests.Inc(1) @@ -228,14 +218,14 @@ and the following Raft status: %+v`, abandon() log.VEventf(ctx, 2, "context cancellation after %0.1fs of attempting command %s", timeutil.Since(startTime).Seconds(), ba) - return nil, roachpb.NewError(roachpb.NewAmbiguousResultError(ctx.Err().Error())) + return nil, g, roachpb.NewError(roachpb.NewAmbiguousResultError(ctx.Err().Error())) case <-shouldQuiesce: // If shutting down, return an AmbiguousResultError, which indicates // to the caller that the command may have executed. abandon() log.VEventf(ctx, 2, "shutdown cancellation after %0.1fs of attempting command %s", timeutil.Since(startTime).Seconds(), ba) - return nil, roachpb.NewError(roachpb.NewAmbiguousResultError("server shutdown")) + return nil, g, roachpb.NewError(roachpb.NewAmbiguousResultError("server shutdown")) } } } @@ -431,6 +421,24 @@ func (r *Replica) evaluate1PC( } } + // Even though the transaction is 1PC and hasn't written any intents, it may + // have acquired unreplicated locks, so inform the concurrency manager that + // it is finalized and than any unreplicated locks that it has acquired can + // be released. + res.Local.UpdatedTxns = []*roachpb.Transaction{clonedTxn} + // TODO(nvanbenschoten): do something like the following once unreplicated + // lock spans are added to EndTxn request. + // res.Local.ResolvedIntents = make([]roachpb.LockUpdate, len(etArg.IntentSpans)) + // for i, sp := range etArg.IntentSpans { + // res.Local.ResolvedIntents[i] = roachpb.LockUpdate{ + // Span: sp, + // Txn: clonedTxn.TxnMeta, + // Status: clonedTxn.Status, + // IgnoredSeqNums: clonedTxn.IgnoredSeqNums, + // Durability: lock.Unreplicated, + // } + // } + // Add placeholder responses for end transaction requests. br.Add(&roachpb.EndTxnResponse{OnePhaseCommit: true}) br.Txn = clonedTxn diff --git a/pkg/storage/store_merge.go b/pkg/storage/store_merge.go index 5a75dc86dc1..0a5ad37d630 100644 --- a/pkg/storage/store_merge.go +++ b/pkg/storage/store_merge.go @@ -85,9 +85,9 @@ func (s *Store) MergeRange( leftRepl.writeStats.resetRequestCounts() } - // Clear the wait queue to redirect the queued transactions to the - // left-hand replica, if necessary. - rightRepl.txnWaitQueue.Clear(true /* disable */) + // Clear the concurrency manager's lock and txn wait-queues to redirect the + // queued transactions to the left-hand replica, if necessary. + rightRepl.concMgr.OnRangeMerge() leftLease, _ := leftRepl.GetLease() rightLease, _ := rightRepl.GetLease() diff --git a/pkg/storage/store_send.go b/pkg/storage/store_send.go index ce36e965dbe..0b0d7f9b7b0 100644 --- a/pkg/storage/store_send.go +++ b/pkg/storage/store_send.go @@ -234,39 +234,3 @@ func (s *Store) Send( } return nil, pErr } - -// maybeWaitForPushee potentially diverts the incoming request to -// the txnwait.Queue, where it will wait for updates to the target -// transaction. -// TODO(nvanbenschoten): Move this method. -func (r *Replica) maybeWaitForPushee( - ctx context.Context, ba *roachpb.BatchRequest, -) (*roachpb.BatchResponse, *roachpb.Error) { - // If this is a push txn request, check the push queue first, which - // may cause this request to wait and either return a successful push - // txn response or else allow this request to proceed. - if ba.IsSinglePushTxnRequest() { - if r.store.cfg.TestingKnobs.DontRetryPushTxnFailures { - return nil, nil - } - pushReq := ba.Requests[0].GetInner().(*roachpb.PushTxnRequest) - pushResp, pErr := r.txnWaitQueue.MaybeWaitForPush(ctx, pushReq) - if pErr != nil { - return nil, pErr - } else if pushResp != nil { - br := &roachpb.BatchResponse{} - br.Add(pushResp) - return br, nil - } - } else if ba.IsSingleQueryTxnRequest() { - // For query txn requests, wait in the txn wait queue either for - // transaction update or for dependent transactions to change. - queryReq := ba.Requests[0].GetInner().(*roachpb.QueryTxnRequest) - pErr := r.txnWaitQueue.MaybeWaitForQuery(ctx, queryReq) - if pErr != nil { - return nil, pErr - } - } - - return nil, nil -} diff --git a/pkg/storage/store_split.go b/pkg/storage/store_split.go index d57a03fcc31..3c163788d4b 100644 --- a/pkg/storage/store_split.go +++ b/pkg/storage/store_split.go @@ -170,7 +170,7 @@ func prepareRightReplicaForSplit( // The right hand side of the split was already created (and its raftMu // acquired) in Replica.acquireSplitLock. It must be present here. rightRng, err := r.store.GetReplica(split.RightDesc.RangeID) - // If the RHS replica at the point of the split was known to be removed when + // If the RHS replica at the point of the split was known to be removed // during the application of the split then we may not find it here. That's // fine, carry on. See also: _, _ = r.acquireSplitLock, splitPostApply @@ -277,11 +277,11 @@ func (s *Store) SplitRange( leftRepl.setDescRaftMuLocked(ctx, newLeftDesc) - // Clear the LHS txn wait queue, to redirect to the RHS if - // appropriate. We do this after setDescWithoutProcessUpdate - // to ensure that no pre-split commands are inserted into the - // txnWaitQueue after we clear it. - leftRepl.txnWaitQueue.Clear(false /* disable */) + // Clear the LHS lock and txn wait-queues, to redirect to the RHS if + // appropriate. We do this after setDescWithoutProcessUpdate to ensure + // that no pre-split commands are inserted into the wait-queues after we + // clear them. + leftRepl.concMgr.OnRangeSplit() // The rangefeed processor will no longer be provided logical ops for // its entire range, so it needs to be shut down and all registrations diff --git a/pkg/storage/txn_wait_queue_test.go b/pkg/storage/txn_wait_queue_test.go index 5a4e5b12a1b..2011fc72aec 100644 --- a/pkg/storage/txn_wait_queue_test.go +++ b/pkg/storage/txn_wait_queue_test.go @@ -82,7 +82,7 @@ func TestTxnWaitQueueEnableDisable(t *testing.T) { } // Queue starts enabled. - q := tc.repl.txnWaitQueue + q := tc.repl.concMgr.TxnWaitQueue() if !q.IsEnabled() { t.Errorf("expected push txn queue is enabled") } @@ -185,7 +185,7 @@ func TestTxnWaitQueueCancel(t *testing.T) { PusheeTxn: txn.TxnMeta, } - q := tc.repl.txnWaitQueue + q := tc.repl.concMgr.TxnWaitQueue() q.Enable() if err := checkAllGaugesZero(tc); err != nil { t.Fatal(err.Error()) @@ -251,7 +251,7 @@ func TestTxnWaitQueueUpdateTxn(t *testing.T) { req2 := req1 req2.PusherTxn = *pusher2 - q := tc.repl.txnWaitQueue + q := tc.repl.concMgr.TxnWaitQueue() q.Enable() q.EnqueueTxn(txn) m := tc.store.txnWaitMetrics @@ -369,7 +369,7 @@ func TestTxnWaitQueueTxnSilentlyCompletes(t *testing.T) { PusheeTxn: txn.TxnMeta, } - q := tc.repl.txnWaitQueue + q := tc.repl.concMgr.TxnWaitQueue() q.Enable() q.EnqueueTxn(txn) @@ -444,7 +444,7 @@ func TestTxnWaitQueueUpdateNotPushedTxn(t *testing.T) { PusheeTxn: txn.TxnMeta, } - q := tc.repl.txnWaitQueue + q := tc.repl.concMgr.TxnWaitQueue() q.Enable() q.EnqueueTxn(txn) @@ -519,7 +519,7 @@ func TestTxnWaitQueuePusheeExpires(t *testing.T) { t.Fatal(err) } - q := tc.repl.txnWaitQueue + q := tc.repl.concMgr.TxnWaitQueue() q.Enable() q.EnqueueTxn(txn) @@ -607,7 +607,7 @@ func TestTxnWaitQueuePusherUpdate(t *testing.T) { PusheeTxn: txn.TxnMeta, } - q := tc.repl.txnWaitQueue + q := tc.repl.concMgr.TxnWaitQueue() q.Enable() q.EnqueueTxn(txn) @@ -718,7 +718,7 @@ func TestTxnWaitQueueDependencyCycle(t *testing.T) { PusheeTxn: txnA.TxnMeta, } - q := tc.repl.txnWaitQueue + q := tc.repl.concMgr.TxnWaitQueue() q.Enable() ctx, cancel := context.WithCancel(context.Background()) @@ -809,7 +809,7 @@ func TestTxnWaitQueueDependencyCycleWithPriorityInversion(t *testing.T) { PusheeTxn: updatedTxnA.TxnMeta, } - q := tc.repl.txnWaitQueue + q := tc.repl.concMgr.TxnWaitQueue() q.Enable() for _, txn := range []*roachpb.Transaction{txnA, txnB} {