Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvnemesis: uniquely identify all versions #89477

Merged
merged 4 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build/bazelutil/check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ pkg/cmd/prereqs/BUILD.bazel
EXISTING_CRDB_TEST_BUILD_CONSTRAINTS="
pkg/util/buildutil/crdb_test_off.go://go:build !crdb_test || crdb_test_off
pkg/util/buildutil/crdb_test_on.go://go:build crdb_test && !crdb_test_off
pkg/kv/kvnemesis/kvnemesisutil/crdb_test_off.go://go:build !crdb_test || crdb_test_off
pkg/kv/kvnemesis/kvnemesisutil/crdb_test_on.go://go:build crdb_test && !crdb_test_off
"

if [ -z "${COCKROACH_BAZEL_CHECK_FAST:-}" ]; then
Expand Down
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,7 @@ GO_TARGETS = [
"//pkg/kv/kvclient/rangefeed:rangefeed_test",
"//pkg/kv/kvclient/rangestats:rangestats",
"//pkg/kv/kvclient:kvclient",
"//pkg/kv/kvnemesis/kvnemesisutil:kvnemesisutil",
"//pkg/kv/kvnemesis:kvnemesis",
"//pkg/kv/kvnemesis:kvnemesis_test",
"//pkg/kv/kvprober:kvprober",
Expand Down Expand Up @@ -2477,6 +2478,7 @@ GET_X_DATA_TARGETS = [
"//pkg/kv/kvclient/rangefeed/rangefeedcache:get_x_data",
"//pkg/kv/kvclient/rangestats:get_x_data",
"//pkg/kv/kvnemesis:get_x_data",
"//pkg/kv/kvnemesis/kvnemesisutil:get_x_data",
"//pkg/kv/kvprober:get_x_data",
"//pkg/kv/kvserver:get_x_data",
"//pkg/kv/kvserver/abortspan:get_x_data",
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ func (b *Batch) ApproximateMutationBytes() int {
return b.approxMutationReqBytes
}

// Requests exposes the requests stashed in the batch thus far.
func (b *Batch) Requests() []roachpb.RequestUnion {
return b.reqs
}

// RawResponse returns the BatchResponse which was the result of a successful
// execution of the batch, and nil otherwise.
func (b *Batch) RawResponse() *roachpb.BatchResponse {
Expand Down
26 changes: 24 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ type DistSender struct {
// LatencyFunc is used to estimate the latency to other nodes.
latencyFunc LatencyFunc

onRangeSpanningNonTxnalBatch func(ba *roachpb.BatchRequest) *roachpb.Error

// locality is the description of the topography of the server on which the
// DistSender is running. It is used to estimate the latency to other nodes
// in the absence of a latency function.
Expand Down Expand Up @@ -499,6 +501,11 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
} else {
ds.latencyFunc = ds.rpcContext.RemoteClocks.Latency
}

if cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch != nil {
ds.onRangeSpanningNonTxnalBatch = cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch
}

return ds
}

Expand Down Expand Up @@ -1256,8 +1263,23 @@ func (ds *DistSender) divideAndSendBatchToRanges(
// If there's no transaction and ba spans ranges, possibly re-run as part of
// a transaction for consistency. The case where we don't need to re-run is
// if the read consistency is not required.
if ba.Txn == nil && ba.IsTransactional() && ba.ReadConsistency == roachpb.CONSISTENT {
return nil, roachpb.NewError(&roachpb.OpRequiresTxnError{})
if ba.Txn == nil {
if ba.IsTransactional() && ba.ReadConsistency == roachpb.CONSISTENT {
// NB: this check isn't quite right. We enter this if there's *any* transactional
// request here, but there could be a mix (for example a DeleteRangeUsingTombstone
// and a Put). DeleteRangeUsingTombstone gets split non-transactionally across
// batches, so that is probably what we would want for the mixed batch as well.
//
// Revisit if this ever becomes something we actually want to do, for now such
// batches will fail (re-wrapped in txn and then fail because some requests
// don't support txns).
return nil, roachpb.NewError(&roachpb.OpRequiresTxnError{})
}
if fn := ds.onRangeSpanningNonTxnalBatch; fn != nil {
if pErr := fn(ba); pErr != nil {
return nil, pErr
}
}
}
// If the batch contains a non-parallel commit EndTxn and spans ranges then
// we want the caller to come again with the EndTxn in a separate
Expand Down
11 changes: 10 additions & 1 deletion pkg/kv/kvclient/kvcoord/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@

package kvcoord

import "github.com/cockroachdb/cockroach/pkg/base"
import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
)

// ClientTestingKnobs contains testing options that dictate the behavior
// of the key-value client.
Expand Down Expand Up @@ -52,6 +55,12 @@ type ClientTestingKnobs struct {
// CommitWaitFilter allows tests to instrument the beginning of a transaction
// commit wait sleep.
CommitWaitFilter func()

// OnRangeSpanningNonTxnalBatch is invoked whenever DistSender attempts to split
// a non-transactional batch across a range boundary. The method may inject an
// error which, if non-nil, becomes the result of the batch. Otherwise, execution
// continues.
OnRangeSpanningNonTxnalBatch func(ba *roachpb.BatchRequest) *roachpb.Error
}

var _ base.ModuleTestingKnobs = &ClientTestingKnobs{}
Expand Down
11 changes: 10 additions & 1 deletion pkg/kv/kvnemesis/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"generator.go",
"kvnemesis.go",
"operations.go",
"seq_tracker.go",
"validator.go",
"watcher.go",
],
Expand All @@ -24,11 +25,13 @@ go_library(
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvnemesis/kvnemesisutil",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/liveness",
"//pkg/roachpb",
"//pkg/sql/catalog/bootstrap",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/util/bufalloc",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
Expand All @@ -41,7 +44,6 @@ go_library(
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"//pkg/util/uuid",
"@com_github_cockroachdb_cockroach_go_v2//crdb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_pebble//:pebble",
Expand All @@ -63,21 +65,28 @@ go_test(
"validator_test.go",
],
args = ["-test.timeout=55s"],
data = glob(["testdata/**"]),
embed = [":kvnemesis"],
deps = [
"//pkg/base",
"//pkg/config/zonepb",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvnemesis/kvnemesisutil",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/testutils",
"//pkg/testutils/echotest",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/buildutil",
"//pkg/util/envutil",
"//pkg/util/hlc",
"//pkg/util/leaktest",
Expand Down
119 changes: 92 additions & 27 deletions pkg/kv/kvnemesis/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvnemesis/kvnemesisutil"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -76,15 +77,47 @@ func (a *Applier) getNextDBRoundRobin() (*kv.DB, int32) {
return a.dbs[dbIdx], int32(dbIdx)
}

// Sentinel errors.
var (
errOmitted = errors.New("omitted")
errClosureTxnRollback = errors.New("rollback")
errDelRangeUsingTombstoneStraddlesRangeBoundary = errors.New("DeleteRangeUsingTombstone can not straddle range boundary")
)

func exceptOmitted(err error) bool { // true if errOmitted
return errors.Is(err, errOmitted)
}

func exceptRollback(err error) bool { // true if intentional txn rollback
return errors.Is(err, errClosureTxnRollback)
}

func exceptRetry(err error) bool { // true if retry error
return errors.HasInterface(err, (*roachpb.ClientVisibleRetryError)(nil))
}

func exceptUnhandledRetry(err error) bool {
return errors.HasType(err, (*roachpb.UnhandledRetryableError)(nil))
}

func exceptAmbiguous(err error) bool { // true if ambiguous result
return errors.HasInterface(err, (*roachpb.ClientVisibleAmbiguousError)(nil))
}

func exceptDelRangeUsingTombstoneStraddlesRangeBoundary(err error) bool {
return errors.Is(err, errDelRangeUsingTombstoneStraddlesRangeBoundary)
}

func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
switch o := op.GetValue().(type) {
case *GetOperation,
*PutOperation,
*ScanOperation,
*BatchOperation,
*DeleteOperation,
*DeleteRangeOperation:
applyClientOp(ctx, db, op, false /* inTxn */)
*DeleteRangeOperation,
*DeleteRangeUsingTombstoneOperation:
applyClientOp(ctx, db, op, false)
case *SplitOperation:
err := db.AdminSplit(ctx, o.Key, hlc.MaxTimestamp, roachpb.AdminSplitRequest_INGESTION)
o.Result = resultInit(ctx, err)
Expand All @@ -94,7 +127,6 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
case *ChangeReplicasOperation:
desc := getRangeDesc(ctx, o.Key, db)
_, err := db.AdminChangeReplicas(ctx, o.Key, desc, o.Changes)
// TODO(dan): Save returned desc?
o.Result = resultInit(ctx, err)
case *TransferLeaseOperation:
err := db.AdminTransferLease(ctx, o.Key, o.Target)
Expand All @@ -117,18 +149,32 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
retryOnAbort.Next()
}
savedTxn = txn
for i := range o.Ops {
op := &o.Ops[i]
op.Result().Reset() // in case we're a retry
applyClientOp(ctx, txn, op, true /* inTxn */)
// The KV api disallows use of a txn after an operation on it errors.
if r := op.Result(); r.Type == ResultType_Error {
return errors.DecodeError(ctx, *r.Err)
{
var err error
for i := range o.Ops {
op := &o.Ops[i]
op.Result().Reset() // in case we're a retry
if err != nil {
// If a previous op failed, mark this op as never invoked. We need
// to do this because we want, as an invariant, to have marked all
// operations as either failed or succeeded.
*op.Result() = resultInit(ctx, errOmitted)
continue
}

applyClientOp(ctx, txn, op, true)
// The KV api disallows use of a txn after an operation on it errors.
if r := op.Result(); r.Type == ResultType_Error {
err = errors.DecodeError(ctx, *r.Err)
}
}
if err != nil {
return err
}
}
if o.CommitInBatch != nil {
b := txn.NewBatch()
applyBatchOp(ctx, b, txn.CommitInBatch, o.CommitInBatch, true)
applyBatchOp(ctx, b, txn.CommitInBatch, o.CommitInBatch)
// The KV api disallows use of a txn after an operation on it errors.
if r := o.CommitInBatch.Result; r.Type == ResultType_Error {
return errors.DecodeError(ctx, *r.Err)
Expand All @@ -138,7 +184,7 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
case ClosureTxnType_Commit:
return nil
case ClosureTxnType_Rollback:
return errors.New("rollback")
return errClosureTxnRollback
default:
panic(errors.AssertionFailedf(`unknown closure txn type: %s`, o.Type))
}
Expand Down Expand Up @@ -221,7 +267,8 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) {
}
case *PutOperation:
_, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) {
b.Put(o.Key, o.Value)
b.Put(o.Key, o.Value())
setLastReqSeq(b, o.Seq)
})
o.Result = resultInit(ctx, err)
if err != nil {
Expand Down Expand Up @@ -250,13 +297,14 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) {
o.Result.Values = make([]KeyValue, len(kvs))
for i, kv := range kvs {
o.Result.Values[i] = KeyValue{
Key: []byte(kv.Key),
Key: kv.Key,
Value: kv.Value.RawBytes,
}
}
case *DeleteOperation:
res, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) {
b.Del(o.Key)
setLastReqSeq(b, o.Seq)
})
o.Result = resultInit(ctx, err)
if err != nil {
Expand All @@ -270,11 +318,9 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) {
o.Result.Keys[i] = deletedKey
}
case *DeleteRangeOperation:
if !inTxn {
panic(errors.AssertionFailedf(`non-transactional DelRange operations currently unsupported`))
}
res, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) {
b.DelRange(o.Key, o.EndKey, true /* returnKeys */)
setLastReqSeq(b, o.Seq)
})
o.Result = resultInit(ctx, err)
if err != nil {
Expand All @@ -287,20 +333,34 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) {
for i, deletedKey := range deletedKeys {
o.Result.Keys[i] = deletedKey
}
case *DeleteRangeUsingTombstoneOperation:
_, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) {
b.DelRangeUsingTombstone(o.Key, o.EndKey)
setLastReqSeq(b, o.Seq)
})
o.Result = resultInit(ctx, err)
if err != nil {
return
}
o.Result.OptionalTimestamp = ts
case *BatchOperation:
b := &kv.Batch{}
applyBatchOp(ctx, b, db.Run, o, inTxn)
applyBatchOp(ctx, b, db.Run, o)
default:
panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, o, o))
}
}

