Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ go_test(
"//pkg/kv/kvpb/kvpbmock",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/concurrency",
"//pkg/kv/kvserver/concurrency/isolation",
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/kv/kvserver/kvserverbase",
Expand Down
173 changes: 173 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"reflect"
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand All @@ -25,7 +26,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
Expand Down Expand Up @@ -4643,3 +4646,173 @@ func TestProxyTracing(t *testing.T) {
t.Logf("found trace event; msg=%s, tag=%s, loc=%s", msg, tag, loc)
})
}

// TestUnexpectedCommitOnTxnRecovery constructs a scenario where transaction
// recovery could incorrectly determine that a transaction is committed. The
// scenario is as follows:
//
// Txn1:
// - Writes to keyA.
// - Acquires an unreplicated exclusive lock on keyB.
// - Acquires a replicated shared lock on keyB. This lock is pipelined, and
// replication for it fails.
// - Attempts to commit, but fails because of the lost replicated Shared lock.
//
// Lease is then transferred to n3. This causes the unreplicated exclusive lock
// on keyB to be replicated.
//
// Txn2:
// - Attempts to read keyA, which kicks off transaction recovery for Txn1.
// - Txn2 (incorrectly) concludes that Txn1 is committed at epoch=1 because it
// finds a (stronger than Shared) replicated lock on keyB.
//
// Txn1:
// - Back here, we do a stateful retry. We should learn that someone (Txn2)
// aborted us when we go and try to commit. At the time of writing, we
// incorrectly learn that we've been (unexpectedly) committed.
func TestUnexpectedCommitOnTxnRecovery(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

keyA := roachpb.Key("a")
keyB := roachpb.Key("b")

var (
targetTxnIDString atomic.Value
cmdID atomic.Value
)
cmdID.Store(kvserverbase.CmdIDKey(""))
targetTxnIDString.Store("")
ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
// This test relies on unreplicated locks to be replicated on lease transfers.
concurrency.UnreplicatedLockReliabilityLeaseTransfer.Override(ctx, &st.SV, true)
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Settings: st,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingProposalFilter: func(fArgs kvserverbase.ProposalFilterArgs) *kvpb.Error {
if fArgs.Req.Header.Txn == nil ||
fArgs.Req.Header.Txn.ID.String() != targetTxnIDString.Load().(string) {
return nil // not our txn
}
if !fArgs.Req.IsSingleRequest() {
// Not the request we care about.
return nil
}
getReq, ok := fArgs.Req.Requests[0].GetInner().(*kvpb.GetRequest)
// Only fail replication on the first retry.
epoch := fArgs.Req.Header.Txn.Epoch
if ok && getReq.KeyLockingDurability == lock.Replicated && epoch == 0 {
t.Logf("will fail application for txn %s@epoch=%d; req: %+v; raft cmdID: %s",
fArgs.Req.Header.Txn.ID.String(), epoch, getReq, fArgs.CmdID)
cmdID.Store(fArgs.CmdID)
}
return nil
},
TestingApplyCalledTwiceFilter: func(fArgs kvserverbase.ApplyFilterArgs) (int, *kvpb.Error) {
if fArgs.CmdID == cmdID.Load().(kvserverbase.CmdIDKey) {
t.Logf("failing application for raft cmdID: %s", cmdID)

return 0, kvpb.NewErrorf("test injected error")
}
return 0, nil
},
},
},
},
})
defer tc.Stopper().Stop(ctx)

transferLease := func(idx int) {
desc := tc.LookupRangeOrFatal(t, keyB)
tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(idx))
}
// Make a db with transaction heartbeating disabled. This ensures that we
// don't mark Txn1 as PENDING after its first failed parallel commit attempt,
// which would otherwise prevent Txn2 from recovering Txn1.
s := tc.Server(0)
ambient := s.AmbientCtx()
tsf := kvcoord.NewTxnCoordSenderFactory(
kvcoord.TxnCoordSenderFactoryConfig{
AmbientCtx: ambient,
HeartbeatInterval: -1, // disabled
Settings: s.ClusterSettings(),
Clock: s.Clock(),
Stopper: s.Stopper(),
},
s.DistSenderI().(*kvcoord.DistSender),
)
db := kv.NewDB(ambient, tsf, s.Clock(), s.Stopper())

startTxn2 := make(chan struct{})
blockCh := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)

// Write to keyB so that we can later get a lock on it.
txn := db.NewTxn(ctx, "txn")
err := txn.Put(ctx, keyB, "valueB")
require.NoError(t, err)
require.NoError(t, txn.Commit(ctx))

