Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] sstable: add value blocks for storing older versions of a key #1443

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions internal/base/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ package base

import "fmt"

// TODO(sumeer): change the InternalIterator interface to not eagerly return
// the value. Implementations should continue to cache the value once asked to
// return it, so that repeated calls to get the value are cheap.

// InternalIterator iterates over a DB's key/value pairs in key order. Unlike
// the Iterator interface, the returned keys are InternalKeys composed of the
// user-key, a sequence number and a key kind. In forward iteration, key/value
Expand Down
211 changes: 204 additions & 7 deletions sstable/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,17 @@ import (

"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/cache"
"github.com/cockroachdb/pebble/internal/manual"
)

// NB: blockWriter supports addInlineValuePrefix for efficiency reasons, in
// that we don't want the caller to have to copy the value to a new slice in
// order to add the prefix. It does not know about valueHandlePrefix since the
// serialization of valueHandle includes that prefix. Similarly, blockReader
// knows little about these prefixes since the value read from a block that
// has such prefixes is passed to valueBlockReader for interpretation
// (NextInlineValue is the exception).

func uvarintLen(v uint32) int {
i := 0
for v >= 0x80 {
Expand All @@ -28,12 +37,13 @@ type blockWriter struct {
buf []byte
restarts []uint32
curKey []byte
curValue []byte
prevKey []byte
tmp [4]byte
// curValue excludes the optional inlineValuePrefix.
curValue []byte
prevKey []byte
tmp [4]byte
}

func (w *blockWriter) store(keySize int, value []byte) {
func (w *blockWriter) store(keySize int, value []byte, addInlineValuePrefix bool) {
shared := 0
if w.nEntries == w.nextRestart {
w.nextRestart = w.nEntries + w.restartInterval
Expand All @@ -58,7 +68,11 @@ func (w *blockWriter) store(keySize int, value []byte) {
}
}

needed := 3*binary.MaxVarintLen32 + len(w.curKey[shared:]) + len(value)
lenValuePlusOptionalPrefix := len(value)
if addInlineValuePrefix {
lenValuePlusOptionalPrefix++
}
needed := 3*binary.MaxVarintLen32 + len(w.curKey[shared:]) + lenValuePlusOptionalPrefix
n := len(w.buf)
if cap(w.buf) < n+needed {
newCap := 2 * cap(w.buf)
Expand Down Expand Up @@ -100,7 +114,7 @@ func (w *blockWriter) store(keySize int, value []byte) {
}

{
x := uint32(len(value))
x := uint32(lenValuePlusOptionalPrefix)
for x >= 0x80 {
w.buf[n] = byte(x) | 0x80
x >>= 7
Expand All @@ -111,6 +125,10 @@ func (w *blockWriter) store(keySize int, value []byte) {
}

n += copy(w.buf[n:], w.curKey[shared:])
if addInlineValuePrefix {
w.buf[n : n+1][0] = byte(inlineValuePrefix)
n++
}
n += copy(w.buf[n:], value)
w.buf = w.buf[:n]

Expand All @@ -120,6 +138,11 @@ func (w *blockWriter) store(keySize int, value []byte) {
}

func (w *blockWriter) add(key InternalKey, value []byte) {
w.addWithOptionalInlineValuePrefix(key, value, false)
}

func (w *blockWriter) addWithOptionalInlineValuePrefix(
key InternalKey, value []byte, addInlineValuePrefix bool) {
w.curKey, w.prevKey = w.prevKey, w.curKey

size := key.Size()
Expand All @@ -129,7 +152,7 @@ func (w *blockWriter) add(key InternalKey, value []byte) {
w.curKey = w.curKey[:size]
key.Encode(w.curKey)

w.store(size, value)
w.store(size, value, addInlineValuePrefix)
}

func (w *blockWriter) finish() []byte {
Expand Down Expand Up @@ -325,6 +348,122 @@ func (i *blockIter) resetForReuse() blockIter {
}
}

// Returns false if older version.
func (i *blockIter) readEntryOrSkipIfOlderVersion() bool {
ptr := unsafe.Pointer(uintptr(i.ptr) + uintptr(i.offset))

// This is an ugly performance hack. Reading entries from blocks is one of
// the inner-most routines and decoding the 3 varints per-entry takes
// significant time. Neither go1.11 or go1.12 will inline decodeVarint for
// us, so we do it manually. This provides a 10-15% performance improvement
// on blockIter benchmarks on both go1.11 and go1.12.
//
// TODO(peter): remove this hack if go:inline is ever supported.

var shared uint32
if a := *((*uint8)(ptr)); a < 128 {
shared = uint32(a)
ptr = unsafe.Pointer(uintptr(ptr) + 1)
} else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 {
shared = uint32(b)<<7 | uint32(a)
ptr = unsafe.Pointer(uintptr(ptr) + 2)
} else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 {
shared = uint32(c)<<14 | uint32(b)<<7 | uint32(a)
ptr = unsafe.Pointer(uintptr(ptr) + 3)
} else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 {
shared = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
ptr = unsafe.Pointer(uintptr(ptr) + 4)
} else {
d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4)))
shared = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
ptr = unsafe.Pointer(uintptr(ptr) + 5)
}

var unshared uint32
if a := *((*uint8)(ptr)); a < 128 {
unshared = uint32(a)
ptr = unsafe.Pointer(uintptr(ptr) + 1)
} else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 {
unshared = uint32(b)<<7 | uint32(a)
ptr = unsafe.Pointer(uintptr(ptr) + 2)
} else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 {
unshared = uint32(c)<<14 | uint32(b)<<7 | uint32(a)
ptr = unsafe.Pointer(uintptr(ptr) + 3)
} else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 {
unshared = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
ptr = unsafe.Pointer(uintptr(ptr) + 4)
} else {
d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4)))
unshared = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
ptr = unsafe.Pointer(uintptr(ptr) + 5)
}

var value uint32
if a := *((*uint8)(ptr)); a < 128 {
value = uint32(a)
ptr = unsafe.Pointer(uintptr(ptr) + 1)
} else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 {
value = uint32(b)<<7 | uint32(a)
ptr = unsafe.Pointer(uintptr(ptr) + 2)
} else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 {
value = uint32(c)<<14 | uint32(b)<<7 | uint32(a)
ptr = unsafe.Pointer(uintptr(ptr) + 3)
} else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 {
value = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
ptr = unsafe.Pointer(uintptr(ptr) + 4)
} else {
d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4)))
value = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
ptr = unsafe.Pointer(uintptr(ptr) + 5)
}

