Skip to content

Commit

Permalink
Merge #89477
Browse files Browse the repository at this point in the history
89477: kvnemesis: uniquely identify all versions r=erikgrinaker a=tbg

This is essentially a v2 of kvnemesis. While a lot of code has changed, it's not a rewrite, rather we are actually bringing kvnemesis closer to the idea which ultimately led to it being built. That idea is that if values can uniquely identify the operation which wrote them, serializability checking becomes easier as any observation of a value totally orders the reader and the writer with respect to each other. "Easier" meant both simpler code, as well as actually being able to computationally do the verification on complex histories.

Prior to this PR, kvnemesis was writing unique values where it could, but it couldn't do it for deletions - after all, a deletion is like writing a "nothing" to MVCC, and there wasn't any way to make two "nothings" distinguishable. Having broken with the basic premise of unique values, there was a lot of bending over backwards going on to avoid, for the most part, devolving into a "normal" serializability checker. However, for contrived edge cases this would not be avoidable, and so there would be histories that kvnemesis just couldn't handle.

"v2" (this PR) gets this right. The main contribution is that we now thread kvnemesis' sequence numbers all the way down into MVCC and back up with the RangeFeed. Values are now truly unique: a deletion tombstone is identifiable via its `MVCCValueHeader`, which carries the `kvnemesisutil.Seq` of the `Operation` that originally wrote it. On top of this, everything "just works" as you expect.

Plumbing testing-only fields through the KV API protobufs isn't something that was taken lightly and that required a good amount of deliberation. However, we figured out a clever (maybe too clever?) way to have these fields be zero-sized in production, meaning they cannot possibly affect production logic and they also do not influence struct sizes or the wire encoding. As a drawback, kvnemesis requires the `crdb_test` build tag (it will `t.Skip()` otherwise).

