Skip to content

Commit

Permalink
kv/kvnemesis: add Txn.Prepare
Browse files Browse the repository at this point in the history
Informs #22329.

This commit adds support for Txn.Prepare in kvnemesis.

Release note: None
  • Loading branch information
nvanbenschoten committed Aug 21, 2024
1 parent 86ea407 commit 8b61574
Show file tree
Hide file tree
Showing 12 changed files with 171 additions and 28 deletions.
5 changes: 5 additions & 0 deletions pkg/kv/kvnemesis/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
}
return err
}
if o.Prepare {
if err := txn.Prepare(ctx); err != nil {
return err
}
}
if o.CommitInBatch != nil {
b := txn.NewBatch()
applyBatchOp(ctx, b, txn.CommitInBatch, o.CommitInBatch)
Expand Down
12 changes: 12 additions & 0 deletions pkg/kv/kvnemesis/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,18 @@ func TestApplier(t *testing.T) {
{
"txn-si-rollback", step(closureTxn(ClosureTxnType_Rollback, isolation.Snapshot, put(k5, 5))),
},
{
"txn-ssi-prepare-commit", step(closureTxnPrepare(ClosureTxnType_Rollback, isolation.Serializable, put(k5, 5))),
},
{
"txn-si-prepare-commit", step(closureTxnPrepare(ClosureTxnType_Rollback, isolation.Snapshot, put(k5, 5))),
},
{
"txn-ssi-prepare-rollback", step(closureTxnPrepare(ClosureTxnType_Rollback, isolation.Serializable, put(k5, 5))),
},
{
"txn-si-prepare-rollback", step(closureTxnPrepare(ClosureTxnType_Rollback, isolation.Snapshot, put(k5, 5))),
},
{
"split", step(split(k2)),
},
Expand Down
92 changes: 70 additions & 22 deletions pkg/kv/kvnemesis/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,24 @@ type ClosureTxnConfig struct {
// via the CommitInBatchMethod. This is an important part of the 1pc txn
// fastpath.
CommitReadCommittedInBatch int
// PrepareCommitSerializable is a serializable transaction that prepares and
// then commits normally.
PrepareCommitSerializable int
// PrepareCommitSnapshot is a snapshot transaction that prepares and then
// commits normally.
PrepareCommitSnapshot int
// PrepareCommitReadCommitted is a read committed transaction that prepares
// and then commits normally.
PrepareCommitReadCommitted int
// PrepareRollbackSerializable is a serializable transaction that prepares and
// encounters an (external) error at the end and has to roll back.
PrepareRollbackSerializable int
// PrepareRollbackSnapshot is a snapshot transaction that prepares and
// encounters an (external) error at the end and has to roll back.
PrepareRollbackSnapshot int
// PrepareRollbackReadCommitted is a read committed transaction that prepares
// and encounters an (external) error at the end and has to roll back.
PrepareRollbackReadCommitted int

TxnClientOps ClientOperationConfig
TxnBatchOps BatchOperationConfig
Expand Down Expand Up @@ -413,19 +431,25 @@ func newAllOperationsConfig() GeneratorConfig {
DB: clientOpConfig,
Batch: batchOpConfig,
ClosureTxn: ClosureTxnConfig{
CommitSerializable: 2,
CommitSnapshot: 2,
CommitReadCommitted: 2,
RollbackSerializable: 2,
RollbackSnapshot: 2,
RollbackReadCommitted: 2,
CommitSerializableInBatch: 2,
CommitSnapshotInBatch: 2,
CommitReadCommittedInBatch: 2,
TxnClientOps: clientOpConfig,
TxnBatchOps: batchOpConfig,
CommitBatchOps: clientOpConfig,
SavepointOps: savepointConfig,
CommitSerializable: 2,
CommitSnapshot: 2,
CommitReadCommitted: 2,
RollbackSerializable: 2,
RollbackSnapshot: 2,
RollbackReadCommitted: 2,
CommitSerializableInBatch: 2,
CommitSnapshotInBatch: 2,
CommitReadCommittedInBatch: 2,
PrepareCommitSerializable: 1,
PrepareCommitSnapshot: 1,
PrepareCommitReadCommitted: 1,
PrepareRollbackSerializable: 1,
PrepareRollbackSnapshot: 1,
PrepareRollbackReadCommitted: 1,
TxnClientOps: clientOpConfig,
TxnBatchOps: batchOpConfig,
CommitBatchOps: clientOpConfig,
SavepointOps: savepointConfig,
},
Split: SplitConfig{
SplitNew: 1,
Expand Down Expand Up @@ -1477,29 +1501,43 @@ func makeRandBatch(c *ClientOperationConfig) opGenFunc {
func (g *generator) registerClosureTxnOps(allowed *[]opGen, c *ClosureTxnConfig) {
const Commit, Rollback = ClosureTxnType_Commit, ClosureTxnType_Rollback
const SSI, SI, RC = isolation.Serializable, isolation.Snapshot, isolation.ReadCommitted
const Prepare, NoPrepare = true, false
addOpGen(allowed,
makeClosureTxn(Commit, SSI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.CommitSerializable)
makeClosureTxn(Commit, SSI, NoPrepare, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.CommitSerializable)
addOpGen(allowed,
makeClosureTxn(Commit, SI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.CommitSnapshot)
makeClosureTxn(Commit, SI, NoPrepare, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.CommitSnapshot)
addOpGen(allowed,
makeClosureTxn(Commit, RC, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.CommitReadCommitted)
makeClosureTxn(Commit, RC, NoPrepare, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.CommitReadCommitted)
addOpGen(allowed,
makeClosureTxn(Rollback, SSI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.RollbackSerializable)
makeClosureTxn(Rollback, SSI, NoPrepare, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.RollbackSerializable)
addOpGen(allowed,
makeClosureTxn(Rollback, SI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.RollbackSnapshot)
makeClosureTxn(Rollback, SI, NoPrepare, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.RollbackSnapshot)
addOpGen(allowed,
makeClosureTxn(Rollback, RC, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.RollbackReadCommitted)
makeClosureTxn(Rollback, RC, NoPrepare, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.RollbackReadCommitted)
addOpGen(allowed,
makeClosureTxn(Commit, SSI, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitSerializableInBatch)
makeClosureTxn(Commit, SSI, NoPrepare, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitSerializableInBatch)
addOpGen(allowed,
makeClosureTxn(Commit, SI, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitSnapshotInBatch)
makeClosureTxn(Commit, SI, NoPrepare, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitSnapshotInBatch)
addOpGen(allowed,
makeClosureTxn(Commit, RC, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitReadCommittedInBatch)
makeClosureTxn(Commit, RC, NoPrepare, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitReadCommittedInBatch)
addOpGen(allowed,
makeClosureTxn(Commit, SSI, Prepare, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.PrepareCommitSerializable)
addOpGen(allowed,
makeClosureTxn(Commit, SI, Prepare, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.PrepareCommitSnapshot)
addOpGen(allowed,
makeClosureTxn(Commit, RC, Prepare, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.PrepareCommitReadCommitted)
addOpGen(allowed,
makeClosureTxn(Rollback, SSI, Prepare, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.PrepareRollbackSerializable)
addOpGen(allowed,
makeClosureTxn(Rollback, SI, Prepare, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.PrepareRollbackSnapshot)
addOpGen(allowed,
makeClosureTxn(Rollback, RC, Prepare, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.PrepareRollbackReadCommitted)
}

func makeClosureTxn(
txnType ClosureTxnType,
iso isolation.Level,
prepare bool,
txnClientOps *ClientOperationConfig,
txnBatchOps *BatchOperationConfig,
commitInBatch *ClientOperationConfig,
Expand Down Expand Up @@ -1532,10 +1570,14 @@ func makeClosureTxn(
maybeUpdateSavepoints(&spIDs, ops[i])
}
op := closureTxn(txnType, iso, ops...)
op.ClosureTxn.Prepare = prepare
if commitInBatch != nil {
if txnType != ClosureTxnType_Commit {
panic(errors.AssertionFailedf(`CommitInBatch must commit got: %s`, txnType))
}
if prepare {
panic(errors.AssertionFailedf(`CommitInBatch cannot prepare`))
}
op.ClosureTxn.CommitInBatch = makeRandBatch(commitInBatch)(g, rng).Batch
}
return op
Expand Down Expand Up @@ -1768,6 +1810,12 @@ func closureTxnSSI(typ ClosureTxnType, ops ...Operation) Operation {
return closureTxn(typ, isolation.Serializable, ops...)
}

func closureTxnPrepare(typ ClosureTxnType, iso isolation.Level, ops ...Operation) Operation {
o := closureTxn(typ, iso, ops...)
o.ClosureTxn.Prepare = true
return o
}

func closureTxnCommitInBatch(
iso isolation.Level, commitInBatch []Operation, ops ...Operation,
) Operation {
Expand Down
36 changes: 30 additions & 6 deletions pkg/kv/kvnemesis/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,22 +309,46 @@ func TestRandStep(t *testing.T) {
} else if o.Type == ClosureTxnType_Commit {
switch o.IsoLevel {
case isolation.Serializable:
counts.ClosureTxn.CommitSerializable++
if o.Prepare {
counts.ClosureTxn.PrepareCommitSerializable++
} else {
counts.ClosureTxn.CommitSerializable++
}
case isolation.Snapshot:
counts.ClosureTxn.CommitSnapshot++
if o.Prepare {
counts.ClosureTxn.PrepareCommitSnapshot++
} else {
counts.ClosureTxn.CommitSnapshot++
}
case isolation.ReadCommitted:
counts.ClosureTxn.CommitReadCommitted++
if o.Prepare {
counts.ClosureTxn.PrepareCommitReadCommitted++
} else {
counts.ClosureTxn.CommitReadCommitted++
}
default:
t.Fatalf("unexpected isolation level %s", o.IsoLevel)
}
} else if o.Type == ClosureTxnType_Rollback {
switch o.IsoLevel {
case isolation.Serializable:
counts.ClosureTxn.RollbackSerializable++
if o.Prepare {
counts.ClosureTxn.PrepareRollbackSerializable++
} else {
counts.ClosureTxn.RollbackSerializable++
}
case isolation.Snapshot:
counts.ClosureTxn.RollbackSnapshot++
if o.Prepare {
counts.ClosureTxn.PrepareRollbackSnapshot++
} else {
counts.ClosureTxn.RollbackSnapshot++
}
case isolation.ReadCommitted:
counts.ClosureTxn.RollbackReadCommitted++
if o.Prepare {
counts.ClosureTxn.PrepareRollbackReadCommitted++
} else {
counts.ClosureTxn.RollbackReadCommitted++
}
default:
t.Fatalf("unexpected isolation level %s", o.IsoLevel)
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvnemesis/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ func (op Operation) format(w *strings.Builder, fctx formatCtx) {
w.WriteString(newFctx.receiver)
fmt.Fprintf(w, `.SetIsoLevel(isolation.%s)`, o.IsoLevel)
formatOps(w, newFctx, o.Ops)
if o.Prepare {
w.WriteString("\n")
w.WriteString(newFctx.indent)
w.WriteString(newFctx.receiver)
w.WriteString(`.Prepare(ctx)`)
}
if o.CommitInBatch != nil {
newFctx.receiver = `b`
o.CommitInBatch.format(w, newFctx)
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvnemesis/operations.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ message ClosureTxnOperation {
BatchOperation commit_in_batch = 3;
ClosureTxnType type = 4;
cockroach.kv.kvserver.concurrency.isolation.Level iso_level = 7;
bool prepare = 8;
Result result = 5 [(gogoproto.nullable) = false];
roachpb.Transaction txn = 6;
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvnemesis/operations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ func TestOperationsFormat(t *testing.T) {
},
{step: step(barrier(k1, k2, false /* withLAI */))},
{step: step(barrier(k3, k4, true /* withLAI */))},
{
step: step(
closureTxnPrepare(ClosureTxnType_Rollback,
isolation.ReadCommitted,
get(k8),
)),
},
}

w := echotest.NewWalker(t, datapathutils.TestDataPath(t, t.Name()))
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvnemesis/testdata/TestApplier/txn-si-prepare-commit
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
echo
----
db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetIsoLevel(isolation.Snapshot)
txn.Put(ctx, tk(5), sv(5)) // @<ts> <nil>
txn.Prepare(ctx)
return errors.New("rollback")
}) // rollback
8 changes: 8 additions & 0 deletions pkg/kv/kvnemesis/testdata/TestApplier/txn-si-prepare-rollback
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
echo
----
db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetIsoLevel(isolation.Snapshot)
txn.Put(ctx, tk(5), sv(5)) // @<ts> <nil>
txn.Prepare(ctx)
return errors.New("rollback")
}) // rollback
8 changes: 8 additions & 0 deletions pkg/kv/kvnemesis/testdata/TestApplier/txn-ssi-prepare-commit
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
echo
----
db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetIsoLevel(isolation.Serializable)
txn.Put(ctx, tk(5), sv(5)) // @<ts> <nil>
txn.Prepare(ctx)
return errors.New("rollback")
}) // rollback
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
echo
----
db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetIsoLevel(isolation.Serializable)
txn.Put(ctx, tk(5), sv(5)) // @<ts> <nil>
txn.Prepare(ctx)
return errors.New("rollback")
}) // rollback
8 changes: 8 additions & 0 deletions pkg/kv/kvnemesis/testdata/TestOperationsFormat/8
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
echo
----
···db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
··· txn.SetIsoLevel(isolation.ReadCommitted)
··· txn.Get(ctx, tk(8))
··· txn.Prepare(ctx)
··· return errors.New("rollback")
···})

0 comments on commit 8b61574

Please sign in to comment.