From 3696c9473188541746ca591f91dc924c3be5b2ce Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Sat, 23 Jan 2021 05:27:29 -0800 Subject: [PATCH] Reuse allocator --- codec/codec.go | 16 +++++++++------- dgraph/cmd/bulk/count_index.go | 9 ++++++--- dgraph/cmd/bulk/reduce.go | 16 +++++++++++++--- t/t.go | 4 ++-- 4 files changed, 30 insertions(+), 15 deletions(-) diff --git a/codec/codec.go b/codec/codec.go index 387e18230b3..d7940e52bb9 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -47,7 +47,7 @@ type Encoder struct { BlockSize int pack *pb.UidPack uids []uint64 - alloc *z.Allocator + Alloc *z.Allocator buf *bytes.Buffer } @@ -70,7 +70,7 @@ func (e *Encoder) packBlock() { } // Allocate blocks manually. - b := e.alloc.AllocateAligned(blockSize) + b := e.Alloc.AllocateAligned(blockSize) block := (*pb.UidBlock)(unsafe.Pointer(&b[0])) block.Base = e.uids[0] @@ -106,7 +106,7 @@ func (e *Encoder) packBlock() { } sz := len(e.buf.Bytes()) - block.Deltas = e.alloc.Allocate(sz) + block.Deltas = e.Alloc.Allocate(sz) x.AssertTrue(sz == copy(block.Deltas, e.buf.Bytes())) e.pack.Blocks = append(e.pack.Blocks, block) } @@ -117,10 +117,12 @@ var tagEncoder string = "enc" func (e *Encoder) Add(uid uint64) { if e.pack == nil { e.pack = &pb.UidPack{BlockSize: uint32(e.BlockSize)} - e.alloc = z.NewAllocator(1024) - e.alloc.Tag = tagEncoder e.buf = new(bytes.Buffer) } + if e.Alloc == nil { + e.Alloc = z.NewAllocator(1024) + e.Alloc.Tag = tagEncoder + } size := len(e.uids) if size > 0 && !match32MSB(e.uids[size-1], uid) { @@ -138,8 +140,8 @@ func (e *Encoder) Add(uid uint64) { // Done returns the final output of the encoder. This UidPack MUST BE FREED via a call to FreePack. func (e *Encoder) Done() *pb.UidPack { e.packBlock() - if e.pack != nil && e.alloc != nil { - e.pack.AllocRef = e.alloc.Ref + if e.pack != nil && e.Alloc != nil { + e.pack.AllocRef = e.Alloc.Ref } return e.pack } diff --git a/dgraph/cmd/bulk/count_index.go b/dgraph/cmd/bulk/count_index.go index 4377d0005f2..334679b7451 100644 --- a/dgraph/cmd/bulk/count_index.go +++ b/dgraph/cmd/bulk/count_index.go @@ -132,8 +132,11 @@ func (c *countIndexer) writeIndex(buf *z.Buffer) { fmt.Printf("Writing count index for %q rev=%v\n", pk.Attr, pk.IsReverse()) } + alloc := z.NewAllocator(8 << 20) + defer alloc.Release() + var pl pb.PostingList - encoder := codec.Encoder{BlockSize: 256} + encoder := codec.Encoder{BlockSize: 256, Alloc: alloc} outBuf := z.NewBuffer(5 << 20) defer outBuf.Release() @@ -144,13 +147,13 @@ func (c *countIndexer) writeIndex(buf *z.Buffer) { } kv := posting.MarshalPostingList(&pl, nil) - codec.FreePack(pl.Pack) kv.Key = append([]byte{}, lastCe.Key()...) kv.Version = c.state.writeTs kv.StreamId = streamId badger.KVToBuffer(kv, outBuf) - encoder = codec.Encoder{BlockSize: 256} + alloc.Reset() + encoder = codec.Encoder{BlockSize: 256, Alloc: alloc} pl.Reset() // Flush out the buffer. diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index 9036ac271cf..d84e00d8789 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -569,6 +569,12 @@ func (r *reducer) toList(req *encodeRequest) { freePostings = append(freePostings, p) } + alloc := z.NewAllocator(16 << 20) + defer func() { + // We put alloc.Release in defer because we reassign alloc for split posting lists. + alloc.Release() + }() + start, end, num := cbuf.StartOffset(), cbuf.StartOffset(), 0 appendToList := func() { if num == 0 { @@ -595,7 +601,8 @@ func (r *reducer) toList(req *encodeRequest) { } } - enc := codec.Encoder{BlockSize: 256} + alloc.Reset() + enc := codec.Encoder{BlockSize: 256, Alloc: alloc} var lastUid uint64 slice, next := []byte{}, start for next >= 0 && (next < end || end == -1) { @@ -629,7 +636,7 @@ func (r *reducer) toList(req *encodeRequest) { // the full pb.Posting type is used (which pb.y contains the // delta packed UID list). if numUids == 0 { - codec.FreePack(pl.Pack) + // No need to FrePack here because we are reusing alloc. return } @@ -657,6 +664,9 @@ func (r *reducer) toList(req *encodeRequest) { kvs, err := l.Rollup(nil) x.Check(err) + // Assign a new allocator, so we don't reset the one we were using during Rollup. + alloc = z.NewAllocator(16 << 20) + for _, kv := range kvs { kv.StreamId = r.streamIdFor(pk.Attr) } @@ -666,7 +676,7 @@ func (r *reducer) toList(req *encodeRequest) { } } else { kv := posting.MarshalPostingList(pl, nil) - codec.FreePack(pl.Pack) + // No need to FreePack here, because we are reusing alloc. kv.Key = y.Copy(currentKey) kv.Version = writeVersionTs diff --git a/t/t.go b/t/t.go index b1780d64c6c..7597152399f 100644 --- a/t/t.go +++ b/t/t.go @@ -485,12 +485,12 @@ func getPackages() []task { } if !isValidPackageForSuite(pkg.ID) { - fmt.Printf("Skipping pacakge %s as its not valid for the selected suite %s \n", pkg.ID, *suite) + fmt.Printf("Skipping package %s as its not valid for the selected suite %s \n", pkg.ID, *suite) continue } if has(skipPkgs, pkg.ID) { - fmt.Printf("Skipping pacakge %s as its available in skip list \n", pkg.ID) + fmt.Printf("Skipping package %s as its available in skip list \n", pkg.ID) continue }