The remainder of this PR is mostly improvements in code quality. `echodriven` testing was introduced for many of the tests that could benefit from it. The core components of kvnemesis were reworked for clarity (the author spent the first week very confused and wishes for that experience to remain unrepeated by anyone). kvnemesis also tracks the execution timestamps as reported by the KV layer, which a future change could [use](#92898) for additional validation.

Of note is the updated doc comment, which is reproduced below in entirety.


Fixes #90955.
Fixes #88988.
Fixes #76435.
Fixes #69642.

----

Package kvnemesis exercises the KV API with random concurrent traffic (as
well as splits, merges, etc) and then validates that the observed behaviors
are serializable.

It does so in polynomial time based on the techniques used by [Elle] (see in
particular section 4.2.3), using the after-the-fact MVCC history as a record
of truth. It ensures that all write operations embed a unique identifier that
is stored in MVCC history, and can thus identify which of its operations'
mutations are reflected in the database ("recoverability" in Elle parlance).

A run of kvnemesis proceeds as follows.

First, a Generator is instantiated. It can create, upon request, a sequence
in which each element is a random Operation. Operations that are mutations
(i.e. not pure reads) are assigned a unique kvnemesisutil.Seq which will be
stored alongside the MVCC data written by the Operation.

A pool of worker threads concurrently generates Operations and executes them
against a CockroachDB cluster. Some of these Operations may
succeed, some may fail, and for some of them an ambiguous result may be
encountered.
Alongside this random traffic, kvnemesis maintains a RangeFeed that ingests
the MVCC history. This creates a "carbon copy" of the MVCC history.

All of these workers wind down once they have in aggregate completed a
configured number of steps.

Next, kvnemesis validates that the Operations that were executed and the
results they saw match the MVCC history, i.e. checks for Serializability. In
general, this is an NP-hard problem[^1], but the use of unique sequence
numbers (kvnemesisutil.Seq) makes it tractable, as each mutation in the MVCC
keyspace uniquely identifies the Operation that must have written the value.

We make use of this property as follows. First, the Validate method iterates
through all Operations performed and, for each, inspects

- the type of the Operation (i.e. Put, Delete, Get, ...),
- the (point or range) key of the operation, and
- its results (i.e. whether there was an error or which key-value pairs were returned).

Each atomic unit (i.e. individual non-transactional operation, or batch of
non-transactional operations, or transaction) results in a slice (in order
in which the Operations within executed[^2]) of observations, where each
element is either an observed read or an observed write.

For example, a Batch that first writes `v1` to `k1`, then scans `[k1-k3)`
(reading its own write as well as some other key k2 with value v2) and then
deletes `k3` would generate the following slice:

       [
         observedWrite(k1->v1),
         observedScan(k1->v1, k2->v2),
         observedWrite(k3->v3),
       ]

Each such slice (i.e. atomic unit) will then be compared to the MVCC history.
For all units that succeeded, their writes must match up with versions in
the MVCC history, and this matching is trivial thanks to unique values
(including for deletions, since we embed the kvnemesisutil.Seq in the value),
and in particular a single write will entirely fix the MVCC timestamp at
which the atomic unit must have executed.

For each read (i.e. get or scan), we compute at which time intervals each
read would have been valid. For example, if the MVCC history for a key `k1`
is as follows:

                  k1

       	 -----------------
       	 t5      v2
       	 t4
       	 t3
       	 t2     <del>
       	 t1      v1

then

  - observedGet(k1->v1)  is valid for [t1,t2),
  - observedGet(k1->nil) is valid for [0,t1) and [t2,t5), and
  - observedGet(k1->v2)  is valid for [t5,inf).

By intersecting the time intervals for each Operation in an atomic unit, we
then get the MVCC timestamps at which this Operation would have observed what
it ended up observing. If this intersection is empty, serializability must have
been violated.

In the error case, kvnemesis verifies that no part of the Operation became visible.
For ambiguous results, kvnemesis requires that either no Operation become visible,
or otherwise treats the atomic unit as committed.

The KV API also has the capability to return the actual execution timestamp directly
with responses. At the time of writing, kvnemesis does verify that it does do this
reliably, but it does not verify that the execution timestamp is contained in the
intersection of time intervals obtained from inspecting MVCC history[^3].

[Elle]: https://arxiv.org/pdf/2003.10554.pdf
[^1]: https://dl.acm.org/doi/10.1145/322154.322158
[^2]: there is currently concurrency within the atomic unit in kvnemesis. It
could in theory carry out multiple reads concurrently while not also writing,
such as DistSQL does, but this is not implemented today. See:
#64825
[^3]: tracked in #92898.

Epic: none

Co-authored-by: Tobias Grieger <tobias.b.grieger@gmail.com>
  • Loading branch information
craig[bot] and tbg committed Dec 7, 2022
2 parents 8165e39 + b3550aa commit 1b4947b
Show file tree
Hide file tree
Showing 255 changed files with 5,726 additions and 2,070 deletions.
2 changes: 2 additions & 0 deletions build/bazelutil/check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ pkg/cmd/prereqs/BUILD.bazel
EXISTING_CRDB_TEST_BUILD_CONSTRAINTS="
pkg/util/buildutil/crdb_test_off.go://go:build !crdb_test || crdb_test_off
pkg/util/buildutil/crdb_test_on.go://go:build crdb_test && !crdb_test_off
pkg/kv/kvnemesis/kvnemesisutil/crdb_test_off.go://go:build !crdb_test || crdb_test_off
pkg/kv/kvnemesis/kvnemesisutil/crdb_test_on.go://go:build crdb_test && !crdb_test_off
"

if [ -z "${COCKROACH_BAZEL_CHECK_FAST:-}" ]; then
Expand Down
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1119,6 +1119,7 @@ GO_TARGETS = [
"//pkg/kv/kvclient/rangefeed:rangefeed_test",
"//pkg/kv/kvclient/rangestats:rangestats",
"//pkg/kv/kvclient:kvclient",
"//pkg/kv/kvnemesis/kvnemesisutil:kvnemesisutil",
"//pkg/kv/kvnemesis:kvnemesis",
"//pkg/kv/kvnemesis:kvnemesis_test",
"//pkg/kv/kvprober:kvprober",
Expand Down Expand Up @@ -2481,6 +2482,7 @@ GET_X_DATA_TARGETS = [
"//pkg/kv/kvclient/rangefeed/rangefeedcache:get_x_data",
"//pkg/kv/kvclient/rangestats:get_x_data",
"//pkg/kv/kvnemesis:get_x_data",
"//pkg/kv/kvnemesis/kvnemesisutil:get_x_data",
"//pkg/kv/kvprober:get_x_data",
"//pkg/kv/kvserver:get_x_data",
"//pkg/kv/kvserver/abortspan:get_x_data",
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ func (b *Batch) ApproximateMutationBytes() int {
return b.approxMutationReqBytes
}

// Requests exposes the requests stashed in the batch thus far.
func (b *Batch) Requests() []roachpb.RequestUnion {
return b.reqs
}

// RawResponse returns the BatchResponse which was the result of a successful
// execution of the batch, and nil otherwise.
func (b *Batch) RawResponse() *roachpb.BatchResponse {
Expand Down
26 changes: 24 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ type DistSender struct {
// LatencyFunc is used to estimate the latency to other nodes.
latencyFunc LatencyFunc

onRangeSpanningNonTxnalBatch func(ba *roachpb.BatchRequest) *roachpb.Error

// locality is the description of the topography of the server on which the
// DistSender is running. It is used to estimate the latency to other nodes
// in the absence of a latency function.
Expand Down Expand Up @@ -499,6 +501,11 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
} else {
ds.latencyFunc = ds.rpcContext.RemoteClocks.Latency
}

if cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch != nil {
ds.onRangeSpanningNonTxnalBatch = cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch
}

return ds
}

Expand Down Expand Up @@ -1256,8 +1263,23 @@ func (ds *DistSender) divideAndSendBatchToRanges(
// If there's no transaction and ba spans ranges, possibly re-run as part of
// a transaction for consistency. The case where we don't need to re-run is
// if the read consistency is not required.
if ba.Txn == nil && ba.IsTransactional() && ba.ReadConsistency == roachpb.CONSISTENT {
return nil, roachpb.NewError(&roachpb.OpRequiresTxnError{})
if ba.Txn == nil {
if ba.IsTransactional() && ba.ReadConsistency == roachpb.CONSISTENT {
// NB: this check isn't quite right. We enter this if there's *any* transactional
// request here, but there could be a mix (for example a DeleteRangeUsingTombstone
// and a Put). DeleteRangeUsingTombstone gets split non-transactionally across
// batches, so that is probably what we would want for the mixed batch as well.
//
// Revisit if this ever becomes something we actually want to do, for now such
// batches will fail (re-wrapped in txn and then fail because some requests
// don't support txns).
return nil, roachpb.NewError(&roachpb.OpRequiresTxnError{})
}
if fn := ds.onRangeSpanningNonTxnalBatch; fn != nil {
if pErr := fn(ba); pErr != nil {
return nil, pErr
}
}
}
// If the batch contains a non-parallel commit EndTxn and spans ranges then
// we want the caller to come again with the EndTxn in a separate
Expand Down
11 changes: 10 additions & 1 deletion pkg/kv/kvclient/kvcoord/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@

package kvcoord

import "github.com/cockroachdb/cockroach/pkg/base"
import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
)

// ClientTestingKnobs contains testing options that dictate the behavior
// of the key-value client.
Expand Down Expand Up @@ -52,6 +55,12 @@ type ClientTestingKnobs struct {
// CommitWaitFilter allows tests to instrument the beginning of a transaction
// commit wait sleep.
CommitWaitFilter func()

// OnRangeSpanningNonTxnalBatch is invoked whenever DistSender attempts to split
// a non-transactional batch across a range boundary. The method may inject an
// error which, if non-nil, becomes the result of the batch. Otherwise, execution
// continues.
OnRangeSpanningNonTxnalBatch func(ba *roachpb.BatchRequest) *roachpb.Error
}

var _ base.ModuleTestingKnobs = &ClientTestingKnobs{}
Expand Down
11 changes: 10 additions & 1 deletion pkg/kv/kvnemesis/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"generator.go",
"kvnemesis.go",
"operations.go",
"seq_tracker.go",
"validator.go",
"watcher.go",
],
Expand All @@ -24,11 +25,13 @@ go_library(
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvnemesis/kvnemesisutil",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/liveness",
"//pkg/roachpb",
"//pkg/sql/catalog/bootstrap",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/util/bufalloc",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
Expand All @@ -41,7 +44,6 @@ go_library(
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"//pkg/util/uuid",
"@com_github_cockroachdb_cockroach_go_v2//crdb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_pebble//:pebble",
Expand All @@ -63,21 +65,28 @@ go_test(
"validator_test.go",
],
args = ["-test.timeout=55s"],
data = glob(["testdata/**"]),
embed = [":kvnemesis"],
deps = [
"//pkg/base",
"//pkg/config/zonepb",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvnemesis/kvnemesisutil",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/testutils",
"//pkg/testutils/echotest",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/buildutil",
"//pkg/util/envutil",
"//pkg/util/hlc",
"//pkg/util/leaktest",
Expand Down
119 changes: 92 additions & 27 deletions pkg/kv/kvnemesis/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvnemesis/kvnemesisutil"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -76,15 +77,47 @@ func (a *Applier) getNextDBRoundRobin() (*kv.DB, int32) {
return a.dbs[dbIdx], int32(dbIdx)
}

// Sentinel errors.
var (
errOmitted = errors.New("omitted")
errClosureTxnRollback = errors.New("rollback")
errDelRangeUsingTombstoneStraddlesRangeBoundary = errors.New("DeleteRangeUsingTombstone can not straddle range boundary")
)

func exceptOmitted(err error) bool { // true if errOmitted
return errors.Is(err, errOmitted)
}

func exceptRollback(err error) bool { // true if intentional txn rollback
return errors.Is(err, errClosureTxnRollback)
}

func exceptRetry(err error) bool { // true if retry error
return errors.HasInterface(err, (*roachpb.ClientVisibleRetryError)(nil))
}

func exceptUnhandledRetry(err error) bool {
return errors.HasType(err, (*roachpb.UnhandledRetryableError)(nil))
}

func exceptAmbiguous(err error) bool { // true if ambiguous result
return errors.HasInterface(err, (*roachpb.ClientVisibleAmbiguousError)(nil))
}

func exceptDelRangeUsingTombstoneStraddlesRangeBoundary(err error) bool {
return errors.Is(err, errDelRangeUsingTombstoneStraddlesRangeBoundary)
}

func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
switch o := op.GetValue().(type) {
case *GetOperation,
*PutOperation,
*ScanOperation,
*BatchOperation,
*DeleteOperation,
*DeleteRangeOperation:
applyClientOp(ctx, db, op, false /* inTxn */)
*DeleteRangeOperation,
*DeleteRangeUsingTombstoneOperation:
applyClientOp(ctx, db, op, false)
case *SplitOperation:
err := db.AdminSplit(ctx, o.Key, hlc.MaxTimestamp, roachpb.AdminSplitRequest_INGESTION)
o.Result = resultInit(ctx, err)
Expand All @@ -94,7 +127,6 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
case *ChangeReplicasOperation:
desc := getRangeDesc(ctx, o.Key, db)
_, err := db.AdminChangeReplicas(ctx, o.Key, desc, o.Changes)
// TODO(dan): Save returned desc?
o.Result = resultInit(ctx, err)
case *TransferLeaseOperation:
err := db.AdminTransferLease(ctx, o.Key, o.Target)
Expand All @@ -117,18 +149,32 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
retryOnAbort.Next()
}
savedTxn = txn
for i := range o.Ops {
op := &o.Ops[i]
op.Result().Reset() // in case we're a retry
applyClientOp(ctx, txn, op, true /* inTxn */)
// The KV api disallows use of a txn after an operation on it errors.
if r := op.Result(); r.Type == ResultType_Error {
return errors.DecodeError(ctx, *r.Err)
{
var err error
for i := range o.Ops {
op := &o.Ops[i]
op.Result().Reset() // in case we're a retry
if err != nil {
// If a previous op failed, mark this op as never invoked. We need
// to do this because we want, as an invariant, to have marked all
// operations as either failed or succeeded.
*op.Result() = resultInit(ctx, errOmitted)
continue
}

applyClientOp(ctx, txn, op, true)
// The KV api disallows use of a txn after an operation on it errors.
if r := op.Result(); r.Type == ResultType_Error {
err = errors.DecodeError(ctx, *r.Err)
}
}
if err != nil {
return err
}
}
if o.CommitInBatch != nil {
b := txn.NewBatch()
applyBatchOp(ctx, b, txn.CommitInBatch, o.CommitInBatch, true)
applyBatchOp(ctx, b, txn.CommitInBatch, o.CommitInBatch)
// The KV api disallows use of a txn after an operation on it errors.
if r := o.CommitInBatch.Result; r.Type == ResultType_Error {
return errors.DecodeError(ctx, *r.Err)
Expand All @@ -138,7 +184,7 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
case ClosureTxnType_Commit:
return nil
case ClosureTxnType_Rollback:
return errors.New("rollback")
return errClosureTxnRollback
default:
panic(errors.AssertionFailedf(`unknown closure txn type: %s`, o.Type))
}
Expand Down Expand Up @@ -221,7 +267,8 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) {
}
case *PutOperation:
_, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) {
b.Put(o.Key, o.Value)
b.Put(o.Key, o.Value())
setLastReqSeq(b, o.Seq)
})
o.Result = resultInit(ctx, err)
if err != nil {
Expand Down Expand Up @@ -250,13 +297,14 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) {
o.Result.Values = make([]KeyValue, len(kvs))
for i, kv := range kvs {
o.Result.Values[i] = KeyValue{
Key: []byte(kv.Key),
Key: kv.Key,
Value: kv.Value.RawBytes,
}
}
case *DeleteOperation:
res, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) {
b.Del(o.Key)
setLastReqSeq(b, o.Seq)
})
o.Result = resultInit(ctx, err)
if err != nil {
Expand All @@ -270,11 +318,9 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) {
o.Result.Keys[i] = deletedKey
}
case *DeleteRangeOperation:
if !inTxn {
panic(errors.AssertionFailedf(`non-transactional DelRange operations currently unsupported`))
}
res, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) {
b.DelRange(o.Key, o.EndKey, true /* returnKeys */)
setLastReqSeq(b, o.Seq)
})
o.Result = resultInit(ctx, err)
if err != nil {
Expand All @@ -287,20 +333,34 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) {
for i, deletedKey := range deletedKeys {
o.Result.Keys[i] = deletedKey
}
case *DeleteRangeUsingTombstoneOperation:
_, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) {
b.DelRangeUsingTombstone(o.Key, o.EndKey)
setLastReqSeq(b, o.Seq)
})
o.Result = resultInit(ctx, err)
if err != nil {
return
}
o.Result.OptionalTimestamp = ts
case *BatchOperation:
b := &kv.Batch{}
applyBatchOp(ctx, b, db.Run, o, inTxn)
applyBatchOp(ctx, b, db.Run, o)
default:
panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, o, o))
}
}

