Skip to content

Commit

Permalink
[WIP] db: add support for user-defined sstable partitioning
Browse files Browse the repository at this point in the history
Add `Options.TablePartitioner` hook which allows the user to specify a
required partition between 2 user-keys. `TablePartitioner` is called
during flush and compaction before outputting a new key to an sstable
which already contains at least 1 key.

TODO

One complication from table partitioning is that it can create L0 tables
which overlap in seqnum space. In order to support partitioned L0, we'd
have to relax the invariant checks in `manifest.CheckOrdering`. Doing so
will make Pebble incompatible with RocksDB.

In order for partitioning to not naively increase read amplification,
we'll want provide some sort of partitioned view of the sstables in
`Version.Files[0]`. `mergingIter` will then need to be made aware of the
partitioning.

We may want to adjust the compaction picking heuristics to not expand
compaction inputs across the partition boundary.

See #517
  • Loading branch information
petermattis committed Feb 26, 2020
1 parent ec5762e commit 3331106
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 25 deletions.
22 changes: 16 additions & 6 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1497,6 +1497,22 @@ func (d *DB) runCompaction(
limit = nil
}

if tw != nil {
if d.opts.TablePartitioner != nil {
lastUserKey := tw.LastUserKey()
if d.opts.TablePartitioner(lastUserKey, key.UserKey) {
limit = key.UserKey
break
}
}
if tw.EstimatedSize() >= c.maxOutputFileSize {
// Use the next key as the sstable boundary. Note that we already
// checked this key against the grandparent limit above.
limit = key.UserKey
break
}
}

atomic.StoreUint64(c.atomicBytesIterated, c.bytesIterated)
if err := pacer.maybeThrottle(c.bytesIterated); err != nil {
return nil, pendingOutputs, err
Expand All @@ -1509,12 +1525,6 @@ func (d *DB) runCompaction(
c.rangeDelFrag.Add(iter.cloneKey(*key), val)
continue
}
if tw != nil && tw.EstimatedSize() >= c.maxOutputFileSize {
// Use the next key as the sstable boundary. Note that we already
// checked this key against the grandparent limit above.
limit = key.UserKey
break
}
if tw == nil {
if err := newOutput(); err != nil {
return nil, pendingOutputs, err
Expand Down
39 changes: 21 additions & 18 deletions internal/manifest/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func CheckOrdering(cmp Compare, format base.Formatter, level int, files []*FileM
// of 0 in RocksDB. We handle this case for compatibility with RocksDB.

// The largest sequence number of a flushed file. Increasing.
var largestFlushedSeqNum uint64
// var largestFlushedSeqNum uint64

// The largest sequence number of any file. Increasing.
var largestSeqNum uint64
Expand Down Expand Up @@ -509,23 +509,26 @@ func CheckOrdering(cmp Compare, format base.Formatter, level int, files []*FileM
// Ingested file.
uncheckedIngestedSeqNums = append(uncheckedIngestedSeqNums, f.LargestSeqNum)
} else {
// Flushed file.
// Two flushed files cannot overlap.
if largestFlushedSeqNum > 0 && f.SmallestSeqNum <= largestFlushedSeqNum {
return fmt.Errorf("L0 flushed file %06d overlaps with the largest seqnum of a "+
"preceding flushed file: %d-%d vs %d", f.FileNum, f.SmallestSeqNum, f.LargestSeqNum,
largestFlushedSeqNum)
}
largestFlushedSeqNum = f.LargestSeqNum
// Check that unchecked ingested sequence numbers are not coincident with f.SmallestSeqNum.
// We do not need to check that they are not coincident with f.LargestSeqNum because we
// have already confirmed that LargestSeqNums were increasing.
for _, seq := range uncheckedIngestedSeqNums {
if seq == f.SmallestSeqNum {
return fmt.Errorf("L0 flushed file %06d has an ingested file coincident with "+
"smallest seqnum: %d-%d", f.FileNum, f.SmallestSeqNum, f.LargestSeqNum)
}
}
// TODO(peter): with user-defined table partitioning, a flush can
// create 2 sstables with overlaps in the seqnum space.

// // Flushed file.
// // Two flushed files cannot overlap.
// if largestFlushedSeqNum > 0 && f.SmallestSeqNum <= largestFlushedSeqNum {
// return fmt.Errorf("L0 flushed file %06d overlaps with the largest seqnum of a "+
// "preceding flushed file: %d-%d vs %d", f.FileNum, f.SmallestSeqNum, f.LargestSeqNum,
// largestFlushedSeqNum)
// }
// largestFlushedSeqNum = f.LargestSeqNum
// // Check that unchecked ingested sequence numbers are not coincident with f.SmallestSeqNum.
// // We do not need to check that they are not coincident with f.LargestSeqNum because we
// // have already confirmed that LargestSeqNums were increasing.
// for _, seq := range uncheckedIngestedSeqNums {
// if seq == f.SmallestSeqNum {
// return fmt.Errorf("L0 flushed file %06d has an ingested file coincident with "+
// "smallest seqnum: %d-%d", f.FileNum, f.SmallestSeqNum, f.LargestSeqNum)
// }
// }
uncheckedIngestedSeqNums = uncheckedIngestedSeqNums[:0]
}
}
Expand Down
12 changes: 12 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ const (
TableFormatLevelDB = sstable.TableFormatLevelDB
)

// TablePartitioner provides a hook for partitioning sstables at the boundary
// between two keys during flushing and compaction. The partitioner is provided
// the last key added to the sstable and the current key being considered for
// addition. A return value of true will cause output to the current sstable to
// be completed and a new sstable to be created to hold curKey and subsequent
// keys.
type TablePartitioner func(lastKey, curKey []byte) bool

// TablePropertyCollector exports the base.TablePropertyCollector type.
type TablePropertyCollector = sstable.TablePropertyCollector

Expand Down Expand Up @@ -371,6 +379,10 @@ type Options struct {
// by a wider range of tools and libraries.
TableFormat TableFormat

// TablePartitioner is a hook to allow user control of required partitions
// between sstables.
TablePartitioner TablePartitioner

// TablePropertyCollectors is a list of TablePropertyCollector creation
// functions. A new TablePropertyCollector is created for each sstable built
// and lives for the lifetime of the table.
Expand Down
9 changes: 8 additions & 1 deletion sstable/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,13 @@ func (w *Writer) Metadata() (*WriterMetadata, error) {
return &w.meta, nil
}

// LastUserKey returns the last point user key added to the table, or nil if no
// point record has been added to the table.
func (w *Writer) LastUserKey() []byte {
prevKey := base.DecodeInternalKey(w.block.curKey)
return prevKey.UserKey
}

// WriterOption provide an interface to do work on Writer while it is being
// opened.
type WriterOption interface {
Expand All @@ -689,7 +696,7 @@ type WriterOption interface {
// internalTableOpt is a WriterOption that sets properties for sstables being
// created by the db itself (i.e. through flushes and compactions), as opposed
// to those meant for ingestion.
type internalTableOpt struct {}
type internalTableOpt struct{}

func (i internalTableOpt) writerApply(w *Writer) {
// Set the external sst version to 0. This is what RocksDB expects for
Expand Down

0 comments on commit 3331106

Please sign in to comment.