Skip to content

Commit

Permalink
Merge pull request #367 from kcalvinalvin/syncpool-bufferpool
Browse files Browse the repository at this point in the history
util/buffer_pool, db: Reduce memory allocation
  • Loading branch information
syndtr authored Aug 19, 2021
2 parents 64b5b1c + 71b98dd commit 079c29c
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 180 deletions.
2 changes: 1 addition & 1 deletion leveldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func recoverTable(s *session, o *opt.Options) error {
}()

// Copy entries.
tw := table.NewWriter(writer, o)
tw := table.NewWriter(writer, o, nil, 0)
for iter.Next() {
key := iter.Key()
if validInternalKey(key) {
Expand Down
2 changes: 1 addition & 1 deletion leveldb/db_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func (b *tableCompactionBuilder) appendKV(key, value []byte) error {

// Create new table.
var err error
b.tw, err = b.s.tops.create()
b.tw, err = b.s.tops.create(b.tableSize)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion leveldb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2539,7 +2539,7 @@ func TestDB_TableCompactionBuilder(t *testing.T) {
value = bytes.Repeat([]byte{'0'}, 100)
)
for i := 0; i < 2; i++ {
tw, err := s.tops.create()
tw, err := s.tops.create(0)
if err != nil {
t.Fatal(err)
}
Expand Down
7 changes: 3 additions & 4 deletions leveldb/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ type tOps struct {
}

// Creates an empty table and returns table writer.
func (t *tOps) create() (*tWriter, error) {
func (t *tOps) create(tSize int) (*tWriter, error) {
fd := storage.FileDesc{Type: storage.TypeTable, Num: t.s.allocFileNum()}
fw, err := t.s.stor.Create(fd)
if err != nil {
Expand All @@ -376,13 +376,13 @@ func (t *tOps) create() (*tWriter, error) {
t: t,
fd: fd,
w: fw,
tw: table.NewWriter(fw, t.s.o.Options),
tw: table.NewWriter(fw, t.s.o.Options, t.bpool, tSize),
}, nil
}

// Builds table from src iterator.
func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) {
w, err := t.create()
w, err := t.create(0)
if err != nil {
return
}
Expand Down Expand Up @@ -501,7 +501,6 @@ func (t *tOps) remove(fd storage.FileDesc) {
// Closes the table ops instance. It will close all tables,
// regadless still used or not.
func (t *tOps) close() {
t.bpool.Close()
t.cache.Close()
if t.bcache != nil {
t.bcache.CloseWeak()
Expand Down
4 changes: 2 additions & 2 deletions leveldb/table/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var _ = testutil.Defer(func() {
)

// Building the table.
tw := NewWriter(buf, o)
tw := NewWriter(buf, o, nil, 0)
tw.Append([]byte("k01"), []byte("hello"))
tw.Append([]byte("k02"), []byte("hello2"))
tw.Append([]byte("k03"), bytes.Repeat([]byte{'x'}, 10000))
Expand Down Expand Up @@ -90,7 +90,7 @@ var _ = testutil.Defer(func() {
buf := &bytes.Buffer{}

// Building the table.
tw := NewWriter(buf, o)
tw := NewWriter(buf, o, nil, 0)
kv.Iterate(func(i int, key, value []byte) {
tw.Append(key, value)
})
Expand Down
23 changes: 22 additions & 1 deletion leveldb/table/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ type Writer struct {
compression opt.Compression
blockSize int

bpool *util.BufferPool
dataBlock blockWriter
indexBlock blockWriter
filterBlock filterWriter
Expand Down Expand Up @@ -285,6 +286,16 @@ func (w *Writer) BytesLen() int {
// after Close, but calling BlocksLen, EntriesLen and BytesLen
// is still possible.
func (w *Writer) Close() error {
defer func() {
if w.bpool != nil {
// Buffer.Bytes() returns [offset:] of the buffer.
// We need to Reset() so that the offset = 0, resulting
// in buf.Bytes() returning the whole allocated bytes.
w.dataBlock.buf.Reset()
w.bpool.Put(w.dataBlock.buf.Bytes())
}
}()

if w.err != nil {
return w.err
}
Expand Down Expand Up @@ -351,14 +362,24 @@ func (w *Writer) Close() error {
// NewWriter creates a new initialized table writer for the file.
//
// Table writer is not safe for concurrent use.
func NewWriter(f io.Writer, o *opt.Options) *Writer {
func NewWriter(f io.Writer, o *opt.Options, pool *util.BufferPool, size int) *Writer {
var bufBytes []byte
if pool == nil {
bufBytes = make([]byte, size)
} else {
bufBytes = pool.Get(size)
}
bufBytes = bufBytes[:0]

w := &Writer{
writer: f,
cmp: o.GetComparer(),
filter: o.GetFilter(),
compression: o.GetCompression(),
blockSize: o.GetBlockSize(),
comparerScratch: make([]byte, 0),
bpool: pool,
dataBlock: blockWriter{buf: *util.NewBuffer(bufBytes)},
}
// data block
w.dataBlock.restartInterval = o.GetBlockRestartInterval()
Expand Down
Loading

0 comments on commit 079c29c

Please sign in to comment.