Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffer pool for decompression #1308

Merged
merged 21 commits into from
May 13, 2020
3 changes: 3 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,9 @@ func Open(opt Options) (db *DB, err error) {
MaxCost: int64(float64(opt.MaxCacheSize) * 0.95),
BufferItems: 64,
Metrics: true,
OnEvict: func(_, _ uint64, value interface{}, _ int64) {
table.BlockEvictHandler(value)
},
}
db.blockCache, err = ristretto.NewCache(&config)
if err != nil {
Expand Down
15 changes: 8 additions & 7 deletions table/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,13 @@ func NewTableBuilder(opts Options) *Builder {
return b
}

var slicePool = sync.Pool{
var blockPool = &sync.Pool{
New: func() interface{} {
// Make 4 KB blocks for reuse.
b := make([]byte, 0, 4<<10)
// Create 5 Kb blocks even when the default size of blocks is 4 KB. The
// ZSTD decompresion library increases the buffer by 2X if it's not big
// enough. Using a 5 KB block instead of a 4 KB one avoids the
// unncessary 2X allocation by the decompression library.
b := make([]byte, 5<<10)
return &b
},
}
Expand All @@ -135,9 +138,7 @@ func (b *Builder) handleBlock() {
// Compress the block.
if b.opt.Compression != options.None {
var err error

dst = slicePool.Get().(*[]byte)
*dst = (*dst)[:0]
dst = blockPool.Get().(*[]byte)

blockBuf, err = b.compressData(*dst, blockBuf)
y.Check(err)
Expand Down Expand Up @@ -167,7 +168,7 @@ func (b *Builder) handleBlock() {
item.end = item.start + uint32(len(blockBuf))

if dst != nil {
slicePool.Put(dst)
blockPool.Put(dst)
}
}
}
Expand Down
13 changes: 12 additions & 1 deletion table/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,21 @@ type blockIterator struct {
key []byte
val []byte
entryOffsets []uint32
block *block

// prevOverlap stores the overlap of the previous key with the base key.
// This avoids unnecessary copy of base key when the overlap is same for multiple keys.
prevOverlap uint16
}

func (itr *blockIterator) setBlock(b *block) {
// Decrement the ref for the old block. If the old block was compressed, we
// might be able to reuse it.
itr.block.decrRef()
// Increment the ref for the new block.
b.incrRef()

itr.block = b
itr.err = nil
itr.idx = 0
itr.baseKey = itr.baseKey[:0]
Expand Down Expand Up @@ -102,7 +110,9 @@ func (itr *blockIterator) Error() error {
return itr.err
}

func (itr *blockIterator) Close() {}
func (itr *blockIterator) Close() {
itr.block.decrRef()
}

var (
origin = 0
Expand Down Expand Up @@ -172,6 +182,7 @@ func (t *Table) NewIterator(reversed bool) *Iterator {

// Close closes the iterator (and it must be called).
func (itr *Iterator) Close() error {
itr.bi.Close()
return itr.t.DecrRef()
}

Expand Down
67 changes: 55 additions & 12 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,44 @@ func (t *Table) DecrRef() error {
return nil
}

// BlockEvictHandler is used to reuse the byte slice stored in the block on cache eviction.
func BlockEvictHandler(value interface{}) {
if b, ok := value.(*block); ok {
b.decrRef()
}
}

type block struct {
offset int
data []byte
checksum []byte
entriesIndexStart int // start index of entryOffsets list
entryOffsets []uint32
chkLen int // checksum length
entriesIndexStart int // start index of entryOffsets list
entryOffsets []uint32 // used to binary search an entry in the block.
chkLen int // checksum length.
isReusable bool // used to determine if the blocked should be reused.
ref int32
}

func (b *block) incrRef() {
atomic.AddInt32(&b.ref, 1)
}
func (b *block) decrRef() {
if b == nil {
return
}

p := atomic.AddInt32(&b.ref, -1)
// Insert the []byte into pool only if the block is resuable. When a block
// is reusable a new []byte is used for decompression and this []byte can
// be reused.
// In case of an uncompressed block, the []byte is a reference to the
// table.mmap []byte slice. Any attempt to write data to the mmap []byte
// will lead to SEGFAULT.
if p == 0 && b.isReusable {
blockPool.Put(&b.data)
}
y.AssertTrue(p >= 0)
}
func (b *block) size() int64 {
return int64(3*intSize /* Size of the offset, entriesIndexStart and chkLen */ +
cap(b.data) + cap(b.checksum) + cap(b.entryOffsets)*4)
Expand Down Expand Up @@ -419,8 +448,7 @@ func (t *Table) block(idx int) (*block, error) {
}
}

blk.data, err = t.decompressData(blk.data)
if err != nil {
if err = t.decompress(blk); err != nil {
return nil, errors.Wrapf(err,
"failed to decode compressed data in file: %s at offset: %d, len: %d",
t.fd.Name(), blk.offset, ko.Len)
Expand Down Expand Up @@ -462,6 +490,7 @@ func (t *Table) block(idx int) (*block, error) {
}
if t.opt.Cache != nil {
key := t.blockCacheKey(idx)
blk.incrRef()
t.opt.Cache.Set(key, blk, blk.size())
}
return blk, nil
Expand Down Expand Up @@ -563,7 +592,8 @@ func (t *Table) VerifyChecksum() error {
return y.Wrapf(err, "checksum validation failed for table: %s, block: %d, offset:%d",
t.Filename(), i, os.Offset)
}

b.incrRef()
defer b.decrRef()
// OnBlockRead or OnTableAndBlockRead, we don't need to call verify checksum
// on block, verification would be done while reading block itself.
if !(t.opt.ChkMode == options.OnBlockRead || t.opt.ChkMode == options.OnTableAndBlockRead) {
Expand Down Expand Up @@ -629,15 +659,28 @@ func NewFilename(id uint64, dir string) string {
return filepath.Join(dir, IDToFilename(id))
}

// decompressData decompresses the given data.
func (t *Table) decompressData(data []byte) ([]byte, error) {
// decompress decompresses the data stored in a block.
func (t *Table) decompress(b *block) error {
var err error
switch t.opt.Compression {
case options.None:
return data, nil
// Nothing to be done here.
case options.Snappy:
return snappy.Decode(nil, data)
dst := blockPool.Get().(*[]byte)
b.data, err = snappy.Decode(*dst, b.data)
if err != nil {
return errors.Wrap(err, "failed to decompress")
}
b.isReusable = true
case options.ZSTD:
return y.ZSTDDecompress(nil, data)
dst := blockPool.Get().(*[]byte)
b.data, err = y.ZSTDDecompress(*dst, b.data)
if err != nil {
return errors.Wrap(err, "failed to decompress")
}
b.isReusable = true
default:
return errors.New("Unsupported compression type")
}
return nil, errors.New("Unsupported compression type")
return nil
}