diff --git a/db.go b/db.go index 96606f7a5..74f236ca7 100644 --- a/db.go +++ b/db.go @@ -299,6 +299,9 @@ func Open(opt Options) (db *DB, err error) { MaxCost: int64(float64(opt.MaxCacheSize) * 0.95), BufferItems: 64, Metrics: true, + OnEvict: func(_, _ uint64, value interface{}, _ int64) { + table.BlockEvictHandler(value) + }, } db.blockCache, err = ristretto.NewCache(&config) if err != nil { diff --git a/table/builder.go b/table/builder.go index 3ac1d77d5..d32d9c6aa 100644 --- a/table/builder.go +++ b/table/builder.go @@ -118,10 +118,13 @@ func NewTableBuilder(opts Options) *Builder { return b } -var slicePool = sync.Pool{ +var blockPool = &sync.Pool{ New: func() interface{} { - // Make 4 KB blocks for reuse. - b := make([]byte, 0, 4<<10) + // Create 5 Kb blocks even when the default size of blocks is 4 KB. The + // ZSTD decompresion library increases the buffer by 2X if it's not big + // enough. Using a 5 KB block instead of a 4 KB one avoids the + // unncessary 2X allocation by the decompression library. + b := make([]byte, 5<<10) return &b }, } @@ -135,9 +138,7 @@ func (b *Builder) handleBlock() { // Compress the block. if b.opt.Compression != options.None { var err error - - dst = slicePool.Get().(*[]byte) - *dst = (*dst)[:0] + dst = blockPool.Get().(*[]byte) blockBuf, err = b.compressData(*dst, blockBuf) y.Check(err) @@ -167,7 +168,7 @@ func (b *Builder) handleBlock() { item.end = item.start + uint32(len(blockBuf)) if dst != nil { - slicePool.Put(dst) + blockPool.Put(dst) } } } diff --git a/table/iterator.go b/table/iterator.go index 33a99a8f9..574c3bc55 100644 --- a/table/iterator.go +++ b/table/iterator.go @@ -33,6 +33,7 @@ type blockIterator struct { key []byte val []byte entryOffsets []uint32 + block *block // prevOverlap stores the overlap of the previous key with the base key. // This avoids unnecessary copy of base key when the overlap is same for multiple keys. @@ -40,6 +41,13 @@ type blockIterator struct { } func (itr *blockIterator) setBlock(b *block) { + // Decrement the ref for the old block. If the old block was compressed, we + // might be able to reuse it. + itr.block.decrRef() + // Increment the ref for the new block. + b.incrRef() + + itr.block = b itr.err = nil itr.idx = 0 itr.baseKey = itr.baseKey[:0] @@ -102,7 +110,9 @@ func (itr *blockIterator) Error() error { return itr.err } -func (itr *blockIterator) Close() {} +func (itr *blockIterator) Close() { + itr.block.decrRef() +} var ( origin = 0 @@ -172,6 +182,7 @@ func (t *Table) NewIterator(reversed bool) *Iterator { // Close closes the iterator (and it must be called). func (itr *Iterator) Close() error { + itr.bi.Close() return itr.t.DecrRef() } diff --git a/table/table.go b/table/table.go index 604150353..43406fa8c 100644 --- a/table/table.go +++ b/table/table.go @@ -169,15 +169,44 @@ func (t *Table) DecrRef() error { return nil } +// BlockEvictHandler is used to reuse the byte slice stored in the block on cache eviction. +func BlockEvictHandler(value interface{}) { + if b, ok := value.(*block); ok { + b.decrRef() + } +} + type block struct { offset int data []byte checksum []byte - entriesIndexStart int // start index of entryOffsets list - entryOffsets []uint32 - chkLen int // checksum length + entriesIndexStart int // start index of entryOffsets list + entryOffsets []uint32 // used to binary search an entry in the block. + chkLen int // checksum length. + isReusable bool // used to determine if the blocked should be reused. + ref int32 } +func (b *block) incrRef() { + atomic.AddInt32(&b.ref, 1) +} +func (b *block) decrRef() { + if b == nil { + return + } + + p := atomic.AddInt32(&b.ref, -1) + // Insert the []byte into pool only if the block is resuable. When a block + // is reusable a new []byte is used for decompression and this []byte can + // be reused. + // In case of an uncompressed block, the []byte is a reference to the + // table.mmap []byte slice. Any attempt to write data to the mmap []byte + // will lead to SEGFAULT. + if p == 0 && b.isReusable { + blockPool.Put(&b.data) + } + y.AssertTrue(p >= 0) +} func (b *block) size() int64 { return int64(3*intSize /* Size of the offset, entriesIndexStart and chkLen */ + cap(b.data) + cap(b.checksum) + cap(b.entryOffsets)*4) @@ -419,8 +448,7 @@ func (t *Table) block(idx int) (*block, error) { } } - blk.data, err = t.decompressData(blk.data) - if err != nil { + if err = t.decompress(blk); err != nil { return nil, errors.Wrapf(err, "failed to decode compressed data in file: %s at offset: %d, len: %d", t.fd.Name(), blk.offset, ko.Len) @@ -462,6 +490,7 @@ func (t *Table) block(idx int) (*block, error) { } if t.opt.Cache != nil { key := t.blockCacheKey(idx) + blk.incrRef() t.opt.Cache.Set(key, blk, blk.size()) } return blk, nil @@ -563,7 +592,8 @@ func (t *Table) VerifyChecksum() error { return y.Wrapf(err, "checksum validation failed for table: %s, block: %d, offset:%d", t.Filename(), i, os.Offset) } - + b.incrRef() + defer b.decrRef() // OnBlockRead or OnTableAndBlockRead, we don't need to call verify checksum // on block, verification would be done while reading block itself. if !(t.opt.ChkMode == options.OnBlockRead || t.opt.ChkMode == options.OnTableAndBlockRead) { @@ -629,15 +659,28 @@ func NewFilename(id uint64, dir string) string { return filepath.Join(dir, IDToFilename(id)) } -// decompressData decompresses the given data. -func (t *Table) decompressData(data []byte) ([]byte, error) { +// decompress decompresses the data stored in a block. +func (t *Table) decompress(b *block) error { + var err error switch t.opt.Compression { case options.None: - return data, nil + // Nothing to be done here. case options.Snappy: - return snappy.Decode(nil, data) + dst := blockPool.Get().(*[]byte) + b.data, err = snappy.Decode(*dst, b.data) + if err != nil { + return errors.Wrap(err, "failed to decompress") + } + b.isReusable = true case options.ZSTD: - return y.ZSTDDecompress(nil, data) + dst := blockPool.Get().(*[]byte) + b.data, err = y.ZSTDDecompress(*dst, b.data) + if err != nil { + return errors.Wrap(err, "failed to decompress") + } + b.isReusable = true + default: + return errors.New("Unsupported compression type") } - return nil, errors.New("Unsupported compression type") + return nil }