From 876ceec0c3a35fd92249cdefaaf46785b897d9ca Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 5 Nov 2019 20:49:19 -0500 Subject: [PATCH] storage/engine: lift Go MVCC implementation above Iterator interface This commit removes the MVCCGet and MVCCScan methods from engine.Iterator and uses the rest of the interface to implement these methods as free functions. This restructuring allows the MVCC operations to support polymorphism of the iterator, which is what the code was intending to do when originally written. The code was moved to the current structure as a way of avoiding cgo calls when using RocksDB's iterator implementation. This is an important optimization when using RocksDB (but not Pebble) so the commit retains it through optional specializations of MVCCGet and MVCCScan. RocksDB's iterator implements this specialization but Pebble's does not need to. This isn't quite ready for a review. I'm mainly pushing it in case others want to take a look. It will be used to get the prototype of #41720 up and running. Benchmarks show about a 0-1% performance regression due to this change. More testing should be done if we actually want to productionize this. --- pkg/storage/engine/engine.go | 29 +-- pkg/storage/engine/mvcc.go | 135 +++++++++++++- pkg/storage/engine/pebble_iterator.go | 119 ------------ pkg/storage/engine/pebble_mvcc_scanner.go | 173 +++++++++--------- pkg/storage/engine/rocksdb.go | 12 +- pkg/storage/engine/rocksdb_iter_stats_test.go | 11 +- pkg/storage/spanset/batch.go | 37 ++-- 7 files changed, 273 insertions(+), 243 deletions(-) diff --git a/pkg/storage/engine/engine.go b/pkg/storage/engine/engine.go index a537de1e7c19..6c50faf1ab1e 100644 --- a/pkg/storage/engine/engine.go +++ b/pkg/storage/engine/engine.go @@ -116,11 +116,23 @@ type Iterator interface { // and the encoded SST data specified, within the provided key range. Returns // stats on skipped KVs, or an error if a collision is found. CheckForKeyCollisions(sstData []byte, start, end roachpb.Key) (enginepb.MVCCStats, error) + // SetUpperBound installs a new upper bound for this iterator. + SetUpperBound(roachpb.Key) + // Stats returns statistics about the iterator. + Stats() IteratorStats +} + +// MVCCIterator is an interface that extends Iterator and provides concrete +// implementations for MVCCGet and MVCCScan operations. It is used by instances +// of the interface backed by RocksDB iterators to avoid cgo hops. +type MVCCIterator interface { + Iterator + // MVCCOpsSpecialized returns whether the iterator has a specialized + // implementation of MVCCGet and MVCCScan. This is exposed as a method + // so that wrapper types can defer to their wrapped iterators. + MVCCOpsSpecialized() bool // MVCCGet is the internal implementation of the family of package-level // MVCCGet functions. - // - // DO NOT CALL directly (except in wrapper Iterator implementations). Use the - // package-level MVCCGet, or one of its variants, instead. MVCCGet( key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions, ) (*roachpb.Value, *roachpb.Intent, error) @@ -129,20 +141,9 @@ type Iterator interface { // returned raw, as a series of buffers of length-prefixed slices, // alternating from key to value, where numKVs specifies the number of pairs // in the buffer. - // - // DO NOT CALL directly (except in wrapper Iterator implementations). Use the - // package-level MVCCScan, or one of its variants, instead. For correct - // operation, the caller must set the lower and upper bounds on the iterator - // before calling this method. - // - // TODO(peter): unexport this method. MVCCScan( start, end roachpb.Key, max int64, timestamp hlc.Timestamp, opts MVCCScanOptions, ) (kvData [][]byte, numKVs int64, resumeSpan *roachpb.Span, intents []roachpb.Intent, err error) - // SetUpperBound installs a new upper bound for this iterator. - SetUpperBound(roachpb.Key) - - Stats() IteratorStats } // IterOptions contains options used to create an Iterator. diff --git a/pkg/storage/engine/mvcc.go b/pkg/storage/engine/mvcc.go index de45697a835b..0b648ea811f6 100644 --- a/pkg/storage/engine/mvcc.go +++ b/pkg/storage/engine/mvcc.go @@ -730,14 +730,78 @@ type MVCCGetOptions struct { func MVCCGet( ctx context.Context, eng Reader, key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions, ) (*roachpb.Value, *roachpb.Intent, error) { + iter := eng.NewIterator(IterOptions{Prefix: true}) + defer iter.Close() + return mvccGet(ctx, iter, key, timestamp, opts) +} + +func mvccGet( + ctx context.Context, iter Iterator, key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions, +) (value *roachpb.Value, intent *roachpb.Intent, err error) { if timestamp.WallTime < 0 { return nil, nil, errors.Errorf("cannot write to %q at timestamp %s", key, timestamp) } + if opts.Inconsistent && opts.Txn != nil { + return nil, nil, errors.Errorf("cannot allow inconsistent reads within a transaction") + } + if len(key) == 0 { + return nil, nil, emptyKeyError() + } - iter := eng.NewIterator(IterOptions{Prefix: true}) - value, intent, err := iter.MVCCGet(key, timestamp, opts) - iter.Close() - return value, intent, err + // If the iterator has a specialized implementation, defer to that. + if mvccIter, ok := iter.(MVCCIterator); ok && mvccIter.MVCCOpsSpecialized() { + return mvccIter.MVCCGet(key, timestamp, opts) + } + + mvccScanner := pebbleMVCCScannerPool.Get().(*pebbleMVCCScanner) + defer pebbleMVCCScannerPool.Put(mvccScanner) + + // MVCCGet is implemented as an MVCCScan where we retrieve a single key. We + // specify an empty key for the end key which will ensure we don't retrieve a + // key different than the start key. This is a bit of a hack. + *mvccScanner = pebbleMVCCScanner{ + parent: iter, + start: key, + ts: timestamp, + maxKeys: 1, + inconsistent: opts.Inconsistent, + tombstones: opts.Tombstones, + } + + mvccScanner.init(opts.Txn) + mvccScanner.get() + + if mvccScanner.err != nil { + return nil, nil, mvccScanner.err + } + intents, err := buildScanIntents(mvccScanner.intents.Repr()) + if err != nil { + return nil, nil, err + } + if !opts.Inconsistent && len(intents) > 0 { + return nil, nil, &roachpb.WriteIntentError{Intents: intents} + } + + if len(intents) > 1 { + return nil, nil, errors.Errorf("expected 0 or 1 intents, got %d", len(intents)) + } else if len(intents) == 1 { + intent = &intents[0] + } + + if len(mvccScanner.results.repr) == 0 { + return nil, intent, nil + } + + mvccKey, rawValue, _, err := MVCCScanDecodeKeyValue(mvccScanner.results.repr) + if err != nil { + return nil, nil, err + } + + value = &roachpb.Value{ + RawBytes: rawValue, + Timestamp: mvccKey.Timestamp, + } + return } // MVCCGetAsTxn constructs a temporary transaction from the given transaction @@ -2170,6 +2234,65 @@ func MVCCDeleteRange( return keys, resumeSpan, int64(len(kvs)), err } +func mvccScanToBytes( + ctx context.Context, + iter Iterator, + key, endKey roachpb.Key, + max int64, + timestamp hlc.Timestamp, + opts MVCCScanOptions, +) (kvData [][]byte, numKVs int64, resumeSpan *roachpb.Span, intents []roachpb.Intent, err error) { + if opts.Inconsistent && opts.Txn != nil { + return nil, 0, nil, nil, errors.Errorf("cannot allow inconsistent reads within a transaction") + } + if len(endKey) == 0 { + return nil, 0, nil, nil, emptyKeyError() + } + if max == 0 { + resumeSpan = &roachpb.Span{Key: key, EndKey: endKey} + return nil, 0, resumeSpan, nil, nil + } + + // If the iterator has a specialized implementation, defer to that. + if mvccIter, ok := iter.(MVCCIterator); ok && mvccIter.MVCCOpsSpecialized() { + return mvccIter.MVCCScan(key, endKey, max, timestamp, opts) + } + + mvccScanner := pebbleMVCCScannerPool.Get().(*pebbleMVCCScanner) + defer pebbleMVCCScannerPool.Put(mvccScanner) + + *mvccScanner = pebbleMVCCScanner{ + parent: iter, + reverse: opts.Reverse, + start: key, + end: endKey, + ts: timestamp, + maxKeys: max, + inconsistent: opts.Inconsistent, + tombstones: opts.Tombstones, + } + + mvccScanner.init(opts.Txn) + resumeSpan, err = mvccScanner.scan() + + if err != nil { + return nil, 0, nil, nil, err + } + + kvData = mvccScanner.results.finish() + numKVs = mvccScanner.results.count + + intents, err = buildScanIntents(mvccScanner.intents.Repr()) + if err != nil { + return nil, 0, nil, nil, err + } + + if !opts.Inconsistent && len(intents) > 0 { + return nil, 0, resumeSpan, nil, &roachpb.WriteIntentError{Intents: intents} + } + return +} + // mvccScanToKvs converts the raw key/value pairs returned by Iterator.MVCCScan // into a slice of roachpb.KeyValues. func mvccScanToKvs( @@ -2180,7 +2303,7 @@ func mvccScanToKvs( timestamp hlc.Timestamp, opts MVCCScanOptions, ) ([]roachpb.KeyValue, *roachpb.Span, []roachpb.Intent, error) { - kvData, numKVs, resumeSpan, intents, err := iter.MVCCScan(key, endKey, max, timestamp, opts) + kvData, numKVs, resumeSpan, intents, err := mvccScanToBytes(ctx, iter, key, endKey, max, timestamp, opts) if err != nil { return nil, nil, nil, err } @@ -2305,7 +2428,7 @@ func MVCCScanToBytes( ) ([][]byte, int64, *roachpb.Span, []roachpb.Intent, error) { iter := engine.NewIterator(IterOptions{LowerBound: key, UpperBound: endKey}) defer iter.Close() - return iter.MVCCScan(key, endKey, max, timestamp, opts) + return mvccScanToBytes(ctx, iter, key, endKey, max, timestamp, opts) } // MVCCIterate iterates over the key range [start,end). At each step of the diff --git a/pkg/storage/engine/pebble_iterator.go b/pkg/storage/engine/pebble_iterator.go index ff051099ad73..bc27995ec445 100644 --- a/pkg/storage/engine/pebble_iterator.go +++ b/pkg/storage/engine/pebble_iterator.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/protoutil" - "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble" ) @@ -365,124 +364,6 @@ func (p *pebbleIterator) FindSplitKey( return bestSplitKey, nil } -// MVCCGet implements the Iterator interface. -func (p *pebbleIterator) MVCCGet( - key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions, -) (value *roachpb.Value, intent *roachpb.Intent, err error) { - if opts.Inconsistent && opts.Txn != nil { - return nil, nil, errors.Errorf("cannot allow inconsistent reads within a transaction") - } - if len(key) == 0 { - return nil, nil, emptyKeyError() - } - if p.iter == nil { - panic("uninitialized iterator") - } - - mvccScanner := pebbleMVCCScannerPool.Get().(*pebbleMVCCScanner) - defer pebbleMVCCScannerPool.Put(mvccScanner) - - // MVCCGet is implemented as an MVCCScan where we retrieve a single key. We - // specify an empty key for the end key which will ensure we don't retrieve a - // key different than the start key. This is a bit of a hack. - *mvccScanner = pebbleMVCCScanner{ - parent: p.iter, - start: key, - ts: timestamp, - maxKeys: 1, - inconsistent: opts.Inconsistent, - tombstones: opts.Tombstones, - } - - mvccScanner.init(opts.Txn) - mvccScanner.get() - - if mvccScanner.err != nil { - return nil, nil, mvccScanner.err - } - intents, err := buildScanIntents(mvccScanner.intents.Repr()) - if err != nil { - return nil, nil, err - } - if !opts.Inconsistent && len(intents) > 0 { - return nil, nil, &roachpb.WriteIntentError{Intents: intents} - } - - if len(intents) > 1 { - return nil, nil, errors.Errorf("expected 0 or 1 intents, got %d", len(intents)) - } else if len(intents) == 1 { - intent = &intents[0] - } - - if len(mvccScanner.results.repr) == 0 { - return nil, intent, nil - } - - mvccKey, rawValue, _, err := MVCCScanDecodeKeyValue(mvccScanner.results.repr) - if err != nil { - return nil, nil, err - } - - value = &roachpb.Value{ - RawBytes: rawValue, - Timestamp: mvccKey.Timestamp, - } - return -} - -// MVCCScan implements the Iterator interface. -func (p *pebbleIterator) MVCCScan( - start, end roachpb.Key, max int64, timestamp hlc.Timestamp, opts MVCCScanOptions, -) (kvData [][]byte, numKVs int64, resumeSpan *roachpb.Span, intents []roachpb.Intent, err error) { - if opts.Inconsistent && opts.Txn != nil { - return nil, 0, nil, nil, errors.Errorf("cannot allow inconsistent reads within a transaction") - } - if len(end) == 0 { - return nil, 0, nil, nil, emptyKeyError() - } - if max == 0 { - resumeSpan = &roachpb.Span{Key: start, EndKey: end} - return nil, 0, resumeSpan, nil, nil - } - if p.iter == nil { - panic("uninitialized iterator") - } - - mvccScanner := pebbleMVCCScannerPool.Get().(*pebbleMVCCScanner) - defer pebbleMVCCScannerPool.Put(mvccScanner) - - *mvccScanner = pebbleMVCCScanner{ - parent: p.iter, - reverse: opts.Reverse, - start: start, - end: end, - ts: timestamp, - maxKeys: max, - inconsistent: opts.Inconsistent, - tombstones: opts.Tombstones, - } - - mvccScanner.init(opts.Txn) - resumeSpan, err = mvccScanner.scan() - - if err != nil { - return nil, 0, nil, nil, err - } - - kvData = mvccScanner.results.finish() - numKVs = mvccScanner.results.count - - intents, err = buildScanIntents(mvccScanner.intents.Repr()) - if err != nil { - return nil, 0, nil, nil, err - } - - if !opts.Inconsistent && len(intents) > 0 { - return nil, 0, resumeSpan, nil, &roachpb.WriteIntentError{Intents: intents} - } - return -} - // SetUpperBound implements the Iterator interface. func (p *pebbleIterator) SetUpperBound(upperBound roachpb.Key) { p.upperBoundBuf = append(p.upperBoundBuf[:0], upperBound...) diff --git a/pkg/storage/engine/pebble_mvcc_scanner.go b/pkg/storage/engine/pebble_mvcc_scanner.go index 10cb18cc728e..20d38b9f7569 100644 --- a/pkg/storage/engine/pebble_mvcc_scanner.go +++ b/pkg/storage/engine/pebble_mvcc_scanner.go @@ -16,6 +16,7 @@ import ( "sort" "sync" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -43,7 +44,7 @@ func (p *pebbleResults) clear() { // The repr that MVCCScan / MVCCGet expects to provide as output goes: // // This function adds to repr in that format. -func (p *pebbleResults) put(key []byte, value []byte) { +func (p *pebbleResults) put(key MVCCKey, value []byte) { // Key value lengths take up 8 bytes (2 x Uint32). const kvLenSize = 8 const minSize = 16 @@ -52,7 +53,8 @@ func (p *pebbleResults) put(key []byte, value []byte) { // We maintain a list of buffers, always encoding into the last one (a.k.a. // pebbleResults.repr). The size of the buffers is exponentially increasing, // capped at maxSize. - lenToAdd := kvLenSize + len(key) + len(value) + lenKey := key.Len() + lenToAdd := kvLenSize + lenKey + len(value) if len(p.repr)+lenToAdd > cap(p.repr) { newSize := 2 * cap(p.repr) if newSize == 0 { @@ -70,9 +72,9 @@ func (p *pebbleResults) put(key []byte, value []byte) { startIdx := len(p.repr) p.repr = p.repr[:startIdx+lenToAdd] binary.LittleEndian.PutUint32(p.repr[startIdx:], uint32(len(value))) - binary.LittleEndian.PutUint32(p.repr[startIdx+4:], uint32(len(key))) - copy(p.repr[startIdx+kvLenSize:], key) - copy(p.repr[startIdx+kvLenSize+len(key):], value) + binary.LittleEndian.PutUint32(p.repr[startIdx+4:], uint32(lenKey)) + encodeKeyToBuf(p.repr[startIdx+kvLenSize:startIdx+kvLenSize+lenKey], key, lenKey) + copy(p.repr[startIdx+kvLenSize+lenKey:], value) p.count++ } @@ -87,7 +89,7 @@ func (p *pebbleResults) finish() [][]byte { // Go port of mvccScanner in libroach/mvcc.h. Stores all variables relating to // one MVCCGet / MVCCScan call. type pebbleMVCCScanner struct { - parent *pebble.Iterator + parent Iterator reverse bool peeked bool // Iteration bounds. Does not contain MVCC timestamp. @@ -109,12 +111,11 @@ type pebbleMVCCScanner struct { keyBuf []byte savedBuf []byte // cur* variables store the "current" record we're pointing to. Updated in - // updateCurrent. Note that curRawKey = the full encoded MVCC key, while - // curKey = the user-key part of curRawKey (i.e. excluding the timestamp). - curRawKey, curKey, curValue []byte - curTS hlc.Timestamp - results pebbleResults - intents pebble.Batch + // updateCurrent. + curKey, curValue []byte + curTS hlc.Timestamp + results pebbleResults + intents pebble.Batch // Stores any error returned. If non-nil, iteration short circuits. err error // Number of iterations to try before we do a Seek/SeekReverse. Stays within @@ -144,9 +145,8 @@ func (p *pebbleMVCCScanner) init(txn *roachpb.Transaction) { // get iterates exactly once and adds one KV to the result set. func (p *pebbleMVCCScanner) get() { - p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], MVCCKey{Key: p.start}) - valid := p.parent.SeekPrefixGE(p.keyBuf) - if !p.updateCurrent(valid) { + p.parent.SeekGE(MVCCKey{Key: p.start}) + if !p.updateCurrent() { return } p.getAndAdvance() @@ -156,13 +156,11 @@ func (p *pebbleMVCCScanner) get() { // iterator is exhausted, or an error is encountered. func (p *pebbleMVCCScanner) scan() (*roachpb.Span, error) { if p.reverse { - p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], MVCCKey{Key: p.end}) - if !p.iterSeekReverse(p.keyBuf) { + if !p.iterSeekReverse(MVCCKey{Key: p.end}) { return nil, p.err } } else { - p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], MVCCKey{Key: p.start}) - if !p.iterSeek(p.keyBuf) { + if !p.iterSeek(MVCCKey{Key: p.start}) { return nil, p.err } } @@ -225,7 +223,7 @@ func (p *pebbleMVCCScanner) getFromIntentHistory() bool { } intent := p.meta.IntentHistory[upIdx-1] if len(intent.Value) > 0 || p.tombstones { - p.results.put(p.curRawKey, intent.Value) + p.results.put(p.curMVCCKey(), intent.Value) } return true } @@ -328,7 +326,8 @@ func (p *pebbleMVCCScanner) getAndAdvance() bool { // that lie before the resume key. return false } - p.err = p.intents.Set(p.curRawKey, p.curValue, nil) + p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], p.curMVCCKey()) + p.err = p.intents.Set(p.keyBuf, p.curValue, nil) if p.err != nil { return false } @@ -342,7 +341,8 @@ func (p *pebbleMVCCScanner) getAndAdvance() bool { // intent. Note that this will trigger an error on the Go // side. We continue scanning so that we can return all of the // intents in the scan range. - p.err = p.intents.Set(p.curRawKey, p.curValue, nil) + p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], p.curMVCCKey()) + p.err = p.intents.Set(p.keyBuf, p.curValue, nil) if p.err != nil { return false } @@ -397,7 +397,7 @@ func (p *pebbleMVCCScanner) getAndAdvance() bool { return p.seekVersion(prevTS, false) } -// Advances to the next user key. +// nextKey advances to the next user key. func (p *pebbleMVCCScanner) nextKey() bool { p.keyBuf = append(p.keyBuf[:0], p.curKey...) @@ -412,11 +412,10 @@ func (p *pebbleMVCCScanner) nextKey() bool { } p.decrementItersBeforeSeek() - // We're pointed at a different version of the same key. Fall back to seeking - // to the next key. We append 2 NULs to account for the "next-key" and a - // trailing zero timestamp. - p.keyBuf = append(p.keyBuf, 0, 0) - return p.iterSeek(p.keyBuf) + // We're pointed at a different version of the same key. Fall back to + // seeking to the next key. We append a NUL to account for the "next-key". + p.keyBuf = append(p.keyBuf, 0) + return p.iterSeek(MVCCKey{Key: p.keyBuf}) } // backwardLatestVersion backs up the iterator to the latest version for the @@ -443,13 +442,12 @@ func (p *pebbleMVCCScanner) backwardLatestVersion(key []byte, i int) bool { } p.decrementItersBeforeSeek() - p.keyBuf = append(p.keyBuf, 0) - return p.iterSeek(p.keyBuf) + return p.iterSeek(MVCCKey{Key: p.keyBuf}) } -// Advance to the newest version of the user key preceding the specified -// key. Assumes that the iterator is currently positioned at key or 1 record -// after key. +// prevKey advances to the newest version of the user key preceding the +// specified key. Assumes that the iterator is currently positioned at +// key or 1 record after key. func (p *pebbleMVCCScanner) prevKey(key []byte) bool { p.keyBuf = append(p.keyBuf[:0], key...) @@ -467,11 +465,10 @@ func (p *pebbleMVCCScanner) prevKey(key []byte) bool { } p.decrementItersBeforeSeek() - p.keyBuf = append(p.keyBuf, 0) - return p.iterSeekReverse(p.keyBuf) + return p.iterSeekReverse(MVCCKey{Key: p.keyBuf}) } -// Advance to the next key in the iterator's direction. +// advanceKey advances to the next key in the iterator's direction. func (p *pebbleMVCCScanner) advanceKey() bool { if p.reverse { return p.prevKey(p.curKey) @@ -486,8 +483,8 @@ func (p *pebbleMVCCScanner) advanceKeyAtEnd() bool { // Iterating to the next key might have caused the iterator to reach the // end of the key space. If that happens, back up to the very last key. p.peeked = false - valid := p.parent.Last() - if !p.updateCurrent(valid) { + p.parent.SeekLT(MVCCKey{Key: keys.MaxKey}) + if !p.updateCurrent() { return false } return p.advanceKey() @@ -515,7 +512,7 @@ func (p *pebbleMVCCScanner) addAndAdvance(val []byte) bool { // Don't include deleted versions len(val) == 0, unless we've been instructed // to include tombstones in the results. if len(val) > 0 || p.tombstones { - p.results.put(p.curRawKey, val) + p.results.put(p.curMVCCKey(), val) if p.results.count == p.maxKeys { return false } @@ -527,7 +524,8 @@ func (p *pebbleMVCCScanner) addAndAdvance(val []byte) bool { // equal to the specified timestamp, adds it to the result set, then moves onto // the next user key. func (p *pebbleMVCCScanner) seekVersion(ts hlc.Timestamp, uncertaintyCheck bool) bool { - p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], MVCCKey{Key: p.curKey, Timestamp: ts}) + key := MVCCKey{Key: p.curKey, Timestamp: ts} + p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], key) origKey := p.keyBuf[:len(p.curKey)] for i := 0; i < p.itersBeforeSeek; i++ { @@ -548,7 +546,7 @@ func (p *pebbleMVCCScanner) seekVersion(ts hlc.Timestamp, uncertaintyCheck bool) } p.decrementItersBeforeSeek() - if !p.iterSeek(p.keyBuf) { + if !p.iterSeek(key) { return p.advanceKeyAtEnd() } if !bytes.Equal(p.curKey, origKey) { @@ -564,73 +562,83 @@ func (p *pebbleMVCCScanner) seekVersion(ts hlc.Timestamp, uncertaintyCheck bool) } // Updates cur{RawKey, Key, TS} to match record the iterator is pointing to. -func (p *pebbleMVCCScanner) updateCurrent(valid bool) bool { - if !valid { +func (p *pebbleMVCCScanner) updateCurrent() bool { + if !p.iterValid() { return false } - p.curRawKey = p.parent.Key() - p.curValue = p.parent.Value() - p.curKey, p.curTS, p.err = enginepb.DecodeKey(p.curRawKey) - return p.err == nil + p.curValue = p.parent.UnsafeValue() + key := p.parent.UnsafeKey() + p.curKey, p.curTS = key.Key, key.Timestamp + return true } -// seek seeks to the latest revision of the specified key (or a greater key). -func (p *pebbleMVCCScanner) iterSeek(key []byte) bool { - p.clearPeeked() - valid := p.parent.SeekGE(key) - return p.updateCurrent(valid) +func (p *pebbleMVCCScanner) iterValid() bool { + if valid, err := p.parent.Valid(); !valid { + p.err = err + return false + } + return true } -// seekReverse seeks to the latest revision of the key before the specified key. -func (p *pebbleMVCCScanner) iterSeekReverse(key []byte) bool { +// iterSeek seeks to the latest revision of the specified key (or a greater key). +func (p *pebbleMVCCScanner) iterSeek(key MVCCKey) bool { p.clearPeeked() + p.parent.SeekGE(key) + return p.updateCurrent() +} - valid := p.parent.SeekLT(key) - if !p.updateCurrent(valid) { +// iterSeekReverse seeks to the latest revision of the key before the specified key. +func (p *pebbleMVCCScanner) iterSeekReverse(key MVCCKey) bool { + p.clearPeeked() + p.parent.SeekLT(key) + if !p.updateCurrent() { // We have seeked to before the start key. Return. return false } + if p.curTS == (hlc.Timestamp{}) { // We landed on an intent or inline value. return true } - // We landed on a versioned value, we need to back up to find the // latest version. return p.backwardLatestVersion(p.curKey, 0) } -// Advance to the next MVCC key. +// iterNext advances to the next MVCC key. func (p *pebbleMVCCScanner) iterNext() bool { if p.reverse && p.peeked { // If we have peeked at the previous entry, we need to advance the iterator // twice. p.peeked = false - if !p.parent.Valid() { + if !p.iterValid() { // We were peeked off the beginning of iteration. Seek to the first // entry, and then advance one step. - if !p.parent.First() { + p.parent.SeekGE(MVCCKey{}) + if !p.iterValid() { return false } - return p.updateCurrent(p.parent.Next()) + p.parent.Next() + return p.updateCurrent() } - if !p.parent.Next() { + p.parent.Next() + if !p.iterValid() { return false } } - valid := p.parent.Next() - return p.updateCurrent(valid) + p.parent.Next() + return p.updateCurrent() } -// Advance to the previous MVCC Key. +// iterPrev advances to the previous MVCC Key. func (p *pebbleMVCCScanner) iterPrev() bool { if p.peeked { p.peeked = false - return p.updateCurrent(p.parent.Valid()) + return p.updateCurrent() } - valid := p.parent.Prev() - return p.updateCurrent(valid) + p.parent.Prev() + return p.updateCurrent() } // Peek the previous key and store the result in peekedKey. Note that this @@ -642,35 +650,26 @@ func (p *pebbleMVCCScanner) iterPeekPrev() ([]byte, bool) { // We need to save a copy of the current iterator key and value and adjust // curRawKey, curKey and curValue to point to this saved data. We use a // single buffer for this purpose: savedBuf. - p.savedBuf = append(p.savedBuf[:0], p.curRawKey...) + p.savedBuf = append(p.savedBuf[:0], p.curKey...) p.savedBuf = append(p.savedBuf, p.curValue...) - p.curRawKey = p.savedBuf[:len(p.curRawKey)] - p.curValue = p.savedBuf[len(p.curRawKey):] - var ok bool - p.curKey, _, ok = enginepb.SplitMVCCKey(p.curRawKey) - if !ok { - p.err = errors.Errorf("invalid encoded mvcc key: %x", p.curRawKey) - return nil, false - } + p.curKey = p.savedBuf[:len(p.curKey)] + p.curValue = p.savedBuf[len(p.curKey):] // With the current iterator state saved we can move the iterator to the // previous entry. - if !p.parent.Prev() { + p.parent.Prev() + if !p.iterValid() { // The iterator is now invalid, but note that this case is handled in // both iterNext and iterPrev. In the former case, we'll position the // iterator at the first entry, and in the latter iteration will be done. return nil, false } - } else if !p.parent.Valid() { + } else if !p.iterValid() { return nil, false } - peekedKey, _, ok := enginepb.SplitMVCCKey(p.parent.Key()) - if !ok { - p.err = errors.Errorf("invalid encoded mvcc key: %x", p.parent.Key()) - return nil, false - } - return peekedKey, true + peekedKey := p.parent.UnsafeKey() + return peekedKey.Key, true } // Clear the peeked flag. Call this before any iterator operations. @@ -679,3 +678,7 @@ func (p *pebbleMVCCScanner) clearPeeked() { p.peeked = false } } + +func (p *pebbleMVCCScanner) curMVCCKey() MVCCKey { + return MVCCKey{Key: p.curKey, Timestamp: p.curTS} +} diff --git a/pkg/storage/engine/rocksdb.go b/pkg/storage/engine/rocksdb.go index 6cea5cd9b9b7..c2a53127673c 100644 --- a/pkg/storage/engine/rocksdb.go +++ b/pkg/storage/engine/rocksdb.go @@ -1560,6 +1560,10 @@ func (r *batchIterator) FindSplitKey( return r.iter.FindSplitKey(start, end, minSplitKey, targetSize) } +func (r *batchIterator) MVCCOpsSpecialized() bool { + return r.iter.MVCCOpsSpecialized() +} + func (r *batchIterator) MVCCGet( key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions, ) (*roachpb.Value, *roachpb.Intent, error) { @@ -2163,7 +2167,7 @@ var iterPool = sync.Pool{ // iterator to free up resources. func newRocksDBIterator( rdb *C.DBEngine, opts IterOptions, engine Reader, parent *RocksDB, -) Iterator { +) MVCCIterator { // In order to prevent content displacement, caching is disabled // when performing scans. Any options set within the shared read // options field that should be carried over needs to be set here @@ -2372,6 +2376,12 @@ func (r *rocksDBIterator) FindSplitKey( return MVCCKey{Key: cStringToGoBytes(splitKey)}, nil } +func (r *rocksDBIterator) MVCCOpsSpecialized() bool { + // rocksDBIterator provides specialized implementations of MVCCGet and + // MVCCScan. + return true +} + func (r *rocksDBIterator) MVCCGet( key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions, ) (*roachpb.Value, *roachpb.Intent, error) { diff --git a/pkg/storage/engine/rocksdb_iter_stats_test.go b/pkg/storage/engine/rocksdb_iter_stats_test.go index 77f9ec0dae8f..0e57a2f83f40 100644 --- a/pkg/storage/engine/rocksdb_iter_stats_test.go +++ b/pkg/storage/engine/rocksdb_iter_stats_test.go @@ -11,6 +11,7 @@ package engine import ( + "context" "math" "testing" @@ -22,6 +23,8 @@ import ( func TestIterStats(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() + db := setupMVCCInMemRocksDB(t, "test_iter_stats") defer db.Close() @@ -61,8 +64,8 @@ func TestIterStats(t *testing.T) { } // Scanning a key range containing the tombstone sees it. for i := 0; i < 10; i++ { - if _, _, _, _, err := iter.MVCCScan( - roachpb.KeyMin, roachpb.KeyMax, math.MaxInt64, hlc.Timestamp{}, MVCCScanOptions{}, + if _, _, _, err := mvccScanToKvs( + ctx, iter, roachpb.KeyMin, roachpb.KeyMax, math.MaxInt64, hlc.Timestamp{}, MVCCScanOptions{}, ); err != nil { t.Fatal(err) } @@ -74,7 +77,7 @@ func TestIterStats(t *testing.T) { // Getting the key with the tombstone sees it. for i := 0; i < 10; i++ { - if _, _, err := iter.MVCCGet(k.Key, hlc.Timestamp{}, MVCCGetOptions{}); err != nil { + if _, _, err := mvccGet(ctx, iter, k.Key, hlc.Timestamp{}, MVCCGetOptions{}); err != nil { t.Fatal(err) } stats := iter.Stats() @@ -84,7 +87,7 @@ func TestIterStats(t *testing.T) { } // Getting KeyMax doesn't see it. for i := 0; i < 10; i++ { - if _, _, err := iter.MVCCGet(roachpb.KeyMax, hlc.Timestamp{}, MVCCGetOptions{}); err != nil { + if _, _, err := mvccGet(ctx, iter, roachpb.KeyMax, hlc.Timestamp{}, MVCCGetOptions{}); err != nil { t.Fatal(err) } stats := iter.Stats() diff --git a/pkg/storage/spanset/batch.go b/pkg/storage/spanset/batch.go index 7b43844671db..c64bceeeee7a 100644 --- a/pkg/storage/spanset/batch.go +++ b/pkg/storage/spanset/batch.go @@ -41,6 +41,7 @@ type Iterator struct { } var _ engine.Iterator = &Iterator{} +var _ engine.MVCCIterator = &Iterator{} // NewIterator constructs an iterator that verifies access of the underlying // iterator against the given SpanSet. Timestamps associated with the spans @@ -55,11 +56,6 @@ func NewIteratorAt(iter engine.Iterator, spans *SpanSet, ts hlc.Timestamp) *Iter return &Iterator{i: iter, spans: spans, ts: ts} } -// Stats is part of the engine.Iterator interface. -func (i *Iterator) Stats() engine.IteratorStats { - return i.i.Stats() -} - // Close is part of the engine.Iterator interface. func (i *Iterator) Close() { i.i.Close() @@ -217,7 +213,25 @@ func (i *Iterator) CheckForKeyCollisions( return i.i.CheckForKeyCollisions(sstData, start, end) } -// MVCCGet is part of the engine.Iterator interface. +// SetUpperBound is part of the engine.Iterator interface. +func (i *Iterator) SetUpperBound(key roachpb.Key) { + i.i.SetUpperBound(key) +} + +// Stats is part of the engine.Iterator interface. +func (i *Iterator) Stats() engine.IteratorStats { + return i.i.Stats() +} + +// MVCCOpsSpecialized is part of the engine.MVCCIterator interface. +func (i *Iterator) MVCCOpsSpecialized() bool { + if mvccIt, ok := i.i.(engine.MVCCIterator); ok { + return mvccIt.MVCCOpsSpecialized() + } + return false +} + +// MVCCGet is part of the engine.MVCCIterator interface. func (i *Iterator) MVCCGet( key roachpb.Key, timestamp hlc.Timestamp, opts engine.MVCCGetOptions, ) (*roachpb.Value, *roachpb.Intent, error) { @@ -230,10 +244,10 @@ func (i *Iterator) MVCCGet( return nil, nil, err } } - return i.i.MVCCGet(key, timestamp, opts) + return i.i.(engine.MVCCIterator).MVCCGet(key, timestamp, opts) } -// MVCCScan is part of the engine.Iterator interface. +// MVCCScan is part of the engine.MVCCIterator interface. func (i *Iterator) MVCCScan( start, end roachpb.Key, max int64, timestamp hlc.Timestamp, opts engine.MVCCScanOptions, ) (kvData [][]byte, numKVs int64, resumeSpan *roachpb.Span, intents []roachpb.Intent, err error) { @@ -246,12 +260,7 @@ func (i *Iterator) MVCCScan( return nil, 0, nil, nil, err } } - return i.i.MVCCScan(start, end, max, timestamp, opts) -} - -// SetUpperBound is part of the engine.Iterator interface. -func (i *Iterator) SetUpperBound(key roachpb.Key) { - i.i.SetUpperBound(key) + return i.i.(engine.MVCCIterator).MVCCScan(start, end, max, timestamp, opts) } type spanSetReader struct {