Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage/engine: add pebbleIterator support for time-bound iterators #41854

Merged
merged 2 commits into from
Oct 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 5 additions & 68 deletions pkg/storage/bulk/sst_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@
package bulk

import (
"bytes"
"io"

"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/sstable"
"github.com/pkg/errors"
Expand All @@ -30,75 +28,14 @@ type SSTWriter struct {
scratch []byte
}

// timeboundPropCollector implements a property collector for MVCC Timestamps.
// Its behavior matches TimeBoundTblPropCollector in table_props.cc.
type timeboundPropCollector struct {
min, max []byte
}

var _ pebble.TablePropertyCollector = &timeboundPropCollector{}

func (t *timeboundPropCollector) Add(key pebble.InternalKey, value []byte) error {
_, ts, ok := enginepb.SplitMVCCKey(key.UserKey)
if !ok {
return errors.Errorf("failed to split MVCC key")
}
if len(ts) > 0 {
if len(t.min) == 0 || bytes.Compare(ts, t.min) < 0 {
t.min = append(t.min[:0], ts...)
}
if len(t.max) == 0 || bytes.Compare(ts, t.max) > 0 {
t.max = append(t.max[:0], ts...)
}
}
return nil
}

func (t *timeboundPropCollector) Finish(userProps map[string]string) error {
userProps["crdb.ts.min"] = string(t.min)
userProps["crdb.ts.max"] = string(t.max)
return nil
}

func (t *timeboundPropCollector) Name() string {
return "TimeBoundTblPropCollectorFactory"
}

// dummyDeleteRangeCollector is a stub collector that just identifies itself.
// This stub can be installed so that SSTs claim to have the same props as those
// written by the Rocks writer, using the collector in table_props.cc. However
// since bulk-ingestion SSTs never contain deletions (range or otherwise), there
// is no actual implementation needed here.
type dummyDeleteRangeCollector struct{}

var _ pebble.TablePropertyCollector = &dummyDeleteRangeCollector{}

func (dummyDeleteRangeCollector) Add(key pebble.InternalKey, value []byte) error {
if key.Kind() != pebble.InternalKeyKindSet {
return errors.Errorf("unsupported key kind %v", key.Kind())
}
return nil
}

func (dummyDeleteRangeCollector) Finish(userProps map[string]string) error {
return nil
}

func (dummyDeleteRangeCollector) Name() string {
return "DeleteRangeTblPropCollectorFactory"
}

// MakeSSTWriter creates a new SSTWriter.
func MakeSSTWriter() SSTWriter {
opts := sstable.WriterOptions{
BlockSize: 32 * 1024,
TableFormat: pebble.TableFormatLevelDB,
Comparer: engine.MVCCComparer,
MergerName: "nullptr",
TablePropertyCollectors: []func() pebble.TablePropertyCollector{
func() pebble.TablePropertyCollector { return &timeboundPropCollector{} },
func() pebble.TablePropertyCollector { return &dummyDeleteRangeCollector{} },
},
BlockSize: 32 * 1024,
TableFormat: pebble.TableFormatLevelDB,
Comparer: engine.MVCCComparer,
MergerName: "nullptr",
TablePropertyCollectors: engine.PebbleTablePropertyCollectors,
}
f := &memFile{}
sst := sstable.NewWriter(f, opts)
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/engine/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"encoding/binary"

"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/pebble"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -237,6 +238,11 @@ func encodeKeyToBuf(buf []byte, key MVCCKey, keyLen int) {
buf[len(buf)-1] = byte(timestampLength)
}

func encodeTimestamp(ts hlc.Timestamp) []byte {
_, encodedTS, _ := enginepb.SplitMVCCKey(EncodeKey(MVCCKey{Timestamp: ts}))
return encodedTS
}

// DecodeMVCCKey decodes an engine.MVCCKey from its serialized representation. This
// decoding must match engine/db.cc:DecodeKey().
func DecodeMVCCKey(encodedKey []byte) (MVCCKey, error) {
Expand Down
163 changes: 163 additions & 0 deletions pkg/storage/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
"context"
"encoding/binary"
"fmt"
"math/rand"
"path/filepath"
"reflect"
Expand Down Expand Up @@ -478,6 +479,168 @@ func TestEngineMerge(t *testing.T) {
}, t)
}

func TestEngineTimeBound(t *testing.T) {
defer leaktest.AfterTest(t)()

for _, engineImpl := range mvccEngineImpls {
t.Run(engineImpl.name, func(t *testing.T) {
engine := engineImpl.create()
defer engine.Close()

var minTimestamp = hlc.Timestamp{WallTime: 1, Logical: 0}
var maxTimestamp = hlc.Timestamp{WallTime: 3, Logical: 0}
times := []hlc.Timestamp{
{WallTime: 2, Logical: 0},
minTimestamp,
maxTimestamp,
{WallTime: 2, Logical: 0},
}

for i, time := range times {
s := fmt.Sprintf("%02d", i)
key := MVCCKey{Key: roachpb.Key(s), Timestamp: time}
if err := engine.Put(key, []byte(s)); err != nil {
t.Fatal(err)
}
}
if err := engine.Flush(); err != nil {
t.Fatal(err)
}

batch := engine.NewBatch()
defer batch.Close()

check := func(t *testing.T, tbi Iterator, keys, ssts int) {
defer tbi.Close()
tbi.Seek(NilKey)

var count int
for ; ; tbi.Next() {
ok, err := tbi.Valid()
if err != nil {
t.Fatal(err)
}
if !ok {
break
}
count++
}

// Make sure the iterator sees no writes.
if keys != count {
t.Fatalf("saw %d values in time bounded iterator, but expected %d", count, keys)
}
stats := tbi.Stats()
if a := stats.TimeBoundNumSSTs; a != ssts {
t.Fatalf("touched %d SSTs, expected %d", a, ssts)
}
}

testCases := []struct {
iter Iterator
keys, ssts int
}{
// Completely to the right, not touching.
{
iter: batch.NewIterator(IterOptions{
MinTimestampHint: maxTimestamp.Next(),
MaxTimestampHint: maxTimestamp.Next().Next(),
UpperBound: roachpb.KeyMax,
WithStats: true,
}),
keys: 0,
ssts: 0,
},
// Completely to the left, not touching.
{
iter: batch.NewIterator(IterOptions{
MinTimestampHint: minTimestamp.Prev().Prev(),
MaxTimestampHint: minTimestamp.Prev(),
UpperBound: roachpb.KeyMax,
WithStats: true,
}),
keys: 0,
ssts: 0,
},
// Touching on the right.
{
iter: batch.NewIterator(IterOptions{
MinTimestampHint: maxTimestamp,
MaxTimestampHint: maxTimestamp,
UpperBound: roachpb.KeyMax,
WithStats: true,
}),
keys: len(times),
ssts: 1,
},
// Touching on the left.
{
iter: batch.NewIterator(IterOptions{
MinTimestampHint: minTimestamp,
MaxTimestampHint: minTimestamp,
UpperBound: roachpb.KeyMax,
WithStats: true,
}),
keys: len(times),
ssts: 1,
},
// Copy of last case, but confirm that we don't get SST stats if we don't
// ask for them.
{
iter: batch.NewIterator(IterOptions{
MinTimestampHint: minTimestamp,
MaxTimestampHint: minTimestamp,
UpperBound: roachpb.KeyMax,
WithStats: false,
}),
keys: len(times),
ssts: 0,
},
// Copy of last case, but confirm that upper bound is respected.
{
iter: batch.NewIterator(IterOptions{
MinTimestampHint: minTimestamp,
MaxTimestampHint: minTimestamp,
UpperBound: []byte("02"),
WithStats: false,
}),
keys: 2,
ssts: 0,
},
}

for _, test := range testCases {
t.Run("", func(t *testing.T) {
check(t, test.iter, test.keys, test.ssts)
})
}

// Make a regular iterator. Before #21721, this would accidentally pick up the
// time bounded iterator instead.
iter := batch.NewIterator(IterOptions{UpperBound: roachpb.KeyMax})
defer iter.Close()
iter.Seek(NilKey)

var count int
for ; ; iter.Next() {
ok, err := iter.Valid()
if err != nil {
t.Fatal(err)
}
if !ok {
break
}
count++
}

// Make sure the iterator sees the writes (i.e. it's not the time bounded iterator).
if expCount := len(times); expCount != count {
t.Fatalf("saw %d values in regular iterator, but expected %d", count, expCount)
}
})
}
}

func TestFlushWithSSTables(t *testing.T) {
defer leaktest.AfterTest(t)()
runWithAllEngines(func(engine Engine, t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/engine/enginepb/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func SplitMVCCKey(mvccKey []byte) (key []byte, ts []byte, ok bool) {
}

// DecodeKey decodes an key/timestamp from its serialized representation. This
// decoding must match engine/db.cc:DecodeKey().
// decoding must match libroach/encoding.cc:DecodeKey().
func DecodeKey(encodedKey []byte) (key []byte, timestamp hlc.Timestamp, _ error) {
key, ts, ok := SplitMVCCKey(encodedKey)
if !ok {
Expand Down
63 changes: 63 additions & 0 deletions pkg/storage/engine/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package engine

import (
"bytes"
"context"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -85,6 +86,67 @@ var MVCCMerger = &pebble.Merger{
},
}

// pebbleTimeBoundPropCollector implements a property collector for MVCC
// Timestamps. Its behavior matches TimeBoundTblPropCollector in
// table_props.cc.
type pebbleTimeBoundPropCollector struct {
min, max []byte
}

func (t *pebbleTimeBoundPropCollector) Add(key pebble.InternalKey, value []byte) error {
_, ts, ok := enginepb.SplitMVCCKey(key.UserKey)
if !ok {
return errors.Errorf("failed to split MVCC key")
}
if len(ts) > 0 {
if len(t.min) == 0 || bytes.Compare(ts, t.min) < 0 {
t.min = append(t.min[:0], ts...)
}
if len(t.max) == 0 || bytes.Compare(ts, t.max) > 0 {
t.max = append(t.max[:0], ts...)
}
}
return nil
}

func (t *pebbleTimeBoundPropCollector) Finish(userProps map[string]string) error {
userProps["crdb.ts.min"] = string(t.min)
userProps["crdb.ts.max"] = string(t.max)
return nil
}

func (t *pebbleTimeBoundPropCollector) Name() string {
// This constant needs to match the one used by the RocksDB version of this
// table property collector. DO NOT CHANGE.
return "TimeBoundTblPropCollectorFactory"
}

// pebbleDeleteRangeCollector marks an sstable for compaction that contains a
// range tombstone.
type pebbleDeleteRangeCollector struct{}

func (pebbleDeleteRangeCollector) Add(key pebble.InternalKey, value []byte) error {
// TODO(peter): track whether a range tombstone is present. Need to extend
// the TablePropertyCollector interface.
return nil
}

func (pebbleDeleteRangeCollector) Finish(userProps map[string]string) error {
return nil
}

func (pebbleDeleteRangeCollector) Name() string {
// This constant needs to match the one used by the RocksDB version of this
// table property collector. DO NOT CHANGE.
return "DeleteRangeTblPropCollectorFactory"
}

// PebbleTablePropertyCollectors is the list of Pebble TablePropertyCollectors.
var PebbleTablePropertyCollectors = []func() pebble.TablePropertyCollector{
func() pebble.TablePropertyCollector { return &pebbleTimeBoundPropCollector{} },
func() pebble.TablePropertyCollector { return &pebbleDeleteRangeCollector{} },
}

// Pebble is a wrapper around a Pebble database instance.
type Pebble struct {
db *pebble.DB
Expand All @@ -104,6 +166,7 @@ var _ WithSSTables = &Pebble{}
func NewPebble(path string, cfg *pebble.Options) (*Pebble, error) {
cfg.Comparer = MVCCComparer
cfg.Merger = MVCCMerger
cfg.TablePropertyCollectors = PebbleTablePropertyCollectors

// pebble.Open also calls EnsureDefaults, but only after doing a clone. Call
// EnsureDefaults beforehand so we have a matching cfg here for when we save
Expand Down
Loading