go func() {
defer wg.Done()

err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if txnID := targetTxnIDString.Load(); txnID == "" {
// Store the txnID for the testing knobs.
targetTxnIDString.Store(txn.ID().String())
t.Logf("txn1 ID is: %s", txn.ID())
} else if txnID != txn.ID() {
// Since txn recovery aborted us, we get retried again but with an
// entirely new txnID. This time we just return. Writing nothing.
return nil
}

switch txn.Epoch() {
case 0:
err := txn.Put(ctx, keyA, "value")
require.NoError(t, err)
res, err := txn.GetForUpdate(ctx, keyB, kvpb.BestEffort)
require.NoError(t, err)
require.Equal(t, res.ValueBytes(), []byte("valueB"))
res, err = txn.GetForShare(ctx, keyB, kvpb.GuaranteedDurability)
require.NoError(t, err)
require.Equal(t, res.ValueBytes(), []byte("valueB"))
err = txn.Commit(ctx)
require.Error(t, err)
require.ErrorContains(t, err, "RETRY_ASYNC_WRITE_FAILURE")
// Transfer the lease to n3.
transferLease(2)
close(startTxn2)
// Block until Txn2 recovers us.
<-blockCh
return err
case 1:
// When retrying the write failure we should discover that txn recovery
// has aborted this transaction.
err := txn.Put(ctx, keyA, "value")
require.Error(t, err)
require.ErrorContains(t, err, "ABORT_REASON_ABORT_SPAN")
return err
default:
t.Errorf("unexpected epoch: %d", txn.Epoch())
}
return nil
})
require.NoError(t, err)
}()
<-startTxn2

txn2 := db.NewTxn(ctx, "txn2")
res, err := txn2.Get(ctx, keyA)
require.NoError(t, err)
// NB: Nothing should exist on keyA, because txn1 didn't commit at epoch 1 (or
// any epoch, for that matter).
require.False(t, res.Exists())

