diff --git a/c-deps/libroach/incremental_iterator.cc b/c-deps/libroach/incremental_iterator.cc index b9cf05e6920c..8b808c867d8d 100644 --- a/c-deps/libroach/incremental_iterator.cc +++ b/c-deps/libroach/incremental_iterator.cc @@ -25,68 +25,24 @@ DBIncrementalIterator::DBIncrementalIterator(DBEngine* engine, DBIterOptions opt end(end), write_intent(write_intent) { - sanity_iter.reset(NULL); - start_time.set_wall_time(start.wall_time); start_time.set_logical(start.logical); end_time.set_wall_time(end.wall_time); end_time.set_logical(end.logical); - // sanity_iter is only relevant if a time-bound iterator is required. - // - // It is necessary for correctness that sanity_iter be created before - // iter. This is because the provided Reader may not be a consistent - // snapshot, so the two could end up observing different information. The hack - // around sanityCheckMetadataKey only works properly if all possible - // discrepancies between the two iterators lead to intents and values falling - // outside of the timestamp range **from iter's perspective**. This allows us - // to simply ignore discrepancies that we notice in advance(). See #34819. + DBIterOptions iter_opts = opts; if (!EmptyTimestamp(opts.min_timestamp_hint) || !EmptyTimestamp(opts.max_timestamp_hint)) { assert(!EmptyTimestamp(opts.max_timestamp_hint)); DBIterOptions nontimebound_opts = DBIterOptions(); nontimebound_opts.upper_bound = opts.upper_bound; - sanity_iter.reset(DBNewIter(engine, nontimebound_opts)); + iter_opts = nontimebound_opts; + time_bound_iter.reset(DBNewIter(engine, opts)); } - iter.reset(DBNewIter(engine, opts)); + iter.reset(DBNewIter(engine, iter_opts)); } DBIncrementalIterator::~DBIncrementalIterator() {} -// sanityCheckMetadataKey looks up the current `iter` key using a normal, -// non-time-bound iterator and returns the value if the normal iterator also -// sees that exact key. Otherwise, it returns false. It's used in the workaround -// in `advanceKey` for a time-bound iterator bug. -rocksdb::Slice DBIncrementalIterator::sanityCheckMetadataKey() { - // If sanityIter is not set, it's because we're not using time-bound - // iterator and we don't need the sanity check. - if (sanity_iter == NULL) { - valid = true; - status = ToDBStatus(rocksdb::Status::OK()); - return iter.get()->rep->value(); - } - - auto sanity_iter_rep = sanity_iter->rep.get(); - sanity_iter_rep->Seek(iter->rep->key()); - - if (!sanity_iter_rep->status().ok()) { - valid = false; - status = ToDBStatus(sanity_iter_rep->status()); - return NULL; - } else if (!sanity_iter_rep->Valid()) { - valid = false; - status = ToDBStatus(rocksdb::Status::OK()); - return NULL; - } else if (kComparator.Compare(sanity_iter_rep->key(), iter->rep->key()) != 0) { - valid = false; - status = ToDBStatus(rocksdb::Status::OK()); - return NULL; - } - - valid = true; - status = ToDBStatus(rocksdb::Status::OK()); - return sanity_iter.get()->rep->value(); -} - // legacyTimestampIsLess compares the timestamps t1 and t2, and returns a // boolean indicating whether t1 is less than t2. bool DBIncrementalIterator::legacyTimestampIsLess(const cockroach::util::hlc::LegacyTimestamp& t1, @@ -95,20 +51,106 @@ bool DBIncrementalIterator::legacyTimestampIsLess(const cockroach::util::hlc::Le (t1.wall_time() == t2.wall_time() && t1.logical() < t2.logical()); } -// advanceKey finds the key and its appropriate version which lies in +// extractKey extracts the key portion of the mvcc_key and put it in key. It +// returns a validity indicator. +WARN_UNUSED_RESULT bool DBIncrementalIterator::extractKey(rocksdb::Slice mvcc_key, rocksdb::Slice *key) { + rocksdb::Slice ts; + if (!SplitKey(mvcc_key, key, &ts)) { + valid = false; + status = FmtStatus("failed to split mvcc key"); + return false; + } + return true; +} + +// maybeSkipKeys checks if any keys can be skipped by using a time-bound +// iterator. If keys can be skipped, it will update the main iterator to point +// to the earliest version of the next candidate key. +// It is expected that TBI is at a key <= main iterator key when calling +// maybeSkipKeys(). +void DBIncrementalIterator::maybeSkipKeys() { + if (time_bound_iter == nullptr) { + // We don't have a TBI, so we can't skip any keys. + return; + } + + rocksdb::Slice tbi_key; + if(!extractKey(time_bound_iter->rep->key(), &tbi_key)) { + return; + } + rocksdb::Slice iter_key; + if(!extractKey(time_bound_iter->rep->key(), &iter_key)) { + return; + } + + if (iter_key.compare(tbi_key) > 0) { + // If the iterKey got ahead of the TBI key, advance the TBI Key. + auto state = DBIterNext(time_bound_iter.get(), true /* skip_current_key_versions */); + if (!state.valid) { + status = state.status; + valid = false; + return; + } + if(!extractKey(time_bound_iter->rep->key(), &tbi_key)) { + return; + } + + auto cmp = iter_key.compare(tbi_key); + // The case where iterKey.Compare(tbiKey) == 0 is the fast-path is when the + // TBI and the main iterator are in lockstep. In this case, the main + // iterator was referencing the next key that would be visited by the TBI. + // This means that for the incremental iterator to perform a Next or NextKey + // will require only 1 extra NextKey invocation while they remain in + // lockstep. + + if (cmp > 0) { + // If the tbiKey is still behind the iterKey, the TBI key may be seeing + // phantom MVCCKey.Keys. These keys may not be seen by the main iterator + // due to aborted transactions and keys which have been subsumed due to + // range tombstones. In this case we can SeekGE() the TBI to the main iterator. + + // NB: Seek() is expensive, and this is only acceptable since we expect + // this case to rarely occur. + state = DBIterSeek(time_bound_iter.get(), ToDBKey(iter_key)); + if (!state.valid) { + status = state.status; + valid = false; + return; + } + if(!extractKey(time_bound_iter->rep->key(), &tbi_key)) { + return; + } + cmp = iter_key.compare(tbi_key); + } + + if (cmp < 0) { + // In the case that the next MVCC key that the TBI observes is not the + // same as the main iterator, we may be able to skip over a large group of + // keys. The main iterator is seeked to the TBI in hopes that many keys + // were skipped. Note that a Seek() is an order of magnitude more + // expensive than a Next(). + state = DBIterSeek(iter.get(), ToDBKey(tbi_key)); + if (!state.valid) { + status = state.status; + valid = false; + return; + } + } + } +} + +// advanceKey advances the main iterator until it is referencing a key within // (start_time, end_time]. +// It populates i.err with an error if either of the following was encountered: +// a) an inline value +// b) an intent with a timestamp within the incremental iterator's bounds void DBIncrementalIterator::advanceKey() { for (;;) { + maybeSkipKeys(); if (!valid) { return; } - if (!iter.get()->rep->Valid()) { - status = ToDBStatus(iter.get()->rep->status()); - valid = false; - return; - } - rocksdb::Slice key; int64_t wall_time = 0; int32_t logical = 0; @@ -123,23 +165,7 @@ void DBIncrementalIterator::advanceKey() { meta.mutable_timestamp()->set_wall_time(wall_time); meta.mutable_timestamp()->set_logical(logical); } else { - // HACK(dan): Work around a known bug in the time-bound iterator. - // For reasons described in #28358, a time-bound iterator will - // sometimes see an unresolved intent where there is none. A normal - // iterator doesn't have this problem, so we work around it by - // double checking all non-value keys. If sanityCheckMetadataKey - // returns false, then the intent was a phantom and we ignore it. - // NB: this could return a older/newer intent than the one the time-bound - // iterator saw; this is handled. - auto value = sanityCheckMetadataKey(); - if (status.data != NULL) { - return; - } else if (!valid) { - valid = true; - DBIterNext(iter.get(), false); - continue; - } - + const auto value = iter->rep->value(); if (!meta.ParseFromArray(value.data(), value.size())) { status = ToDBString("failed to parse meta"); valid = false; @@ -172,15 +198,21 @@ void DBIncrementalIterator::advanceKey() { } } + DBIterState state; if (legacyTimestampIsLess(end_time, meta.timestamp())) { - DBIterNext(iter.get(), false); - continue; + state = DBIterNext(iter.get(), false); } else if (!legacyTimestampIsLess(start_time, meta.timestamp())) { - DBIterNext(iter.get(), true); - continue; + state = DBIterNext(iter.get(), true); + } else { + // We have found a key within the time bounds, break. + break; } - break; + if (!state.valid) { + status = state.status; + valid = false; + return; + } } } @@ -201,14 +233,43 @@ DBIterState DBIncrementalIterator::getState() { return state; } +// seek advances the iterator to the first key in the engine which is >= the +// provided key. key should be a metadata key to ensure that the iterator has a +// chance to observe any intents on the key if they are there. DBIterState DBIncrementalIterator::seek(DBKey key) { - DBIterSeek(iter.get(), key); + if (time_bound_iter != nullptr) { + // Check which is the first key seen by the TBI. + auto state = DBIterSeek(time_bound_iter.get(), key); + if (!state.valid) { + status = state.status; + valid = false; + return getState(); + } + const rocksdb::Slice tbi_key(time_bound_iter->rep->key()); + const rocksdb::Slice iter_key(key.key.data, key.key.len); + if (tbi_key.compare(iter_key) > 0) { + // If the first key that the TBI sees is ahead of the given startKey, we + // can seek directly to the first version of the key. + key = ToDBKey(tbi_key); + } + } + auto state = DBIterSeek(iter.get(), key); + if (!state.valid) { + status = state.status; + valid = false; + return getState(); + } advanceKey(); return getState(); } DBIterState DBIncrementalIterator::next(bool skip_current_key_versions) { - DBIterNext(iter.get(), skip_current_key_versions); + auto state = DBIterNext(iter.get(), skip_current_key_versions); + if (!state.valid) { + status = state.status; + valid = false; + return getState(); + } advanceKey(); return getState(); } diff --git a/c-deps/libroach/incremental_iterator.h b/c-deps/libroach/incremental_iterator.h index 0a3990eabe53..86b0c5954def 100644 --- a/c-deps/libroach/incremental_iterator.h +++ b/c-deps/libroach/incremental_iterator.h @@ -17,6 +17,11 @@ #include "protos/roachpb/data.pb.h" #include "status.h" +// DBIncrementalIterator is the C++ implementation of MVCCIncrementalIterator. +// This implementation should be kept in sync with MVCCIncrementalIterator, +// which is used as an oracle to test DBIncrementalIterator. +// For general documentation around this iterator please refer to the comments +// in mvcc_incremental_iterator.go. struct DBIncrementalIterator { DBIncrementalIterator(DBEngine* engine, DBIterOptions opts, DBKey start, DBKey end, DBString* write_intent); @@ -27,7 +32,31 @@ struct DBIncrementalIterator { const rocksdb::Slice value(); std::unique_ptr iter; - std::unique_ptr sanity_iter; + + // The time_bound_iter is used as a performance optimization to potentially + // allow skipping over a large number of keys. The time bound iterator skips + // sstables which do not contain keys modified in a certain interval. + // + // A time-bound iterator cannot be used by itself due to a bug in the time- + // bound iterator (#28358). This was historically augmented with an iterator + // without the time-bound optimization to act as a sanity iterator, but + // issues remained (#43799), so now the iterator above is the main iterator + // the timeBoundIter is used to check if any keys can be skipped by the main + // iterator. + // + // Note regarding the correctness of the time-bound iterator optimization: + // + // When using (t_s, t_e], say there is a version (committed or provisional) + // k@t where t is in that interval, that is visible to iter. All sstables + // containing k@t will be included in timeBoundIter. Note that there may be + // multiple sequence numbers for the key k@t at the storage layer, say k@t#n1, + // k@t#n2, where n1 > n2, some of which may be deleted, but the latest + // sequence number will be visible using iter (since not being visible would be + // a contradiction of the initial assumption that k@t is visible to iter). + // Since there is no delete across all sstables that deletes k@t#n1, there is + // no delete in the subset of sstables used by timeBoundIter that deletes + // k@t#n1, so the timeBoundIter will see k@t. + std::unique_ptr time_bound_iter = nullptr; DBEngine* engine; DBIterOptions opts; @@ -42,6 +71,8 @@ struct DBIncrementalIterator { const cockroach::util::hlc::LegacyTimestamp& t2); DBIterState getState(); void advanceKey(); + void maybeSkipKeys(); + bool extractKey(rocksdb::Slice mvcc_key, rocksdb::Slice *key); cockroach::util::hlc::LegacyTimestamp start_time; cockroach::util::hlc::LegacyTimestamp end_time; diff --git a/pkg/storage/engine/bench_test.go b/pkg/storage/engine/bench_test.go index 18fa85f0f865..5ca692e02ada 100644 --- a/pkg/storage/engine/bench_test.go +++ b/pkg/storage/engine/bench_test.go @@ -20,7 +20,9 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -53,6 +55,96 @@ type benchDataOptions struct { transactional bool } +// loadTestData writes numKeys keys in numBatches separate batches. Keys are +// written in order. Every key in a given batch has the same MVCC timestamp; +// batch timestamps start at batchTimeSpan and increase in intervals of +// batchTimeSpan. +// +// Importantly, writing keys in order convinces RocksDB to output one SST per +// batch, where each SST contains keys of only one timestamp. E.g., writing A,B +// at t0 and C at t1 will create two SSTs: one for A,B that only contains keys +// at t0, and one for C that only contains keys at t1. Conversely, writing A, C +// at t0 and B at t1 would create just one SST that contained A,B,C (due to an +// immediate compaction). +// +// The creation of the database is time consuming, so the caller can choose +// whether to use a temporary or permanent location. +func loadTestData(dir string, numKeys, numBatches, batchTimeSpan, valueBytes int) (Engine, error) { + ctx := context.Background() + + exists := true + if _, err := os.Stat(dir); os.IsNotExist(err) { + exists = false + } + + eng, err := NewRocksDB( + RocksDBConfig{ + StorageConfig: base.StorageConfig{ + Settings: cluster.MakeTestingClusterSettings(), + Dir: dir, + }, + }, + RocksDBCache{}, + ) + if err != nil { + return nil, err + } + + if exists { + testutils.ReadAllFiles(filepath.Join(dir, "*")) + return eng, nil + } + + log.Infof(context.Background(), "creating test data: %s", dir) + + // Generate the same data every time. + rng := rand.New(rand.NewSource(1449168817)) + + keys := make([]roachpb.Key, numKeys) + for i := 0; i < numKeys; i++ { + keys[i] = roachpb.Key(encoding.EncodeUvarintAscending([]byte("key-"), uint64(i))) + } + + sstTimestamps := make([]int64, numBatches) + for i := 0; i < len(sstTimestamps); i++ { + sstTimestamps[i] = int64((i + 1) * batchTimeSpan) + } + + var batch Batch + var minWallTime int64 + for i, key := range keys { + if scaled := len(keys) / numBatches; (i % scaled) == 0 { + if i > 0 { + log.Infof(ctx, "committing (%d/~%d)", i/scaled, numBatches) + if err := batch.Commit(false /* sync */); err != nil { + return nil, err + } + batch.Close() + if err := eng.Flush(); err != nil { + return nil, err + } + } + batch = eng.NewBatch() + minWallTime = sstTimestamps[i/scaled] + } + timestamp := hlc.Timestamp{WallTime: minWallTime + rand.Int63n(int64(batchTimeSpan))} + value := roachpb.MakeValueFromBytes(randutil.RandBytes(rng, valueBytes)) + value.InitChecksum(key) + if err := MVCCPut(ctx, batch, nil, key, timestamp, value, nil); err != nil { + return nil, err + } + } + if err := batch.Commit(false /* sync */); err != nil { + return nil, err + } + batch.Close() + if err := eng.Flush(); err != nil { + return nil, err + } + + return eng, nil +} + // setupMVCCData writes up to numVersions values at each of numKeys // keys. The number of versions written for each key is chosen // randomly according to a uniform distribution. Each successive diff --git a/pkg/storage/engine/mvcc_incremental_iterator.go b/pkg/storage/engine/mvcc_incremental_iterator.go index 22d1d802f8a6..44cd4d5bdf03 100644 --- a/pkg/storage/engine/mvcc_incremental_iterator.go +++ b/pkg/storage/engine/mvcc_incremental_iterator.go @@ -24,6 +24,11 @@ import ( // most recent version (before or at endTime) of that key. If the key was most // recently deleted, this is signaled with an empty value. // +// MVCCIncrementalIterator will return an error if either of the following are +// encountered: +// 1. An inline value (non-user data) +// 2. An intent whose timestamp lies within the time bounds +// // Note: The endTime is inclusive to be consistent with the non-incremental // iterator, where reads at a given timestamp return writes at that // timestamp. The startTime is then made exclusive so that iterating time 1 to @@ -47,23 +52,39 @@ import ( // ... // } // +// Note regarding the correctness of the time-bound iterator optimization: +// +// When using (t_s, t_e], say there is a version (committed or provisional) +// k@t where t is in that interval, that is visible to iter. All sstables +// containing k@t will be included in timeBoundIter. Note that there may be +// multiple sequence numbers for the key k@t at the storage layer, say k@t#n1, +// k@t#n2, where n1 > n2, some of which may be deleted, but the latest +// sequence number will be visible using iter (since not being visible would be +// a contradiction of the initial assumption that k@t is visible to iter). +// Since there is no delete across all sstables that deletes k@t#n1, there is +// no delete in the subset of sstables used by timeBoundIter that deletes +// k@t#n1, so the timeBoundIter will see k@t. +// // NOTE: This is not used by CockroachDB and has been preserved to serve as an // oracle to prove the correctness of the new export logic. type MVCCIncrementalIterator struct { iter Iterator - // fields used for a workaround for a bug in the time-bound iterator - // (#28358) - upperBound roachpb.Key - reader Reader - sanityIter Iterator + // A time-bound iterator cannot be used by itself due to a bug in the time- + // bound iterator (#28358). This was historically augmented with an iterator + // without the time-bound optimization to act as a sanity iterator, but + // issues remained (#43799), so now the iterator above is the main iterator + // the timeBoundIter is used to check if any keys can be skipped by the main + // iterator. + timeBoundIter Iterator startTime hlc.Timestamp endTime hlc.Timestamp err error valid bool - // For allocation avoidance. + // For allocation avoidance, meta is used to store the timestamp of keys + // regardless if they are metakeys. meta enginepb.MVCCMetadata } @@ -72,43 +93,69 @@ var _ SimpleIterator = &MVCCIncrementalIterator{} // MVCCIncrementalIterOptions bundles options for NewMVCCIncrementalIterator. type MVCCIncrementalIterOptions struct { IterOptions IterOptions - StartTime hlc.Timestamp - EndTime hlc.Timestamp + // Keys visible by the MVCCIncrementalIterator must be within (StartTime, + // EndTime]. Note that if {Min,Max}TimestampHints are specified in + // IterOptions, the timestamp hints interval should include the start and end + // time. + StartTime hlc.Timestamp + EndTime hlc.Timestamp } // NewMVCCIncrementalIterator creates an MVCCIncrementalIterator with the -// specified reader and options. +// specified reader and options. The timestamp hint range should not be more +// restrictive than the start and end time range. +// TODO(pbardea): Add validation here and in C++ implementation that the +// timestamp hints are not more restrictive than incremental iterator's +// (startTime, endTime] interval. func NewMVCCIncrementalIterator( reader Reader, opts MVCCIncrementalIterOptions, ) *MVCCIncrementalIterator { - var sanityIter Iterator + var iter Iterator + var timeBoundIter Iterator if !opts.IterOptions.MinTimestampHint.IsEmpty() && !opts.IterOptions.MaxTimestampHint.IsEmpty() { - // It is necessary for correctness that sanityIter be created before iter. - // This is because the provided Reader may not be a consistent snapshot, so - // the two could end up observing different information. The hack around - // sanityCheckMetadataKey only works properly if all possible discrepancies - // between the two iterators lead to intents and values falling outside of - // the timestamp range **from iter's perspective**. This allows us to simply - // ignore discrepancies that we notice in advance(). See #34819. - sanityIter = reader.NewIterator(IterOptions{ + // An iterator without the timestamp hints is created to ensure that the + // iterator visits every required version of every key that has changed. + iter = reader.NewIterator(IterOptions{ UpperBound: opts.IterOptions.UpperBound, }) + timeBoundIter = reader.NewIterator(opts.IterOptions) + } else { + iter = reader.NewIterator(opts.IterOptions) } return &MVCCIncrementalIterator{ - reader: reader, - upperBound: opts.IterOptions.UpperBound, - iter: reader.NewIterator(opts.IterOptions), - startTime: opts.StartTime, - endTime: opts.EndTime, - sanityIter: sanityIter, + iter: iter, + startTime: opts.StartTime, + endTime: opts.EndTime, + timeBoundIter: timeBoundIter, } } // SeekGE advances the iterator to the first key in the engine which is >= the -// provided key. +// provided key. startKey should be a metadata key to ensure that the iterator +// has a chance to observe any intents on the key if they are there. func (i *MVCCIncrementalIterator) SeekGE(startKey MVCCKey) { + if i.timeBoundIter != nil { + // Check which is the first key seen by the TBI. + i.timeBoundIter.SeekGE(startKey) + if ok, err := i.timeBoundIter.Valid(); !ok { + i.err = err + i.valid = false + return + } + tbiKey := i.timeBoundIter.Key().Key + if tbiKey.Compare(startKey.Key) > 0 { + // If the first key that the TBI sees is ahead of the given startKey, we + // can seek directly to the first version of the key. + startKey = MakeMVCCMetadataKey(tbiKey) + } + } i.iter.SeekGE(startKey) + if ok, err := i.iter.Valid(); !ok { + i.err = err + i.valid = false + return + } i.err = nil i.valid = true i.advance() @@ -117,8 +164,8 @@ func (i *MVCCIncrementalIterator) SeekGE(startKey MVCCKey) { // Close frees up resources held by the iterator. func (i *MVCCIncrementalIterator) Close() { i.iter.Close() - if i.sanityIter != nil { - i.sanityIter.Close() + if i.timeBoundIter != nil { + i.timeBoundIter.Close() } } @@ -127,57 +174,121 @@ func (i *MVCCIncrementalIterator) Close() { // key. func (i *MVCCIncrementalIterator) Next() { i.iter.Next() + if ok, err := i.iter.Valid(); !ok { + i.err = err + i.valid = false + return + } i.advance() } -// NextKey advances the iterator to the next MVCC key. This operation is -// distinct from Next which advances to the next version of the current key or -// the next key if the iterator is currently located at the last version for a -// key. +// NextKey advances the iterator to the next key. This operation is distinct +// from Next which advances to the next version of the current key or the next +// key if the iterator is currently located at the last version for a key. func (i *MVCCIncrementalIterator) NextKey() { i.iter.NextKey() + if ok, err := i.iter.Valid(); !ok { + i.err = err + i.valid = false + return + } i.advance() } +// maybeSkipKeys checks if any keys can be skipped by using a time-bound +// iterator. If keys can be skipped, it will update the main iterator to point +// to the earliest version of the next candidate key. +// It is expected that TBI is at a key <= main iterator key when calling +// maybeSkipKeys(). +func (i *MVCCIncrementalIterator) maybeSkipKeys() { + if i.timeBoundIter == nil { + // If there is no time bound iterator, we cannot skip any keys. + return + } + tbiKey := i.timeBoundIter.Key().Key + iterKey := i.iter.Key().Key + if iterKey.Compare(tbiKey) > 0 { + // If the iterKey got ahead of the TBI key, advance the TBI Key. + i.timeBoundIter.NextKey() + if ok, err := i.timeBoundIter.Valid(); !ok { + i.err = err + i.valid = false + return + } + tbiKey = i.timeBoundIter.Key().Key + + // The case where cmp == 0 is the fast-path is when the + // TBI and the main iterator are in lockstep. In this case, the main + // iterator was referencing the next key that would be visited by the TBI. + // This means that for the incremental iterator to perform a Next or NextKey + // will require only 1 extra NextKey invocation while they remain in + // lockstep. + cmp := iterKey.Compare(tbiKey) + + if cmp > 0 { + // If the tbiKey is still behind the iterKey, the TBI key may be seeing + // phantom MVCCKey.Keys. These keys may not be seen by the main iterator + // due to aborted transactions and keys which have been subsumed due to + // range tombstones. In this case we can SeekGE() the TBI to the main iterator. + + // NB: Seek() is expensive, and this is only acceptable since we expect + // this case to rarely occur. + seekKey := MakeMVCCMetadataKey(iterKey) + i.timeBoundIter.SeekGE(seekKey) + if ok, err := i.timeBoundIter.Valid(); !ok { + i.err = err + i.valid = false + return + } + tbiKey = i.timeBoundIter.Key().Key + cmp = iterKey.Compare(tbiKey) + } + + if cmp < 0 { + // In the case that the next MVCC key that the TBI observes is not the + // same as the main iterator, we may be able to skip over a large group + // of keys. The main iterator is seeked to the TBI in hopes that many + // keys were skipped. Note that a Seek is an order of magnitude more + // expensive than a Next call. + seekKey := MakeMVCCMetadataKey(tbiKey) + i.iter.SeekGE(seekKey) + if ok, err := i.iter.Valid(); !ok { + i.err = err + i.valid = false + return + } + } + } +} + +// advance advances the main iterator until it is referencing a key within +// (start_time, end_time]. +// It populates i.err with an error if either of the following was encountered: +// a) an inline value +// b) an intent with a timestamp within the incremental iterator's bounds func (i *MVCCIncrementalIterator) advance() { for { + i.maybeSkipKeys() if !i.valid { return } - if ok, err := i.iter.Valid(); !ok { - i.err = err - i.valid = false - return - } unsafeMetaKey := i.iter.UnsafeKey() if unsafeMetaKey.IsValue() { + // They key is an MVCC value and note an intent. + // Intents are handled next. i.meta.Reset() i.meta.Timestamp = hlc.LegacyTimestamp(unsafeMetaKey.Timestamp) } else { - // HACK(dan): Work around a known bug in the time-bound iterator. - // For reasons described in #28358, a time-bound iterator will - // sometimes see an unresolved intent where there is none. A normal - // iterator doesn't have this problem, so we work around it by - // double checking all non-value keys. If sanityCheckMetadataKey - // returns false, then the intent was a phantom and we ignore it. - // NB: this could return a newer intent than the one the time-bound - // iterator saw; this is handled. - unsafeValueBytes, ok, err := i.sanityCheckMetadataKey() - if err != nil { - i.err = err - i.valid = false - return - } else if !ok { - i.iter.Next() - continue - } - - if i.err = protoutil.Unmarshal(unsafeValueBytes, &i.meta); i.err != nil { + // The key is a metakey (an intent), this is used later to see if the + // timestamp of this intent is within the incremental iterator's time + // bounds. + if i.err = protoutil.Unmarshal(i.iter.UnsafeValue(), &i.meta); i.err != nil { i.valid = false return } } + if i.meta.IsInline() { // Inline values are only used in non-user data. They're not needed // for backup, so they're not handled by this method. If one shows @@ -200,42 +311,33 @@ func (i *MVCCIncrementalIterator) advance() { return } i.iter.Next() + if ok, err := i.iter.Valid(); !ok { + i.err = err + i.valid = false + return + } continue } + // Note that MVCC keys are sorted by key, then by _descending_ timestamp + // order with the exception of the metakey (timestamp 0) being sorted + // first. See mvcc.h for more information. if i.endTime.Less(metaTimestamp) { i.iter.Next() - continue - } - if metaTimestamp.LessEq(i.startTime) { + } else if metaTimestamp.LessEq(i.startTime) { i.iter.NextKey() - continue + } else { + // The current key is a valid user key and within the time bounds. We are + // done. + break } - break - } -} - -// sanityCheckMetadataKey looks up the current `i.iter` key using a normal, -// non-time-bound iterator and returns the value if the normal iterator also -// sees that exact key. Otherwise, it returns false. It's used in the workaround -// in `advance` for a time-bound iterator bug. -func (i *MVCCIncrementalIterator) sanityCheckMetadataKey() ([]byte, bool, error) { - if i.sanityIter == nil { - // If sanityIter is not set, it's because we're not using time-bound - // iterator and we don't need the sanity check. - return i.iter.UnsafeValue(), true, nil - } - unsafeKey := i.iter.UnsafeKey() - i.sanityIter.SeekGE(unsafeKey) - if ok, err := i.sanityIter.Valid(); err != nil { - return nil, false, err - } else if !ok { - return nil, false, nil - } else if !i.sanityIter.UnsafeKey().Equal(unsafeKey) { - return nil, false, nil + if ok, err := i.iter.Valid(); !ok { + i.err = err + i.valid = false + return + } } - return i.sanityIter.UnsafeValue(), true, nil } // Valid must be called after any call to Reset(), Next(), or similar methods. diff --git a/pkg/storage/engine/mvcc_incremental_iterator_test.go b/pkg/storage/engine/mvcc_incremental_iterator_test.go index bc984f201e7f..b3aff7a68d76 100644 --- a/pkg/storage/engine/mvcc_incremental_iterator_test.go +++ b/pkg/storage/engine/mvcc_incremental_iterator_test.go @@ -13,9 +13,12 @@ package engine import ( "bytes" "context" + "fmt" "math" + "path/filepath" "testing" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -23,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/pkg/errors" + "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) @@ -409,3 +413,308 @@ func TestMVCCIncrementalIteratorIntentRewrittenConcurrently(t *testing.T) { }) } } + +// TestMVCCIncrementalIteratorIntentDeletion checks a workaround in +// MVCCIncrementalIterator for a bug in time-bound iterators, where an intent +// has been deleted, but the time-bound iterator doesn't see the deletion. +func TestMVCCIncrementalIteratorIntentDeletion(t *testing.T) { + defer leaktest.AfterTest(t)() + + txn := func(key roachpb.Key, ts hlc.Timestamp) *roachpb.Transaction { + return &roachpb.Transaction{ + TxnMeta: enginepb.TxnMeta{ + Key: key, + ID: uuid.MakeV4(), + Epoch: 1, + WriteTimestamp: ts, + }, + ReadTimestamp: ts, + } + } + intent := func(txn *roachpb.Transaction) roachpb.LockUpdate { + intent := roachpb.MakeLockUpdate(txn, roachpb.Span{Key: txn.Key}) + intent.Status = roachpb.COMMITTED + return intent + } + + ctx := context.Background() + kA := roachpb.Key("kA") + vA1 := roachpb.MakeValueFromString("vA1") + vA2 := roachpb.MakeValueFromString("vA2") + vA3 := roachpb.MakeValueFromString("vA3") + kB := roachpb.Key("kB") + vB1 := roachpb.MakeValueFromString("vB1") + kC := roachpb.Key("kC") + vC1 := roachpb.MakeValueFromString("vC1") + ts0 := hlc.Timestamp{WallTime: 0} + ts1 := hlc.Timestamp{WallTime: 1} + ts2 := hlc.Timestamp{WallTime: 2} + ts3 := hlc.Timestamp{WallTime: 3} + txnA1 := txn(kA, ts1) + txnA3 := txn(kA, ts3) + txnB1 := txn(kB, ts1) + txnC1 := txn(kC, ts1) + + db := NewInMem(ctx, DefaultStorageEngine, roachpb.Attributes{}, 10<<20) + defer db.Close() + + // Set up two sstables very specifically: + // + // sst1 (time-bound metadata ts1->ts1) + // kA -> (intent) + // kA:1 -> vA1 + // kB -> (intent) + // kB:1 -> vB1 + // kC -> (intent) + // kC:1 -> vC1 + // + // sst2 (time-bound metadata ts2->ts3) the intent deletions are for the + // intents at ts1, but there's no way know that when constructing the + // metadata (hence the time-bound iterator bug) + // kA -> (intent) [NB this overwrites the intent deletion] + // kA:3 -> vA3 + // kA:2 -> vA2 + // kB -> (intent deletion) + require.NoError(t, MVCCPut(ctx, db, nil, kA, txnA1.ReadTimestamp, vA1, txnA1)) + require.NoError(t, MVCCPut(ctx, db, nil, kB, txnB1.ReadTimestamp, vB1, txnB1)) + require.NoError(t, MVCCPut(ctx, db, nil, kC, txnC1.ReadTimestamp, vC1, txnC1)) + require.NoError(t, db.Flush()) + require.NoError(t, db.Compact()) + _, err := MVCCResolveWriteIntent(ctx, db, nil, intent(txnA1)) + require.NoError(t, err) + _, err = MVCCResolveWriteIntent(ctx, db, nil, intent(txnB1)) + require.NoError(t, err) + require.NoError(t, MVCCPut(ctx, db, nil, kA, ts2, vA2, nil)) + require.NoError(t, MVCCPut(ctx, db, nil, kA, txnA3.WriteTimestamp, vA3, txnA3)) + require.NoError(t, db.Flush()) + + if rocks, ok := db.(*RocksDB); ok { + // Double-check that we've created the SSTs we intended to. + userProps, err := rocks.GetUserProperties() + require.NoError(t, err) + require.Len(t, userProps.Sst, 2) + require.Equal(t, userProps.Sst[0].TsMin, &ts1) + require.Equal(t, userProps.Sst[0].TsMax, &ts1) + require.Equal(t, userProps.Sst[1].TsMin, &ts2) + require.Equal(t, userProps.Sst[1].TsMax, &ts3) + } + + // The kA ts1 intent has been resolved. There's now a new intent on kA, but + // the timestamp (ts3) is too new so it should be ignored. + kvs, err := slurpKVsInTimeRange(db, kA, ts0, ts1) + require.NoError(t, err) + require.Equal(t, []MVCCKeyValue{ + {Key: MVCCKey{Key: kA, Timestamp: ts1}, Value: vA1.RawBytes}, + }, kvs) + // kA has a value at ts2. Again the intent is too new (ts3), so ignore. + kvs, err = slurpKVsInTimeRange(db, kA, ts0, ts2) + require.NoError(t, err) + require.Equal(t, []MVCCKeyValue{ + {Key: MVCCKey{Key: kA, Timestamp: ts2}, Value: vA2.RawBytes}, + {Key: MVCCKey{Key: kA, Timestamp: ts1}, Value: vA1.RawBytes}, + }, kvs) + // At ts3, we should see the new intent + _, err = slurpKVsInTimeRange(db, kA, ts0, ts3) + require.EqualError(t, err, `conflicting intents on "kA"`) + + // Similar to the kA ts1 check, but there is no newer intent. We expect to + // pick up the intent deletion and it should cancel out the intent, leaving + // only the value at ts1. + kvs, err = slurpKVsInTimeRange(db, kB, ts0, ts1) + require.NoError(t, err) + require.Equal(t, []MVCCKeyValue{ + {Key: MVCCKey{Key: kB, Timestamp: ts1}, Value: vB1.RawBytes}, + }, kvs) + + // Sanity check that we see the still unresolved intent for kC ts1. + _, err = slurpKVsInTimeRange(db, kC, ts0, ts1) + require.EqualError(t, err, `conflicting intents on "kC"`) +} + +func TestMVCCIncrementalIteratorIntentStraddlesSStables(t *testing.T) { + defer leaktest.AfterTest(t)() + + // Create a DB containing 2 keys, a and b, where b has an intent. We use the + // regular MVCCPut operation to generate these keys, which we'll later be + // copying into manually created sstables. + ctx := context.Background() + db1 := NewInMem(ctx, DefaultStorageEngine, roachpb.Attributes{}, 10<<20 /* 10 MB */) + defer db1.Close() + + put := func(key, value string, ts int64, txn *roachpb.Transaction) { + v := roachpb.MakeValueFromString(value) + if err := MVCCPut( + ctx, db1, nil, roachpb.Key(key), hlc.Timestamp{WallTime: ts}, v, txn, + ); err != nil { + t.Fatal(err) + } + } + + put("a", "a value", 1, nil) + put("b", "b value", 2, &roachpb.Transaction{ + TxnMeta: enginepb.TxnMeta{ + Key: roachpb.Key("b"), + ID: uuid.MakeV4(), + Epoch: 1, + WriteTimestamp: hlc.Timestamp{WallTime: 2}, + }, + ReadTimestamp: hlc.Timestamp{WallTime: 2}, + }) + + // Create a second DB in which we'll create a specific SSTable structure: the + // first SSTable contains 2 KVs where the first is a regular versioned key + // and the second is the MVCC metadata entry (i.e. an intent). The next + // SSTable contains the provisional value for the intent. The effect is that + // the metadata entry is separated from the entry it is metadata for. + // + // SSTable 1: + // a@1 + // b@ + // + // SSTable 2: + // b@2 + db2 := NewInMem(ctx, DefaultStorageEngine, roachpb.Attributes{}, 10<<20 /* 10 MB */) + defer db2.Close() + + ingest := func(it Iterator, count int) { + sst, err := MakeRocksDBSstFileWriter() + if err != nil { + t.Fatal(err) + } + defer sst.Close() + + for i := 0; i < count; i++ { + ok, err := it.Valid() + if err != nil { + t.Fatal(err) + } + if !ok { + t.Fatal("expected key") + } + if err := sst.Put(it.Key(), it.Value()); err != nil { + t.Fatal(err) + } + it.Next() + } + sstContents, err := sst.Finish() + if err != nil { + t.Fatal(err) + } + if err := db2.WriteFile(`ingest`, sstContents); err != nil { + t.Fatal(err) + } + if err := db2.IngestExternalFiles(ctx, []string{`ingest`}); err != nil { + t.Fatal(err) + } + } + + { + // Iterate over the entries in the first DB, ingesting them into SSTables + // in the second DB. + it := db1.NewIterator(IterOptions{ + UpperBound: keys.MaxKey, + }) + defer it.Close() + it.SeekGE(MVCCKey{Key: keys.MinKey}) + ingest(it, 2) + ingest(it, 1) + } + + { + // Use an incremental iterator to simulate an incremental backup from (1, + // 2]. Note that incremental iterators are exclusive on the start time and + // inclusive on the end time. The expectation is that we'll see a write + // intent error. + it := NewMVCCIncrementalIterator(db2, MVCCIncrementalIterOptions{ + IterOptions: IterOptions{UpperBound: keys.MaxKey}, + StartTime: hlc.Timestamp{WallTime: 1}, + EndTime: hlc.Timestamp{WallTime: 2}, + }) + defer it.Close() + for it.SeekGE(MVCCKey{Key: keys.MinKey}); ; it.Next() { + ok, err := it.Valid() + if err != nil { + if _, ok = err.(*roachpb.WriteIntentError); ok { + // This is the write intent error we were expecting. + return + } + t.Fatalf("%T: %s", err, err) + } + if !ok { + break + } + } + t.Fatalf("expected write intent error, but found success") + } +} + +func TestMVCCIterateTimeBound(t *testing.T) { + defer leaktest.AfterTest(t)() + + dir, cleanupFn := testutils.TempDir(t) + defer cleanupFn() + + const numKeys = 1000 + const numBatches = 10 + const batchTimeSpan = 10 + const valueSize = 8 + + eng, err := loadTestData(filepath.Join(dir, "mvcc_data"), + numKeys, numBatches, batchTimeSpan, valueSize) + if err != nil { + t.Fatal(err) + } + defer eng.Close() + + for _, testCase := range []struct { + start hlc.Timestamp + end hlc.Timestamp + }{ + // entire time range + {hlc.Timestamp{WallTime: 0, Logical: 0}, hlc.Timestamp{WallTime: 110, Logical: 0}}, + // one SST + {hlc.Timestamp{WallTime: 10, Logical: 0}, hlc.Timestamp{WallTime: 19, Logical: 0}}, + // one SST, plus the min of the following SST + {hlc.Timestamp{WallTime: 10, Logical: 0}, hlc.Timestamp{WallTime: 20, Logical: 0}}, + // one SST, plus the max of the preceding SST + {hlc.Timestamp{WallTime: 9, Logical: 0}, hlc.Timestamp{WallTime: 19, Logical: 0}}, + // one SST, plus the min of the following and the max of the preceding SST + {hlc.Timestamp{WallTime: 9, Logical: 0}, hlc.Timestamp{WallTime: 21, Logical: 0}}, + // one SST, not min or max + {hlc.Timestamp{WallTime: 17, Logical: 0}, hlc.Timestamp{WallTime: 18, Logical: 0}}, + // one SST's max + {hlc.Timestamp{WallTime: 18, Logical: 0}, hlc.Timestamp{WallTime: 19, Logical: 0}}, + // one SST's min + {hlc.Timestamp{WallTime: 19, Logical: 0}, hlc.Timestamp{WallTime: 20, Logical: 0}}, + // random endpoints + {hlc.Timestamp{WallTime: 32, Logical: 0}, hlc.Timestamp{WallTime: 78, Logical: 0}}, + } { + t.Run(fmt.Sprintf("%s-%s", testCase.start, testCase.end), func(t *testing.T) { + defer leaktest.AfterTest(t)() + + var expectedKVs []MVCCKeyValue + iter := eng.NewIterator(IterOptions{UpperBound: roachpb.KeyMax}) + defer iter.Close() + iter.SeekGE(MVCCKey{}) + for { + ok, err := iter.Valid() + if err != nil { + t.Fatal(err) + } else if !ok { + break + } + ts := iter.Key().Timestamp + if (ts.Less(testCase.end) || testCase.end == ts) && testCase.start.Less(ts) { + expectedKVs = append(expectedKVs, MVCCKeyValue{Key: iter.Key(), Value: iter.Value()}) + } + iter.Next() + } + if len(expectedKVs) < 1 { + t.Fatalf("source of truth had no expected KVs; likely a bug in the test itself") + } + + fn := (*MVCCIncrementalIterator).NextKey + assertEqualKVs(eng, fn, keys.MinKey, keys.MaxKey, testCase.start, testCase.end, expectedKVs)(t) + }) + } +}