diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index c1e5d968a20..92521c41781 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -77,9 +77,7 @@ func (r *reducer) run() error { } writer := db.NewStreamWriter() - if err := writer.Prepare(); err != nil { - x.Check(err) - } + x.Check(writer.Prepare()) ci := &countIndexer{reducer: r, writer: writer} sort.Slice(partitionKeys, func(i, j int) bool { @@ -93,9 +91,7 @@ func (r *reducer) run() error { r.reduce(partitionKeys, mapItrs, ci) ci.wait() - if err := writer.Flush(); err != nil { - x.Check(err) - } + x.Check(writer.Flush()) for _, itr := range mapItrs { if err := itr.Close(); err != nil { fmt.Printf("Error while closing iterator: %v", err) @@ -316,6 +312,9 @@ func (r *reducer) encode(entryCh chan *encodeRequest, closer *y.Closer) { x.Check(err) x.AssertTrue(len(pk.Attr) > 0) kv.StreamId = r.streamIdFor(pk.Attr) + if pk.HasStartUid { + kv.StreamId |= 0x80000000 + } } req.countKeys = countKeys req.wg.Done() @@ -475,15 +474,23 @@ func (r *reducer) toList(bufEntries [][]byte, list *bpb.KVList) []*countIndexEnt } pl.Pack = codec.Encode(uids, 256) - val, err := pl.Marshal() - x.Check(err) - kv := &bpb.KV{ - Key: y.Copy(currentKey), - Value: val, - UserMeta: userMeta, - Version: writeVersionTs, + shouldSplit := pl.Size() > (1<<20)/2 && len(pl.Pack.Blocks) > 1 + if shouldSplit { + l := posting.NewList(y.Copy(currentKey), pl, writeVersionTs) + kvs, err := l.Rollup() + x.Check(err) + list.Kv = append(list.Kv, kvs...) + } else { + val, err := pl.Marshal() + x.Check(err) + kv := &bpb.KV{ + Key: y.Copy(currentKey), + Value: val, + UserMeta: userMeta, + Version: writeVersionTs, + } + list.Kv = append(list.Kv, kv) } - list.Kv = append(list.Kv, kv) uids = uids[:0] pl.Reset() diff --git a/posting/list.go b/posting/list.go index 0c99906b51c..040f8a0aae5 100644 --- a/posting/list.go +++ b/posting/list.go @@ -798,6 +798,12 @@ func (l *List) Rollup() ([]*bpb.KV, error) { kvs = append(kvs, kv) } + // Sort the KVs by their key so that the main part of the list is at the + // start of the list and all other parts appear in the order of their start UID. + sort.Slice(kvs, func(i, j int) bool { + return bytes.Compare(kvs[i].Key, kvs[j].Key) <= 0 + }) + return kvs, nil }