Skip to content

Commit

Permalink
sstable: adjust sizeEstimate logic for forthcoming value blocks
Browse files Browse the repository at this point in the history
sizeEstimate is used for two purposes: estimating the total compressed
size of the data blocks, and estimating the size of the index block.
The commentary around sizeEstimate was incomplete in terms of how
these use cases behave, which is now clarified (and the "written",
"inflight" and "compressed" terms clarified, since they map to
different concepts for the index block case).

Additionally, the size estimation of the total compressed data
blocks can count a block as "written" once compression is complete,
and does not need to wait until the write. This simplification is
made now, so there is no need for the mutex to update sizeEstimate
in the writeQueue. The mutex path is kept to allow for easy merging
of the pending code for parallel compression. The interface is
also simplified to not separately pass a finalEntrySize when we
know the total size. This will fix the peculiarity with compression
ratio 1 for the index block case -- the estimaton there has nothing
to do with compression and has to do with the fact that we don't
know the key-value size upfront, and we should be properly using
the ratio in that case. The interface now has two methods
writtenWithTotal and writtenWithDelta, and the caller can use
whichever one is more convenient. The writtenWithTotal is more
convenient for the index block writing case.

This cleanup is worthwhile in itself, and it also sets us up for
the value blocks change where the compressed value blocks will
be held in-memory until late in the sstable construction (since
the value blocks are written after all the index blocks). We
do want to accurately account for their compressed size. That
code path will use the writtenWithDelta similar to how we use
it now after compressing a data block.
  • Loading branch information
sumeerbhola committed Oct 20, 2022
1 parent 0090519 commit 5ed983e
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 80 deletions.
47 changes: 29 additions & 18 deletions sstable/testdata/size_estimate
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,16 @@ num_entries
----
1

# After compression, entry only had a size of 3. The total size is the 3, but the
# max estimated size yet is 4.
entry_written 3 4 3
# Compression ratio defaults to 1, so the size of the inflight entry fully
# counts towards size.
size
----
4

# After compression, entry only had a size of 3. The total size is therefore
# 3, since this is the first entry. The max estimated size is 4 since we
# ensure that it is monotonically non decreasing.
entry_written 3 4
----
4

Expand All @@ -38,7 +45,7 @@ num_inflight_entries
0

# Compression ratio is 0.75 at this point. The total size is 3, and the inflight
# size is 4, so that returned size is uint64(6.75).
# size is 5, so that returned size is uint64(3 + 0.75*5) = uint64(6.75).
add_inflight 5
----
6
Expand Down Expand Up @@ -71,7 +78,7 @@ num_inflight_entries

# First inflight entry written. The entry didn't get compressed. The total size
# now is less than 9, but the max estimated size should still be 9.
entry_written 4 4 4
entry_written 4 4
----
9

Expand All @@ -94,25 +101,25 @@ num_written_entries
----
1

# The inflight entry had a size of 5, but the entry added had a size of 3 because
# of compression/size estimation. The compression ratio is 0.77 at this point.
# The inflightSize is 8. The true size is 13.16, but the maxEstimatedSize is
# returned.
entry_written 7 5 3
# The inflight entry had a size of 5, but the entry added had a size of 3
# because of compression/size estimation. The compression ratio is (4+3)/(4+5)
# = 0.77 at this point. The inflightSize is 8. The true size is 7+8*0.77 =
# 13.22, but the maxEstimatedSize is returned.
entry_written 7 5
----
17

# The inflight size is 0, and the total size is 11.
entry_written 11 8 4
entry_written 11 8
----
17

num_written_entries
----
3

# The compression ratio is 0.64, and the inflight size is 20, 20*0.64 = 12.8,
# so the total size is uint64(12.8 + 11)
# The compression ratio is (4+3+4)/(4+5+8)=0.647, and the inflight size is 20,
# 20*0.64 = 12.94, so the total size is uint64(12.94 + 11)
add_inflight 20
----
23
Expand All @@ -121,13 +128,17 @@ num_inflight_entries
----
1

# We can write an entry, but it might not have an inflightSize, because it
# was never inflight. In such a case, the numInflightEntries, shouldn't be
# decreased.
entry_written 11 0 4
# We can write an entry, which increases the written size from 11 to 19, but
# it might not have an inflightSize, because it was never inflight. In such a
# case, the numInflightEntries, shouldn't be decreased.
entry_written 19 0
----
28
31

num_inflight_entries
----
1

