diff --git a/dgraph/cmd/bulk/mapper.go b/dgraph/cmd/bulk/mapper.go index 442bbe027c9..604c05080ce 100644 --- a/dgraph/cmd/bulk/mapper.go +++ b/dgraph/cmd/bulk/mapper.go @@ -57,10 +57,10 @@ type shardState struct { func newMapperBuffer(opt *options) *z.Buffer { sz := float64(opt.MapBufSize) * 1.1 - buf, err := z.NewBufferWithDir(int(sz), 2*int(opt.MapBufSize), z.UseMmap, - filepath.Join(opt.TmpDir, bufferDir), "Mapper.Buffer") + tmpDir := filepath.Join(opt.TmpDir, bufferDir) + buf, err := z.NewBufferTmp(tmpDir, int(sz)) x.Check(err) - return buf + return buf.WithMaxSize(2 * int(opt.MapBufSize)) } func newMapper(st *state) *mapper { diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index 6bc714633c9..a898149fd27 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -434,11 +434,9 @@ func bufferStats(cbuf *z.Buffer) { } func getBuf(dir string) *z.Buffer { - cbuf, err := z.NewBufferWithDir(64<<20, 64<<30, z.UseCalloc, - filepath.Join(dir, bufferDir), "Reducer.GetBuf") - x.Check(err) - cbuf.AutoMmapAfter(1 << 30) - return cbuf + return z.NewBuffer(64<<20, "Reducer.GetBuf"). + WithAutoMmap(1<<30, filepath.Join(dir, bufferDir)). + WithMaxSize(64 << 30) } func (r *reducer) reduce(partitionKeys [][]byte, mapItrs []*mapIterator, ci *countIndexer) { diff --git a/dgraph/cmd/zero/oracle.go b/dgraph/cmd/zero/oracle.go index c1c0b424437..4df2de2af37 100644 --- a/dgraph/cmd/zero/oracle.go +++ b/dgraph/cmd/zero/oracle.go @@ -59,15 +59,16 @@ func (o *Oracle) Init() { o.commits = make(map[uint64]uint64) // Remove the older btree file, before creating NewTree, as it may contain stale data leading // to wrong results. - o.keyCommit = z.NewTree() + o.keyCommit = z.NewTree("oracle") o.subscribers = make(map[int]chan pb.OracleDelta) o.updates = make(chan *pb.OracleDelta, 100000) // Keeping 1 second worth of updates. o.doneUntil.Init(nil) go o.sendDeltasToSubscribers() } -// oracle close releases the memory associated with btree used for keycommit. +// close releases the memory associated with btree used for keycommit. func (o *Oracle) close() { + o.keyCommit.Close() } func (o *Oracle) updateStartTxnTs(ts uint64) { diff --git a/dgraph/cmd/zero/run.go b/dgraph/cmd/zero/run.go index 4d72952c8f8..0bdd3a9974b 100644 --- a/dgraph/cmd/zero/run.go +++ b/dgraph/cmd/zero/run.go @@ -127,7 +127,7 @@ instances to achieve high-availability. Head("Audit options"). Flag("output", `[stdout, /path/to/dir] This specifies where audit logs should be output to. - "stdout" is for standard output. You can also specify the directory where audit logs + "stdout" is for standard output. You can also specify the directory where audit logs will be saved. When stdout is specified as output other fields will be ignored.`). Flag("compress", "Enables the compression of old audit logs."). diff --git a/go.mod b/go.mod index 0855dcc4e0d..851fd6203fa 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/dgraph-io/gqlgen v0.13.2 github.com/dgraph-io/gqlparser/v2 v2.2.0 github.com/dgraph-io/graphql-transport-ws v0.0.0-20210223074046-e5b8b80bb4ed - github.com/dgraph-io/ristretto v0.0.4-0.20210430144708-642987276d6a + github.com/dgraph-io/ristretto v0.0.4-0.20210428103110-8405ab9b246f github.com/dgraph-io/simdjson-go v0.3.0 github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 diff --git a/go.sum b/go.sum index ebc2835dca7..2a08b2004cf 100644 --- a/go.sum +++ b/go.sum @@ -131,8 +131,8 @@ github.com/dgraph-io/gqlparser/v2 v2.2.0/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P github.com/dgraph-io/graphql-transport-ws v0.0.0-20210223074046-e5b8b80bb4ed h1:pgGMBoTtFhR+xkyzINaToLYRurHn+6pxMYffIGmmEPc= github.com/dgraph-io/graphql-transport-ws v0.0.0-20210223074046-e5b8b80bb4ed/go.mod h1:7z3c/5w0sMYYZF5bHsrh8IH4fKwG5O5Y70cPH1ZLLRQ= github.com/dgraph-io/ristretto v0.0.4-0.20210309073149-3836124cdc5a/go.mod h1:MIonLggsKgZLUSt414ExgwNtlOL5MuEoAJP514mwGe8= -github.com/dgraph-io/ristretto v0.0.4-0.20210430144708-642987276d6a h1:www55JYSP8aYgBPNWJLeMS/HJMba6emZyBF65/a4XRI= -github.com/dgraph-io/ristretto v0.0.4-0.20210430144708-642987276d6a/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug= +github.com/dgraph-io/ristretto v0.0.4-0.20210428103110-8405ab9b246f h1:vupa/2tdIvqUTP0Md7MMvdKwCQjF48m+T6LZRVc+3cg= +github.com/dgraph-io/ristretto v0.0.4-0.20210428103110-8405ab9b246f/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug= github.com/dgraph-io/simdjson-go v0.3.0 h1:h71LO7vR4LHMPUhuoGN8bqGm1VNfGOlAG8BI6iDUKw0= github.com/dgraph-io/simdjson-go v0.3.0/go.mod h1:Otpysdjaxj9OGaJusn4pgQV7OFh2bELuHANq0I78uvY= github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= diff --git a/worker/backup_ee.go b/worker/backup_ee.go index eb2d5836480..52455b52bce 100644 --- a/worker/backup_ee.go +++ b/worker/backup_ee.go @@ -294,9 +294,9 @@ func NewBackupProcessor(db *badger.DB, req *pb.BackupRequest) *BackupProcessor { bp.txn = db.NewTransactionAt(req.ReadTs, false) } for i := range bp.threads { - buf, err := z.NewBufferWith(32<<20, 32<<30, z.UseCalloc, "Worker.BackupProcessor") - x.Check(err) - buf.AutoMmapAfter(1 << 30) + buf := z.NewBuffer(32<<20, "Worker.BackupProcessor"). + WithAutoMmap(1<<30, ""). + WithMaxSize(32 << 30) bp.threads[i] = &threadLocal{ Request: bp.Request, diff --git a/worker/predicate_move.go b/worker/predicate_move.go index b5437d43e72..38caaa54ebf 100644 --- a/worker/predicate_move.go +++ b/worker/predicate_move.go @@ -71,7 +71,7 @@ func batchAndProposeKeyValues(ctx context.Context, kvs chan *pb.KVS) error { var pk x.ParsedKey for kvPayload := range kvs { - buf := z.BufferFrom(kvPayload.GetData()) + buf := z.NewBufferSlice(kvPayload.GetData()) err := buf.SliceIterate(func(s []byte) error { kv := &bpb.KV{} x.Check(kv.Unmarshal(s)) @@ -164,7 +164,7 @@ func (w *grpcWorker) ReceivePredicate(stream pb.Worker_ReceivePredicateServer) e } glog.V(2).Infof("Received batch of size: %s\n", humanize.IBytes(uint64(len(kvBuf.Data)))) - buf := z.BufferFrom(kvBuf.Data) + buf := z.NewBufferSlice(kvBuf.Data) buf.SliceIterate(func(_ []byte) error { count++ return nil diff --git a/worker/restore_map.go b/worker/restore_map.go index d98262b5d83..8fa0ba60ce5 100644 --- a/worker/restore_map.go +++ b/worker/restore_map.go @@ -229,9 +229,9 @@ func (m *mapper) writeToDisk(buf *z.Buffer) error { } func newBuffer() *z.Buffer { - buf, err := z.NewBufferWithDir(mapFileSz, 2*mapFileSz, z.UseMmap, "", "Restore.Buffer") + buf, err := z.NewBufferTmp("", mapFileSz) x.Check(err) - return buf + return buf.WithMaxSize(2 * mapFileSz) } func (mw *mapper) sendForWriting() error { diff --git a/worker/restore_reduce.go b/worker/restore_reduce.go index 43e5689ea1d..db44570c030 100644 --- a/worker/restore_reduce.go +++ b/worker/restore_reduce.go @@ -120,10 +120,7 @@ func newMapIterator(filename string) (*pb.MapHeader, *mapIterator) { func getBuf() *z.Buffer { path := filepath.Join(x.WorkerConfig.TmpDir, "buffer") x.Check(os.MkdirAll(path, 0750)) - cbuf, err := z.NewBufferWithDir(64<<20, 64<<30, z.UseCalloc, path, "Restore.GetBuf") - x.Check(err) - cbuf.AutoMmapAfter(1 << 30) - return cbuf + return z.NewBuffer(64<<20, "Restore.GetBuf").WithAutoMmap(1<<30, path).WithMaxSize(64 << 30) } type reducer struct { diff --git a/worker/snapshot.go b/worker/snapshot.go index e7fde2339de..1b6e77af156 100644 --- a/worker/snapshot.go +++ b/worker/snapshot.go @@ -103,7 +103,7 @@ func (n *node) populateSnapshot(snap pb.Snapshot, pl *conn.Pool) error { glog.V(1).Infof("Received batch of size: %s. Total so far: %s\n", humanize.IBytes(uint64(len(kvs.Data))), humanize.IBytes(uint64(size))) - buf := z.BufferFrom(kvs.Data) + buf := z.NewBufferSlice(kvs.Data) if err := writer.Write(buf); err != nil { return err } diff --git a/xidmap/trie.go b/xidmap/trie.go index 3df8b132b13..d2b3e4d9eb3 100644 --- a/xidmap/trie.go +++ b/xidmap/trie.go @@ -20,7 +20,6 @@ import ( "math" "unsafe" - "github.com/dgraph-io/dgraph/x" "github.com/dgraph-io/ristretto/z" ) @@ -34,8 +33,7 @@ type Trie struct { // NewTrie would return back a Trie backed by the provided Arena. Trie would assume ownership of the // Arena. Release must be called at the end to release Arena's resources. func NewTrie() *Trie { - buf, err := z.NewBufferWith(32<<20, math.MaxUint32, z.UseMmap, "Trie") - x.Check(err) + buf := z.NewBuffer(32<<20, "Trie").WithMaxSize(math.MaxUint32) // Add additional 8 bytes at the start, because offset=0 is used for checking non-existing node. // Therefore we can't keep root at 0 offset. ro := buf.AllocateOffset(nodeSz + 8) diff --git a/xidmap/xidmap.go b/xidmap/xidmap.go index d1bc428e276..a7b59b075d8 100644 --- a/xidmap/xidmap.go +++ b/xidmap/xidmap.go @@ -103,7 +103,7 @@ func New(opts XidMapOptions) *XidMap { } for i := range xm.shards { xm.shards[i] = &shard{ - tree: z.NewTree(), + tree: z.NewTree("XidMap"), } } @@ -331,6 +331,9 @@ func (m *XidMap) Flush() error { // even during reduce phase. If bulk loader is running on large dataset, this occupies lot of // memory and causing OOM sometimes. Making shards explicitly nil in this method fixes this. // TODO: find why xidmap is not getting GCed without below line. + for _, shards := range m.shards { + shards.tree.Close() + } m.shards = nil if m.writer == nil { return nil