close(blockCh)
wg.Wait()
}
23 changes: 7 additions & 16 deletions pkg/kv/kvnemesis/kvnemesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,21 +337,14 @@ func TestKVNemesisMultiNode_BufferedWrites(t *testing.T) {
defer log.Scope(t).Close(t)

testKVNemesisImpl(t, kvnemesisTestCfg{
numNodes: 3,
numSteps: defaultNumSteps,
concurrency: 5,
seedOverride: 0,
// TODO(#145458): Until #145458 is fixed reduce the
// rate of lost writes by avoiding lease transfers and
// merges and also turning off error injection.
invalidLeaseAppliedIndexProb: 0.0,
injectReproposalErrorProb: 0.0,
numNodes: 3,
numSteps: defaultNumSteps,
concurrency: 5,
seedOverride: 0,
invalidLeaseAppliedIndexProb: 0.2,
injectReproposalErrorProb: 0.2,
assertRaftApply: true,
bufferedWriteProb: 0.70,
testGeneratorConfig: func(g *GeneratorConfig) {
g.Ops.ChangeLease = ChangeLeaseConfig{}
g.Ops.Merge = MergeConfig{}
},
testSettings: func(ctx context.Context, st *cluster.Settings) {
kvcoord.BufferedWritesEnabled.Override(ctx, &st.SV, true)
concurrency.UnreplicatedLockReliabilityLeaseTransfer.Override(ctx, &st.SV, true)
Expand All @@ -362,9 +355,7 @@ func TestKVNemesisMultiNode_BufferedWrites(t *testing.T) {
}

// TestKVNemesisMultiNode_BufferedWritesNoPipelining turns on buffered
// writes and turns off write pipelining. Turning off write pipelining
// allows us to test the lock reliability features even without a fix
// for #145458.
// writes and turns off write pipelining.
func TestKVNemesisMultiNode_BufferedWritesNoPipelining(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
7 changes: 5 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_query_intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,11 @@ func QueryIntent(
}
}

res := result.Result{}
if !reply.FoundIntent && args.ErrorIfMissing {
return result.Result{}, kvpb.NewIntentMissingError(args.Key, intent)
l := roachpb.MakeLockAcquisition(args.Txn, args.Key, lock.Replicated, args.Strength, args.IgnoredSeqNums)
res.Local.ReportedMissingLocks = []roachpb.LockAcquisition{l}
return res, kvpb.NewIntentMissingError(args.Key, intent)
}
return result.Result{}, nil
return res, nil
}
22 changes: 22 additions & 0 deletions pkg/kv/kvserver/batcheval/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ type LocalResult struct {
// UpdatedTxns stores transaction records that have been updated by
// calls to EndTxn, PushTxn, and RecoverTxn.
UpdatedTxns []*roachpb.Transaction
// ReportedMissingLocks stores lock acquisition structs that represent locks
// that have been reported as missing via QueryIntent. Such locks must be
// reported to the concurrency manager.
ReportedMissingLocks []roachpb.LockAcquisition
// EndTxns stores completed transactions. If the transaction
// contains unresolved intents, they should be handed off for
// asynchronous intent resolution. A bool in each EndTxnIntents
Expand Down Expand Up @@ -131,6 +135,17 @@ func (lResult *LocalResult) DetachEncounteredIntents() []roachpb.Intent {
return r
}

// DetachMissingLocks returns (and removes) those locks that have been reported
// missing during an QueryIntentRequest and must be handled.
func (lResult *LocalResult) DetachMissingLocks() []roachpb.LockAcquisition {
if lResult == nil {
return nil
}
r := lResult.ReportedMissingLocks
lResult.ReportedMissingLocks = nil
return r
}

// DetachEndTxns returns (and removes) the EndTxnIntent objects from
// the local result. If alwaysOnly is true, the slice is filtered to
// include only those which have specified returnAlways=true, meaning
Expand Down Expand Up @@ -418,6 +433,13 @@ func (p *Result) MergeAndDestroy(q Result) error {
}
q.Local.ResolvedLocks = nil

if p.Local.ReportedMissingLocks == nil {
p.Local.ReportedMissingLocks = q.Local.ReportedMissingLocks
} else {
p.Local.ReportedMissingLocks = append(p.Local.ReportedMissingLocks, q.Local.ReportedMissingLocks...)
}
q.Local.ReportedMissingLocks = nil

if p.Local.UpdatedTxns == nil {
p.Local.UpdatedTxns = q.Local.UpdatedTxns
} else {
Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ type LockManager interface {
// acquired a new lock or re-acquired an existing lock that it already held.
OnLockAcquired(context.Context, *roachpb.LockAcquisition)

// OnLockMissing informs the concurrency manager that a lock has been reported
// missing to a client via QueryIntent. Such locks cannot later be
// materialized via a lock table flush.
OnLockMissing(context.Context, *roachpb.LockAcquisition)

// OnLockUpdated informs the concurrency manager that a transaction has
// updated or released a lock or range of locks that it previously held.
// The Durability field of the lock update struct is ignored.
Expand Down Expand Up @@ -697,6 +702,11 @@ type lockTable interface {
// intent has been applied to the replicated state machine.
AcquireLock(*roachpb.LockAcquisition) error

// MarkIneligibleForExport marks any locks held by this transaction on the
// same key as ineligible for export from the lock table for replication since
// doing so could result in a transaction being erroneously committed.
MarkIneligibleForExport(*roachpb.LockAcquisition) error

// UpdateLocks informs the lockTable that an existing lock or range of locks
// was either updated or released.
//
Expand Down
17 changes: 11 additions & 6 deletions pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,7 @@ var UnreplicatedLockReliabilityLeaseTransfer = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.lock_table.unreplicated_lock_reliability.lease_transfer.enabled",
"whether the replica should attempt to keep unreplicated locks during lease transfers",
// TODO(#145458): We've disabled this by default to avoid flakes until the underlying bug is fixed.
// metamorphic.ConstantWithTestBool("kv.lock_table.unreplicated_lock_reliability.lease_transfer.enabled", true),
false,
metamorphic.ConstantWithTestBool("kv.lock_table.unreplicated_lock_reliability.lease_transfer.enabled", false),
)

// UnreplicatedLockReliabilityMerge controls whether the replica will
Expand All @@ -142,9 +140,7 @@ var UnreplicatedLockReliabilityMerge = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.lock_table.unreplicated_lock_reliability.merge.enabled",
"whether the replica should attempt to keep unreplicated locks during range merges",
// TODO(#145458): We've disabled this by default to avoid flakes until the underlying bug is fixed.
// metamorphic.ConstantWithTestBool("kv.lock_table.unreplicated_lock_reliability.merge.enabled", true),
false,
metamorphic.ConstantWithTestBool("kv.lock_table.unreplicated_lock_reliability.merge.enabled", false),
)

var MaxLockFlushSize = settings.RegisterByteSizeSetting(
Expand Down Expand Up @@ -599,6 +595,15 @@ func (m *managerImpl) OnLockAcquired(ctx context.Context, acq *roachpb.LockAcqui
}
}

// OnLockMissing implements the Lockmanager interface.
func (m *managerImpl) OnLockMissing(ctx context.Context, acq *roachpb.LockAcquisition) {
if err := m.lt.MarkIneligibleForExport(acq); err != nil {
// We don't currently expect any errors other than assertion failures that represent
// programming errors from this method.
log.Fatalf(ctx, "%v", err)
}
}

// OnLockUpdated implements the LockManager interface.
func (m *managerImpl) OnLockUpdated(ctx context.Context, up *roachpb.LockUpdate) {
if err := m.lt.UpdateLocks(up); err != nil {
Expand Down
Loading