diff --git a/pkg/storage/engine/engine.go b/pkg/storage/engine/engine.go index a537de1e7c19..6c50faf1ab1e 100644 --- a/pkg/storage/engine/engine.go +++ b/pkg/storage/engine/engine.go @@ -116,11 +116,23 @@ type Iterator interface { // and the encoded SST data specified, within the provided key range. Returns // stats on skipped KVs, or an error if a collision is found. CheckForKeyCollisions(sstData []byte, start, end roachpb.Key) (enginepb.MVCCStats, error) + // SetUpperBound installs a new upper bound for this iterator. + SetUpperBound(roachpb.Key) + // Stats returns statistics about the iterator. + Stats() IteratorStats +} + +// MVCCIterator is an interface that extends Iterator and provides concrete +// implementations for MVCCGet and MVCCScan operations. It is used by instances +// of the interface backed by RocksDB iterators to avoid cgo hops. +type MVCCIterator interface { + Iterator + // MVCCOpsSpecialized returns whether the iterator has a specialized + // implementation of MVCCGet and MVCCScan. This is exposed as a method + // so that wrapper types can defer to their wrapped iterators. + MVCCOpsSpecialized() bool // MVCCGet is the internal implementation of the family of package-level // MVCCGet functions. - // - // DO NOT CALL directly (except in wrapper Iterator implementations). Use the - // package-level MVCCGet, or one of its variants, instead. MVCCGet( key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions, ) (*roachpb.Value, *roachpb.Intent, error) @@ -129,20 +141,9 @@ type Iterator interface { // returned raw, as a series of buffers of length-prefixed slices, // alternating from key to value, where numKVs specifies the number of pairs // in the buffer. - // - // DO NOT CALL directly (except in wrapper Iterator implementations). Use the - // package-level MVCCScan, or one of its variants, instead. For correct - // operation, the caller must set the lower and upper bounds on the iterator - // before calling this method. - // - // TODO(peter): unexport this method. MVCCScan( start, end roachpb.Key, max int64, timestamp hlc.Timestamp, opts MVCCScanOptions, ) (kvData [][]byte, numKVs int64, resumeSpan *roachpb.Span, intents []roachpb.Intent, err error) - // SetUpperBound installs a new upper bound for this iterator. - SetUpperBound(roachpb.Key) - - Stats() IteratorStats } // IterOptions contains options used to create an Iterator. diff --git a/pkg/storage/engine/mvcc.go b/pkg/storage/engine/mvcc.go index de45697a835b..0b648ea811f6 100644 --- a/pkg/storage/engine/mvcc.go +++ b/pkg/storage/engine/mvcc.go @@ -730,14 +730,78 @@ type MVCCGetOptions struct { func MVCCGet( ctx context.Context, eng Reader, key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions, ) (*roachpb.Value, *roachpb.Intent, error) { + iter := eng.NewIterator(IterOptions{Prefix: true}) + defer iter.Close() + return mvccGet(ctx, iter, key, timestamp, opts) +} + +func mvccGet( + ctx context.Context, iter Iterator, key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions, +) (value *roachpb.Value, intent *roachpb.Intent, err error) { if timestamp.WallTime < 0 { return nil, nil, errors.Errorf("cannot write to %q at timestamp %s", key, timestamp) } + if opts.Inconsistent && opts.Txn != nil { + return nil, nil, errors.Errorf("cannot allow inconsistent reads within a transaction") + } + if len(key) == 0 { + return nil, nil, emptyKeyError() + } - iter := eng.NewIterator(IterOptions{Prefix: true}) - value, intent, err := iter.MVCCGet(key, timestamp, opts) - iter.Close() - return value, intent, err + // If the iterator has a specialized implementation, defer to that. + if mvccIter, ok := iter.(MVCCIterator); ok && mvccIter.MVCCOpsSpecialized() { + return mvccIter.MVCCGet(key, timestamp, opts) + } + + mvccScanner := pebbleMVCCScannerPool.Get().(*pebbleMVCCScanner) + defer pebbleMVCCScannerPool.Put(mvccScanner) + + // MVCCGet is implemented as an MVCCScan where we retrieve a single key. We + // specify an empty key for the end key which will ensure we don't retrieve a + // key different than the start key. This is a bit of a hack. + *mvccScanner = pebbleMVCCScanner{ + parent: iter, + start: key, + ts: timestamp, + maxKeys: 1, + inconsistent: opts.Inconsistent, + tombstones: opts.Tombstones, + } + + mvccScanner.init(opts.Txn) + mvccScanner.get() + + if mvccScanner.err != nil { + return nil, nil, mvccScanner.err + } + intents, err := buildScanIntents(mvccScanner.intents.Repr()) + if err != nil { + return nil, nil, err + } + if !opts.Inconsistent && len(intents) > 0 { + return nil, nil, &roachpb.WriteIntentError{Intents: intents} + } + + if len(intents) > 1 { + return nil, nil, errors.Errorf("expected 0 or 1 intents, got %d", len(intents)) + } else if len(intents) == 1 { + intent = &intents[0] + } + + if len(mvccScanner.results.repr) == 0 { + return nil, intent, nil + } + + mvccKey, rawValue, _, err := MVCCScanDecodeKeyValue(mvccScanner.results.repr) + if err != nil { + return nil, nil, err + } + + value = &roachpb.Value{ + RawBytes: rawValue, + Timestamp: mvccKey.Timestamp, + } + return } // MVCCGetAsTxn constructs a temporary transaction from the given transaction @@ -2170,6 +2234,65 @@ func MVCCDeleteRange( return keys, resumeSpan, int64(len(kvs)), err } +func mvccScanToBytes( + ctx context.Context, + iter Iterator, + key, endKey roachpb.Key, + max int64, + timestamp hlc.Timestamp, + opts MVCCScanOptions, +) (kvData [][]byte, numKVs int64, resumeSpan *roachpb.Span, intents []roachpb.Intent, err error) { + if opts.Inconsistent && opts.Txn != nil { + return nil, 0, nil, nil, errors.Errorf("cannot allow inconsistent reads within a transaction") + } + if len(endKey) == 0 { + return nil, 0, nil, nil, emptyKeyError() + } + if max == 0 { + resumeSpan = &roachpb.Span{Key: key, EndKey: endKey} + return nil, 0, resumeSpan, nil, nil + } + + // If the iterator has a specialized implementation, defer to that. + if mvccIter, ok := iter.(MVCCIterator); ok && mvccIter.MVCCOpsSpecialized() { + return mvccIter.MVCCScan(key, endKey, max, timestamp, opts) + } + + mvccScanner := pebbleMVCCScannerPool.Get().(*pebbleMVCCScanner) + defer pebbleMVCCScannerPool.Put(mvccScanner) + + *mvccScanner = pebbleMVCCScanner{ + parent: iter, + reverse: opts.Reverse, + start: key, + end: endKey, + ts: timestamp, + maxKeys: max, + inconsistent: opts.Inconsistent, + tombstones: opts.Tombstones, + } + + mvccScanner.init(opts.Txn) + resumeSpan, err = mvccScanner.scan() + + if err != nil { + return nil, 0, nil, nil, err + } + + kvData = mvccScanner.results.finish() + numKVs = mvccScanner.results.count + + intents, err = buildScanIntents(mvccScanner.intents.Repr()) + if err != nil { + return nil, 0, nil, nil, err + } + + if !opts.Inconsistent && len(intents) > 0 { + return nil, 0, resumeSpan, nil, &roachpb.WriteIntentError{Intents: intents} + } + return +} + // mvccScanToKvs converts the raw key/value pairs returned by Iterator.MVCCScan // into a slice of roachpb.KeyValues. func mvccScanToKvs( @@ -2180,7 +2303,7 @@ func mvccScanToKvs( timestamp hlc.Timestamp, opts MVCCScanOptions, ) ([]roachpb.KeyValue, *roachpb.Span, []roachpb.Intent, error) { - kvData, numKVs, resumeSpan, intents, err := iter.MVCCScan(key, endKey, max, timestamp, opts) + kvData, numKVs, resumeSpan, intents, err := mvccScanToBytes(ctx, iter, key, endKey, max, timestamp, opts) if err != nil { return nil, nil, nil, err } @@ -2305,7 +2428,7 @@ func MVCCScanToBytes( ) ([][]byte, int64, *roachpb.Span, []roachpb.Intent, error) { iter := engine.NewIterator(IterOptions{LowerBound: key, UpperBound: endKey}) defer iter.Close() - return iter.MVCCScan(key, endKey, max, timestamp, opts) + return mvccScanToBytes(ctx, iter, key, endKey, max, timestamp, opts) } // MVCCIterate iterates over the key range [start,end). At each step of the diff --git a/pkg/storage/engine/pebble_iterator.go b/pkg/storage/engine/pebble_iterator.go index ff051099ad73..bc27995ec445 100644 --- a/pkg/storage/engine/pebble_iterator.go +++ b/pkg/storage/engine/pebble_iterator.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/protoutil" - "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble" ) @@ -365,124 +364,6 @@ func (p *pebbleIterator) FindSplitKey( return bestSplitKey, nil } -// MVCCGet implements the Iterator interface. -func (p *pebbleIterator) MVCCGet( - key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions, -) (value *roachpb.Value, intent *roachpb.Intent, err error) { - if opts.Inconsistent && opts.Txn != nil { - return nil, nil, errors.Errorf("cannot allow inconsistent reads within a transaction") - } - if len(key) == 0 { - return nil, nil, emptyKeyError() - } - if p.iter == nil { - panic("uninitialized iterator") - } - - mvccScanner := pebbleMVCCScannerPool.Get().(*pebbleMVCCScanner) - defer pebbleMVCCScannerPool.Put(mvccScanner) - - // MVCCGet is implemented as an MVCCScan where we retrieve a single key. We - // specify an empty key for the end key which will ensure we don't retrieve a - // key different than the start key. This is a bit of a hack. - *mvccScanner = pebbleMVCCScanner{ - parent: p.iter, - start: key, - ts: timestamp, - maxKeys: 1, - inconsistent: opts.Inconsistent, - tombstones: opts.Tombstones, - } - - mvccScanner.init(opts.Txn) - mvccScanner.get() - - if mvccScanner.err != nil { - return nil, nil, mvccScanner.err - } - intents, err := buildScanIntents(mvccScanner.intents.Repr()) - if err != nil { - return nil, nil, err - } - if !opts.Inconsistent && len(intents) > 0 { - return nil, nil, &roachpb.WriteIntentError{Intents: intents} - } - - if len(intents) > 1 { - return nil, nil, errors.Errorf("expected 0 or 1 intents, got %d", len(intents)) - } else if len(intents) == 1 { - intent = &intents[0] - } - - if len(mvccScanner.results.repr) == 0 { - return nil, intent, nil - } - - mvccKey, rawValue, _, err := MVCCScanDecodeKeyValue(mvccScanner.results.repr) - if err != nil { - return nil, nil, err - } - - value = &roachpb.Value{ - RawBytes: rawValue, - Timestamp: mvccKey.Timestamp, - } - return -} - -// MVCCScan implements the Iterator interface. -func (p *pebbleIterator) MVCCScan( - start, end roachpb.Key, max int64, timestamp hlc.Timestamp, opts MVCCScanOptions, -) (kvData [][]byte, numKVs int64, resumeSpan *roachpb.Span, intents []roachpb.Intent, err error) { - if opts.Inconsistent && opts.Txn != nil { - return nil, 0, nil, nil, errors.Errorf("cannot allow inconsistent reads within a transaction") - } - if len(end) == 0 { - return nil, 0, nil, nil, emptyKeyError() - } - if max == 0 { - resumeSpan = &roachpb.Span{Key: start, EndKey: end} - return nil, 0, resumeSpan, nil, nil - } - if p.iter == nil { - panic("uninitialized iterator") - } - - mvccScanner := pebbleMVCCScannerPool.Get().(*pebbleMVCCScanner) - defer pebbleMVCCScannerPool.Put(mvccScanner) - - *mvccScanner = pebbleMVCCScanner{ - parent: p.iter, - reverse: opts.Reverse, - start: start, - end: end, - ts: timestamp, - maxKeys: max, - inconsistent: opts.Inconsistent, - tombstones: opts.Tombstones, - } - - mvccScanner.init(opts.Txn) - resumeSpan, err = mvccScanner.scan() - - if err != nil { - return nil, 0, nil, nil, err - } - - kvData = mvccScanner.results.finish() - numKVs = mvccScanner.results.count - - intents, err = buildScanIntents(mvccScanner.intents.Repr()) - if err != nil { - return nil, 0, nil, nil, err - } - - if !opts.Inconsistent && len(intents) > 0 { - return nil, 0, resumeSpan, nil, &roachpb.WriteIntentError{Intents: intents} - } - return -} - // SetUpperBound implements the Iterator interface. func (p *pebbleIterator) SetUpperBound(upperBound roachpb.Key) { p.upperBoundBuf = append(p.upperBoundBuf[:0], upperBound...) diff --git a/pkg/storage/engine/pebble_mvcc_scanner.go b/pkg/storage/engine/pebble_mvcc_scanner.go index 10cb18cc728e..20d38b9f7569 100644 --- a/pkg/storage/engine/pebble_mvcc_scanner.go +++ b/pkg/storage/engine/pebble_mvcc_scanner.go @@ -16,6 +16,7 @@ import ( "sort" "sync" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -43,7 +44,7 @@ func (p *pebbleResults) clear() { // The repr that MVCCScan / MVCCGet expects to provide as output goes: // // This function adds to repr in that format. -func (p *pebbleResults) put(key []byte, value []byte) { +func (p *pebbleResults) put(key MVCCKey, value []byte) { // Key value lengths take up 8 bytes (2 x Uint32). const kvLenSize = 8 const minSize = 16 @@ -52,7 +53,8 @@ func (p *pebbleResults) put(key []byte, value []byte) { // We maintain a list of buffers, always encoding into the last one (a.k.a. // pebbleResults.repr). The size of the buffers is exponentially increasing, // capped at maxSize. - lenToAdd := kvLenSize + len(key) + len(value) + lenKey := key.Len() + lenToAdd := kvLenSize + lenKey + len(value) if len(p.repr)+lenToAdd > cap(p.repr) { newSize := 2 * cap(p.repr) if newSize == 0 { @@ -70,9 +72,9 @@ func (p *pebbleResults) put(key []byte, value []byte) { startIdx := len(p.repr) p.repr = p.repr[:startIdx+lenToAdd] binary.LittleEndian.PutUint32(p.repr[startIdx:], uint32(len(value))) - binary.LittleEndian.PutUint32(p.repr[startIdx+4:], uint32(len(key))) - copy(p.repr[startIdx+kvLenSize:], key) - copy(p.repr[startIdx+kvLenSize+len(key):], value) + binary.LittleEndian.PutUint32(p.repr[startIdx+4:], uint32(lenKey)) + encodeKeyToBuf(p.repr[startIdx+kvLenSize:startIdx+kvLenSize+lenKey], key, lenKey) + copy(p.repr[startIdx+kvLenSize+lenKey:], value) p.count++ } @@ -87,7 +89,7 @@ func (p *pebbleResults) finish() [][]byte { // Go port of mvccScanner in libroach/mvcc.h. Stores all variables relating to // one MVCCGet / MVCCScan call. type pebbleMVCCScanner struct { - parent *pebble.Iterator + parent Iterator reverse bool peeked bool // Iteration bounds. Does not contain MVCC timestamp. @@ -109,12 +111,11 @@ type pebbleMVCCScanner struct { keyBuf []byte savedBuf []byte // cur* variables store the "current" record we're pointing to. Updated in - // updateCurrent. Note that curRawKey = the full encoded MVCC key, while - // curKey = the user-key part of curRawKey (i.e. excluding the timestamp). - curRawKey, curKey, curValue []byte - curTS hlc.Timestamp - results pebbleResults - intents pebble.Batch + // updateCurrent. + curKey, curValue []byte + curTS hlc.Timestamp + results pebbleResults + intents pebble.Batch // Stores any error returned. If non-nil, iteration short circuits. err error // Number of iterations to try before we do a Seek/SeekReverse. Stays within @@ -144,9 +145,8 @@ func (p *pebbleMVCCScanner) init(txn *roachpb.Transaction) { // get iterates exactly once and adds one KV to the result set. func (p *pebbleMVCCScanner) get() { - p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], MVCCKey{Key: p.start}) - valid := p.parent.SeekPrefixGE(p.keyBuf) - if !p.updateCurrent(valid) { + p.parent.SeekGE(MVCCKey{Key: p.start}) + if !p.updateCurrent() { return } p.getAndAdvance() @@ -156,13 +156,11 @@ func (p *pebbleMVCCScanner) get() { // iterator is exhausted, or an error is encountered. func (p *pebbleMVCCScanner) scan() (*roachpb.Span, error) { if p.reverse { - p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], MVCCKey{Key: p.end}) - if !p.iterSeekReverse(p.keyBuf) { + if !p.iterSeekReverse(MVCCKey{Key: p.end}) { return nil, p.err } } else { - p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], MVCCKey{Key: p.start}) - if !p.iterSeek(p.keyBuf) { + if !p.iterSeek(MVCCKey{Key: p.start}) { return nil, p.err } } @@ -225,7 +223,7 @@ func (p *pebbleMVCCScanner) getFromIntentHistory() bool { } intent := p.meta.IntentHistory[upIdx-1] if len(intent.Value) > 0 || p.tombstones { - p.results.put(p.curRawKey, intent.Value) + p.results.put(p.curMVCCKey(), intent.Value) } return true } @@ -328,7 +326,8 @@ func (p *pebbleMVCCScanner) getAndAdvance() bool { // that lie before the resume key. return false } - p.err = p.intents.Set(p.curRawKey, p.curValue, nil) + p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], p.curMVCCKey()) + p.err = p.intents.Set(p.keyBuf, p.curValue, nil) if p.err != nil { return false } @@ -342,7 +341,8 @@ func (p *pebbleMVCCScanner) getAndAdvance() bool { // intent. Note that this will trigger an error on the Go // side. We continue scanning so that we can return all of the // intents in the scan range. - p.err = p.intents.Set(p.curRawKey, p.curValue, nil) + p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], p.curMVCCKey()) + p.err = p.intents.Set(p.keyBuf, p.curValue, nil) if p.err != nil { return false } @@ -397,7 +397,7 @@ func (p *pebbleMVCCScanner) getAndAdvance() bool { return p.seekVersion(prevTS, false) } -// Advances to the next user key. +// nextKey advances to the next user key. func (p *pebbleMVCCScanner) nextKey() bool { p.keyBuf = append(p.keyBuf[:0], p.curKey...) @@ -412,11 +412,10 @@ func (p *pebbleMVCCScanner) nextKey() bool { } p.decrementItersBeforeSeek() - // We're pointed at a different version of the same key. Fall back to seeking - // to the next key. We append 2 NULs to account for the "next-key" and a - // trailing zero timestamp. - p.keyBuf = append(p.keyBuf, 0, 0) - return p.iterSeek(p.keyBuf) + // We're pointed at a different version of the same key. Fall back to + // seeking to the next key. We append a NUL to account for the "next-key". + p.keyBuf = append(p.keyBuf, 0) + return p.iterSeek(MVCCKey{Key: p.keyBuf}) } // backwardLatestVersion backs up the iterator to the latest version for the @@ -443,13 +442,12 @@ func (p *pebbleMVCCScanner) backwardLatestVersion(key []byte, i int) bool { } p.decrementItersBeforeSeek() - p.keyBuf = append(p.keyBuf, 0) - return p.iterSeek(p.keyBuf) + return p.iterSeek(MVCCKey{Key: p.keyBuf}) } -// Advance to the newest version of the user key preceding the specified -// key. Assumes that the iterator is currently positioned at key or 1 record -// after key. +// prevKey advances to the newest version of the user key preceding the +// specified key. Assumes that the iterator is currently positioned at +// key or 1 record after key. func (p *pebbleMVCCScanner) prevKey(key []byte) bool { p.keyBuf = append(p.keyBuf[:0], key...) @@ -467,11 +465,10 @@ func (p *pebbleMVCCScanner) prevKey(key []byte) bool { } p.decrementItersBeforeSeek() - p.keyBuf = append(p.keyBuf, 0) - return p.iterSeekReverse(p.keyBuf) + return p.iterSeekReverse(MVCCKey{Key: p.keyBuf}) } -// Advance to the next key in the iterator's direction. +// advanceKey advances to the next key in the iterator's direction. func (p *pebbleMVCCScanner) advanceKey() bool { if p.reverse { return p.prevKey(p.curKey) @@ -486,8 +483,8 @@ func (p *pebbleMVCCScanner) advanceKeyAtEnd() bool { // Iterating to the next key might have caused the iterator to reach the // end of the key space. If that happens, back up to the very last key. p.peeked = false - valid := p.parent.Last() - if !p.updateCurrent(valid) { + p.parent.SeekLT(MVCCKey{Key: keys.MaxKey}) + if !p.updateCurrent() { return false } return p.advanceKey() @@ -515,7 +512,7 @@ func (p *pebbleMVCCScanner) addAndAdvance(val []byte) bool { // Don't include deleted versions len(val) == 0, unless we've been instructed // to include tombstones in the results. if len(val) > 0 || p.tombstones { - p.results.put(p.curRawKey, val) + p.results.put(p.curMVCCKey(), val) if p.results.count == p.maxKeys { return false } @@ -527,7 +524,8 @@ func (p *pebbleMVCCScanner) addAndAdvance(val []byte) bool { // equal to the specified timestamp, adds it to the result set, then moves onto // the next user key. func (p *pebbleMVCCScanner) seekVersion(ts hlc.Timestamp, uncertaintyCheck bool) bool { - p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], MVCCKey{Key: p.curKey, Timestamp: ts}) + key := MVCCKey{Key: p.curKey, Timestamp: ts} + p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], key) origKey := p.keyBuf[:len(p.curKey)] for i := 0; i < p.itersBeforeSeek; i++ { @@ -548,7 +546,7 @@ func (p *pebbleMVCCScanner) seekVersion(ts hlc.Timestamp, uncertaintyCheck bool) } p.decrementItersBeforeSeek() - if !p.iterSeek(p.keyBuf) { + if !p.iterSeek(key) { return p.advanceKeyAtEnd() } if !bytes.Equal(p.curKey, origKey) { @@ -564,73 +562,83 @@ func (p *pebbleMVCCScanner) seekVersion(ts hlc.Timestamp, uncertaintyCheck bool) } // Updates cur{RawKey, Key, TS} to match record the iterator is pointing to. -func (p *pebbleMVCCScanner) updateCurrent(valid bool) bool { - if !valid { +func (p *pebbleMVCCScanner) updateCurrent() bool { + if !p.iterValid() { return false } - p.curRawKey = p.parent.Key() - p.curValue = p.parent.Value() - p.curKey, p.curTS, p.err = enginepb.DecodeKey(p.curRawKey) - return p.err == nil + p.curValue = p.parent.UnsafeValue() + key := p.parent.UnsafeKey() + p.curKey, p.curTS = key.Key, key.Timestamp + return true } -// seek seeks to the latest revision of the specified key (or a greater key). -func (p *pebbleMVCCScanner) iterSeek(key []byte) bool { - p.clearPeeked() - valid := p.parent.SeekGE(key) - return p.updateCurrent(valid) +func (p *pebbleMVCCScanner) iterValid() bool { + if valid, err := p.parent.Valid(); !valid { + p.err = err + return false + } + return true } -// seekReverse seeks to the latest revision of the key before the specified key. -func (p *pebbleMVCCScanner) iterSeekReverse(key []byte) bool { +// iterSeek seeks to the latest revision of the specified key (or a greater key). +func (p *pebbleMVCCScanner) iterSeek(key MVCCKey) bool { p.clearPeeked() + p.parent.SeekGE(key) + return p.updateCurrent() +} - valid := p.parent.SeekLT(key) - if !p.updateCurrent(valid) { +// iterSeekReverse seeks to the latest revision of the key before the specified key. +func (p *pebbleMVCCScanner) iterSeekReverse(key MVCCKey) bool { + p.clearPeeked() + p.parent.SeekLT(key) + if !p.updateCurrent() { // We have seeked to before the start key. Return. return false } + if p.curTS == (hlc.Timestamp{}) { // We landed on an intent or inline value. return true } - // We landed on a versioned value, we need to back up to find the // latest version. return p.backwardLatestVersion(p.curKey, 0) } -// Advance to the next MVCC key. +// iterNext advances to the next MVCC key. func (p *pebbleMVCCScanner) iterNext() bool { if p.reverse && p.peeked { // If we have peeked at the previous entry, we need to advance the iterator // twice. p.peeked = false - if !p.parent.Valid() { + if !p.iterValid() { // We were peeked off the beginning of iteration. Seek to the first // entry, and then advance one step. - if !p.parent.First() { + p.parent.SeekGE(MVCCKey{}) + if !p.iterValid() { return false } - return p.updateCurrent(p.parent.Next()) + p.parent.Next() + return p.updateCurrent() } - if !p.parent.Next() { + p.parent.Next() + if !p.iterValid() { return false } } - valid := p.parent.Next() - return p.updateCurrent(valid) + p.parent.Next() + return p.updateCurrent() } -// Advance to the previous MVCC Key. +// iterPrev advances to the previous MVCC Key. func (p *pebbleMVCCScanner) iterPrev() bool { if p.peeked { p.peeked = false - return p.updateCurrent(p.parent.Valid()) + return p.updateCurrent() } - valid := p.parent.Prev() - return p.updateCurrent(valid) + p.parent.Prev() + return p.updateCurrent() } // Peek the previous key and store the result in peekedKey. Note that this @@ -642,35 +650,26 @@ func (p *pebbleMVCCScanner) iterPeekPrev() ([]byte, bool) { // We need to save a copy of the current iterator key and value and adjust // curRawKey, curKey and curValue to point to this saved data. We use a // single buffer for this purpose: savedBuf. - p.savedBuf = append(p.savedBuf[:0], p.curRawKey...) + p.savedBuf = append(p.savedBuf[:0], p.curKey...) p.savedBuf = append(p.savedBuf, p.curValue...) - p.curRawKey = p.savedBuf[:len(p.curRawKey)] - p.curValue = p.savedBuf[len(p.curRawKey):] - var ok bool - p.curKey, _, ok = enginepb.SplitMVCCKey(p.curRawKey) - if !ok { - p.err = errors.Errorf("invalid encoded mvcc key: %x", p.curRawKey) - return nil, false - } + p.curKey = p.savedBuf[:len(p.curKey)] + p.curValue = p.savedBuf[len(p.curKey):] // With the current iterator state saved we can move the iterator to the // previous entry. - if !p.parent.Prev() { + p.parent.Prev() + if !p.iterValid() { // The iterator is now invalid, but note that this case is handled in // both iterNext and iterPrev. In the former case, we'll position the // iterator at the first entry, and in the latter iteration will be done. return nil, false } - } else if !p.parent.Valid() { + } else if !p.iterValid() { return nil, false } - peekedKey, _, ok := enginepb.SplitMVCCKey(p.parent.Key()) - if !ok { - p.err = errors.Errorf("invalid encoded mvcc key: %x", p.parent.Key()) - return nil, false - } - return peekedKey, true + peekedKey := p.parent.UnsafeKey() + return peekedKey.Key, true } // Clear the peeked flag. Call this before any iterator operations. @@ -679,3 +678,7 @@ func (p *pebbleMVCCScanner) clearPeeked() { p.peeked = false } } + +func (p *pebbleMVCCScanner) curMVCCKey() MVCCKey { + return MVCCKey{Key: p.curKey, Timestamp: p.curTS} +} diff --git a/pkg/storage/engine/rocksdb.go b/pkg/storage/engine/rocksdb.go index 6cea5cd9b9b7..c2a53127673c 100644 --- a/pkg/storage/engine/rocksdb.go +++ b/pkg/storage/engine/rocksdb.go @@ -1560,6 +1560,10 @@ func (r *batchIterator) FindSplitKey( return r.iter.FindSplitKey(start, end, minSplitKey, targetSize) } +func (r *batchIterator) MVCCOpsSpecialized() bool { + return r.iter.MVCCOpsSpecialized() +} + func (r *batchIterator) MVCCGet( key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions, ) (*roachpb.Value, *roachpb.Intent, error) { @@ -2163,7 +2167,7 @@ var iterPool = sync.Pool{ // iterator to free up resources. func newRocksDBIterator( rdb *C.DBEngine, opts IterOptions, engine Reader, parent *RocksDB, -) Iterator { +) MVCCIterator { // In order to prevent content displacement, caching is disabled // when performing scans. Any options set within the shared read // options field that should be carried over needs to be set here @@ -2372,6 +2376,12 @@ func (r *rocksDBIterator) FindSplitKey( return MVCCKey{Key: cStringToGoBytes(splitKey)}, nil } +func (r *rocksDBIterator) MVCCOpsSpecialized() bool { + // rocksDBIterator provides specialized implementations of MVCCGet and + // MVCCScan. + return true +} + func (r *rocksDBIterator) MVCCGet( key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions, ) (*roachpb.Value, *roachpb.Intent, error) { diff --git a/pkg/storage/engine/rocksdb_iter_stats_test.go b/pkg/storage/engine/rocksdb_iter_stats_test.go index 77f9ec0dae8f..0e57a2f83f40 100644 --- a/pkg/storage/engine/rocksdb_iter_stats_test.go +++ b/pkg/storage/engine/rocksdb_iter_stats_test.go @@ -11,6 +11,7 @@ package engine import ( + "context" "math" "testing" @@ -22,6 +23,8 @@ import ( func TestIterStats(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() + db := setupMVCCInMemRocksDB(t, "test_iter_stats") defer db.Close() @@ -61,8 +64,8 @@ func TestIterStats(t *testing.T) { } // Scanning a key range containing the tombstone sees it. for i := 0; i < 10; i++ { - if _, _, _, _, err := iter.MVCCScan( - roachpb.KeyMin, roachpb.KeyMax, math.MaxInt64, hlc.Timestamp{}, MVCCScanOptions{}, + if _, _, _, err := mvccScanToKvs( + ctx, iter, roachpb.KeyMin, roachpb.KeyMax, math.MaxInt64, hlc.Timestamp{}, MVCCScanOptions{}, ); err != nil { t.Fatal(err) } @@ -74,7 +77,7 @@ func TestIterStats(t *testing.T) { // Getting the key with the tombstone sees it. for i := 0; i < 10; i++ { - if _, _, err := iter.MVCCGet(k.Key, hlc.Timestamp{}, MVCCGetOptions{}); err != nil { + if _, _, err := mvccGet(ctx, iter, k.Key, hlc.Timestamp{}, MVCCGetOptions{}); err != nil { t.Fatal(err) } stats := iter.Stats() @@ -84,7 +87,7 @@ func TestIterStats(t *testing.T) { } // Getting KeyMax doesn't see it. for i := 0; i < 10; i++ { - if _, _, err := iter.MVCCGet(roachpb.KeyMax, hlc.Timestamp{}, MVCCGetOptions{}); err != nil { + if _, _, err := mvccGet(ctx, iter, roachpb.KeyMax, hlc.Timestamp{}, MVCCGetOptions{}); err != nil { t.Fatal(err) } stats := iter.Stats() diff --git a/pkg/storage/spanset/batch.go b/pkg/storage/spanset/batch.go index 7b43844671db..c64bceeeee7a 100644 --- a/pkg/storage/spanset/batch.go +++ b/pkg/storage/spanset/batch.go @@ -41,6 +41,7 @@ type Iterator struct { } var _ engine.Iterator = &Iterator{} +var _ engine.MVCCIterator = &Iterator{} // NewIterator constructs an iterator that verifies access of the underlying // iterator against the given SpanSet. Timestamps associated with the spans @@ -55,11 +56,6 @@ func NewIteratorAt(iter engine.Iterator, spans *SpanSet, ts hlc.Timestamp) *Iter return &Iterator{i: iter, spans: spans, ts: ts} } -// Stats is part of the engine.Iterator interface. -func (i *Iterator) Stats() engine.IteratorStats { - return i.i.Stats() -} - // Close is part of the engine.Iterator interface. func (i *Iterator) Close() { i.i.Close() @@ -217,7 +213,25 @@ func (i *Iterator) CheckForKeyCollisions( return i.i.CheckForKeyCollisions(sstData, start, end) } -// MVCCGet is part of the engine.Iterator interface. +// SetUpperBound is part of the engine.Iterator interface. +func (i *Iterator) SetUpperBound(key roachpb.Key) { + i.i.SetUpperBound(key) +} + +// Stats is part of the engine.Iterator interface. +func (i *Iterator) Stats() engine.IteratorStats { + return i.i.Stats() +} + +// MVCCOpsSpecialized is part of the engine.MVCCIterator interface. +func (i *Iterator) MVCCOpsSpecialized() bool { + if mvccIt, ok := i.i.(engine.MVCCIterator); ok { + return mvccIt.MVCCOpsSpecialized() + } + return false +} + +// MVCCGet is part of the engine.MVCCIterator interface. func (i *Iterator) MVCCGet( key roachpb.Key, timestamp hlc.Timestamp, opts engine.MVCCGetOptions, ) (*roachpb.Value, *roachpb.Intent, error) { @@ -230,10 +244,10 @@ func (i *Iterator) MVCCGet( return nil, nil, err } } - return i.i.MVCCGet(key, timestamp, opts) + return i.i.(engine.MVCCIterator).MVCCGet(key, timestamp, opts) } -// MVCCScan is part of the engine.Iterator interface. +// MVCCScan is part of the engine.MVCCIterator interface. func (i *Iterator) MVCCScan( start, end roachpb.Key, max int64, timestamp hlc.Timestamp, opts engine.MVCCScanOptions, ) (kvData [][]byte, numKVs int64, resumeSpan *roachpb.Span, intents []roachpb.Intent, err error) { @@ -246,12 +260,7 @@ func (i *Iterator) MVCCScan( return nil, 0, nil, nil, err } } - return i.i.MVCCScan(start, end, max, timestamp, opts) -} - -// SetUpperBound is part of the engine.Iterator interface. -func (i *Iterator) SetUpperBound(key roachpb.Key) { - i.i.SetUpperBound(key) + return i.i.(engine.MVCCIterator).MVCCScan(start, end, max, timestamp, opts) } type spanSetReader struct {