Skip to content

Commit

Permalink
chore(perf): Improve perf of bulk loader with Reuse allocator and ass…
Browse files Browse the repository at this point in the history
…inging tags to allocator (#7360) (#7547)

* Reuse allocator (#7360)

Instead of creating a new z.Allocator for every encoder, this PR reuses the allocator. On a 21M dataset, bulk loader takes 2m22s on master, and only 1m35s on this PR. That's a major 35% performance improvement.

* use allocator tags

Co-authored-by: Manish R Jain <manish@dgraph.io>
  • Loading branch information
aman-bansal and manishrjain authored Mar 23, 2021
1 parent 99064e6 commit 3ccc521
Show file tree
Hide file tree
Showing 12 changed files with 50 additions and 49 deletions.
18 changes: 10 additions & 8 deletions codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Encoder struct {
BlockSize int
pack *pb.UidPack
uids []uint64
alloc *z.Allocator
Alloc *z.Allocator
buf *bytes.Buffer
}

Expand All @@ -70,7 +70,7 @@ func (e *Encoder) packBlock() {
}

// Allocate blocks manually.
b := e.alloc.AllocateAligned(blockSize)
b := e.Alloc.AllocateAligned(blockSize)
block := (*pb.UidBlock)(unsafe.Pointer(&b[0]))

block.Base = e.uids[0]
Expand Down Expand Up @@ -106,7 +106,7 @@ func (e *Encoder) packBlock() {
}

sz := len(e.buf.Bytes())
block.Deltas = e.alloc.Allocate(sz)
block.Deltas = e.Alloc.Allocate(sz)
x.AssertTrue(sz == copy(block.Deltas, e.buf.Bytes()))
e.pack.Blocks = append(e.pack.Blocks, block)
}
Expand All @@ -117,10 +117,12 @@ var tagEncoder string = "enc"
func (e *Encoder) Add(uid uint64) {
if e.pack == nil {
e.pack = &pb.UidPack{BlockSize: uint32(e.BlockSize)}
e.alloc = z.NewAllocator(1024)
e.alloc.Tag = tagEncoder
e.buf = new(bytes.Buffer)
}
if e.Alloc == nil {
e.Alloc = z.NewAllocator(1024, "Encoder.Add")
e.Alloc.Tag = tagEncoder
}

size := len(e.uids)
if size > 0 && !match32MSB(e.uids[size-1], uid) {
Expand All @@ -138,8 +140,8 @@ func (e *Encoder) Add(uid uint64) {
// Done returns the final output of the encoder. This UidPack MUST BE FREED via a call to FreePack.
func (e *Encoder) Done() *pb.UidPack {
e.packBlock()
if e.pack != nil && e.alloc != nil {
e.pack.AllocRef = e.alloc.Ref
if e.pack != nil && e.Alloc != nil {
e.pack.AllocRef = e.Alloc.Ref
}
return e.pack
}
Expand Down Expand Up @@ -405,7 +407,7 @@ func Decode(pack *pb.UidPack, seek uint64) []uint64 {
// DecodeToBuffer is the same as Decode but it returns a z.Buffer which is
// calloc'ed and can be SHOULD be freed up by calling buffer.Release().
func DecodeToBuffer(pack *pb.UidPack, seek uint64) *z.Buffer {
buf, err := z.NewBufferWith(256<<20, 32<<30, z.UseCalloc)
buf, err := z.NewBufferWith(256<<20, 32<<30, z.UseCalloc, "Codec.DecodeToBuffer")
x.Check(err)
buf.AutoMmapAfter(1 << 30)

Expand Down
11 changes: 7 additions & 4 deletions dgraph/cmd/bulk/count_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,13 @@ func (c *countIndexer) writeIndex(buf *z.Buffer) {
fmt.Printf("Writing count index for %q rev=%v\n", pk.Attr, pk.IsReverse())
}

alloc := z.NewAllocator(8<<20, "CountIndexer.WriteIndex")
defer alloc.Release()

var pl pb.PostingList
encoder := codec.Encoder{BlockSize: 256}
encoder := codec.Encoder{BlockSize: 256, Alloc: alloc}

outBuf := z.NewBuffer(5 << 20)
outBuf := z.NewBuffer(5<<20, "CountIndexer.Buffer.WriteIndex")
defer outBuf.Release()
encode := func() {
pl.Pack = encoder.Done()
Expand All @@ -144,13 +147,13 @@ func (c *countIndexer) writeIndex(buf *z.Buffer) {
}

kv := posting.MarshalPostingList(&pl, nil)
codec.FreePack(pl.Pack)
kv.Key = append([]byte{}, lastCe.Key()...)
kv.Version = c.state.writeTs
kv.StreamId = streamId
badger.KVToBuffer(kv, outBuf)

encoder = codec.Encoder{BlockSize: 256}
alloc.Reset()
encoder = codec.Encoder{BlockSize: 256, Alloc: alloc}
pl.Reset()

// Flush out the buffer.
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/bulk/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type shardState struct {
func newMapperBuffer(opt *options) *z.Buffer {
sz := float64(opt.MapBufSize) * 1.1
buf, err := z.NewBufferWithDir(int(sz), 2*int(opt.MapBufSize), z.UseMmap,
filepath.Join(opt.TmpDir, bufferDir))
filepath.Join(opt.TmpDir, bufferDir), "Mapper.Buffer")
x.Check(err)
return buf
}
Expand Down
25 changes: 18 additions & 7 deletions dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (r *reducer) startWriting(ci *countIndexer, writerCh chan *encodeRequest, c
StreamDone: true,
}

buf := z.NewBuffer(512)
buf := z.NewBuffer(512, "Reducer.Write")
defer buf.Release()
badger.KVToBuffer(doneKV, buf)

Expand Down Expand Up @@ -436,7 +436,8 @@ func bufferStats(cbuf *z.Buffer) {
}

func getBuf(dir string) *z.Buffer {
cbuf, err := z.NewBufferWithDir(64<<20, 64<<30, z.UseCalloc, filepath.Join(dir, bufferDir))
cbuf, err := z.NewBufferWithDir(64<<20, 64<<30, z.UseCalloc,
filepath.Join(dir, bufferDir), "Reducer.GetBuf")
x.Check(err)
cbuf.AutoMmapAfter(1 << 30)
return cbuf
Expand Down Expand Up @@ -549,7 +550,7 @@ func (r *reducer) toList(req *encodeRequest) {
pl := new(pb.PostingList)
writeVersionTs := r.state.writeTs

kvBuf := z.NewBuffer(260 << 20)
kvBuf := z.NewBuffer(260<<20, "Reducer.Buffer.ToList")
trackCountIndex := make(map[string]bool)

var freePostings []*pb.Posting
Expand All @@ -568,6 +569,12 @@ func (r *reducer) toList(req *encodeRequest) {
freePostings = append(freePostings, p)
}

alloc := z.NewAllocator(16<<20, "Reducer.ToList")
defer func() {
// We put alloc.Release in defer because we reassign alloc for split posting lists.
alloc.Release()
}()

start, end, num := cbuf.StartOffset(), cbuf.StartOffset(), 0
appendToList := func() {
if num == 0 {
Expand All @@ -594,7 +601,8 @@ func (r *reducer) toList(req *encodeRequest) {
}
}

enc := codec.Encoder{BlockSize: 256}
alloc.Reset()
enc := codec.Encoder{BlockSize: 256, Alloc: alloc}
var lastUid uint64
slice, next := []byte{}, start
for next >= 0 && (next < end || end == -1) {
Expand Down Expand Up @@ -628,7 +636,7 @@ func (r *reducer) toList(req *encodeRequest) {
// the full pb.Posting type is used (which pb.y contains the
// delta packed UID list).
if numUids == 0 {
codec.FreePack(pl.Pack)
// No need to FrePack here because we are reusing alloc.
return
}

Expand Down Expand Up @@ -656,6 +664,9 @@ func (r *reducer) toList(req *encodeRequest) {
kvs, err := l.Rollup(nil)
x.Check(err)

// Assign a new allocator, so we don't reset the one we were using during Rollup.
alloc = z.NewAllocator(16<<20, "Reducer.AppendToList")

for _, kv := range kvs {
kv.StreamId = r.streamIdFor(pk.Attr)
}
Expand All @@ -665,7 +676,7 @@ func (r *reducer) toList(req *encodeRequest) {
}
} else {
kv := posting.MarshalPostingList(pl, nil)
codec.FreePack(pl.Pack)
// No need to FreePack here, because we are reusing alloc.

kv.Key = y.Copy(currentKey)
kv.Version = writeVersionTs
Expand All @@ -690,7 +701,7 @@ func (r *reducer) toList(req *encodeRequest) {

if kvBuf.LenNoPadding() > 256<<20 {
req.listCh <- kvBuf
kvBuf = z.NewBuffer(260 << 20)
kvBuf = z.NewBuffer(260<<20, "Reducer.Buffer.KVBuffer")
}
}
end = next
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func rollupKey(db *badger.DB) {
pl, err := posting.ReadPostingList(item.KeyCopy(nil), itr)
x.Check(err)

alloc := z.NewAllocator(32 << 20)
alloc := z.NewAllocator(32<<20, "Debug.RollupKey")
defer alloc.Release()

kvs, err := pl.Rollup(alloc)
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ require (
github.com/OneOfOne/xxhash v1.2.5 // indirect
github.com/blevesearch/bleve v1.0.13
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd
github.com/dgraph-io/badger/v3 v3.2011.2-0.20210210142907-44c9230e5a66
github.com/dgraph-io/badger/v3 v3.2011.2-0.20210323103207-baadc01dca6e
github.com/dgraph-io/dgo/v200 v200.0.0-20200805103119-a3544c464dd6
github.com/dgraph-io/gqlgen v0.13.2
github.com/dgraph-io/gqlparser/v2 v2.1.5
github.com/dgraph-io/graphql-transport-ws v0.0.0-20200916064635-48589439591b
github.com/dgraph-io/ristretto v0.0.4-0.20210122082011-bb5d392ed82d
github.com/dgraph-io/ristretto v0.0.4-0.20210309073149-3836124cdc5a
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13
Expand Down Expand Up @@ -51,6 +51,7 @@ require (
github.com/prometheus/client_golang v0.9.3
github.com/prometheus/common v0.4.1
github.com/prometheus/procfs v0.0.0-20190517135640-51af30a78b0e // indirect
github.com/sirupsen/logrus v1.6.0 // indirect
github.com/soheilhy/cmux v0.1.4
github.com/spf13/cast v1.3.0
github.com/spf13/cobra v0.0.5
Expand Down
Loading

0 comments on commit 3ccc521

Please sign in to comment.