var keyKind InternalKeyKind
if unshared >= 8 {
keyKind = InternalKeyKind(
(*[manual.MaxArrayLen]byte)(ptr)[unshared-8])
} else {
offsetFromRear := 8 - unshared
keyKind = InternalKeyKind(
i.fullKey[:shared][shared-offsetFromRear])
}
// If this is an older version, the prefix must be the same. Eventually we
// will either find a DEL with the same prefix, or a different prefix.
// - DEL case: The shared part could include parts of the suffix of the preceding key.
// - Different prefix: The shared part would differ in parts of the prefix.
// The first case requires us to construct the fullKey here.
// The following change is a HACK to ignore the first case, since there are
// ways to work around it when writing the sstable.
/*
unsharedKey := getBytes(ptr, int(unshared))
i.fullKey = append(i.fullKey[:shared], unsharedKey...)
*/

if /*i.fullKey[len(i.fullKey)-8] == byte(InternalKeyKindSet) &&*/ keyKind == InternalKeyKindSet &&
(*[manual.MaxArrayLen]byte)(unsafe.Pointer(uintptr(ptr) + uintptr(unshared)))[0] ==
byte(valueHandlePrefix) {
// unsharedKey := getBytes(ptr, int(unshared))
// i.fullKey = append(i.fullKey[:shared], unsharedKey...)
i.nextOffset = int32(uintptr(ptr)-uintptr(i.ptr)) + int32(unshared) + int32(value)
return false
}
unsharedKey := getBytes(ptr, int(unshared))
i.fullKey = append(i.fullKey[:shared], unsharedKey...)

if shared == 0 {
// Provide stability for the key across positioning calls if the key
// doesn't share a prefix with the previous key. This removes requiring the
// key to be copied if the caller knows the block has a restart interval of
// 1. An important example of this is range-del blocks.
i.key = unsharedKey
} else {
i.key = i.fullKey
}
ptr = unsafe.Pointer(uintptr(ptr) + uintptr(unshared))
i.val = getBytes(ptr, int(value))
i.nextOffset = int32(uintptr(ptr)-uintptr(i.ptr)) + int32(value)
return true
}

