Skip to content

Commit

Permalink
storage: Update Engine/Reader/Writer interfaces for ScanInternal
Browse files Browse the repository at this point in the history
This change updates pkg/storage interfaces and implementations to allow
the use of ScanInternal in skip-shared iteration mode as well as
writing/reading of internal point keys, range dels and range keys.
Replication / snapshot code will soon rely on these changes to
be able to replicate internal keys in higher levels plus metadata
of shared sstables in lower levels, as opposed to just observed
user keys.

Part of cockroachdb#103028

Epic: none

Release note: None
  • Loading branch information
itsbilal committed Aug 3, 2023
1 parent dc28e19 commit c880906
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 6 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/spanset/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_cockroachdb_pebble//rangekey",
],
)

Expand Down
36 changes: 36 additions & 0 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/rangekey"
)

// MVCCIterator wraps an storage.MVCCIterator and ensures that it can
Expand Down Expand Up @@ -445,6 +446,17 @@ type spanSetReader struct {

var _ storage.Reader = spanSetReader{}

func (s spanSetReader) ScanInternal(
ctx context.Context,
lower, upper roachpb.Key,
visitPointKey func(key *pebble.InternalKey, value pebble.LazyValue) error,
visitRangeDel func(start []byte, end []byte, seqNum uint64) error,
visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
) error {
return s.r.ScanInternal(ctx, lower, upper, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile)
}

func (s spanSetReader) Close() {
s.r.Close()
}
Expand Down Expand Up @@ -762,6 +774,18 @@ type spanSetBatch struct {

var _ storage.Batch = spanSetBatch{}

func (s spanSetBatch) ScanInternal(
ctx context.Context,
lower, upper roachpb.Key,
visitPointKey func(key *pebble.InternalKey, value pebble.LazyValue) error,
visitRangeDel func(start []byte, end []byte, seqNum uint64) error,
visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
) error {
// Only used on Engine.
panic("unimplemented")
}

func (s spanSetBatch) Commit(sync bool) error {
return s.b.Commit(sync)
}
Expand Down Expand Up @@ -794,6 +818,18 @@ func (s spanSetBatch) CommitStats() storage.BatchCommitStats {
return s.b.CommitStats()
}

func (s spanSetBatch) PutInternalRangeKey(start, end []byte, key rangekey.Key) error {
return s.b.PutInternalRangeKey(start, end, key)
}

func (s spanSetBatch) PutInternalPointKey(key *pebble.InternalKey, value []byte) error {
return s.b.PutInternalPointKey(key, value)
}

func (s spanSetBatch) ClearRawEncodedRange(start, end []byte) error {
return s.b.ClearRawEncodedRange(start, end)
}

// NewBatch returns a storage.Batch that asserts access of the underlying
// Batch against the given SpanSet. We only consider span boundaries, associated
// timestamps are not considered.
Expand Down
32 changes: 28 additions & 4 deletions pkg/storage/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ func (r *BatchReader) Value() []byte {
}
}

// EngineEndKey returns the engine end key of the current ranged batch entry.
func (r *BatchReader) EngineEndKey() (EngineKey, error) {
// EndKey returns the raw end key of the current ranged batch entry.
func (r *BatchReader) EndKey() ([]byte, error) {
var rawKey []byte
switch r.kind {
case pebble.InternalKeyKindRangeDelete:
Expand All @@ -160,14 +160,23 @@ func (r *BatchReader) EngineEndKey() (EngineKey, error) {
case pebble.InternalKeyKindRangeKeySet, pebble.InternalKeyKindRangeKeyUnset, pebble.InternalKeyKindRangeKeyDelete:
rangeKeys, err := r.rangeKeys()
if err != nil {
return EngineKey{}, err
return nil, err
}
rawKey = rangeKeys.End

default:
return EngineKey{}, errors.AssertionFailedf(
return nil, errors.AssertionFailedf(
"can only ask for EndKey on a ranged entry, got %v", r.kind)
}
return rawKey, nil
}

// EngineEndKey returns the engine end key of the current ranged batch entry.
func (r *BatchReader) EngineEndKey() (EngineKey, error) {
rawKey, err := r.EndKey()
if err != nil {
return EngineKey{}, err
}

key, ok := DecodeEngineKey(rawKey)
if !ok {
Expand All @@ -176,6 +185,21 @@ func (r *BatchReader) EngineEndKey() (EngineKey, error) {
return key, nil
}

// RawRangeKeys returns the raw range key values at the current entry.
func (r *BatchReader) RawRangeKeys() ([]rangekey.Key, error) {
switch r.kind {
case pebble.InternalKeyKindRangeKeySet, pebble.InternalKeyKindRangeKeyUnset:
default:
return nil, errors.AssertionFailedf(
"can only ask for range keys on a range key entry, got %v", r.kind)
}
rangeKeys, err := r.rangeKeys()
if err != nil {
return nil, err
}
return rangeKeys.Keys, nil
}

// EngineRangeKeys returns the engine range key values at the current entry.
func (r *BatchReader) EngineRangeKeys() ([]EngineRangeKeyValue, error) {
switch r.kind {
Expand Down
54 changes: 53 additions & 1 deletion pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/rangekey"
"github.com/cockroachdb/pebble/vfs"
"github.com/cockroachdb/redact"
prometheusgo "github.com/prometheus/client_model/go"
Expand Down Expand Up @@ -598,6 +599,26 @@ type Reader interface {
// with the iterator to free resources. The caller can change IterOptions
// after this function returns.
NewEngineIterator(opts IterOptions) EngineIterator
// ScanInternal allows a caller to inspect the underlying engine's InternalKeys
// using a visitor pattern, while also allowing for keys in shared files to be
// skipped if a visitor is provided for visitSharedFiles. Useful for
// fast-replicating state from one Reader to another. Point keys are collapsed
// such that only one internal key per user key is exposed, and rangedels and
// range keys are collapsed and defragmented with each span being surfaced
// exactly once, alongside the highest seqnum for a rangedel on that span
// (for rangedels) or all coalesced rangekey.Keys in that span (for range
// keys). A point key deleted by a rangedel will not be exposed, but the
// rangedel would be exposed.
//
// Note that ScanInternal does not obey the guarantees indicated by
// ConsistentIterators.
ScanInternal(
ctx context.Context, lower, upper roachpb.Key,
visitPointKey func(key *pebble.InternalKey, value pebble.LazyValue) error,
visitRangeDel func(start, end []byte, seqNum uint64) error,
visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
) error
// ConsistentIterators returns true if the Reader implementation guarantees
// that the different iterators constructed by this Reader will see the same
// underlying Engine state. This is not true about Batch writes: new iterators
Expand Down Expand Up @@ -854,6 +875,32 @@ type Writer interface {
BufferedSize() int
}

// InternalWriter is an extension of Writer that supports additional low-level
// methods to operate on internal keys in Pebble. These additional methods
// should only be used sparingly, when one of the high-level methods cannot
// achieve the same ends.
type InternalWriter interface {
Writer
// ClearRawEncodedRange is similar to ClearRawRange, except it takes pre-encoded
// start, end keys and bypasses the EngineKey encoding step. It also only
// operates on point keys; for range keys, use ClearEngineRangeKey or
// PutInternalRangeKey.
//
// It is safe to modify the contents of the arguments after it returns.
ClearRawEncodedRange(start, end []byte) error

// PutInternalRangeKey adds an InternalRangeKey to this batch. This is a very
// low-level method that should be used sparingly.
//
// It is safe to modify the contents of the arguments after it returns.
PutInternalRangeKey(start, end []byte, key rangekey.Key) error
// PutInternalPointKey adds a point InternalKey to this batch. This is a very
// low-level method that should be used sparingly.
//
// It is safe to modify the contents of the arguments after it returns.
PutInternalPointKey(key *pebble.InternalKey, value []byte) error
}

// ClearOptions holds optional parameters to methods that clear keys from the
// storage engine.
type ClearOptions struct {
Expand Down Expand Up @@ -984,6 +1031,11 @@ type Engine interface {
// additionally returns ingestion stats.
IngestExternalFilesWithStats(
ctx context.Context, paths []string) (pebble.IngestOperationStats, error)
// IngestAndExciseExternalFiles is a variant of IngestExternalFilesWithStats
// that excises an ExciseSpan, and ingests either local or shared sstables or
// both.
IngestAndExciseExternalFiles(
ctx context.Context, paths []string, shared []pebble.SharedSSTMeta, exciseSpan roachpb.Span) (pebble.IngestOperationStats, error)
// PreIngestDelay offers an engine the chance to backpressure ingestions.
// When called, it may choose to block if the engine determines that it is in
// or approaching a state where further ingestions may risk its health.
Expand Down Expand Up @@ -1048,7 +1100,7 @@ type Batch interface {

// WriteBatch is the interface for write batch specific operations.
type WriteBatch interface {
Writer
InternalWriter
// Close closes the batch, freeing up any outstanding resources.
Close()
// Commit atomically applies any batched updates to the underlying engine. If
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ func BallastSize(size int64) ConfigOption {
func SharedStorage(sharedStorage cloud.ExternalStorage) ConfigOption {
return func(cfg *engineConfig) error {
cfg.SharedStorage = sharedStorage
// TODO(bilal): Do the format major version ratchet while accounting for
// version upgrade finalization. However, seeing as shared storage is
// an experimental feature and upgrading from existing stores is not
// supported, this is fine.
cfg.Opts.FormatMajorVersion = pebble.ExperimentalFormatVirtualSSTables
return nil
}
}
Expand Down
60 changes: 59 additions & 1 deletion pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,7 +1025,13 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) {
ValueBlocksEnabled.Get(&cfg.Settings.SV)
}
opts.Experimental.DisableIngestAsFlushable = func() bool {
return !IngestAsFlushable.Get(&cfg.Settings.SV)
// Disable flushable ingests if shared storage is enabled. This is because
// flushable ingests currently do not support Excise operations.
//
// TODO(bilal): Remove the first part of this || statement when
// https://github.com/cockroachdb/pebble/issues/2676 is completed, or when
// Pebble has better guards against this.
return cfg.SharedStorage != nil || !IngestAsFlushable.Get(&cfg.Settings.SV)
}

auxDir := opts.FS.PathJoin(cfg.Dir, base.AuxiliaryDir)
Expand Down Expand Up @@ -1475,6 +1481,20 @@ func (p *Pebble) NewEngineIterator(opts IterOptions) EngineIterator {
return newPebbleIterator(p.db, opts, StandardDurability, p)
}

// ScanInternal implements the Engine interface.
func (p *Pebble) ScanInternal(
ctx context.Context,
lower, upper roachpb.Key,
visitPointKey func(key *pebble.InternalKey, value pebble.LazyValue) error,
visitRangeDel func(start []byte, end []byte, seqNum uint64) error,
visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
) error {
rawLower := EngineKey{Key: lower}.Encode()
rawUpper := EngineKey{Key: upper}.Encode()
return p.db.ScanInternal(ctx, rawLower, rawUpper, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile)
}

// ConsistentIterators implements the Engine interface.
func (p *Pebble) ConsistentIterators() bool {
return false
Expand Down Expand Up @@ -2033,6 +2053,17 @@ func (p *Pebble) IngestExternalFilesWithStats(
return p.db.IngestWithStats(paths)
}

// IngestAndExciseExternalFiles implements the Engine interface.
func (p *Pebble) IngestAndExciseExternalFiles(
ctx context.Context, paths []string, shared []pebble.SharedSSTMeta, exciseSpan roachpb.Span,
) (pebble.IngestOperationStats, error) {
rawSpan := pebble.KeyRange{
Start: EngineKey{Key: exciseSpan.Key}.Encode(),
End: EngineKey{Key: exciseSpan.EndKey}.Encode(),
}
return p.db.IngestAndExcise(paths, shared, rawSpan)
}

// PreIngestDelay implements the Engine interface.
func (p *Pebble) PreIngestDelay(ctx context.Context) {
preIngestDelay(ctx, p, p.settings)
Expand Down Expand Up @@ -2444,10 +2475,23 @@ func (p *pebbleReadOnly) PinEngineStateForIterators() error {
return nil
}

// ScanInternal implements the Reader interface.
func (p *pebbleReadOnly) ScanInternal(
ctx context.Context,
lower, upper roachpb.Key,
visitPointKey func(key *pebble.InternalKey, value pebble.LazyValue) error,
visitRangeDel func(start []byte, end []byte, seqNum uint64) error,
visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
) error {
return p.parent.ScanInternal(ctx, lower, upper, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile)
}

// Writer methods are not implemented for pebbleReadOnly. Ideally, the code
// could be refactored so that a Reader could be supplied to evaluateBatch

// Writer is the write interface to an engine's data.

func (p *pebbleReadOnly) ApplyBatchRepr(repr []byte, sync bool) error {
panic("not implemented")
}
Expand Down Expand Up @@ -2621,6 +2665,20 @@ func (p *pebbleSnapshot) PinEngineStateForIterators() error {
return nil
}

// ScanInternal implements the Reader interface.
func (p *pebbleSnapshot) ScanInternal(
ctx context.Context,
lower, upper roachpb.Key,
visitPointKey func(key *pebble.InternalKey, value pebble.LazyValue) error,
visitRangeDel func(start []byte, end []byte, seqNum uint64) error,
visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
) error {
rawLower := EngineKey{Key: lower}.Encode()
rawUpper := EngineKey{Key: upper}.Encode()
return p.snapshot.ScanInternal(ctx, rawLower, rawUpper, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile)
}

// ExceedMaxSizeError is the error returned when an export request
// fails due the export size exceeding the budget. This can be caused
// by large KVs that have many revisions.
Expand Down
40 changes: 40 additions & 0 deletions pkg/storage/pebble_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/rangekey"
)

// Wrapper struct around a pebble.Batch.
Expand Down Expand Up @@ -257,6 +258,23 @@ func (p *pebbleBatch) NewEngineIterator(opts IterOptions) EngineIterator {
return iter
}

// ScanInternal implements the Reader interface.
func (p *pebbleBatch) ScanInternal(
ctx context.Context,
lower, upper roachpb.Key,
visitPointKey func(key *pebble.InternalKey, value pebble.LazyValue) error,
visitRangeDel func(start []byte, end []byte, seqNum uint64) error,
visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
) error {
panic("ScanInternal only supported on Engine and Snapshot.")
}

// ClearRawEncodedRange implements the InternalWriter interface.
func (p *pebbleBatch) ClearRawEncodedRange(start, end []byte) error {
return p.batch.DeleteRange(start, end, pebble.Sync)
}

// ConsistentIterators implements the Batch interface.
func (p *pebbleBatch) ConsistentIterators() bool {
return true
Expand Down Expand Up @@ -482,6 +500,20 @@ func (p *pebbleBatch) PutEngineRangeKey(start, end roachpb.Key, suffix, value []
EngineKey{Key: start}.Encode(), EngineKey{Key: end}.Encode(), suffix, value, nil)
}

// PutInternalRangeKey implements the InternalWriter interface.
func (p *pebbleBatch) PutInternalRangeKey(start, end []byte, key rangekey.Key) error {
switch key.Kind() {
case pebble.InternalKeyKindRangeKeyUnset:
return p.batch.RangeKeyUnset(start, end, key.Suffix, nil /* writeOptions */)
case pebble.InternalKeyKindRangeKeySet:
return p.batch.RangeKeySet(start, end, key.Suffix, key.Value, nil /* writeOptions */)
case pebble.InternalKeyKindRangeKeyDelete:
return p.batch.RangeKeyDelete(start, end, nil /* writeOptions */)
default:
panic("unexpected range key kind")
}
}

// ClearEngineRangeKey implements the Engine interface.
func (p *pebbleBatch) ClearEngineRangeKey(start, end roachpb.Key, suffix []byte) error {
return p.batch.RangeKeyUnset(
Expand Down Expand Up @@ -542,6 +574,14 @@ func (p *pebbleBatch) PutEngineKey(key EngineKey, value []byte) error {
return p.batch.Set(p.buf, value, nil)
}

// PutInternalPointKey implements the WriteBatch interface.
func (p *pebbleBatch) PutInternalPointKey(key *pebble.InternalKey, value []byte) error {
if len(key.UserKey) == 0 {
return emptyKeyError()
}
return p.batch.AddInternalKey(key, value, nil /* writeOptions */)
}

func (p *pebbleBatch) put(key MVCCKey, value []byte) error {
if len(key.Key) == 0 {
return emptyKeyError()
Expand Down
Loading

0 comments on commit c880906

Please sign in to comment.