diff --git a/dgraph/cmd/bulk/loader.go b/dgraph/cmd/bulk/loader.go index dafe3071051..ff536135040 100644 --- a/dgraph/cmd/bulk/loader.go +++ b/dgraph/cmd/bulk/loader.go @@ -61,6 +61,7 @@ type options struct { IgnoreErrors bool CustomTokenizers string NewUids bool + ClientDir string MapShards int ReduceShards int @@ -161,7 +162,15 @@ func readSchema(filename string) *schema.ParsedSchema { func (ld *loader) mapStage() { ld.prog.setPhase(mapPhase) - ld.xids = xidmap.New(ld.zero, nil) + var db *badger.DB + if len(ld.opt.ClientDir) > 0 { + x.Check(os.MkdirAll(ld.opt.ClientDir, 0700)) + + var err error + db, err = badger.Open(badger.DefaultOptions(ld.opt.ClientDir)) + x.Checkf(err, "Error while creating badger KV posting store") + } + ld.xids = xidmap.New(ld.zero, db) files := x.FindDataFiles(ld.opt.DataFiles, []string{".rdf", ".rdf.gz", ".json", ".json.gz"}) if len(files) == 0 { @@ -224,6 +233,9 @@ func (ld *loader) mapStage() { ld.mappers[i] = nil } x.Check(ld.xids.Flush()) + if db != nil { + x.Check(db.Close()) + } ld.xids = nil } diff --git a/dgraph/cmd/bulk/run.go b/dgraph/cmd/bulk/run.go index 56027c31bd1..60c7ecbb1f8 100644 --- a/dgraph/cmd/bulk/run.go +++ b/dgraph/cmd/bulk/run.go @@ -81,6 +81,7 @@ func init() { flag.Bool("version", false, "Prints the version of Dgraph Bulk Loader.") flag.BoolP("store_xids", "x", false, "Generate an xid edge for each node.") flag.StringP("zero", "z", "localhost:5080", "gRPC address for Dgraph zero") + flag.String("xidmap", "", "Directory to store xid to uid mapping") // TODO: Potentially move http server to main. flag.String("http", "localhost:8080", "Address to serve http (pprof).") @@ -129,6 +130,7 @@ func run() { ReduceShards: Bulk.Conf.GetInt("reduce_shards"), CustomTokenizers: Bulk.Conf.GetString("custom_tokenizers"), NewUids: Bulk.Conf.GetBool("new_uids"), + ClientDir: Bulk.Conf.GetString("xidmap"), BadgerKeyFile: Bulk.Conf.GetString("encryption_key_file"), BadgerCompressionLevel: Bulk.Conf.GetInt("badger.compression_level"),