func setLastReqSeq(b *kv.Batch, seq kvnemesisutil.Seq) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could add a testing hook for Batch which can automatically assign seq nums to appended requests?

sl := b.Requests()
req := sl[len(sl)-1].GetInner()
h := req.Header()
h.KVNemesisSeq.Set(seq)
req.SetHeader(h)
}

func applyBatchOp(
ctx context.Context,
b *kv.Batch,
run func(context.Context, *kv.Batch) error,
o *BatchOperation,
inTxn bool,
ctx context.Context, b *kv.Batch, run func(context.Context, *kv.Batch) error, o *BatchOperation,
) {
for i := range o.Ops {
switch subO := o.Ops[i].GetValue().(type) {
Expand All @@ -311,7 +371,8 @@ func applyBatchOp(
b.Get(subO.Key)
}
case *PutOperation:
b.Put(subO.Key, subO.Value)
b.Put(subO.Key, subO.Value())
setLastReqSeq(b, subO.Seq)
case *ScanOperation:
if subO.Reverse && subO.ForUpdate {
b.ReverseScanForUpdate(subO.Key, subO.EndKey)
Expand All @@ -324,11 +385,13 @@ func applyBatchOp(
}
case *DeleteOperation:
b.Del(subO.Key)
setLastReqSeq(b, subO.Seq)
case *DeleteRangeOperation:
if !inTxn {
panic(errors.AssertionFailedf(`non-transactional batch DelRange operations currently unsupported`))
}
b.DelRange(subO.Key, subO.EndKey, true /* returnKeys */)
setLastReqSeq(b, subO.Seq)
case *DeleteRangeUsingTombstoneOperation:
b.DelRangeUsingTombstone(subO.Key, subO.EndKey)
setLastReqSeq(b, subO.Seq)
default:
panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO))
}
Expand Down Expand Up @@ -384,6 +447,8 @@ func applyBatchOp(
subO.Result.Keys[j] = key
}
}
case *DeleteRangeUsingTombstoneOperation:
subO.Result = resultInit(ctx, err)
default:
panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO))
}
Expand Down
Loading