From f10183ec77862e0585b2cfe04945bbf7a2604482 Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Fri, 28 Jun 2019 16:15:52 -0700 Subject: [PATCH 01/10] Removing the first empty line and comma in the json output --- worker/export.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/worker/export.go b/worker/export.go index 72f298cb4da..30f7374e3d5 100644 --- a/worker/export.go +++ b/worker/export.go @@ -68,11 +68,11 @@ var exportFormats = map[string]exportFormat{ } type exporter struct { - pl *posting.List - uid uint64 - attr string - readTs uint64 - counter int + pl *posting.List + uid uint64 + attr string + readTs uint64 + dataCounter int } // Map from our types to RDF type. Useful when writing storage types @@ -149,8 +149,7 @@ func escapedString(str string) string { func (e *exporter) toJSON() (*bpb.KVList, error) { bp := new(bytes.Buffer) - - if e.counter != 1 { + if e.dataCounter != 1 { fmt.Fprint(bp, ",\n") } @@ -475,8 +474,8 @@ func export(ctx context.Context, in *pb.ExportRequest) error { } e := &exporter{ - readTs: in.ReadTs, - counter: 0, + readTs: in.ReadTs, + dataCounter: 0, } stream := pstore.NewStreamAt(in.ReadTs) @@ -504,7 +503,6 @@ func export(ctx context.Context, in *pb.ExportRequest) error { item := itr.Item() pk := x.Parse(item.Key()) - e.counter += 1 e.uid = pk.Uid e.attr = pk.Attr @@ -536,6 +534,7 @@ func export(ctx context.Context, in *pb.ExportRequest) error { return toType(pk.Attr, update) case pk.IsData(): + e.dataCounter += 1 e.pl, err = posting.ReadPostingList(key, itr) if err != nil { return nil, err From 64988bff25fbe76c889579697c174acfd4fd16c3 Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Fri, 28 Jun 2019 18:05:50 -0700 Subject: [PATCH 02/10] Moved the separator logic into the Send function, since the KeyToList is called concurrently, and adding locks to protect the counter would be too expensive --- worker/export.go | 36 +++++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/worker/export.go b/worker/export.go index 30f7374e3d5..32713253aae 100644 --- a/worker/export.go +++ b/worker/export.go @@ -68,11 +68,10 @@ var exportFormats = map[string]exportFormat{ } type exporter struct { - pl *posting.List - uid uint64 - attr string - readTs uint64 - dataCounter int + pl *posting.List + uid uint64 + attr string + readTs uint64 } // Map from our types to RDF type. Useful when writing storage types @@ -149,10 +148,6 @@ func escapedString(str string) string { func (e *exporter) toJSON() (*bpb.KVList, error) { bp := new(bytes.Buffer) - if e.dataCounter != 1 { - fmt.Fprint(bp, ",\n") - } - // We could output more compact JSON at the cost of code complexity. // Leaving it simple for now. @@ -474,8 +469,7 @@ func export(ctx context.Context, in *pb.ExportRequest) error { } e := &exporter{ - readTs: in.ReadTs, - dataCounter: 0, + readTs: in.ReadTs, } stream := pstore.NewStreamAt(in.ReadTs) @@ -534,12 +528,10 @@ func export(ctx context.Context, in *pb.ExportRequest) error { return toType(pk.Attr, update) case pk.IsData(): - e.dataCounter += 1 e.pl, err = posting.ReadPostingList(key, itr) if err != nil { return nil, err } - switch in.Format { case "json": return e.toJSON() @@ -555,6 +547,18 @@ func export(ctx context.Context, in *pb.ExportRequest) error { return nil, nil } + hasDataBefore := false + var separator []byte + switch in.Format { + case "json": + separator = []byte(",\n") + case "rdf": + // the separator for RDF should be empty since the toRDF function already + // adds newline to each RDF entry + default: + glog.Fatalf("Invalid export format found: %s", in.Format) + } + stream.Send = func(list *bpb.KVList) error { for _, kv := range list.Kv { var writer *fileWriter @@ -567,9 +571,15 @@ func export(ctx context.Context, in *pb.ExportRequest) error { glog.Fatalf("Invalid data type found: %x", kv.Key) } + if hasDataBefore { + if _, err := writer.gw.Write(separator); err != nil { + return err + } + } if _, err := writer.gw.Write(kv.Value); err != nil { return err } + hasDataBefore = true } // Once all the sends are done, writers must be flushed and closed in order. return nil From 996ec6c5f7c24f3122a58cf62712ace8dec1e2df Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Wed, 17 Jul 2019 14:21:55 -0700 Subject: [PATCH 03/10] When there are multiple files specified for bulk loader, the existence validation should be run on individual files --- dgraph/cmd/bulk/run.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/dgraph/cmd/bulk/run.go b/dgraph/cmd/bulk/run.go index 65eca9d38dc..148754a98d4 100644 --- a/dgraph/cmd/bulk/run.go +++ b/dgraph/cmd/bulk/run.go @@ -134,10 +134,16 @@ func run() { if opt.DataFiles == "" { fmt.Fprint(os.Stderr, "RDF or JSON file(s) location must be specified.\n") os.Exit(1) - } else if _, err := os.Stat(opt.DataFiles); err != nil && os.IsNotExist(err) { - fmt.Fprintf(os.Stderr, "Data path(%v) does not exist.\n", opt.DataFiles) - os.Exit(1) + } else { + fileList := strings.Split(opt.DataFiles, ",") + for _, file := range fileList { + if _, err := os.Stat(file); err != nil && os.IsNotExist(err) { + fmt.Fprintf(os.Stderr, "Data path(%v) does not exist.\n", file) + os.Exit(1) + } + } } + if opt.ReduceShards > opt.MapShards { fmt.Fprintf(os.Stderr, "Invalid flags: reduce_shards(%d) should be <= map_shards(%d)\n", opt.ReduceShards, opt.MapShards) From 4d96ba624a1cd9f56fd24795ca38536a8c38e828 Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Wed, 17 Jul 2019 15:25:15 -0700 Subject: [PATCH 04/10] Fixing the export to not use shared exporter variable --- worker/export.go | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/worker/export.go b/worker/export.go index e818a6d9f0e..926d93dcb95 100644 --- a/worker/export.go +++ b/worker/export.go @@ -468,10 +468,6 @@ func export(ctx context.Context, in *pb.ExportRequest) error { return err } - e := &exporter{ - readTs: in.ReadTs, - } - stream := pstore.NewStreamAt(in.ReadTs) stream.LogPrefix = "Export" stream.ChooseKey = func(item *badger.Item) bool { @@ -496,7 +492,9 @@ func export(ctx context.Context, in *pb.ExportRequest) error { stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) { item := itr.Item() pk := x.Parse(item.Key()) - + e := &exporter{ + readTs: in.ReadTs, + } e.uid = pk.Uid e.attr = pk.Attr @@ -571,15 +569,19 @@ func export(ctx context.Context, in *pb.ExportRequest) error { glog.Fatalf("Invalid data type found: %x", kv.Key) } - if hasDataBefore { - if _, err := writer.gw.Write(separator); err != nil { - return err + if kv.Version == 1 { // only insert separator for data + if hasDataBefore { + if _, err := writer.gw.Write(separator); err != nil { + return err + } } + // change the hasDataBefore flag so that the next data entry will have a separator + // prepended + hasDataBefore = true } if _, err := writer.gw.Write(kv.Value); err != nil { return err } - hasDataBefore = true } // Once all the sends are done, writers must be flushed and closed in order. return nil From 120156b9c1c886e2c3eba76611afe95bc8025b62 Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Wed, 17 Jul 2019 16:19:06 -0700 Subject: [PATCH 05/10] Improving the doc --- worker/export.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker/export.go b/worker/export.go index 926d93dcb95..9aa560b22dd 100644 --- a/worker/export.go +++ b/worker/export.go @@ -551,8 +551,8 @@ func export(ctx context.Context, in *pb.ExportRequest) error { case "json": separator = []byte(",\n") case "rdf": - // the separator for RDF should be empty since the toRDF function already - // adds newline to each RDF entry + // The separator for RDF should be empty since the toRDF function already + // adds newline to each RDF entry. default: glog.Fatalf("Invalid export format found: %s", in.Format) } From c703db21d35c39cbfeb7adeb471a883259c347e1 Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Fri, 19 Jul 2019 15:34:21 -0700 Subject: [PATCH 06/10] Fix race conditions in the bulk loader since StreamWriter.Write is not thread-safe --- dgraph/cmd/bulk/count_index.go | 24 ++++++++++++++---------- dgraph/cmd/bulk/reduce.go | 1 - 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/dgraph/cmd/bulk/count_index.go b/dgraph/cmd/bulk/count_index.go index 2c09ef7178d..34b11bffd5c 100644 --- a/dgraph/cmd/bulk/count_index.go +++ b/dgraph/cmd/bulk/count_index.go @@ -19,7 +19,6 @@ package bulk import ( "bytes" "sort" - "sync" "sync/atomic" "github.com/dgraph-io/badger" @@ -41,12 +40,24 @@ type countIndexer struct { writer *badger.StreamWriter cur current counts map[int][]uint64 - wg sync.WaitGroup } // addUid adds the uid from rawKey to a count index if a count index is // required by the schema. This method expects keys to be passed into it in // sorted order. + +// record that the given key has the given count + +// convert rawKey into key +// key must be non nill and neither data nor reverse +// check if key matches the current countIndexer by comparing the pred and reverse +// if not the same as the current count indexer +// if the current counts is greater than 0, launch go routines to write the index +// clean counts to prepare for the new key + +// +// set the pred, rev and track to the current key's pred, reverse, +// and track to see if we need to track the count index according to the schema func (c *countIndexer) addUid(rawKey []byte, count int) { key := x.Parse(rawKey) if key == nil || (!key.IsData() && !key.IsReverse()) { @@ -59,8 +70,7 @@ func (c *countIndexer) addUid(rawKey []byte, count int) { if !sameIndexKey { if len(c.counts) > 0 { - c.wg.Add(1) - go c.writeIndex(c.cur.pred, c.cur.rev, c.counts) + c.writeIndex(c.cur.pred, c.cur.rev, c.counts) } if len(c.counts) > 0 || c.counts == nil { c.counts = make(map[int][]uint64) @@ -75,8 +85,6 @@ func (c *countIndexer) addUid(rawKey []byte, count int) { } func (c *countIndexer) writeIndex(pred string, rev bool, counts map[int][]uint64) { - defer c.wg.Done() - streamId := atomic.AddUint32(&c.streamId, 1) list := &bpb.KVList{} for count, uids := range counts { @@ -101,7 +109,3 @@ func (c *countIndexer) writeIndex(pred string, rev bool, counts map[int][]uint64 x.Check(err) } } - -func (c *countIndexer) wait() { - c.wg.Wait() -} diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index 283bd1ebce7..05b4b5cfaee 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -70,7 +70,6 @@ func (r *reducer) run() error { ci := &countIndexer{reducer: r, writer: writer} r.reduce(mapItrs, ci) - ci.wait() if err := writer.Flush(); err != nil { x.Check(err) From a35899b6cd0b584edef13f96f87ebae39f645123 Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Fri, 19 Jul 2019 15:42:10 -0700 Subject: [PATCH 07/10] Fixed the encodeAndWrite --- dgraph/cmd/bulk/reduce.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index 05b4b5cfaee..48cec443c38 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -135,9 +135,7 @@ func newMapIterator(filename string) *mapIterator { } func (r *reducer) encodeAndWrite( - writer *badger.StreamWriter, entryCh chan []*pb.MapEntry, closer *y.Closer) { - defer closer.Done() - + writer *badger.StreamWriter, entryCh chan []*pb.MapEntry) { var listSize int list := &bpb.KVList{} @@ -180,9 +178,6 @@ func (r *reducer) encodeAndWrite( func (r *reducer) reduce(mapItrs []*mapIterator, ci *countIndexer) { entryCh := make(chan []*pb.MapEntry, 100) - closer := y.NewCloser(1) - defer closer.SignalAndWait() - var ph postingHeap for _, itr := range mapItrs { me := itr.Next() @@ -194,7 +189,7 @@ func (r *reducer) reduce(mapItrs []*mapIterator, ci *countIndexer) { } writer := ci.writer - go r.encodeAndWrite(writer, entryCh, closer) + r.encodeAndWrite(writer, entryCh) const batchSize = 10000 const batchAlloc = batchSize * 11 / 10 From 10bdf8a3dcd8e048453b6b3fe25e0cf56fcf88fe Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Fri, 19 Jul 2019 16:08:19 -0700 Subject: [PATCH 08/10] remove parallelism in encodeANdWrite --- dgraph/cmd/bulk/reduce.go | 39 ++++++++++++++------------------------- 1 file changed, 14 insertions(+), 25 deletions(-) diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index 48cec443c38..7358315e942 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -134,12 +134,14 @@ func newMapIterator(filename string) *mapIterator { return &mapIterator{fd: fd, reader: bufio.NewReaderSize(fd, 16<<10)} } +// encodeAndWrite converts the given batch into a KVList and then +// send the list to the stream writer +// while iterating the kvs in the batch, it also sets the kv's stream id +// to a previous mapped stream id for the attribute (predicate), or a newly assigned +// stream id if the attribute does not exist in the preds map func (r *reducer) encodeAndWrite( - writer *badger.StreamWriter, entryCh chan []*pb.MapEntry) { - var listSize int - list := &bpb.KVList{} + writer *badger.StreamWriter, batch []*pb.MapEntry, preds map[string]uint32) { - preds := make(map[string]uint32) setStreamId := func(kv *bpb.KV) { pk := x.Parse(kv.Key) x.AssertTrue(len(pk.Attr) > 0) @@ -157,27 +159,15 @@ func (r *reducer) encodeAndWrite( kv.StreamId = streamId } - for batch := range entryCh { - listSize += r.toList(batch, list) - if listSize > 4<<20 { - for _, kv := range list.Kv { - setStreamId(kv) - } - x.Check(writer.Write(list)) - list = &bpb.KVList{} - listSize = 0 - } - } - if len(list.Kv) > 0 { - for _, kv := range list.Kv { - setStreamId(kv) - } - x.Check(writer.Write(list)) + list := &bpb.KVList{} + r.toList(batch, list) + for _, kv := range list.Kv { + setStreamId(kv) } + x.Check(writer.Write(list)) } func (r *reducer) reduce(mapItrs []*mapIterator, ci *countIndexer) { - entryCh := make(chan []*pb.MapEntry, 100) var ph postingHeap for _, itr := range mapItrs { me := itr.Next() @@ -189,7 +179,6 @@ func (r *reducer) reduce(mapItrs []*mapIterator, ci *countIndexer) { } writer := ci.writer - r.encodeAndWrite(writer, entryCh) const batchSize = 10000 const batchAlloc = batchSize * 11 / 10 @@ -197,6 +186,7 @@ func (r *reducer) reduce(mapItrs []*mapIterator, ci *countIndexer) { var prevKey []byte var plistLen int + preds := make(map[string]uint32) for len(ph.nodes) > 0 { node0 := &ph.nodes[0] me := node0.mapEntry @@ -217,7 +207,7 @@ func (r *reducer) reduce(mapItrs []*mapIterator, ci *countIndexer) { } if len(batch) >= batchSize && keyChanged { - entryCh <- batch + r.encodeAndWrite(writer, batch, preds) batch = make([]*pb.MapEntry, 0, batchAlloc) } prevKey = me.Key @@ -225,12 +215,11 @@ func (r *reducer) reduce(mapItrs []*mapIterator, ci *countIndexer) { plistLen++ } if len(batch) > 0 { - entryCh <- batch + r.encodeAndWrite(writer, batch, preds) } if plistLen > 0 { ci.addUid(prevKey, plistLen) } - close(entryCh) } type heapNode struct { From a6d561d15fd92383b5242f76ac4a603f26d90197 Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Fri, 19 Jul 2019 16:35:49 -0700 Subject: [PATCH 09/10] Removed the unnecessary comments --- dgraph/cmd/bulk/count_index.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/dgraph/cmd/bulk/count_index.go b/dgraph/cmd/bulk/count_index.go index 34b11bffd5c..8e9f359010e 100644 --- a/dgraph/cmd/bulk/count_index.go +++ b/dgraph/cmd/bulk/count_index.go @@ -45,19 +45,6 @@ type countIndexer struct { // addUid adds the uid from rawKey to a count index if a count index is // required by the schema. This method expects keys to be passed into it in // sorted order. - -// record that the given key has the given count - -// convert rawKey into key -// key must be non nill and neither data nor reverse -// check if key matches the current countIndexer by comparing the pred and reverse -// if not the same as the current count indexer -// if the current counts is greater than 0, launch go routines to write the index -// clean counts to prepare for the new key - -// -// set the pred, rev and track to the current key's pred, reverse, -// and track to see if we need to track the count index according to the schema func (c *countIndexer) addUid(rawKey []byte, count int) { key := x.Parse(rawKey) if key == nil || (!key.IsData() && !key.IsReverse()) { From 07a94afd4561590d6fc4f4937ac0deaa733f7f4c Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Fri, 19 Jul 2019 16:38:13 -0700 Subject: [PATCH 10/10] Improving docs --- dgraph/cmd/bulk/reduce.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index 7358315e942..7028d76eeda 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -137,8 +137,8 @@ func newMapIterator(filename string) *mapIterator { // encodeAndWrite converts the given batch into a KVList and then // send the list to the stream writer // while iterating the kvs in the batch, it also sets the kv's stream id -// to a previous mapped stream id for the attribute (predicate), or a newly assigned -// stream id if the attribute does not exist in the preds map +// to a previously recorded stream id corresponding to the kv's attribute (predicate), +// or a newly assigned stream id if the attribute has never been recorded func (r *reducer) encodeAndWrite( writer *badger.StreamWriter, batch []*pb.MapEntry, preds map[string]uint32) {