func (i *blockIter) readEntry() {
ptr := unsafe.Pointer(uintptr(i.ptr) + uintptr(i.offset))

Expand Down Expand Up @@ -776,6 +915,25 @@ func (i *blockIter) First() (*InternalKey, []byte) {
return &i.ikey, i.val
}

func (i *blockIter) firstSkipOlderVersions() (*InternalKey, []byte) {
i.offset = 0
if !i.Valid() {
return nil, nil
}
i.clearCache()
for {
if !i.readEntryOrSkipIfOlderVersion() {
i.offset = i.nextOffset
if !i.Valid() {
return nil, nil
}
continue
}
i.decodeInternalKey(i.key)
return &i.ikey, i.val
}
}

// Last implements internalIterator.Last, as documented in the pebble package.
func (i *blockIter) Last() (*InternalKey, []byte) {
// Seek forward from the last restart point.
Expand Down Expand Up @@ -834,6 +992,45 @@ func (i *blockIter) Next() (*InternalKey, []byte) {
return &i.ikey, i.val
}

func (i *blockIter) nextSkipOlderVersions() (*InternalKey, []byte) {
if len(i.cachedBuf) > 0 {
// We're switching from reverse iteration to forward iteration. We need to
// populate i.fullKey with the current key we're positioned at so that
// readEntry() can use i.fullKey for key prefix decompression. Note that we
// don't know whether i.key is backed by i.cachedBuf or i.fullKey (if
// SeekLT was the previous call, i.key may be backed by i.fullKey), but
// copying into i.fullKey works for both cases.
//
// TODO(peter): Rather than clearing the cache, we could instead use the
// cache until it is exhausted. This would likely be faster than falling
// through to the normal forward iteration code below.
i.fullKey = append(i.fullKey[:0], i.key...)
i.clearCache()
}

for {
i.offset = i.nextOffset
if !i.Valid() {
return nil, nil
}
if !i.readEntryOrSkipIfOlderVersion() {
continue
}
// Manually inlined version of i.decodeInternalKey(i.key).
if n := len(i.key) - 8; n >= 0 {
i.ikey.Trailer = binary.LittleEndian.Uint64(i.key[n:])
i.ikey.UserKey = i.key[:n:n]
if i.globalSeqNum != 0 {
i.ikey.SetSeqNum(i.globalSeqNum)
}
} else {
i.ikey.Trailer = uint64(InternalKeyKindInvalid)
i.ikey.UserKey = nil
}
return &i.ikey, i.val
}
}

// Prev implements internalIterator.Prev, as documented in the pebble
// package.
func (i *blockIter) Prev() (*InternalKey, []byte) {
Expand Down
4 changes: 4 additions & 0 deletions sstable/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ type WriterOptions struct {

// Checksum specifies which checksum to use.
Checksum ChecksumType

// ValueBlocksAreEnabled indicates whether the writer should place older
// versions in value blocks.
ValueBlocksAreEnabled bool
}

func (o WriterOptions) ensureDefaults() WriterOptions {
Expand Down
16 changes: 16 additions & 0 deletions sstable/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ type Properties struct {
NumRangeKeySets uint64 `prop:"pebble.num.range-key-sets"`
// The number of RANGEKEYUNSETs in this table.
NumRangeKeyUnsets uint64 `prop:"pebble.num.range-key-unsets"`
// The number of value blocks in this table. Only serialized if > 0.
NumValueBlocks uint64 `prop:"pebble.num.value-blocks"`
// The number of values stored in value blocks. Only serialized if > 0.
NumValuesInValueBlocks uint64 `prop:"pebble.num.values.in.value-blocks"`
// Timestamp of the earliest key. 0 if unknown.
OldestKeyTime uint64 `prop:"rocksdb.oldest.key.time"`
// The name of the prefix extractor used in this table. Empty if no prefix
Expand All @@ -142,6 +146,9 @@ type Properties struct {
TopLevelIndexSize uint64 `prop:"rocksdb.top-level.index.size"`
// User collected properties.
UserProperties map[string]string
// True iff the use of value blocks is enabled. Only serialized if true.
ValueBlocksAreEnabled bool `prop:"pebble.value-blocks.enabled"`

// If filtering is enabled, was the filter created on the whole key.
WholeKeyFiltering bool `prop:"rocksdb.block.based.table.whole.key.filtering"`

Expand Down Expand Up @@ -340,6 +347,12 @@ func (p *Properties) save(w *rawBlockWriter) {
p.saveUvarint(m, unsafe.Offsetof(p.RawRangeKeyKeySize), p.RawRangeKeyKeySize)
p.saveUvarint(m, unsafe.Offsetof(p.RawRangeKeyValueSize), p.RawRangeKeyValueSize)
}
if p.NumValueBlocks > 0 {
p.saveUvarint(m, unsafe.Offsetof(p.NumValueBlocks), p.NumValueBlocks)
}
if p.NumValuesInValueBlocks > 0 {
p.saveUvarint(m, unsafe.Offsetof(p.NumValuesInValueBlocks), p.NumValuesInValueBlocks)
}
p.saveUvarint(m, unsafe.Offsetof(p.OldestKeyTime), p.OldestKeyTime)
if p.PrefixExtractorName != "" {
p.saveString(m, unsafe.Offsetof(p.PrefixExtractorName), p.PrefixExtractorName)
Expand All @@ -350,6 +363,9 @@ func (p *Properties) save(w *rawBlockWriter) {
}
p.saveUvarint(m, unsafe.Offsetof(p.RawKeySize), p.RawKeySize)
p.saveUvarint(m, unsafe.Offsetof(p.RawValueSize), p.RawValueSize)
if p.ValueBlocksAreEnabled {
p.saveBool(m, unsafe.Offsetof(p.ValueBlocksAreEnabled), p.ValueBlocksAreEnabled)
}
p.saveBool(m, unsafe.Offsetof(p.WholeKeyFiltering), p.WholeKeyFiltering)

keys := make([]string, 0, len(m))
Expand Down
2 changes: 1 addition & 1 deletion sstable/raw_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (w *rawBlockWriter) add(key InternalKey, value []byte) {
w.curKey = w.curKey[:size]
copy(w.curKey, key.UserKey)

w.store(size, value)
w.store(size, value, false)
}

// rawBlockIter is an iterator over a single block of data. Unlike blockIter,
Expand Down
Loading