diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index 8da9ad8d9fea..ac08d90983e2 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -188,6 +188,7 @@ go_test( "//pkg/kv/kvpb/kvpbmock", "//pkg/kv/kvserver", "//pkg/kv/kvserver/closedts", + "//pkg/kv/kvserver/concurrency", "//pkg/kv/kvserver/concurrency/isolation", "//pkg/kv/kvserver/concurrency/lock", "//pkg/kv/kvserver/kvserverbase", diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index 64918308e1bc..df79622558c4 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -13,6 +13,7 @@ import ( "reflect" "sort" "strings" + "sync" "sync/atomic" "testing" "time" @@ -25,7 +26,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" @@ -4643,3 +4646,173 @@ func TestProxyTracing(t *testing.T) { t.Logf("found trace event; msg=%s, tag=%s, loc=%s", msg, tag, loc) }) } + +// TestUnexpectedCommitOnTxnRecovery constructs a scenario where transaction +// recovery could incorrectly determine that a transaction is committed. The +// scenario is as follows: +// +// Txn1: +// - Writes to keyA. +// - Acquires an unreplicated exclusive lock on keyB. +// - Acquires a replicated shared lock on keyB. This lock is pipelined, and +// replication for it fails. +// - Attempts to commit, but fails because of the lost replicated Shared lock. +// +// Lease is then transferred to n3. This causes the unreplicated exclusive lock +// on keyB to be replicated. +// +// Txn2: +// - Attempts to read keyA, which kicks off transaction recovery for Txn1. +// - Txn2 (incorrectly) concludes that Txn1 is committed at epoch=1 because it +// finds a (stronger than Shared) replicated lock on keyB. +// +// Txn1: +// - Back here, we do a stateful retry. We should learn that someone (Txn2) +// aborted us when we go and try to commit. At the time of writing, we +// incorrectly learn that we've been (unexpectedly) committed. +func TestUnexpectedCommitOnTxnRecovery(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + keyA := roachpb.Key("a") + keyB := roachpb.Key("b") + + var ( + targetTxnIDString atomic.Value + cmdID atomic.Value + ) + cmdID.Store(kvserverbase.CmdIDKey("")) + targetTxnIDString.Store("") + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + // This test relies on unreplicated locks to be replicated on lease transfers. + concurrency.UnreplicatedLockReliabilityLeaseTransfer.Override(ctx, &st.SV, true) + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Settings: st, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + TestingProposalFilter: func(fArgs kvserverbase.ProposalFilterArgs) *kvpb.Error { + if fArgs.Req.Header.Txn == nil || + fArgs.Req.Header.Txn.ID.String() != targetTxnIDString.Load().(string) { + return nil // not our txn + } + if !fArgs.Req.IsSingleRequest() { + // Not the request we care about. + return nil + } + getReq, ok := fArgs.Req.Requests[0].GetInner().(*kvpb.GetRequest) + // Only fail replication on the first retry. + epoch := fArgs.Req.Header.Txn.Epoch + if ok && getReq.KeyLockingDurability == lock.Replicated && epoch == 0 { + t.Logf("will fail application for txn %s@epoch=%d; req: %+v; raft cmdID: %s", + fArgs.Req.Header.Txn.ID.String(), epoch, getReq, fArgs.CmdID) + cmdID.Store(fArgs.CmdID) + } + return nil + }, + TestingApplyCalledTwiceFilter: func(fArgs kvserverbase.ApplyFilterArgs) (int, *kvpb.Error) { + if fArgs.CmdID == cmdID.Load().(kvserverbase.CmdIDKey) { + t.Logf("failing application for raft cmdID: %s", cmdID) + + return 0, kvpb.NewErrorf("test injected error") + } + return 0, nil + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + transferLease := func(idx int) { + desc := tc.LookupRangeOrFatal(t, keyB) + tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(idx)) + } + // Make a db with transaction heartbeating disabled. This ensures that we + // don't mark Txn1 as PENDING after its first failed parallel commit attempt, + // which would otherwise prevent Txn2 from recovering Txn1. + s := tc.Server(0) + ambient := s.AmbientCtx() + tsf := kvcoord.NewTxnCoordSenderFactory( + kvcoord.TxnCoordSenderFactoryConfig{ + AmbientCtx: ambient, + HeartbeatInterval: -1, // disabled + Settings: s.ClusterSettings(), + Clock: s.Clock(), + Stopper: s.Stopper(), + }, + s.DistSenderI().(*kvcoord.DistSender), + ) + db := kv.NewDB(ambient, tsf, s.Clock(), s.Stopper()) + + startTxn2 := make(chan struct{}) + blockCh := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(1) + + // Write to keyB so that we can later get a lock on it. + txn := db.NewTxn(ctx, "txn") + err := txn.Put(ctx, keyB, "valueB") + require.NoError(t, err) + require.NoError(t, txn.Commit(ctx)) + + go func() { + defer wg.Done() + + err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + if txnID := targetTxnIDString.Load(); txnID == "" { + // Store the txnID for the testing knobs. + targetTxnIDString.Store(txn.ID().String()) + t.Logf("txn1 ID is: %s", txn.ID()) + } else if txnID != txn.ID() { + // Since txn recovery aborted us, we get retried again but with an + // entirely new txnID. This time we just return. Writing nothing. + return nil + } + + switch txn.Epoch() { + case 0: + err := txn.Put(ctx, keyA, "value") + require.NoError(t, err) + res, err := txn.GetForUpdate(ctx, keyB, kvpb.BestEffort) + require.NoError(t, err) + require.Equal(t, res.ValueBytes(), []byte("valueB")) + res, err = txn.GetForShare(ctx, keyB, kvpb.GuaranteedDurability) + require.NoError(t, err) + require.Equal(t, res.ValueBytes(), []byte("valueB")) + err = txn.Commit(ctx) + require.Error(t, err) + require.ErrorContains(t, err, "RETRY_ASYNC_WRITE_FAILURE") + // Transfer the lease to n3. + transferLease(2) + close(startTxn2) + // Block until Txn2 recovers us. + <-blockCh + return err + case 1: + // When retrying the write failure we should discover that txn recovery + // has aborted this transaction. + err := txn.Put(ctx, keyA, "value") + require.Error(t, err) + require.ErrorContains(t, err, "ABORT_REASON_ABORT_SPAN") + return err + default: + t.Errorf("unexpected epoch: %d", txn.Epoch()) + } + return nil + }) + require.NoError(t, err) + }() + <-startTxn2 + + txn2 := db.NewTxn(ctx, "txn2") + res, err := txn2.Get(ctx, keyA) + require.NoError(t, err) + // NB: Nothing should exist on keyA, because txn1 didn't commit at epoch 1 (or + // any epoch, for that matter). + require.False(t, res.Exists()) + + close(blockCh) + wg.Wait() +} diff --git a/pkg/kv/kvnemesis/kvnemesis_test.go b/pkg/kv/kvnemesis/kvnemesis_test.go index b1275e660f42..7d07148fc922 100644 --- a/pkg/kv/kvnemesis/kvnemesis_test.go +++ b/pkg/kv/kvnemesis/kvnemesis_test.go @@ -337,21 +337,14 @@ func TestKVNemesisMultiNode_BufferedWrites(t *testing.T) { defer log.Scope(t).Close(t) testKVNemesisImpl(t, kvnemesisTestCfg{ - numNodes: 3, - numSteps: defaultNumSteps, - concurrency: 5, - seedOverride: 0, - // TODO(#145458): Until #145458 is fixed reduce the - // rate of lost writes by avoiding lease transfers and - // merges and also turning off error injection. - invalidLeaseAppliedIndexProb: 0.0, - injectReproposalErrorProb: 0.0, + numNodes: 3, + numSteps: defaultNumSteps, + concurrency: 5, + seedOverride: 0, + invalidLeaseAppliedIndexProb: 0.2, + injectReproposalErrorProb: 0.2, assertRaftApply: true, bufferedWriteProb: 0.70, - testGeneratorConfig: func(g *GeneratorConfig) { - g.Ops.ChangeLease = ChangeLeaseConfig{} - g.Ops.Merge = MergeConfig{} - }, testSettings: func(ctx context.Context, st *cluster.Settings) { kvcoord.BufferedWritesEnabled.Override(ctx, &st.SV, true) concurrency.UnreplicatedLockReliabilityLeaseTransfer.Override(ctx, &st.SV, true) @@ -362,9 +355,7 @@ func TestKVNemesisMultiNode_BufferedWrites(t *testing.T) { } // TestKVNemesisMultiNode_BufferedWritesNoPipelining turns on buffered -// writes and turns off write pipelining. Turning off write pipelining -// allows us to test the lock reliability features even without a fix -// for #145458. +// writes and turns off write pipelining. func TestKVNemesisMultiNode_BufferedWritesNoPipelining(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/kv/kvserver/batcheval/cmd_query_intent.go b/pkg/kv/kvserver/batcheval/cmd_query_intent.go index 582b9e9581c6..a2fbd75c8a7a 100644 --- a/pkg/kv/kvserver/batcheval/cmd_query_intent.go +++ b/pkg/kv/kvserver/batcheval/cmd_query_intent.go @@ -157,8 +157,11 @@ func QueryIntent( } } + res := result.Result{} if !reply.FoundIntent && args.ErrorIfMissing { - return result.Result{}, kvpb.NewIntentMissingError(args.Key, intent) + l := roachpb.MakeLockAcquisition(args.Txn, args.Key, lock.Replicated, args.Strength, args.IgnoredSeqNums) + res.Local.ReportedMissingLocks = []roachpb.LockAcquisition{l} + return res, kvpb.NewIntentMissingError(args.Key, intent) } - return result.Result{}, nil + return res, nil } diff --git a/pkg/kv/kvserver/batcheval/result/result.go b/pkg/kv/kvserver/batcheval/result/result.go index 2ef61199c060..08f2b8ddac7a 100644 --- a/pkg/kv/kvserver/batcheval/result/result.go +++ b/pkg/kv/kvserver/batcheval/result/result.go @@ -38,6 +38,10 @@ type LocalResult struct { // UpdatedTxns stores transaction records that have been updated by // calls to EndTxn, PushTxn, and RecoverTxn. UpdatedTxns []*roachpb.Transaction + // ReportedMissingLocks stores lock acquisition structs that represent locks + // that have been reported as missing via QueryIntent. Such locks must be + // reported to the concurrency manager. + ReportedMissingLocks []roachpb.LockAcquisition // EndTxns stores completed transactions. If the transaction // contains unresolved intents, they should be handed off for // asynchronous intent resolution. A bool in each EndTxnIntents @@ -131,6 +135,17 @@ func (lResult *LocalResult) DetachEncounteredIntents() []roachpb.Intent { return r } +// DetachMissingLocks returns (and removes) those locks that have been reported +// missing during an QueryIntentRequest and must be handled. +func (lResult *LocalResult) DetachMissingLocks() []roachpb.LockAcquisition { + if lResult == nil { + return nil + } + r := lResult.ReportedMissingLocks + lResult.ReportedMissingLocks = nil + return r +} + // DetachEndTxns returns (and removes) the EndTxnIntent objects from // the local result. If alwaysOnly is true, the slice is filtered to // include only those which have specified returnAlways=true, meaning @@ -418,6 +433,13 @@ func (p *Result) MergeAndDestroy(q Result) error { } q.Local.ResolvedLocks = nil + if p.Local.ReportedMissingLocks == nil { + p.Local.ReportedMissingLocks = q.Local.ReportedMissingLocks + } else { + p.Local.ReportedMissingLocks = append(p.Local.ReportedMissingLocks, q.Local.ReportedMissingLocks...) + } + q.Local.ReportedMissingLocks = nil + if p.Local.UpdatedTxns == nil { p.Local.UpdatedTxns = q.Local.UpdatedTxns } else { diff --git a/pkg/kv/kvserver/concurrency/concurrency_control.go b/pkg/kv/kvserver/concurrency/concurrency_control.go index dea3d2ae964f..8cf557fa6649 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_control.go +++ b/pkg/kv/kvserver/concurrency/concurrency_control.go @@ -254,6 +254,11 @@ type LockManager interface { // acquired a new lock or re-acquired an existing lock that it already held. OnLockAcquired(context.Context, *roachpb.LockAcquisition) + // OnLockMissing informs the concurrency manager that a lock has been reported + // missing to a client via QueryIntent. Such locks cannot later be + // materialized via a lock table flush. + OnLockMissing(context.Context, *roachpb.LockAcquisition) + // OnLockUpdated informs the concurrency manager that a transaction has // updated or released a lock or range of locks that it previously held. // The Durability field of the lock update struct is ignored. @@ -697,6 +702,11 @@ type lockTable interface { // intent has been applied to the replicated state machine. AcquireLock(*roachpb.LockAcquisition) error + // MarkIneligibleForExport marks any locks held by this transaction on the + // same key as ineligible for export from the lock table for replication since + // doing so could result in a transaction being erroneously committed. + MarkIneligibleForExport(*roachpb.LockAcquisition) error + // UpdateLocks informs the lockTable that an existing lock or range of locks // was either updated or released. // diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager.go b/pkg/kv/kvserver/concurrency/concurrency_manager.go index fca272b71099..f49abde9a78c 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager.go @@ -131,9 +131,7 @@ var UnreplicatedLockReliabilityLeaseTransfer = settings.RegisterBoolSetting( settings.SystemOnly, "kv.lock_table.unreplicated_lock_reliability.lease_transfer.enabled", "whether the replica should attempt to keep unreplicated locks during lease transfers", - // TODO(#145458): We've disabled this by default to avoid flakes until the underlying bug is fixed. - // metamorphic.ConstantWithTestBool("kv.lock_table.unreplicated_lock_reliability.lease_transfer.enabled", true), - false, + metamorphic.ConstantWithTestBool("kv.lock_table.unreplicated_lock_reliability.lease_transfer.enabled", false), ) // UnreplicatedLockReliabilityMerge controls whether the replica will @@ -142,9 +140,7 @@ var UnreplicatedLockReliabilityMerge = settings.RegisterBoolSetting( settings.SystemOnly, "kv.lock_table.unreplicated_lock_reliability.merge.enabled", "whether the replica should attempt to keep unreplicated locks during range merges", - // TODO(#145458): We've disabled this by default to avoid flakes until the underlying bug is fixed. - // metamorphic.ConstantWithTestBool("kv.lock_table.unreplicated_lock_reliability.merge.enabled", true), - false, + metamorphic.ConstantWithTestBool("kv.lock_table.unreplicated_lock_reliability.merge.enabled", false), ) var MaxLockFlushSize = settings.RegisterByteSizeSetting( @@ -599,6 +595,15 @@ func (m *managerImpl) OnLockAcquired(ctx context.Context, acq *roachpb.LockAcqui } } +// OnLockMissing implements the Lockmanager interface. +func (m *managerImpl) OnLockMissing(ctx context.Context, acq *roachpb.LockAcquisition) { + if err := m.lt.MarkIneligibleForExport(acq); err != nil { + // We don't currently expect any errors other than assertion failures that represent + // programming errors from this method. + log.Fatalf(ctx, "%v", err) + } +} + // OnLockUpdated implements the LockManager interface. func (m *managerImpl) OnLockUpdated(ctx context.Context, up *roachpb.LockUpdate) { if err := m.lt.UpdateLocks(up); err != nil { diff --git a/pkg/kv/kvserver/concurrency/lock_table.go b/pkg/kv/kvserver/concurrency/lock_table.go index daa10239c2e8..5adc78b37901 100644 --- a/pkg/kv/kvserver/concurrency/lock_table.go +++ b/pkg/kv/kvserver/concurrency/lock_table.go @@ -1086,6 +1086,17 @@ type unreplicatedLockHolderInfo struct { // based on the sequence of the held unreplicated lock. ignoredSeqNums []enginepb.IgnoredSeqNumRange + // ineligibleForExport, if true, indicates that this lock must never be + // exported from the lock table for replication. It is set to true via the + // lock manager's OnLockMissing function when QueryIntent reports a lock as + // missing to a client. + // + // Writing out a lock that "covers" a lock that we previously reported as + // missing can result in transaction recovery erroneously committing an + // uncomitted transaction because it finds a lock that the original + // transaction failed to find during its attempt to commit. + ineligibleForExport bool + // The timestamp at which the unreplicated lock is held. Must not regress. ts hlc.Timestamp } @@ -3062,6 +3073,47 @@ func (kl *keyLocks) acquireLock( return nil } +// markLockIneligibleForExport marks any lock held by the same transaction on the +// same key at an equal or greater strength as ineligible for export from the +// in-memory lock table to storage because it would erroneously re-materialize a +// lock that has been previously reported as missing. +// +// Acquires kl.mu. +func (kl *keyLocks) markLockIneligibleForExport(acq *roachpb.LockAcquisition) { + assert(acq != nil, "unexpected nil LockAcquisition") + + kl.mu.Lock() + defer kl.mu.Unlock() + if !kl.isLockedBy(acq.Txn.ID) { + return + } + + for hl := kl.holders.Front(); hl != nil; hl = hl.Next() { + tl := hl.Value + if tl == nil || tl.unreplicatedInfo.isEmpty() { + continue + } + if tl.txn == nil { + continue + } + if tl.txn.ID != acq.Txn.ID { + continue + } + + // NB: We might be able to be more conservative here with something like: + // + // heldMode := tl.getLockMode() + // if heldMode.Strength >= acq.Strength { + // tl.unreplicatedInfo.ineligibleForExport = true + // } + // + // But, we avoid that for now in case we are overlooking any cases where it + // would be possible for a transaction to acquire a new unreplicated lock + // on this key that would also be a problem to export. + tl.unreplicatedInfo.ineligibleForExport = true + } +} + // discoveredLock is called with a lock that is discovered by guard g when trying // to access this key with strength accessStrength. // @@ -4388,6 +4440,36 @@ func (t *lockTableImpl) AcquireLock(acq *roachpb.LockAcquisition) error { return nil } +// MarkIneligibleForExport implements the lockTable interface. +func (t *lockTableImpl) MarkIneligibleForExport(acq *roachpb.LockAcquisition) error { + if acq == nil { + return errors.AssertionFailedf("unexpected nil lock acquisition") + } + if acq.Empty() { + return errors.AssertionFailedf("unexpected empty lock acquisition %s", acq) + } + if acq.Durability != lock.Replicated { + return errors.AssertionFailedf("unexpected lock acquisition durability %s", acq.Durability) + } + + t.enabledMu.RLock() + defer t.enabledMu.RUnlock() + if !t.enabled { + // If not enabled, don't track any locks. + return nil + } + + t.locks.mu.Lock() + defer t.locks.mu.Unlock() + + iter := t.locks.MakeIter() + iter.FirstOverlap(&keyLocks{key: acq.Key}) + if iter.Valid() { + iter.Cur().markLockIneligibleForExport(acq) + } + return nil +} + // checkMaxKeysLockedAndTryClear checks if the request is tracking more lock // information on keys in its lock table snapshot than it should. If it is, this // method relieves memory pressure by clearing as much per-key tracking as it @@ -4680,6 +4762,10 @@ func (t *lockTableImpl) ExportUnreplicatedLocks( continue } + if tl.unreplicatedInfo.ineligibleForExport { + continue + } + for _, str := range unreplicatedHolderStrengths { if tl.unreplicatedInfo.held(str) { exporter(&roachpb.LockAcquisition{ diff --git a/pkg/kv/kvserver/concurrency/verifiable_lock_table.go b/pkg/kv/kvserver/concurrency/verifiable_lock_table.go index 95be8ea5d6f4..26f32619f921 100644 --- a/pkg/kv/kvserver/concurrency/verifiable_lock_table.go +++ b/pkg/kv/kvserver/concurrency/verifiable_lock_table.go @@ -94,6 +94,11 @@ func (v verifyingLockTable) AcquireLock(acq *roachpb.LockAcquisition) error { return v.lt.AcquireLock(acq) } +// MarkIneligibleForExport implements the lockTable interface. +func (v verifyingLockTable) MarkIneligibleForExport(acq *roachpb.LockAcquisition) error { + return v.lt.MarkIneligibleForExport(acq) +} + // UpdateLocks implements the lockTable interface. func (v verifyingLockTable) UpdateLocks(up *roachpb.LockUpdate) error { defer v.lt.verify() diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index c7bf5692fbbf..d41a7435b6b6 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -183,6 +183,14 @@ func (r *Replica) executeReadOnlyBatch( // conflicting intents so intent resolution could have been racing with this // request even if latches were held. intents := result.Local.DetachEncounteredIntents() + + // If QueryIntent reports a lock as missing, we must report it to the lock + // manager. + missingLocks := result.Local.DetachMissingLocks() + for i := range missingLocks { + r.concMgr.OnLockMissing(ctx, &missingLocks[i]) + } + if pErr == nil { pErr = r.handleReadOnlyLocalEvalResult(ctx, ba, result.Local) } @@ -503,8 +511,9 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes( // Failed read-only batches can't have any Result except for what's // allowlisted here. res.Local = result.LocalResult{ - EncounteredIntents: res.Local.DetachEncounteredIntents(), - Metrics: res.Local.Metrics, + ReportedMissingLocks: res.Local.ReportedMissingLocks, + EncounteredIntents: res.Local.DetachEncounteredIntents(), + Metrics: res.Local.Metrics, } return ba, nil, res, pErr }