Skip to content

Commit

Permalink
Update Ristretto to bring in new Buffer and Tree API (#7747)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajeetdsouza committed May 5, 2021
1 parent 77f3cef commit 88ab633
Show file tree
Hide file tree
Showing 13 changed files with 27 additions and 30 deletions.
6 changes: 3 additions & 3 deletions dgraph/cmd/bulk/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ type shardState struct {

func newMapperBuffer(opt *options) *z.Buffer {
sz := float64(opt.MapBufSize) * 1.1
buf, err := z.NewBufferWithDir(int(sz), 2*int(opt.MapBufSize), z.UseMmap,
filepath.Join(opt.TmpDir, bufferDir), "Mapper.Buffer")
tmpDir := filepath.Join(opt.TmpDir, bufferDir)
buf, err := z.NewBufferTmp(tmpDir, int(sz))
x.Check(err)
return buf
return buf.WithMaxSize(2 * int(opt.MapBufSize))
}

func newMapper(st *state) *mapper {
Expand Down
8 changes: 3 additions & 5 deletions dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,11 +434,9 @@ func bufferStats(cbuf *z.Buffer) {
}

func getBuf(dir string) *z.Buffer {
cbuf, err := z.NewBufferWithDir(64<<20, 64<<30, z.UseCalloc,
filepath.Join(dir, bufferDir), "Reducer.GetBuf")
x.Check(err)
cbuf.AutoMmapAfter(1 << 30)
return cbuf
return z.NewBuffer(64<<20, "Reducer.GetBuf").
WithAutoMmap(1<<30, filepath.Join(dir, bufferDir)).
WithMaxSize(64 << 30)
}

func (r *reducer) reduce(partitionKeys [][]byte, mapItrs []*mapIterator, ci *countIndexer) {
Expand Down
5 changes: 3 additions & 2 deletions dgraph/cmd/zero/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,16 @@ func (o *Oracle) Init() {
o.commits = make(map[uint64]uint64)
// Remove the older btree file, before creating NewTree, as it may contain stale data leading
// to wrong results.
o.keyCommit = z.NewTree()
o.keyCommit = z.NewTree("oracle")
o.subscribers = make(map[int]chan pb.OracleDelta)
o.updates = make(chan *pb.OracleDelta, 100000) // Keeping 1 second worth of updates.
o.doneUntil.Init(nil)
go o.sendDeltasToSubscribers()
}

// oracle close releases the memory associated with btree used for keycommit.
// close releases the memory associated with btree used for keycommit.
func (o *Oracle) close() {
o.keyCommit.Close()
}

func (o *Oracle) updateStartTxnTs(ts uint64) {
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/zero/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ instances to achieve high-availability.
Head("Audit options").
Flag("output",
`[stdout, /path/to/dir] This specifies where audit logs should be output to.
"stdout" is for standard output. You can also specify the directory where audit logs
"stdout" is for standard output. You can also specify the directory where audit logs
will be saved. When stdout is specified as output other fields will be ignored.`).
Flag("compress",
"Enables the compression of old audit logs.").
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/dgraph-io/gqlgen v0.13.2
github.com/dgraph-io/gqlparser/v2 v2.2.0
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210223074046-e5b8b80bb4ed
github.com/dgraph-io/ristretto v0.0.4-0.20210430144708-642987276d6a
github.com/dgraph-io/ristretto v0.0.4-0.20210428103110-8405ab9b246f
github.com/dgraph-io/simdjson-go v0.3.0
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ github.com/dgraph-io/gqlparser/v2 v2.2.0/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210223074046-e5b8b80bb4ed h1:pgGMBoTtFhR+xkyzINaToLYRurHn+6pxMYffIGmmEPc=
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210223074046-e5b8b80bb4ed/go.mod h1:7z3c/5w0sMYYZF5bHsrh8IH4fKwG5O5Y70cPH1ZLLRQ=
github.com/dgraph-io/ristretto v0.0.4-0.20210309073149-3836124cdc5a/go.mod h1:MIonLggsKgZLUSt414ExgwNtlOL5MuEoAJP514mwGe8=
github.com/dgraph-io/ristretto v0.0.4-0.20210430144708-642987276d6a h1:www55JYSP8aYgBPNWJLeMS/HJMba6emZyBF65/a4XRI=
github.com/dgraph-io/ristretto v0.0.4-0.20210430144708-642987276d6a/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
github.com/dgraph-io/ristretto v0.0.4-0.20210428103110-8405ab9b246f h1:vupa/2tdIvqUTP0Md7MMvdKwCQjF48m+T6LZRVc+3cg=
github.com/dgraph-io/ristretto v0.0.4-0.20210428103110-8405ab9b246f/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
github.com/dgraph-io/simdjson-go v0.3.0 h1:h71LO7vR4LHMPUhuoGN8bqGm1VNfGOlAG8BI6iDUKw0=
github.com/dgraph-io/simdjson-go v0.3.0/go.mod h1:Otpysdjaxj9OGaJusn4pgQV7OFh2bELuHANq0I78uvY=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
Expand Down
6 changes: 3 additions & 3 deletions worker/backup_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,9 @@ func NewBackupProcessor(db *badger.DB, req *pb.BackupRequest) *BackupProcessor {
bp.txn = db.NewTransactionAt(req.ReadTs, false)
}
for i := range bp.threads {
buf, err := z.NewBufferWith(32<<20, 32<<30, z.UseCalloc, "Worker.BackupProcessor")
x.Check(err)
buf.AutoMmapAfter(1 << 30)
buf := z.NewBuffer(32<<20, "Worker.BackupProcessor").
WithAutoMmap(1<<30, "").
WithMaxSize(32 << 30)

bp.threads[i] = &threadLocal{
Request: bp.Request,
Expand Down
4 changes: 2 additions & 2 deletions worker/predicate_move.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func batchAndProposeKeyValues(ctx context.Context, kvs chan *pb.KVS) error {
var pk x.ParsedKey

for kvPayload := range kvs {
buf := z.BufferFrom(kvPayload.GetData())
buf := z.NewBufferSlice(kvPayload.GetData())
err := buf.SliceIterate(func(s []byte) error {
kv := &bpb.KV{}
x.Check(kv.Unmarshal(s))
Expand Down Expand Up @@ -164,7 +164,7 @@ func (w *grpcWorker) ReceivePredicate(stream pb.Worker_ReceivePredicateServer) e
}
glog.V(2).Infof("Received batch of size: %s\n", humanize.IBytes(uint64(len(kvBuf.Data))))

buf := z.BufferFrom(kvBuf.Data)
buf := z.NewBufferSlice(kvBuf.Data)
buf.SliceIterate(func(_ []byte) error {
count++
return nil
Expand Down
4 changes: 2 additions & 2 deletions worker/restore_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,9 @@ func (m *mapper) writeToDisk(buf *z.Buffer) error {
}

func newBuffer() *z.Buffer {
buf, err := z.NewBufferWithDir(mapFileSz, 2*mapFileSz, z.UseMmap, "", "Restore.Buffer")
buf, err := z.NewBufferTmp("", mapFileSz)
x.Check(err)
return buf
return buf.WithMaxSize(2 * mapFileSz)
}

func (mw *mapper) sendForWriting() error {
Expand Down
5 changes: 1 addition & 4 deletions worker/restore_reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,7 @@ func newMapIterator(filename string) (*pb.MapHeader, *mapIterator) {
func getBuf() *z.Buffer {
path := filepath.Join(x.WorkerConfig.TmpDir, "buffer")
x.Check(os.MkdirAll(path, 0750))
cbuf, err := z.NewBufferWithDir(64<<20, 64<<30, z.UseCalloc, path, "Restore.GetBuf")
x.Check(err)
cbuf.AutoMmapAfter(1 << 30)
return cbuf
return z.NewBuffer(64<<20, "Restore.GetBuf").WithAutoMmap(1<<30, path).WithMaxSize(64 << 30)
}

type reducer struct {
Expand Down
2 changes: 1 addition & 1 deletion worker/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (n *node) populateSnapshot(snap pb.Snapshot, pl *conn.Pool) error {
glog.V(1).Infof("Received batch of size: %s. Total so far: %s\n",
humanize.IBytes(uint64(len(kvs.Data))), humanize.IBytes(uint64(size)))

buf := z.BufferFrom(kvs.Data)
buf := z.NewBufferSlice(kvs.Data)
if err := writer.Write(buf); err != nil {
return err
}
Expand Down
4 changes: 1 addition & 3 deletions xidmap/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"math"
"unsafe"

"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
)

Expand All @@ -34,8 +33,7 @@ type Trie struct {
// NewTrie would return back a Trie backed by the provided Arena. Trie would assume ownership of the
// Arena. Release must be called at the end to release Arena's resources.
func NewTrie() *Trie {
buf, err := z.NewBufferWith(32<<20, math.MaxUint32, z.UseMmap, "Trie")
x.Check(err)
buf := z.NewBuffer(32<<20, "Trie").WithMaxSize(math.MaxUint32)
// Add additional 8 bytes at the start, because offset=0 is used for checking non-existing node.
// Therefore we can't keep root at 0 offset.
ro := buf.AllocateOffset(nodeSz + 8)
Expand Down
5 changes: 4 additions & 1 deletion xidmap/xidmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func New(opts XidMapOptions) *XidMap {
}
for i := range xm.shards {
xm.shards[i] = &shard{
tree: z.NewTree(),
tree: z.NewTree("XidMap"),
}
}

Expand Down Expand Up @@ -331,6 +331,9 @@ func (m *XidMap) Flush() error {
// even during reduce phase. If bulk loader is running on large dataset, this occupies lot of
// memory and causing OOM sometimes. Making shards explicitly nil in this method fixes this.
// TODO: find why xidmap is not getting GCed without below line.
for _, shards := range m.shards {
shards.tree.Close()
}
m.shards = nil
if m.writer == nil {
return nil
Expand Down

0 comments on commit 88ab633

Please sign in to comment.