Skip to content

Commit

Permalink
Fix StreamWriter usage in bulk loader (#3635)
Browse files Browse the repository at this point in the history
Last remaining KVs were not getting their StreamId set during reduce phase. This PR fixes that.

Fixes #3625 .
  • Loading branch information
manishrjain authored Jul 5, 2019
1 parent e4a07ad commit a67fbfd
Showing 1 changed file with 23 additions and 15 deletions.
38 changes: 23 additions & 15 deletions dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,34 +139,42 @@ func (r *reducer) encodeAndWrite(
writer *badger.StreamWriter, entryCh chan []*pb.MapEntry, closer *y.Closer) {
defer closer.Done()

preds := make(map[string]uint32)

var listSize int
list := &bpb.KVList{}

preds := make(map[string]uint32)
setStreamId := func(kv *bpb.KV) {
pk := x.Parse(kv.Key)
x.AssertTrue(len(pk.Attr) > 0)

// We don't need to consider the data prefix, count prefix, etc. because each predicate
// contains sorted keys, the way they are produced.
streamId := preds[pk.Attr]
if streamId == 0 {
streamId = atomic.AddUint32(&r.streamId, 1)
preds[pk.Attr] = streamId
}
// TODO: Having many stream ids can cause memory issues with StreamWriter. So, we
// should build a way in StreamWriter to indicate that the stream is over, so the
// table for that stream can be flushed and memory released.
kv.StreamId = streamId
}

for batch := range entryCh {
listSize += r.toList(batch, list)
if listSize > 4<<20 {
for _, kv := range list.Kv {
pk := x.Parse(kv.Key)
if len(pk.Attr) == 0 {
continue
}
streamId := preds[pk.Attr]
if streamId == 0 {
streamId = atomic.AddUint32(&r.streamId, 1)
preds[pk.Attr] = streamId
}
// TODO: Having many stream ids can cause memory issues with StreamWriter. So, we
// should build a way in StreamWriter to indicate that the stream is over, so the
// table for that stream can be flushed and memory released.
kv.StreamId = streamId
setStreamId(kv)
}
x.Check(writer.Write(list))
list = &bpb.KVList{}
listSize = 0
}
}
if len(list.Kv) > 0 {
for _, kv := range list.Kv {
setStreamId(kv)
}
x.Check(writer.Write(list))
}
}
Expand Down

0 comments on commit a67fbfd

Please sign in to comment.