func setLastReqSeq(b *kv.Batch, seq kvnemesisutil.Seq) {
sl := b.Requests()
req := sl[len(sl)-1].GetInner()
h := req.Header()
h.KVNemesisSeq.Set(seq)
req.SetHeader(h)
}

func applyBatchOp(
ctx context.Context,
b *kv.Batch,
run func(context.Context, *kv.Batch) error,
o *BatchOperation,
inTxn bool,
ctx context.Context, b *kv.Batch, run func(context.Context, *kv.Batch) error, o *BatchOperation,
) {
for i := range o.Ops {
switch subO := o.Ops[i].GetValue().(type) {
Expand All @@ -311,7 +371,8 @@ func applyBatchOp(
b.Get(subO.Key)
}
case *PutOperation:
b.Put(subO.Key, subO.Value)
b.Put(subO.Key, subO.Value())
setLastReqSeq(b, subO.Seq)
case *ScanOperation:
if subO.Reverse && subO.ForUpdate {
b.ReverseScanForUpdate(subO.Key, subO.EndKey)
Expand All @@ -324,11 +385,13 @@ func applyBatchOp(
}
case *DeleteOperation:
b.Del(subO.Key)
setLastReqSeq(b, subO.Seq)
case *DeleteRangeOperation:
if !inTxn {
panic(errors.AssertionFailedf(`non-transactional batch DelRange operations currently unsupported`))
}
b.DelRange(subO.Key, subO.EndKey, true /* returnKeys */)
setLastReqSeq(b, subO.Seq)
case *DeleteRangeUsingTombstoneOperation:
b.DelRangeUsingTombstone(subO.Key, subO.EndKey)
setLastReqSeq(b, subO.Seq)
default:
panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO))
}
Expand Down Expand Up @@ -384,6 +447,8 @@ func applyBatchOp(
subO.Result.Keys[j] = key
}
}
case *DeleteRangeUsingTombstoneOperation:
subO.Result = resultInit(ctx, err)
default:
panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO))
}
Expand Down
Loading

0 comments on commit 1b4947b

Please sign in to comment.