Skip to content

Commit

Permalink
Use a different stream writer id for split keys.
Browse files Browse the repository at this point in the history
  • Loading branch information
martinmr committed Mar 17, 2020
1 parent 6251673 commit 1426660
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 14 deletions.
35 changes: 21 additions & 14 deletions dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ func (r *reducer) run() error {
}

writer := db.NewStreamWriter()
if err := writer.Prepare(); err != nil {
x.Check(err)
}
x.Check(writer.Prepare())

ci := &countIndexer{reducer: r, writer: writer}
sort.Slice(partitionKeys, func(i, j int) bool {
Expand All @@ -93,9 +91,7 @@ func (r *reducer) run() error {
r.reduce(partitionKeys, mapItrs, ci)
ci.wait()

if err := writer.Flush(); err != nil {
x.Check(err)
}
x.Check(writer.Flush())
for _, itr := range mapItrs {
if err := itr.Close(); err != nil {
fmt.Printf("Error while closing iterator: %v", err)
Expand Down Expand Up @@ -316,6 +312,9 @@ func (r *reducer) encode(entryCh chan *encodeRequest, closer *y.Closer) {
x.Check(err)
x.AssertTrue(len(pk.Attr) > 0)
kv.StreamId = r.streamIdFor(pk.Attr)
if pk.HasStartUid {
kv.StreamId |= 0x80000000
}
}
req.countKeys = countKeys
req.wg.Done()
Expand Down Expand Up @@ -475,15 +474,23 @@ func (r *reducer) toList(bufEntries [][]byte, list *bpb.KVList) []*countIndexEnt
}

pl.Pack = codec.Encode(uids, 256)
val, err := pl.Marshal()
x.Check(err)
kv := &bpb.KV{
Key: y.Copy(currentKey),
Value: val,
UserMeta: userMeta,
Version: writeVersionTs,
shouldSplit := pl.Size() > (1<<20)/2 && len(pl.Pack.Blocks) > 1
if shouldSplit {
l := posting.NewList(y.Copy(currentKey), pl, writeVersionTs)
kvs, err := l.Rollup()
x.Check(err)
list.Kv = append(list.Kv, kvs...)
} else {
val, err := pl.Marshal()
x.Check(err)
kv := &bpb.KV{
Key: y.Copy(currentKey),
Value: val,
UserMeta: userMeta,
Version: writeVersionTs,
}
list.Kv = append(list.Kv, kv)
}
list.Kv = append(list.Kv, kv)

uids = uids[:0]
pl.Reset()
Expand Down
6 changes: 6 additions & 0 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,12 @@ func (l *List) Rollup() ([]*bpb.KV, error) {
kvs = append(kvs, kv)
}

// Sort the KVs by their key so that the main part of the list is at the
// start of the list and all other parts appear in the order of their start UID.
sort.Slice(kvs, func(i, j int) bool {
return bytes.Compare(kvs[i].Key, kvs[j].Key) <= 0
})

return kvs, nil
}

Expand Down

0 comments on commit 1426660

Please sign in to comment.