num_written_entries
----
4
7 changes: 0 additions & 7 deletions sstable/write_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ type writeTask struct {
// currIndexBlock is the index block on which indexBlock.add must be called.
currIndexBlock *indexBlockBuf
indexEntrySep InternalKey
// inflightSize is used to decrement Writer.coordination.sizeEstimate.inflightSize.
inflightSize int
// inflightIndexEntrySize is used to decrement Writer.indexBlock.sizeEstimate.inflightSize.
indexInflightSize int
// If the index block is finished, then we set the finishedIndexProps here.
Expand Down Expand Up @@ -69,11 +67,6 @@ func (w *writeQueue) performWrite(task *writeTask) error {
return err
}

// Update the size estimates after writing the data block to disk.
w.writer.coordination.sizeEstimate.dataBlockWritten(
w.writer.meta.Size, task.inflightSize, int(bh.Length),
)

bhp = BlockHandleWithProperties{BlockHandle: bh, Props: task.buf.dataBlockProps}
if err = w.writer.addIndexEntry(
task.indexEntrySep, bhp, task.buf.tmp[:], task.flushableIndexBlock, task.currIndexBlock,
Expand Down
120 changes: 76 additions & 44 deletions sstable/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ type coordinationState struct {

func (c *coordinationState) init(parallelismEnabled bool, writer *Writer) {
c.parallelismEnabled = parallelismEnabled
c.sizeEstimate.useMutex = parallelismEnabled
// useMutex is false regardless of parallelismEnabled, because we do not do
// parallel compression yet.
c.sizeEstimate.useMutex = false

// writeQueueSize determines the size of the write queue, or the number
// of items which can be added to the queue without blocking. By default, we
Expand All @@ -234,6 +236,47 @@ func (c *coordinationState) init(parallelismEnabled bool, writer *Writer) {
c.writeQueue = newWriteQueue(writeQueueSize, writer)
}

// sizeEstimate is a general purpose helper for estimating two kinds of sizes:
// A. The compressed sstable size, which is useful for deciding when to start
// a new sstable during flushes or compactions. In practice, we use this in
// estimating the data size (excluding the index).
// B. The size of index blocks to decide when to start a new index block.
//
// There are some terminology peculiarities which are due to the origin of
// sizeEstimate for use case A with parallel compression enabled (for which
// the code has not been merged). Specifically this relates to the terms
// "written" and "compressed".
// - The notion of "written" for case A is sufficiently defined by saying that
// the data block is compressed. Waiting for the actual data block write to
// happen can result in unnecessary estimation, when we already know how big
// it will be in compressed form. Additionally, with the forthcoming value
// blocks containing older MVCC values, these compressed block will be held
// in-memory until late in the sstable writing, and we do want to accurately
// account for them without waiting for the actual write.
// For case B, "written" means that the index entry has been fully
// generated, and has been added to the uncompressed block buffer for that
// index block. It does not include actually writing a potentially
// compressed index block.
// - The notion of "compressed" is to differentiate between a "inflight" size
// and the actual size, and is handled via computing a compression ratio
// observed so far (defaults to 1).
// For case A, this is actual data block compression, so the "inflight" size
// is uncompressed blocks (that are no longer being written to) and the
// "compressed" size is after they have been compressed.
// For case B the inflight size is for a key-value pair in the index for
// which the value size (the encoded size of the BlockHandleWithProperties)
// is not accurately known, while the compressed size is the size of that
// entry when it has been added to the (in-progress) index ssblock.
//
// Usage: To update state, one can optionally provide an inflight write value
// using addInflight (used for case B). When something is "written" the state
// can be updated using either writtenWithDelta or writtenWithTotal, which
// provide the actual delta size or the total size (latter must be
// monotonically non-decreasing). If there were no calls to addInflight, there
// isn't any real estimation happening here. So case A does not do any real
// estimation. However, when we introduce parallel compression, there will be
// estimation in that the client goroutine will call addInFlight and the
// compression goroutines will call writtenWithDelta.
type sizeEstimate struct {
// emptySize is the size when there is no inflight data, and numEntries is 0.
// emptySize is constant once set.
Expand Down Expand Up @@ -300,18 +343,22 @@ func (s *sizeEstimate) addInflight(size int) {
s.inflightSize += uint64(size)
}

func (s *sizeEstimate) written(newTotalSize uint64, inflightSize int, finalEntrySize int) {
s.inflightSize -= uint64(inflightSize)
func (s *sizeEstimate) writtenWithTotal(newTotalSize uint64, inflightSize int) {
finalEntrySize := int(newTotalSize - s.totalSize)
s.writtenWithDelta(finalEntrySize, inflightSize)
}

func (s *sizeEstimate) writtenWithDelta(finalEntrySize int, inflightSize int) {
if inflightSize > 0 {
// This entry was previously inflight, so we should decrement inflight
// entries.
// entries and update the "compression" stats for future estimation.
s.numInflightEntries--
s.inflightSize -= uint64(inflightSize)
s.uncompressedSize += uint64(inflightSize)
s.compressedSize += uint64(finalEntrySize)
}
s.numWrittenEntries++
s.totalSize = newTotalSize

s.uncompressedSize += uint64(inflightSize)
s.compressedSize += uint64(finalEntrySize)
s.totalSize += uint64(finalEntrySize)
}

func (s *sizeEstimate) clear() {
Expand Down Expand Up @@ -381,10 +428,7 @@ func (i *indexBlockBuf) add(key InternalKey, value []byte, inflightSize int) {
i.size.mu.Lock()
defer i.size.mu.Unlock()
}
// Since, we're not compressing index entries when adding them to index blocks,
// we assume that the size of entry written to the index block is equal to the
// size of the inflight entry, giving us a compression ratio of 1.
i.size.estimate.written(uint64(size), inflightSize, inflightSize)
i.size.estimate.writtenWithTotal(uint64(size), inflightSize)
}

func (i *indexBlockBuf) finish() []byte {
Expand Down Expand Up @@ -423,30 +467,29 @@ func (i *indexBlockBuf) estimatedSize() uint64 {
return i.size.estimate.size()
}

// sizeEstimate is used for sstable size estimation. sizeEstimate can be accessed by
// the Writer client, writeQueue, compressionQueue goroutines. Fields should only be
// read/updated through the functions defined on the *sizeEstimate type.
// sizeEstimate is used for sstable size estimation. sizeEstimate can be
// accessed by the Writer client and compressionQueue goroutines. Fields
// should only be read/updated through the functions defined on the
// *sizeEstimate type.
type dataBlockEstimates struct {
// If we don't do block compression, block writes in parallel, then we don't need to take
// If we don't do block compression in parallel, then we don't need to take
// the performance hit of synchronizing using this mutex.
useMutex bool
mu sync.Mutex

estimate sizeEstimate
}

// newTotalSize is the new w.meta.Size. inflightSize is the uncompressed block size estimate which
// was previously added to sizeEstimate.inflightSize. writtenSize is the compressed size of the block
// which was written to disk.
func (d *dataBlockEstimates) dataBlockWritten(
newTotalSize uint64, inflightSize int, writtenSize int,
) {
// inflightSize is the uncompressed block size estimate which has been
// previously provided to addInflightDataBlock(). If addInflightDataBlock()
// has not been called, this must be set to 0. compressedSize is the
// compressed size of the block.
func (d *dataBlockEstimates) dataBlockCompressed(compressedSize int, inflightSize int) {
if d.useMutex {
d.mu.Lock()
defer d.mu.Unlock()
}

d.estimate.written(newTotalSize, inflightSize, writtenSize)
d.estimate.writtenWithDelta(compressedSize+blockTrailerLen, inflightSize)
}

// size is an estimated size of datablock data which has been written to disk.
Expand All @@ -455,18 +498,19 @@ func (d *dataBlockEstimates) size() uint64 {
d.mu.Lock()
defer d.mu.Unlock()
}

// Use invariants to make sure that the size estimation works as expected
// when parallelism is disabled.
// If there is no parallel compression, there should not be any inflight bytes.
if invariants.Enabled && !d.useMutex {
if d.estimate.inflightSize != 0 {
panic("unexpected inflight entry in data block size estimation")
}
}

return d.estimate.size()
}

// Avoid linter unused error.
var _ = (&dataBlockEstimates{}).addInflightDataBlock

// NB: unused since no parallel compression.
func (d *dataBlockEstimates) addInflightDataBlock(size int) {
if d.useMutex {
d.mu.Lock()
Expand Down Expand Up @@ -1082,19 +1126,16 @@ func (w *Writer) maybeAddToFilter(key []byte) {
}

func (w *Writer) flush(key InternalKey) error {
estimatedUncompressedSize := w.dataBlockBuf.dataBlock.estimatedSize()
w.coordination.sizeEstimate.addInflightDataBlock(estimatedUncompressedSize)

var err error

// We're finishing a data block.
err = w.finishDataBlockProps(w.dataBlockBuf)
err := w.finishDataBlockProps(w.dataBlockBuf)
if err != nil {
return err
}

w.dataBlockBuf.finish()
w.dataBlockBuf.compressAndChecksum(w.compression)
// Since dataBlockEstimates.addInflightDataBlock was never called, the
// inflightSize is set to 0.
w.coordination.sizeEstimate.dataBlockCompressed(len(w.dataBlockBuf.compressed), 0)

// Determine if the index block should be flushed. Since we're accessing the
// dataBlockBuf.dataBlock.curKey here, we have to make sure that once we start
Expand Down Expand Up @@ -1141,7 +1182,6 @@ func (w *Writer) flush(key InternalKey) error {
writeTask.compressionDone <- true
writeTask.buf = w.dataBlockBuf
writeTask.indexEntrySep = sep
writeTask.inflightSize = estimatedUncompressedSize
writeTask.currIndexBlock = w.indexBlock
writeTask.indexInflightSize = sep.Size() + encodedBHPEstimatedSize
writeTask.finishedIndexProps = indexProps
Expand Down Expand Up @@ -1815,14 +1855,6 @@ func (w *Writer) Close() (err error) {
// EstimatedSize returns the estimated size of the sstable being written if a
// call to Finish() was made without adding additional keys.
func (w *Writer) EstimatedSize() uint64 {
if invariants.Enabled && !w.coordination.parallelismEnabled {
// The w.meta.Size should only be accessed from the writeQueue goroutine
// if parallelism is enabled, but since it isn't we break that invariant
// here.
if w.coordination.sizeEstimate.size() != w.meta.Size {
panic("sstable size estimation sans parallelism is incorrect")
}
}
return w.coordination.sizeEstimate.size() +
uint64(w.dataBlockBuf.dataBlock.estimatedSize()) +
w.indexBlock.estimatedSize()
Expand Down
16 changes: 5 additions & 11 deletions sstable/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func TestClearIndexBlockBuf(t *testing.T) {

testBlockCleared(t, &i.block, &blockWriter{})
require.Equal(
t, i.size.estimate, sizeEstimate{emptySize: i.size.estimate.emptySize},
t, i.size.estimate, sizeEstimate{emptySize: emptyBlockSize},
)
indexBlockBufPool.Put(i)
}
Expand All @@ -239,7 +239,6 @@ func TestClearWriteTask(t *testing.T) {
w.flushableIndexBlock = &indexBlockBuf{}
w.currIndexBlock = &indexBlockBuf{}
w.indexEntrySep = ikey("apple")
w.inflightSize = 1
w.indexInflightSize = 1
w.finishedIndexProps = []byte{'a', 'v'}

Expand All @@ -253,7 +252,6 @@ func TestClearWriteTask(t *testing.T) {
require.Equal(t, w.flushableIndexBlock, nilIndexBlockBuf)
require.Equal(t, w.currIndexBlock, nilIndexBlockBuf)
require.Equal(t, w.indexEntrySep, base.InvalidInternalKey)
require.Equal(t, w.inflightSize, 0)
require.Equal(t, w.indexInflightSize, 0)
require.Equal(t, w.finishedIndexProps, []byte(nil))

Expand Down Expand Up @@ -326,22 +324,18 @@ func TestSizeEstimate(t *testing.T) {
sizeEstimate.addInflight(inflightSize)
return fmt.Sprintf("%d", sizeEstimate.size())
case "entry_written":
if len(td.CmdArgs) != 3 {
return "entry_written <new_size> <prev_inflight_size> <entry_size>"
if len(td.CmdArgs) != 2 {
return "entry_written <new_total_size> <prev_inflight_size>"
}
newSize, err := strconv.Atoi(td.CmdArgs[0].String())
newTotalSize, err := strconv.Atoi(td.CmdArgs[0].String())
if err != nil {
return "invalid inflight size"
}
inflightSize, err := strconv.Atoi(td.CmdArgs[1].String())
if err != nil {
return "invalid inflight size"
}
entrySize, err := strconv.Atoi(td.CmdArgs[2].String())
if err != nil {
return "invalid inflight size"
}
sizeEstimate.written(uint64(newSize), inflightSize, entrySize)
sizeEstimate.writtenWithTotal(uint64(newTotalSize), inflightSize)
return fmt.Sprintf("%d", sizeEstimate.size())
case "num_written_entries":
return fmt.Sprintf("%d", sizeEstimate.numWrittenEntries)
Expand Down

0 comments on commit 5ed983e

Please sign in to comment.