diff --git a/pkg/base/config.go b/pkg/base/config.go index 0dedf6e7c21f..129b771fc47f 100644 --- a/pkg/base/config.go +++ b/pkg/base/config.go @@ -57,9 +57,9 @@ const ( // leader lease active duration should be of the raft election timeout. defaultRangeLeaseRaftElectionTimeoutMultiplier = 3 - // defaultHeartbeatInterval is the default value of HeartbeatInterval used - // by the rpc context. - defaultHeartbeatInterval = 3 * time.Second + // defaultRPCHeartbeatInterval is the default value of RPCHeartbeatInterval + // used by the rpc context. + defaultRPCHeartbeatInterval = 3 * time.Second // rangeLeaseRenewalFraction specifies what fraction the range lease // renewal duration should be of the range lease active time. For example, @@ -181,10 +181,10 @@ type Config struct { // See the comment in server.Config for more details. HistogramWindowInterval time.Duration - // HeartbeatInterval controls how often a Ping request is sent on peer + // RPCHeartbeatInterval controls how often a Ping request is sent on peer // connections to determine connection health and update the local view // of remote clocks. - HeartbeatInterval time.Duration + RPCHeartbeatInterval time.Duration } func wrapError(err error) error { @@ -207,7 +207,7 @@ func (cfg *Config) InitDefaults() { cfg.HTTPAddr = defaultHTTPAddr cfg.SSLCertsDir = DefaultCertsDirectory cfg.certificateManager = lazyCertificateManager{} - cfg.HeartbeatInterval = defaultHeartbeatInterval + cfg.RPCHeartbeatInterval = defaultRPCHeartbeatInterval } // HTTPRequestScheme returns "http" or "https" based on the value of Insecure. diff --git a/pkg/base/constants.go b/pkg/base/constants.go index af1963db4faa..4ad1bc7947ce 100644 --- a/pkg/base/constants.go +++ b/pkg/base/constants.go @@ -19,12 +19,12 @@ const ( // for more on this setting. DefaultMaxClockOffset = 500 * time.Millisecond - // DefaultHeartbeatInterval is how often heartbeats are sent from the + // DefaultTxnHeartbeatInterval is how often heartbeats are sent from the // transaction coordinator to a live transaction. These keep it from // being preempted by other transactions writing the same keys. If a - // transaction fails to be heartbeat within 2x the heartbeat interval, + // transaction fails to be heartbeat within 5x the heartbeat interval, // it may be aborted by conflicting txns. - DefaultHeartbeatInterval = 1 * time.Second + DefaultTxnHeartbeatInterval = 1 * time.Second // SlowRequestThreshold is the amount of time to wait before considering a // request to be "slow". diff --git a/pkg/cmd/roachtest/kv.go b/pkg/cmd/roachtest/kv.go index 84ab90fd1115..7cd32b7544f8 100644 --- a/pkg/cmd/roachtest/kv.go +++ b/pkg/cmd/roachtest/kv.go @@ -152,12 +152,19 @@ func registerKV(r *testRegistry) { func registerKVContention(r *testRegistry) { const nodes = 4 r.Add(testSpec{ + Skip: "https://github.com/cockroachdb/cockroach/issues/36089", Name: fmt.Sprintf("kv/contention/nodes=%d", nodes), Cluster: makeClusterSpec(nodes + 1), Run: func(ctx context.Context, t *test, c *cluster) { c.Put(ctx, cockroach, "./cockroach", c.Range(1, nodes)) c.Put(ctx, workload, "./workload", c.Node(nodes+1)) - c.Start(ctx, t, c.Range(1, nodes)) + + // Start the cluster with an extremely high txn liveness threshold. + // If requests ever get stuck on a transaction that was abandoned + // then it will take 10m for them to get unstuck, at which point the + // QPS threshold check in the test is guaranteed to fail. + args := startArgs("--env=COCKROACH_TXN_LIVENESS_HEARTBEAT_MULTIPLIER=600") + c.Start(ctx, t, args, c.Range(1, nodes)) // Enable request tracing, which is a good tool for understanding // how different transactions are interacting. diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 1cf20fe55833..1f2d623b9919 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -265,7 +265,7 @@ func (r *Registry) LoadJobWithTxn(ctx context.Context, jobID int64, txn *client. // DefaultCancelInterval is a reasonable interval at which to poll this node // for liveness failures and cancel running jobs. -var DefaultCancelInterval = base.DefaultHeartbeatInterval +var DefaultCancelInterval = base.DefaultTxnHeartbeatInterval // DefaultAdoptInterval is a reasonable interval at which to poll system.jobs // for jobs with expired leases. diff --git a/pkg/kv/txn_coord_sender.go b/pkg/kv/txn_coord_sender.go index 00d8822d877a..837a58e52acb 100644 --- a/pkg/kv/txn_coord_sender.go +++ b/pkg/kv/txn_coord_sender.go @@ -455,7 +455,7 @@ func NewTxnCoordSenderFactory( tcf.st = cluster.MakeTestingClusterSettings() } if tcf.heartbeatInterval == 0 { - tcf.heartbeatInterval = base.DefaultHeartbeatInterval + tcf.heartbeatInterval = base.DefaultTxnHeartbeatInterval } if tcf.metrics == (TxnMetrics{}) { tcf.metrics = MakeTxnMetrics(metric.TestSampleInterval) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index f1cd05c42e64..0a9bad33fcec 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -437,7 +437,7 @@ func NewContext( var cancel context.CancelFunc ctx.masterCtx, cancel = context.WithCancel(ambient.AnnotateCtx(context.Background())) ctx.Stopper = stopper - ctx.heartbeatInterval = baseCtx.HeartbeatInterval + ctx.heartbeatInterval = baseCtx.RPCHeartbeatInterval ctx.RemoteClocks = newRemoteClockMonitor( ctx.LocalClock, 10*ctx.heartbeatInterval, baseCtx.HistogramWindowInterval) ctx.heartbeatTimeout = 2 * ctx.heartbeatInterval diff --git a/pkg/rpc/nodedialer/nodedialer_test.go b/pkg/rpc/nodedialer/nodedialer_test.go index eaebc2ca937c..5f67cc16ac0d 100644 --- a/pkg/rpc/nodedialer/nodedialer_test.go +++ b/pkg/rpc/nodedialer/nodedialer_test.go @@ -232,7 +232,7 @@ func newTestServer( func newTestContext(clock *hlc.Clock, stopper *stop.Stopper) *rpc.Context { cfg := testutils.NewNodeTestBaseContext() cfg.Insecure = true - cfg.HeartbeatInterval = 10 * time.Millisecond + cfg.RPCHeartbeatInterval = 10 * time.Millisecond rctx := rpc.NewContext( log.AmbientContext{Tracer: tracing.NewTracer()}, cfg, diff --git a/pkg/storage/batcheval/cmd_scan.go b/pkg/storage/batcheval/cmd_scan.go index ffc28fe6f00e..9ccfe6f254b8 100644 --- a/pkg/storage/batcheval/cmd_scan.go +++ b/pkg/storage/batcheval/cmd_scan.go @@ -80,5 +80,4 @@ func Scan( reply.IntentRows, err = CollectIntentRows(ctx, batch, cArgs, intents) } return result.FromIntents(intents, args), err - } diff --git a/pkg/storage/client_merge_test.go b/pkg/storage/client_merge_test.go index 3a4a2baabf8c..959b07c5654c 100644 --- a/pkg/storage/client_merge_test.go +++ b/pkg/storage/client_merge_test.go @@ -1072,8 +1072,7 @@ func TestStoreRangeMergeInFlightTxns(t *testing.T) { var cancel func() ctx, cancel = context.WithTimeout(ctx, testutils.DefaultSucceedsSoonDuration) defer cancel() - defer func(old time.Duration) { txnwait.TxnLivenessThreshold = old }(txnwait.TxnLivenessThreshold) - txnwait.TxnLivenessThreshold = 2 * testutils.DefaultSucceedsSoonDuration + defer txnwait.TestingOverrideTxnLivenessThreshold(2 * testutils.DefaultSucceedsSoonDuration) // Create a transaction that won't complete until after the merge. txn1 := client.NewTxn(ctx, store.DB(), 0 /* gatewayNodeID */, client.RootTxn) diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index 061aea9d9a05..0950195831af 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -485,7 +485,7 @@ func TestFailedReplicaChange(t *testing.T) { // The first failed replica change has laid down intents. Make sure those // are pushable by making the transaction abandoned. - mtc.manualClock.Increment(10 * base.DefaultHeartbeatInterval.Nanoseconds()) + mtc.manualClock.Increment(10 * base.DefaultTxnHeartbeatInterval.Nanoseconds()) if _, err := repl.ChangeReplicas( context.Background(), diff --git a/pkg/storage/intentresolver/contention_queue.go b/pkg/storage/intentresolver/contention_queue.go index ebe0b90e8ff9..bee1170919ea 100644 --- a/pkg/storage/intentresolver/contention_queue.go +++ b/pkg/storage/intentresolver/contention_queue.go @@ -138,25 +138,26 @@ func txnID(txn *roachpb.Transaction) string { // single intent (len(wiErr.Intents) == 1). // // Returns a cleanup function to be invoked by the caller after the -// original request completes, a possibly updated WriteIntentError and +// original request completes, a possibly updated WriteIntentError, // a bool indicating whether the intent resolver should regard the // original push / resolve as no longer applicable and skip those // steps to retry the original request that generated the -// WriteIntentError. The cleanup function takes two arguments, a -// newWIErr, non-nil in case the re-executed request experienced +// WriteIntentError, and an error if one was encountered while +// waiting in the queue. The cleanup function takes two arguments, +// a newWIErr, non-nil in case the re-executed request experienced // another write intent error and could not complete; and // newIntentTxn, nil if the re-executed request left no intent, and // non-nil if it did. At most one of these two arguments should be // provided. func (cq *contentionQueue) add( ctx context.Context, wiErr *roachpb.WriteIntentError, h roachpb.Header, -) (CleanupFunc, *roachpb.WriteIntentError, bool) { +) (CleanupFunc, *roachpb.WriteIntentError, bool, *roachpb.Error) { if len(wiErr.Intents) != 1 { log.Fatalf(ctx, "write intent error must contain only a single intent: %s", wiErr) } if hasExtremePriority(h) { // Never queue maximum or minimum priority transactions. - return nil, wiErr, false + return nil, wiErr, false, nil } intent := wiErr.Intents[0] key := string(intent.Span.Key) @@ -194,6 +195,7 @@ func (cq *contentionQueue) add( // Wait on prior pusher, if applicable. var done bool + var pErr *roachpb.Error if waitCh != nil { var detectCh chan struct{} var detectReady <-chan time.Time @@ -237,7 +239,7 @@ func (cq *contentionQueue) add( case <-ctx.Done(): // The pusher's context is done. Return without pushing. - done = true + pErr = roachpb.NewError(ctx.Err()) break Loop case <-detectReady: @@ -271,8 +273,8 @@ func (cq *contentionQueue) add( }) log.VEventf(ctx, 3, "%s pushing %s to detect dependency cycles", txnID(curPusher.txn), pusheeTxn.ID.Short()) if err := cq.db.Run(ctx, b); err != nil { - log.VErrEventf(ctx, 2, "while waiting in push contention queue to push %s: %s", pusheeTxn.ID.Short(), b.MustPErr()) - done = true // done=true to avoid uselessly trying to push and resolve + pErr = b.MustPErr() + log.VErrEventf(ctx, 2, "while waiting in push contention queue to push %s: %s", pusheeTxn.ID.Short(), pErr) break Loop } // Note that this pusher may have aborted the pushee, but it @@ -347,7 +349,7 @@ func (cq *contentionQueue) add( curPusher.waitCh <- newIntentTxn close(curPusher.waitCh) } - }, wiErr, done + }, wiErr, done, pErr } func hasExtremePriority(h roachpb.Header) bool { diff --git a/pkg/storage/intentresolver/intent_resolver.go b/pkg/storage/intentresolver/intent_resolver.go index 5bf2c367b1a4..a296ba128455 100644 --- a/pkg/storage/intentresolver/intent_resolver.go +++ b/pkg/storage/intentresolver/intent_resolver.go @@ -245,11 +245,13 @@ func (ir *IntentResolver) ProcessWriteIntentError( var cleanup func(*roachpb.WriteIntentError, *enginepb.TxnMeta) if len(wiErr.Intents) == 1 && len(wiErr.Intents[0].Span.EndKey) == 0 { var done bool + var pErr *roachpb.Error // Note that the write intent error may be mutated here in the event // that this pusher is queued to wait for a different transaction // instead. - if cleanup, wiErr, done = ir.contentionQ.add(ctx, wiErr, h); done { - return cleanup, nil + cleanup, wiErr, done, pErr = ir.contentionQ.add(ctx, wiErr, h) + if done || pErr != nil { + return cleanup, pErr } } diff --git a/pkg/storage/intentresolver/intent_resolver_test.go b/pkg/storage/intentresolver/intent_resolver_test.go index d6281cd899b0..52b8146f50a5 100644 --- a/pkg/storage/intentresolver/intent_resolver_test.go +++ b/pkg/storage/intentresolver/intent_resolver_test.go @@ -408,7 +408,7 @@ func TestContendedIntent(t *testing.T) { {pusher: roTxn2, expTxns: []*roachpb.Transaction{roTxn1, roTxn2}}, {pusher: roTxn3, expTxns: []*roachpb.Transaction{roTxn1, roTxn2, roTxn3}}, // The fourth txn will be canceled before its predecessor is cleaned up to - // excersize the cancellation code path. + // exercise the cancellation code path. {pusher: roTxn4, expTxns: []*roachpb.Transaction{roTxn1, roTxn2, roTxn3, roTxn4}}, // Now, verify that a writing txn is inserted at the end of the queue. {pusher: rwTxn1, expTxns: []*roachpb.Transaction{roTxn1, roTxn2, roTxn3, roTxn4, rwTxn1}}, @@ -417,7 +417,12 @@ func TestContendedIntent(t *testing.T) { {pusher: rwTxn3, expTxns: []*roachpb.Transaction{roTxn1, roTxn2, roTxn3, roTxn4, rwTxn1, rwTxn2, rwTxn3}}, } var wg sync.WaitGroup - cleanupFuncs := make(chan CleanupFunc) + type intentResolverResp struct { + idx int + fn CleanupFunc + pErr *roachpb.Error + } + resps := make(chan intentResolverResp, 1) for i, tc := range testCases { testCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -429,14 +434,11 @@ func TestContendedIntent(t *testing.T) { }}} h := roachpb.Header{Txn: tc.pusher} wg.Add(1) - go func() { + go func(idx int) { + defer wg.Done() cleanupFunc, pErr := ir.ProcessWriteIntentError(testCtx, roachpb.NewError(wiErr), nil, h, roachpb.PUSH_ABORT) - if pErr != nil { - t.Errorf("unexpected error from ProcessWriteIntentError: %v", pErr) - } - cleanupFuncs <- cleanupFunc - wg.Done() - }() + resps <- intentResolverResp{idx: idx, fn: cleanupFunc, pErr: pErr} + }(i) testutils.SucceedsSoon(t, func() error { if lc, let := ir.NumContended(keyA), len(tc.expTxns); lc != let { return errors.Errorf("expected len %d; got %d", let, lc) @@ -461,11 +463,34 @@ func TestContendedIntent(t *testing.T) { } // Wait until all of the WriteIntentErrors have been processed to stop - // processing the cleanupFuncs. - go func() { wg.Wait(); close(cleanupFuncs) }() - i := 0 - for f := range cleanupFuncs { - switch i { + // processing the resps. + go func() { wg.Wait(); close(resps) }() + expIdx := 0 + for resp := range resps { + // Check index. + idx := resp.idx + if idx != expIdx { + t.Errorf("expected response from request %d, found %d", expIdx, idx) + } + expIdx++ + + // Check error. + pErr := resp.pErr + switch idx { + case 3: + // Expected to be canceled. + if !testutils.IsPError(pErr, context.Canceled.Error()) { + t.Errorf("expected context canceled error; got %v", pErr) + } + default: + if pErr != nil { + t.Errorf("unexpected error from ProcessWriteIntentError: %v", pErr) + } + } + + // Call cleanup function. + f := resp.fn + switch idx { // The read only transactions should be cleaned up with nil, nil. case 0: // There should be a push of orig and then a resolve of intent. @@ -497,13 +522,17 @@ func TestContendedIntent(t *testing.T) { verifyResolveIntent(<-reqChan, keyA) }) } - fallthrough + f(nil, nil) case 1, 2, 3: // The remaining roTxns should not do anything upon cleanup. - f(nil, nil) - if i == 1 { + if idx == 2 { + // Cancel request 3 before request 2 cleans up. Wait for + // it to return an error before continuing. testCases[3].cancelFunc() + cf3 := <-resps + resps <- cf3 } + f(nil, nil) case 4: // Call the CleanupFunc with a new WriteIntentError with a different // transaction. This should lean to a new push on the new transaction and @@ -521,9 +550,10 @@ func TestContendedIntent(t *testing.T) { Txn: rwTxn1.TxnMeta, }}}, nil) case 6: - f(nil, &testCases[i].pusher.TxnMeta) + f(nil, &testCases[idx].pusher.TxnMeta) + default: + t.Fatalf("unexpected response %d", idx) } - i++ } } diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index bac303278ff4..c1d749e9ce00 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -791,13 +791,16 @@ func (r *Replica) handleLocalEvalResult(ctx context.Context, lResult result.Loca // Non-state updates and actions. // ====================== - // The caller is required to detach and handle intents. + // The caller is required to detach and handle the following three fields. if lResult.Intents != nil { log.Fatalf(ctx, "LocalEvalResult.Intents should be nil: %+v", lResult.Intents) } if lResult.EndTxns != nil { log.Fatalf(ctx, "LocalEvalResult.EndTxns should be nil: %+v", lResult.EndTxns) } + if lResult.MaybeWatchForMerge { + log.Fatalf(ctx, "LocalEvalResult.MaybeWatchForMerge should be false") + } if lResult.GossipFirstRange { // We need to run the gossip in an async task because gossiping requires diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 99d9c9b9a230..6bdb797be59c 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -5043,7 +5043,7 @@ func TestPushTxnHeartbeatTimeout(t *testing.T) { const indetCommitError = "txn in indeterminate STAGING state" m := int64(txnwait.TxnLivenessHeartbeatMultiplier) - ns := base.DefaultHeartbeatInterval.Nanoseconds() + ns := base.DefaultTxnHeartbeatInterval.Nanoseconds() testCases := []struct { status roachpb.TransactionStatus // -1 for no record heartbeatOffset int64 // nanoseconds from original timestamp, 0 for no heartbeat diff --git a/pkg/storage/store.go b/pkg/storage/store.go index ff779a6ba451..d6d68a76b65a 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -74,8 +74,8 @@ import ( const ( // rangeIDAllocCount is the number of Range IDs to allocate per allocation. - rangeIDAllocCount = 10 - defaultHeartbeatIntervalTicks = 5 + rangeIDAllocCount = 10 + defaultRaftHeartbeatIntervalTicks = 5 // defaultRaftEntryCacheSize is the default size in bytes for a // store's Raft log entry cache. @@ -748,7 +748,7 @@ func (sc *StoreConfig) SetDefaults() { sc.CoalescedHeartbeatsInterval = sc.RaftTickInterval / 2 } if sc.RaftHeartbeatIntervalTicks == 0 { - sc.RaftHeartbeatIntervalTicks = defaultHeartbeatIntervalTicks + sc.RaftHeartbeatIntervalTicks = defaultRaftHeartbeatIntervalTicks } if sc.RaftEntryCacheSize == 0 { sc.RaftEntryCacheSize = defaultRaftEntryCacheSize diff --git a/pkg/storage/txnwait/txnqueue.go b/pkg/storage/txnwait/queue.go similarity index 93% rename from pkg/storage/txnwait/txnqueue.go rename to pkg/storage/txnwait/queue.go index caaaee9820f2..0ab061189397 100644 --- a/pkg/storage/txnwait/txnqueue.go +++ b/pkg/storage/txnwait/queue.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -33,12 +34,26 @@ const maxWaitForQueryTxn = 50 * time.Millisecond // TxnLivenessHeartbeatMultiplier specifies what multiple the transaction // liveness threshold should be of the transaction heartbeat internval. -const TxnLivenessHeartbeatMultiplier = 5 +var TxnLivenessHeartbeatMultiplier = envutil.EnvOrDefaultInt( + "COCKROACH_TXN_LIVENESS_HEARTBEAT_MULTIPLIER", 5) // TxnLivenessThreshold is the maximum duration between transaction heartbeats // before the transaction is considered expired by Queue. It is exposed and // mutable to allow tests to override it. -var TxnLivenessThreshold = TxnLivenessHeartbeatMultiplier * base.DefaultHeartbeatInterval +// +// Use TestingOverrideTxnLivenessThreshold to override the value in tests. +var TxnLivenessThreshold = time.Duration(TxnLivenessHeartbeatMultiplier) * base.DefaultTxnHeartbeatInterval + +// TestingOverrideTxnLivenessThreshold allows tests to override the transaction +// liveness threshold. The function returns a closure that should be called to +// reset the value. +func TestingOverrideTxnLivenessThreshold(t time.Duration) func() { + old := TxnLivenessThreshold + TxnLivenessThreshold = t + return func() { + TxnLivenessThreshold = old + } +} // ShouldPushImmediately returns whether the PushTxn request should // proceed without queueing. This is true for pushes which are neither @@ -552,6 +567,29 @@ func (q *Queue) MaybeWaitForPush( pending.txn.Store(updatedPushee) if updatedPushee.Status.IsFinalized() { log.VEvent(ctx, 2, "push request is satisfied") + if updatedPushee.Status == roachpb.ABORTED { + // Inform any other waiting pushers that the transaction is now + // finalized. Intuitively we would expect that if any pusher was + // stuck waiting for the transaction to be finalized then it would + // have heard about the update when the transaction record moved + // into its finalized state. This is correct for cases where a + // command explicitly wrote the transaction record with a finalized + // status. + // + // However, this does not account for the case where a transaction + // becomes uncommittable due a loss of resolution in the store's + // timestamp cache. In that case, a transaction may suddenly become + // uncommittable without an associated write to its record. When + // this happens, no one else will immediately inform the other + // pushers about the uncommittable transaction. Eventually the + // pushee's coordinator will come along and roll back its record, + // but that's only if the pushee isn't itself waiting on the result + // of one of the pushers here. If there is such a dependency cycle + // then the other pushers may have to wait for up to the transaction + // expiration to query the pushee again and notice that the pushee + // is now uncommittable. + q.UpdateTxn(ctx, updatedPushee) + } return createPushTxnResponse(updatedPushee), nil } if IsExpired(q.store.Clock().Now(), updatedPushee) { diff --git a/pkg/storage/txnwait/queue_test.go b/pkg/storage/txnwait/queue_test.go index 8dfa46e8f9e5..a2bb286a9306 100644 --- a/pkg/storage/txnwait/queue_test.go +++ b/pkg/storage/txnwait/queue_test.go @@ -12,14 +12,19 @@ package txnwait import ( "context" + "sync" + "sync/atomic" "testing" "time" + "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/stretchr/testify/require" ) func TestShouldPushImmediately(t *testing.T) { @@ -125,16 +130,38 @@ func TestIsPushed(t *testing.T) { } } +// mockRepl implements the ReplicaInterface interface. type mockRepl struct{} func (mockRepl) ContainsKey(_ roachpb.Key) bool { return true } +// mockStore implements the StoreInterface interface. type mockStore struct { - StoreInterface + manual *hlc.ManualClock + clock *hlc.Clock + stopper *stop.Stopper + db *client.DB metrics *Metrics } -func (s mockStore) GetTxnWaitMetrics() *Metrics { return s.metrics } +func newMockStore(s client.SenderFunc) StoreInterface { + var ms mockStore + ms.manual = hlc.NewManualClock(123) + ms.clock = hlc.NewClock(ms.manual.UnixNano, time.Nanosecond) + ms.stopper = stop.NewStopper() + ms.metrics = NewMetrics(time.Minute) + if s != nil { + factory := client.NonTransactionalFactoryFunc(s) + ms.db = client.NewDB(testutils.MakeAmbientCtx(), factory, ms.clock) + } + return ms +} + +func (s mockStore) Clock() *hlc.Clock { return s.clock } +func (s mockStore) Stopper() *stop.Stopper { return s.stopper } +func (s mockStore) DB() *client.DB { return s.db } +func (s mockStore) GetTxnWaitKnobs() TestingKnobs { return TestingKnobs{} } +func (s mockStore) GetTxnWaitMetrics() *Metrics { return s.metrics } // TestMaybeWaitForQueryWithContextCancellation adds a new waiting query to the // queue and cancels its context. It then verifies that the query was cleaned @@ -142,8 +169,9 @@ func (s mockStore) GetTxnWaitMetrics() *Metrics { return s.metrics } // leak. func TestMaybeWaitForQueryWithContextCancellation(t *testing.T) { defer leaktest.AfterTest(t)() - metrics := NewMetrics(time.Minute) - q := NewQueue(mockStore{metrics: metrics}) + ms := newMockStore(nil) + defer ms.Stopper().Stop(context.Background()) + q := NewQueue(ms) q.Enable() ctx, cancel := context.WithCancel(context.Background()) @@ -161,6 +189,7 @@ func TestMaybeWaitForQueryWithContextCancellation(t *testing.T) { t.Errorf("expected no waiting queries, found %v", q.mu.queries) } + metrics := ms.GetTxnWaitMetrics() allMetricsAreZero := metrics.PusheeWaiting.Value() == 0 && metrics.PusherWaiting.Value() == 0 && metrics.QueryWaiting.Value() == 0 && @@ -170,3 +199,63 @@ func TestMaybeWaitForQueryWithContextCancellation(t *testing.T) { t.Errorf("expected all metric gauges to be zero, got some that aren't") } } + +// TestPushersReleasedAfterAnyQueryTxnFindsAbortedTxn tests that if any +// QueryTxn on a pushee txn returns an aborted transaction status, all +// pushees of that transaction are informed of the aborted status and +// released. +func TestPushersReleasedAfterAnyQueryTxnFindsAbortedTxn(t *testing.T) { + defer leaktest.AfterTest(t)() + var mockSender client.SenderFunc + ms := newMockStore(func( + ctx context.Context, ba roachpb.BatchRequest, + ) (*roachpb.BatchResponse, *roachpb.Error) { + return mockSender(ctx, ba) + }) + defer ms.Stopper().Stop(context.Background()) + q := NewQueue(ms) + q.Enable() + + // Set an extremely high transaction liveness threshold so that the pushee + // is only queried once per pusher. + defer TestingOverrideTxnLivenessThreshold(time.Hour)() + + // Enqueue pushee transaction in the queue. + txn := roachpb.MakeTransaction("test", nil, 0, ms.Clock().Now(), 0) + q.Enqueue(&txn) + + const numPushees = 3 + var queryTxnCount int32 + mockSender = func( + ctx context.Context, ba roachpb.BatchRequest, + ) (*roachpb.BatchResponse, *roachpb.Error) { + br := ba.CreateReply() + resp := br.Responses[0].GetInner().(*roachpb.QueryTxnResponse) + resp.QueriedTxn = txn + if atomic.AddInt32(&queryTxnCount, 1) == numPushees { + // Only the last pusher's query observes an ABORTED transaction. As + // mentioned in the corresponding comment in MaybeWaitForPush, this + // isn't expected without an associated update to the pushee's + // transaction record. However, it is possible if the pushee hasn't + // written a transaction record yet and the timestamp cache loses + // resolution due to memory pressure. While rare, we need to handle + // this case correctly. + resp.QueriedTxn.Status = roachpb.ABORTED + } + return br, nil + } + var wg sync.WaitGroup + for i := 0; i < numPushees; i++ { + wg.Add(1) + go func() { + defer wg.Done() + ctx := context.Background() + req := roachpb.PushTxnRequest{PusheeTxn: txn.TxnMeta, PushType: roachpb.PUSH_ABORT} + res, err := q.MaybeWaitForPush(ctx, mockRepl{}, &req) + require.Nil(t, err) + require.NotNil(t, res) + require.Equal(t, roachpb.ABORTED, res.PusheeTxn.Status) + }() + } + wg.Wait() +}