diff --git a/dgraph/cmd/bulk/mapper.go b/dgraph/cmd/bulk/mapper.go index e9d3775d143..5567470c1b4 100644 --- a/dgraph/cmd/bulk/mapper.go +++ b/dgraph/cmd/bulk/mapper.go @@ -57,10 +57,10 @@ type shardState struct { func newMapperBuffer(opt *options) *z.Buffer { sz := float64(opt.MapBufSize) * 1.1 - buf, err := z.NewBufferWithDir(int(sz), 2*int(opt.MapBufSize), z.UseMmap, - filepath.Join(opt.TmpDir, bufferDir), "Mapper.Buffer") + tmpDir := filepath.Join(opt.TmpDir, bufferDir) + buf, err := z.NewBufferTmp(tmpDir, int(sz)) x.Check(err) - return buf + return buf.WithMaxSize(2 * int(opt.MapBufSize)) } func newMapper(st *state) *mapper { diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index 80291b4d1d8..df427d4fcec 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -435,11 +435,9 @@ func bufferStats(cbuf *z.Buffer) { } func getBuf(dir string) *z.Buffer { - cbuf, err := z.NewBufferWithDir(64<<20, 64<<30, z.UseCalloc, - filepath.Join(dir, bufferDir), "Reducer.GetBuf") - x.Check(err) - cbuf.AutoMmapAfter(1 << 30) - return cbuf + return z.NewBuffer(64<<20, "Reducer.GetBuf"). + WithAutoMmap(1<<30, filepath.Join(dir, bufferDir)). + WithMaxSize(64 << 30) } func (r *reducer) reduce(partitionKeys [][]byte, mapItrs []*mapIterator, ci *countIndexer) { diff --git a/dgraph/cmd/zero/oracle.go b/dgraph/cmd/zero/oracle.go index c1c0b424437..4df2de2af37 100644 --- a/dgraph/cmd/zero/oracle.go +++ b/dgraph/cmd/zero/oracle.go @@ -59,15 +59,16 @@ func (o *Oracle) Init() { o.commits = make(map[uint64]uint64) // Remove the older btree file, before creating NewTree, as it may contain stale data leading // to wrong results. - o.keyCommit = z.NewTree() + o.keyCommit = z.NewTree("oracle") o.subscribers = make(map[int]chan pb.OracleDelta) o.updates = make(chan *pb.OracleDelta, 100000) // Keeping 1 second worth of updates. o.doneUntil.Init(nil) go o.sendDeltasToSubscribers() } -// oracle close releases the memory associated with btree used for keycommit. +// close releases the memory associated with btree used for keycommit. func (o *Oracle) close() { + o.keyCommit.Close() } func (o *Oracle) updateStartTxnTs(ts uint64) { diff --git a/dgraph/cmd/zero/run.go b/dgraph/cmd/zero/run.go index 4d72952c8f8..0bdd3a9974b 100644 --- a/dgraph/cmd/zero/run.go +++ b/dgraph/cmd/zero/run.go @@ -127,7 +127,7 @@ instances to achieve high-availability. Head("Audit options"). Flag("output", `[stdout, /path/to/dir] This specifies where audit logs should be output to. - "stdout" is for standard output. You can also specify the directory where audit logs + "stdout" is for standard output. You can also specify the directory where audit logs will be saved. When stdout is specified as output other fields will be ignored.`). Flag("compress", "Enables the compression of old audit logs."). diff --git a/go.mod b/go.mod index 2b61444171b..db7ca1a9e20 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( github.com/dgraph-io/gqlgen v0.13.2 github.com/dgraph-io/gqlparser/v2 v2.2.0 github.com/dgraph-io/graphql-transport-ws v0.0.0-20210223074046-e5b8b80bb4ed - github.com/dgraph-io/ristretto v0.0.4-0.20210407062338-62d2e1706f55 + github.com/dgraph-io/ristretto v0.0.4-0.20210428103110-8405ab9b246f github.com/dgraph-io/roaring v0.5.6-0.20210227175938-766b897233a5 github.com/dgraph-io/simdjson-go v0.3.0 github.com/dgrijalva/jwt-go v3.2.0+incompatible diff --git a/go.sum b/go.sum index b473ce0b7cd..aced7c60f59 100644 --- a/go.sum +++ b/go.sum @@ -91,6 +91,8 @@ github.com/cenkalti/backoff v2.1.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QH github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= +github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= +github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd h1:qMd81Ts1T2OTKmB4acZcyKaMtRnY5Y44NuXGX2GFJ1w= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= @@ -131,8 +133,8 @@ github.com/dgraph-io/gqlparser/v2 v2.2.0/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P github.com/dgraph-io/graphql-transport-ws v0.0.0-20210223074046-e5b8b80bb4ed h1:pgGMBoTtFhR+xkyzINaToLYRurHn+6pxMYffIGmmEPc= github.com/dgraph-io/graphql-transport-ws v0.0.0-20210223074046-e5b8b80bb4ed/go.mod h1:7z3c/5w0sMYYZF5bHsrh8IH4fKwG5O5Y70cPH1ZLLRQ= github.com/dgraph-io/ristretto v0.0.4-0.20210309073149-3836124cdc5a/go.mod h1:MIonLggsKgZLUSt414ExgwNtlOL5MuEoAJP514mwGe8= -github.com/dgraph-io/ristretto v0.0.4-0.20210407062338-62d2e1706f55 h1:CO2ExPUrWQ01M1zDN1Kfk0cEOowEYiqfbb+lKToPyNo= -github.com/dgraph-io/ristretto v0.0.4-0.20210407062338-62d2e1706f55/go.mod h1:MIonLggsKgZLUSt414ExgwNtlOL5MuEoAJP514mwGe8= +github.com/dgraph-io/ristretto v0.0.4-0.20210428103110-8405ab9b246f h1:vupa/2tdIvqUTP0Md7MMvdKwCQjF48m+T6LZRVc+3cg= +github.com/dgraph-io/ristretto v0.0.4-0.20210428103110-8405ab9b246f/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug= github.com/dgraph-io/roaring v0.5.6-0.20210227175938-766b897233a5 h1:9t3OKcvsQlxU9Cu0U55tgvNtaRYVGDr6rUb95P8cSbg= github.com/dgraph-io/roaring v0.5.6-0.20210227175938-766b897233a5/go.mod h1:I8kxPBtSQW3OdQFWonumQdCx2DTmq2WjdnTjGXz3uTM= github.com/dgraph-io/simdjson-go v0.3.0 h1:h71LO7vR4LHMPUhuoGN8bqGm1VNfGOlAG8BI6iDUKw0= diff --git a/worker/backup_ee.go b/worker/backup_ee.go index a7295f181db..71de253e113 100644 --- a/worker/backup_ee.go +++ b/worker/backup_ee.go @@ -306,9 +306,9 @@ func NewBackupProcessor(db *badger.DB, req *pb.BackupRequest) *BackupProcessor { bp.txn = db.NewTransactionAt(req.ReadTs, false) } for i := range bp.threads { - buf, err := z.NewBufferWith(32<<20, 32<<30, z.UseCalloc, "Worker.BackupProcessor") - x.Check(err) - buf.AutoMmapAfter(1 << 30) + buf := z.NewBuffer(32<<20, "Worker.BackupProcessor"). + WithAutoMmap(1<<30, ""). + WithMaxSize(32 << 30) bp.threads[i] = &threadLocal{ Request: bp.Request, diff --git a/worker/predicate_move.go b/worker/predicate_move.go index 485e91ec910..7605030114a 100644 --- a/worker/predicate_move.go +++ b/worker/predicate_move.go @@ -79,7 +79,7 @@ func batchAndProposeKeyValues(ctx context.Context, kvs chan *pb.KVS) error { var pk x.ParsedKey for kvPayload := range kvs { - buf := z.BufferFrom(kvPayload.GetData()) + buf := z.NewBufferSlice(kvPayload.GetData()) err := buf.SliceIterate(func(s []byte) error { kv := &bpb.KV{} x.Check(kv.Unmarshal(s)) @@ -174,7 +174,7 @@ func (w *grpcWorker) ReceivePredicate(stream pb.Worker_ReceivePredicateServer) e } glog.V(2).Infof("Received batch of size: %s\n", humanize.IBytes(uint64(len(kvBuf.Data)))) - buf := z.BufferFrom(kvBuf.Data) + buf := z.NewBufferSlice(kvBuf.Data) buf.SliceIterate(func(_ []byte) error { count++ return nil diff --git a/worker/restore_map.go b/worker/restore_map.go index ada4de314cd..931580862c1 100644 --- a/worker/restore_map.go +++ b/worker/restore_map.go @@ -228,9 +228,9 @@ func (m *mapper) writeToDisk(buf *z.Buffer) error { } func newBuffer() *z.Buffer { - buf, err := z.NewBufferWithDir(mapFileSz, 2*mapFileSz, z.UseMmap, "", "Restore.Buffer") + buf, err := z.NewBufferTmp("", mapFileSz) x.Check(err) - return buf + return buf.WithMaxSize(2 * mapFileSz) } func (mw *mapper) sendForWriting() error { diff --git a/worker/restore_reduce.go b/worker/restore_reduce.go index 43e5689ea1d..db44570c030 100644 --- a/worker/restore_reduce.go +++ b/worker/restore_reduce.go @@ -120,10 +120,7 @@ func newMapIterator(filename string) (*pb.MapHeader, *mapIterator) { func getBuf() *z.Buffer { path := filepath.Join(x.WorkerConfig.TmpDir, "buffer") x.Check(os.MkdirAll(path, 0750)) - cbuf, err := z.NewBufferWithDir(64<<20, 64<<30, z.UseCalloc, path, "Restore.GetBuf") - x.Check(err) - cbuf.AutoMmapAfter(1 << 30) - return cbuf + return z.NewBuffer(64<<20, "Restore.GetBuf").WithAutoMmap(1<<30, path).WithMaxSize(64 << 30) } type reducer struct { diff --git a/worker/snapshot.go b/worker/snapshot.go index e7fde2339de..1b6e77af156 100644 --- a/worker/snapshot.go +++ b/worker/snapshot.go @@ -103,7 +103,7 @@ func (n *node) populateSnapshot(snap pb.Snapshot, pl *conn.Pool) error { glog.V(1).Infof("Received batch of size: %s. Total so far: %s\n", humanize.IBytes(uint64(len(kvs.Data))), humanize.IBytes(uint64(size))) - buf := z.BufferFrom(kvs.Data) + buf := z.NewBufferSlice(kvs.Data) if err := writer.Write(buf); err != nil { return err } diff --git a/xidmap/trie.go b/xidmap/trie.go index 3df8b132b13..d2b3e4d9eb3 100644 --- a/xidmap/trie.go +++ b/xidmap/trie.go @@ -20,7 +20,6 @@ import ( "math" "unsafe" - "github.com/dgraph-io/dgraph/x" "github.com/dgraph-io/ristretto/z" ) @@ -34,8 +33,7 @@ type Trie struct { // NewTrie would return back a Trie backed by the provided Arena. Trie would assume ownership of the // Arena. Release must be called at the end to release Arena's resources. func NewTrie() *Trie { - buf, err := z.NewBufferWith(32<<20, math.MaxUint32, z.UseMmap, "Trie") - x.Check(err) + buf := z.NewBuffer(32<<20, "Trie").WithMaxSize(math.MaxUint32) // Add additional 8 bytes at the start, because offset=0 is used for checking non-existing node. // Therefore we can't keep root at 0 offset. ro := buf.AllocateOffset(nodeSz + 8) diff --git a/xidmap/xidmap.go b/xidmap/xidmap.go index 1e1b6f1d27a..18ac3a6f4fe 100644 --- a/xidmap/xidmap.go +++ b/xidmap/xidmap.go @@ -108,7 +108,7 @@ func New(opts XidMapOptions) *XidMap { } for i := range xm.shards { xm.shards[i] = &shard{ - tree: z.NewTree(), + tree: z.NewTree("XidMap"), } } @@ -355,6 +355,9 @@ func (m *XidMap) Flush() error { // even during reduce phase. If bulk loader is running on large dataset, this occupies lot of // memory and causing OOM sometimes. Making shards explicitly nil in this method fixes this. // TODO: find why xidmap is not getting GCed without below line. + for _, shards := range m.shards { + shards.tree.Close() + } m.shards = nil if m.writer == nil { return nil