From 45f02865c02e05639a80d2fdfeb2868af6e56bb4 Mon Sep 17 00:00:00 2001 From: NamanJain8 Date: Fri, 5 Feb 2021 01:37:17 +0530 Subject: [PATCH 1/7] basic bulk loader working --- chunker/json_parser.go | 33 ++++++++++++++++++++--- chunker/json_parser_test.go | 1 + chunker/rdf_parser.go | 10 +++++++ dgraph/cmd/bulk/loader.go | 12 ++++++++- dgraph/cmd/bulk/mapper.go | 35 +++++++++++++++--------- dgraph/cmd/bulk/merge_shards.go | 1 + dgraph/cmd/bulk/run.go | 4 +++ dgraph/cmd/bulk/schema.go | 47 +++++++++++++++++++++++---------- dgraph/cmd/bulk/shard_map.go | 1 + dgraph/cmd/debug/run.go | 4 ++- go.mod | 5 ++-- gql/mutation.go | 4 ++- graphql/admin/namespace.go | 5 ++-- query/mutation.go | 1 + x/keys.go | 6 +++++ 15 files changed, 131 insertions(+), 38 deletions(-) diff --git a/chunker/json_parser.go b/chunker/json_parser.go index 4f894052dde..1d55f8ebb3c 100644 --- a/chunker/json_parser.go +++ b/chunker/json_parser.go @@ -189,8 +189,9 @@ func parseScalarFacets(m map[string]interface{}, prefix string) ([]*api.Facet, e // This is the response for a map[string]interface{} i.e. a struct. type mapResponse struct { - uid string // uid retrieved or allocated for the node. - fcts []*api.Facet // facets on the edge connecting this node to the source if any. + uid string // uid retrieved or allocated for the node. + namespace uint64 // namespace to which the node belongs. + fcts []*api.Facet // facets on the edge connecting this node to the source if any. } func handleBasicType(k string, v interface{}, op int, nq *api.NQuad) error { @@ -267,6 +268,7 @@ func (buf *NQuadBuffer) checkForDeletion(mr mapResponse, m map[string]interface{ buf.Push(&api.NQuad{ Subject: mr.uid, Predicate: x.Star, + Namespace: mr.namespace, ObjectValue: &api.Value{Val: &api.Value_DefaultVal{DefaultVal: x.Star}}, }) } @@ -447,12 +449,29 @@ func (buf *NQuadBuffer) mapToNquads(m map[string]interface{}, op int, parentPred mr.uid = getNextBlank() } + namespace := x.DefaultNamespace + if ns, ok := m["namespace"]; ok { + switch ns := ns.(type) { + case json.Number: + nsi, err := ns.Int64() + if err != nil { + return mr, err + } + namespace = uint64(nsi) + + // this int64 case is needed for FastParseJSON, which doesn't use json.Number + case int64: + namespace = uint64(ns) + } + } + mr.namespace = namespace + for pred, v := range m { // We have already extracted the uid above so we skip that edge. // v can be nil if user didn't set a value and if omitEmpty was not supplied as JSON // option. // We also skip facets here because we parse them with the corresponding predicate. - if pred == "uid" { + if pred == "uid" || pred == "namespace" { continue } @@ -462,6 +481,7 @@ func (buf *NQuadBuffer) mapToNquads(m map[string]interface{}, op int, parentPred nq := &api.NQuad{ Subject: mr.uid, Predicate: pred, + Namespace: namespace, ObjectValue: &api.Value{Val: &api.Value_DefaultVal{DefaultVal: x.Star}}, } // Here we split predicate and lang directive (ex: "name@en"), if needed. With JSON @@ -478,6 +498,7 @@ func (buf *NQuadBuffer) mapToNquads(m map[string]interface{}, op int, parentPred nq := api.NQuad{ Subject: mr.uid, Predicate: pred, + Namespace: namespace, } prefix := pred + x.FacetDelimeter @@ -545,6 +566,7 @@ func (buf *NQuadBuffer) mapToNquads(m map[string]interface{}, op int, parentPred nq := api.NQuad{ Subject: mr.uid, Predicate: pred, + Namespace: namespace, } switch iv := item.(type) { @@ -739,8 +761,11 @@ func (buf *NQuadBuffer) ParseJSON(b []byte, op int) error { return nil } mr, err := buf.mapToNquads(ms, op, "") + if err != nil { + return err + } buf.checkForDeletion(mr, ms, op) - return err + return nil } // ParseJSON is a convenience wrapper function to get all NQuads in one call. This can however, lead diff --git a/chunker/json_parser_test.go b/chunker/json_parser_test.go index f49044c12fc..4808940cc56 100644 --- a/chunker/json_parser_test.go +++ b/chunker/json_parser_test.go @@ -116,6 +116,7 @@ func TestNquadsFromJson1(t *testing.T) { tn := time.Now().UTC() m := true p := Person{ + Uid: "1", Name: "Alice", Age: 26, Married: &m, diff --git a/chunker/rdf_parser.go b/chunker/rdf_parser.go index 1f465095e36..dd798bd54b8 100644 --- a/chunker/rdf_parser.go +++ b/chunker/rdf_parser.go @@ -226,6 +226,16 @@ L: return rnq, errors.Errorf("NQuad failed sanity check:%+v", rnq) } + // TODO(Naman): Ensure the label contains valid namespace. + // Append the namespace to the predicate before returning NQuad. + if rnq.Label != "" { + ns, err := strconv.ParseUint(rnq.Label, 0, 64) + if err != nil { + return rnq, err + } + rnq.Namespace = ns + } + return rnq, nil } diff --git a/dgraph/cmd/bulk/loader.go b/dgraph/cmd/bulk/loader.go index 537c63daf7d..665c15f0a58 100644 --- a/dgraph/cmd/bulk/loader.go +++ b/dgraph/cmd/bulk/loader.go @@ -76,6 +76,8 @@ type options struct { MapShards int ReduceShards int + Namespace uint64 + shardOutputDirs []string // ........... Badger options .......... @@ -100,6 +102,7 @@ type state struct { dbs []*badger.DB tmpDbs []*badger.DB // Temporary DB to write the split lists to avoid ordering issues. writeTs uint64 // All badger writes use this timestamp + namespaces *sync.Map // To store the encountered namespaces. } type loader struct { @@ -136,6 +139,7 @@ func newLoader(opt *options) *loader { // Lots of gz readers, so not much channel buffer needed. readerChunkCh: make(chan *bytes.Buffer, opt.NumGoroutines), writeTs: getWriteTimestamp(zero), + namespaces: &sync.Map{}, } st.schema = newSchemaStore(readSchema(opt), opt, st) ld := &loader{ @@ -183,7 +187,7 @@ func readSchema(opt *options) *schema.ParsedSchema { buf, err := ioutil.ReadAll(r) x.Check(err) - result, err := schema.Parse(string(buf)) + result, err := schema.ParseWithNamespace(string(buf), opt.Namespace) x.Check(err) return result } @@ -262,6 +266,8 @@ func (ld *loader) mapStage() { // Send the graphql triples ld.processGqlSchema(loadType) + // By this time, we will know about all the namespaces encountered. Else, we depend on the schema + // to contain whole of the information about namespace. close(ld.readerChunkCh) mapperWg.Wait() @@ -276,6 +282,7 @@ func (ld *loader) mapStage() { ld.xids = nil } +// TODO(Naman): Fix this for multi-tenancy. func (ld *loader) processGqlSchema(loadType chunker.InputFormat) { if ld.opt.GqlSchemaFile == "" { return @@ -299,6 +306,7 @@ func (ld *loader) processGqlSchema(loadType chunker.InputFormat) { buf, err := ioutil.ReadAll(r) x.Check(err) + // TODO(Naman): We will nedd this for all the namespaces. rdfSchema := `_:gqlschema "dgraph.graphql" . _:gqlschema "dgraph.graphql.schema" . _:gqlschema %s . @@ -310,6 +318,8 @@ func (ld *loader) processGqlSchema(loadType chunker.InputFormat) { "dgraph.graphql.schema": %s }` + // TODO(Naman): Process the GQL schema here. + gqlBuf := &bytes.Buffer{} schema := strconv.Quote(string(buf)) switch loadType { diff --git a/dgraph/cmd/bulk/mapper.go b/dgraph/cmd/bulk/mapper.go index 79ada3b837b..a5c17757b24 100644 --- a/dgraph/cmd/bulk/mapper.go +++ b/dgraph/cmd/bulk/mapper.go @@ -84,7 +84,7 @@ type MapEntry []byte // } func mapEntrySize(key []byte, p *pb.Posting) int { - return 8 + 4 + 4 + len(key) + p.Size() + return 8 + 4 + 4 + len(key) + p.Size() // UID + keySz + postingSz + len(key) + size(p) } func marshalMapEntry(dst []byte, uid uint64, key []byte, p *pb.Posting) { @@ -290,14 +290,17 @@ func (m *mapper) addMapEntry(key []byte, p *pb.Posting, shard int) { } func (m *mapper) processNQuad(nq gql.NQuad) { - sid := m.uid(nq.GetSubject()) + if m.opt.Namespace != math.MaxUint64 { + nq.Namespace = m.opt.Namespace + } + sid := m.uid(nq.GetSubject(), nq.Namespace) if sid == 0 { panic(fmt.Sprintf("invalid UID with value 0 for %v", nq.GetSubject())) } var oid uint64 var de *pb.DirectedEdge if nq.GetObjectValue() == nil { - oid = m.uid(nq.GetObjectId()) + oid = m.uid(nq.GetObjectId(), nq.Namespace) if oid == 0 { panic(fmt.Sprintf("invalid UID with value 0 for %v", nq.GetObjectId())) } @@ -308,19 +311,23 @@ func (m *mapper) processNQuad(nq gql.NQuad) { x.Check(err) } + m.schema.checkAndSetInitialSchema(nq.Namespace) + + // Appropriate schema must exist for the nquad's namespace by this time. + attr := x.NamespaceAttr(nq.Namespace, nq.Predicate) fwd, rev := m.createPostings(nq, de) - shard := m.state.shards.shardFor(nq.Predicate) - key := x.DataKey(nq.Predicate, sid) + shard := m.state.shards.shardFor(attr) + key := x.DataKey(attr, sid) m.addMapEntry(key, fwd, shard) if rev != nil { - key = x.ReverseKey(nq.Predicate, oid) + key = x.ReverseKey(attr, oid) m.addMapEntry(key, rev, shard) } m.addIndexMapEntries(nq, de) } -func (m *mapper) uid(xid string) uint64 { +func (m *mapper) uid(xid string, ns uint64) uint64 { if !m.opt.NewUids { if uid, err := strconv.ParseUint(xid, 0, 64); err == nil { m.xids.BumpTo(uid) @@ -328,10 +335,10 @@ func (m *mapper) uid(xid string) uint64 { } } - return m.lookupUid(xid) + return m.lookupUid(xid, ns) } -func (m *mapper) lookupUid(xid string) uint64 { +func (m *mapper) lookupUid(xid string, ns uint64) uint64 { // We create a copy of xid string here because it is stored in // the map in AssignUid and going to be around throughout the process. // We don't want to keep the whole line that we read from file alive. @@ -359,6 +366,7 @@ func (m *mapper) lookupUid(xid string) uint64 { ObjectValue: &api.Value{ Val: &api.Value_StrVal{StrVal: xid}, }, + Namespace: ns, }} m.processNQuad(nq) return uid @@ -370,7 +378,7 @@ func (m *mapper) createPostings(nq gql.NQuad, m.schema.validateType(de, nq.ObjectValue == nil) p := posting.NewPosting(de) - sch := m.schema.getSchema(nq.GetPredicate()) + sch := m.schema.getSchema(x.NamespaceAttr(nq.GetNamespace(), nq.GetPredicate())) if nq.GetObjectValue() != nil { lang := de.GetLang() switch { @@ -405,7 +413,7 @@ func (m *mapper) addIndexMapEntries(nq gql.NQuad, de *pb.DirectedEdge) { return // Cannot index UIDs } - sch := m.schema.getSchema(nq.GetPredicate()) + sch := m.schema.getSchema(x.NamespaceAttr(nq.GetNamespace(), nq.GetPredicate())) for _, tokerName := range sch.GetTokenizer() { // Find tokeniser. toker, ok := tok.GetTokenizer(tokerName) @@ -429,15 +437,16 @@ func (m *mapper) addIndexMapEntries(nq gql.NQuad, de *pb.DirectedEdge) { toks, err := tok.BuildTokens(schemaVal.Value, tok.GetTokenizerForLang(toker, nq.Lang)) x.Check(err) + attr := x.NamespaceAttr(nq.Namespace, nq.Predicate) // Store index posting. for _, t := range toks { m.addMapEntry( - x.IndexKey(nq.Predicate, t), + x.IndexKey(attr, t), &pb.Posting{ Uid: de.GetEntity(), PostingType: pb.Posting_REF, }, - m.state.shards.shardFor(nq.Predicate), + m.state.shards.shardFor(attr), ) } } diff --git a/dgraph/cmd/bulk/merge_shards.go b/dgraph/cmd/bulk/merge_shards.go index e18748751a4..3192a21af6b 100644 --- a/dgraph/cmd/bulk/merge_shards.go +++ b/dgraph/cmd/bulk/merge_shards.go @@ -60,6 +60,7 @@ func mergeMapShardsIntoReduceShards(opt *options) { // Put the first map shard in the first reduce shard since it contains all the reserved // predicates. + // TODO(Naman): Why do we handle first shard differently? reduceShard := filepath.Join(reduceShards[0], filepath.Base(firstShard)) fmt.Printf("Shard %s -> Reduce %s\n", firstShard, reduceShard) x.Check(os.Rename(firstShard, reduceShard)) diff --git a/dgraph/cmd/bulk/run.go b/dgraph/cmd/bulk/run.go index 40260a06d26..0ab9a8827f7 100644 --- a/dgraph/cmd/bulk/run.go +++ b/dgraph/cmd/bulk/run.go @@ -113,6 +113,8 @@ func init() { "Comma separated list of tokenizer plugins") flag.Bool("new_uids", false, "Ignore UIDs in load files and assign new ones.") + flag.Uint64("force-namespace", math.MaxUint64, + "Namespace onto which to load the data. If not set, will preserve the namespace.") // Options around how to set up Badger. flag.String("badger.compression", "snappy", @@ -156,6 +158,8 @@ func run() { CustomTokenizers: Bulk.Conf.GetString("custom_tokenizers"), NewUids: Bulk.Conf.GetBool("new_uids"), ClientDir: Bulk.Conf.GetString("xidmap"), + Namespace: Bulk.Conf.GetUint64("force-namespace"), + // Badger options BadgerCompression: ctype, BadgerCompressionLevel: clevel, diff --git a/dgraph/cmd/bulk/schema.go b/dgraph/cmd/bulk/schema.go index 294dad5f943..6d35ecd2244 100644 --- a/dgraph/cmd/bulk/schema.go +++ b/dgraph/cmd/bulk/schema.go @@ -47,22 +47,12 @@ func newSchemaStore(initial *schema.ParsedSchema, opt *options, state *state) *s state: state, } - // Load all initial predicates. Some predicates that might not be used when - // the alpha is started (e.g ACL predicates) might be included but it's - // better to include them in case the input data contains triples with these - // predicates. // TODO(Ahsan): Use the right namespace here. - for _, update := range schema.CompleteInitialSchema(x.DefaultNamespace) { - s.schemaMap[update.Predicate] = update - } - - if opt.StoreXids { - s.schemaMap["xid"] = &pb.SchemaUpdate{ - ValueType: pb.Posting_STRING, - Tokenizer: []string{"hash"}, - } - } + // Initialize only for the default namespace. Initialization for other namespaces will be done + // whenever we see data for a new namespace. + s.checkAndSetInitialSchema(x.DefaultNamespace) + // This is from the schema read from the schema file. for _, sch := range initial.Preds { p := sch.Predicate sch.Predicate = "" // Predicate is stored in the (badger) key, so not needed in the value. @@ -70,9 +60,12 @@ func newSchemaStore(initial *schema.ParsedSchema, opt *options, state *state) *s fmt.Printf("Predicate %q already exists in schema\n", p) continue } + s.checkAndSetInitialSchema(x.ParseNamespace(p)) s.schemaMap[p] = sch } + // TODO(Naman): Can types be there without the schema? Need to add initial schema for these + // namespaces? s.types = initial.Types return s @@ -94,6 +87,32 @@ func (s *schemaStore) setSchemaAsList(pred string) { sch.List = true } +// checkAndSetInitialSchema initializes the schema for namespace if it does not already exist. +func (s *schemaStore) checkAndSetInitialSchema(namespace uint64) { + if _, ok := s.namespaces.Load(namespace); !ok { + s.Lock() + defer s.Unlock() + if _, ok := s.namespaces.Load(namespace); !ok { + // Load all initial predicates. Some predicates that might not be used when + // the alpha is started (e.g ACL predicates) might be included but it's + // better to include them in case the input data contains triples with these + // predicates. + for _, update := range schema.CompleteInitialSchema(namespace) { + s.schemaMap[update.Predicate] = update + } + + if s.opt.StoreXids { + s.schemaMap[x.NamespaceAttr(namespace, "xid")] = &pb.SchemaUpdate{ + ValueType: pb.Posting_STRING, + Tokenizer: []string{"hash"}, + } + } + s.namespaces.Store(namespace, struct{}{}) + } + } + return +} + func (s *schemaStore) validateType(de *pb.DirectedEdge, objectIsUID bool) { if objectIsUID { de.ValueType = pb.Posting_UID diff --git a/dgraph/cmd/bulk/shard_map.go b/dgraph/cmd/bulk/shard_map.go index fe91f96895b..9721ae05bd2 100644 --- a/dgraph/cmd/bulk/shard_map.go +++ b/dgraph/cmd/bulk/shard_map.go @@ -37,6 +37,7 @@ func newShardMap(numShards int) *shardMap { } func (m *shardMap) shardFor(pred string) int { + // TODO(Naman): Figure out why? // Always assign NQuads with reserved predicates to the first map shard. if x.IsReservedPredicate(pred) { return 0 diff --git a/dgraph/cmd/debug/run.go b/dgraph/cmd/debug/run.go index 4509d63e5b4..8fd1e7b85e3 100644 --- a/dgraph/cmd/debug/run.go +++ b/dgraph/cmd/debug/run.go @@ -570,7 +570,9 @@ func printKeys(db *badger.DB) { if pk.IsReverse() { x.Check2(buf.WriteString("{r}")) } - x.Check2(buf.WriteString(" attr: " + pk.Attr)) + ns, attr := x.ParseNamespaceAttr(pk.Attr) + x.Check2(buf.WriteString(fmt.Sprintf(" ns: 0x%x ", ns))) + x.Check2(buf.WriteString(" attr: " + attr)) if len(pk.Term) > 0 { fmt.Fprintf(&buf, " term: [%d] %s ", pk.Term[0], pk.Term[1:]) } diff --git a/go.mod b/go.mod index 3bf39e9353e..9949e6bbb6a 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/dgraph-io/dgraph go 1.12 +replace github.com/dgraph-io/dgo/v200 => /home/algod/go/src/github.com/dgraph-io/dgo + // replace github.com/dgraph-io/badger/v2 => /home/mrjn/go/src/github.com/dgraph-io/badger // replace github.com/dgraph-io/ristretto => /home/mrjn/go/src/github.com/dgraph-io/ristretto @@ -67,8 +69,7 @@ require ( golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 golang.org/x/sys v0.0.0-20210105210732-16f7687f5001 golang.org/x/text v0.3.3 - golang.org/x/tools v0.0.0-20210106214847-113979e3529a - google.golang.org/grpc v1.23.0 + google.golang.org/grpc v1.27.0 gopkg.in/DataDog/dd-trace-go.v1 v1.13.1 // indirect gopkg.in/square/go-jose.v2 v2.3.1 gopkg.in/yaml.v2 v2.2.4 diff --git a/gql/mutation.go b/gql/mutation.go index 50438d0abae..82482cbda14 100644 --- a/gql/mutation.go +++ b/gql/mutation.go @@ -115,9 +115,11 @@ func toUid(subject string, newToUid map[string]uint64) (uid uint64, err error) { var emptyEdge pb.DirectedEdge func (nq NQuad) createEdgePrototype(subjectUid uint64) *pb.DirectedEdge { + // TODO(Naman): Check all occurances of NQuad and directed edges. + // Check if we are passing the appropriate namespace everywhere. return &pb.DirectedEdge{ Entity: subjectUid, - Attr: nq.Predicate, + Attr: x.NamespaceAttr(nq.Namespace, nq.Predicate), Label: nq.Label, Lang: nq.Lang, Facets: nq.Facets, diff --git a/graphql/admin/namespace.go b/graphql/admin/namespace.go index b98c60519e4..b54be0d5fe0 100644 --- a/graphql/admin/namespace.go +++ b/graphql/admin/namespace.go @@ -3,6 +3,7 @@ package admin import ( "context" "encoding/json" + "strconv" "github.com/dgraph-io/dgraph/edgraph" "github.com/dgraph-io/dgraph/graphql/resolve" @@ -24,7 +25,7 @@ func resolveCreateNamespace(ctx context.Context, m schema.Mutation) (*resolve.Re return resolve.DataResult( m, map[string]interface{}{m.Name(): map[string]interface{}{ - "namespaceId": req.NamespaceId, + "namespaceId": strconv.Itoa(req.NamespaceId), "message": "Created namespace successfully", }}, nil, @@ -42,7 +43,7 @@ func resolveDeleteNamespace(ctx context.Context, m schema.Mutation) (*resolve.Re return resolve.DataResult( m, map[string]interface{}{m.Name(): map[string]interface{}{ - "namespaceId": req.NamespaceId, + "namespaceId": strconv.Itoa(req.NamespaceId), "message": "Deleted namespace successfully", }}, nil, diff --git a/query/mutation.go b/query/mutation.go index 181656057e2..666f87218bd 100644 --- a/query/mutation.go +++ b/query/mutation.go @@ -194,6 +194,7 @@ func AssignUids(ctx context.Context, gmuList []*gql.Mutation) (map[string]uint64 } // ToDirectedEdges converts the gql.Mutation input into a set of directed edges. +// TODO(Naman): We have namespace in NQuad. Check if this function needs special handling. func ToDirectedEdges(gmuList []*gql.Mutation, newUids map[string]uint64) ( edges []*pb.DirectedEdge, err error) { diff --git a/x/keys.go b/x/keys.go index 778cf2868d7..ef914d44ef9 100644 --- a/x/keys.go +++ b/x/keys.go @@ -90,6 +90,12 @@ func ParseAttr(attr string) string { return attr[8:] } +// ParseNamespace returns the namespace from the given value. +func ParseNamespace(attr string) uint64 { + AssertTrue(len(attr) >= 8) + return binary.BigEndian.Uint64([]byte(attr[:8])) +} + func ParseAttrList(attrs []string) []string { var resp []string for _, attr := range attrs { From 2a529a6084f0a536c1b1b4ff8b7af870f8557302 Mon Sep 17 00:00:00 2001 From: NamanJain8 Date: Fri, 5 Feb 2021 16:31:52 +0530 Subject: [PATCH 2/7] lease the namespaces after map phase --- dgraph/cmd/bulk/loader.go | 24 ++++++++++++++++++++++++ dgraph/cmd/bulk/run.go | 1 + 2 files changed, 25 insertions(+) diff --git a/dgraph/cmd/bulk/loader.go b/dgraph/cmd/bulk/loader.go index 665c15f0a58..9340e7ec5fd 100644 --- a/dgraph/cmd/bulk/loader.go +++ b/dgraph/cmd/bulk/loader.go @@ -168,6 +168,30 @@ func getWriteTimestamp(zero *grpc.ClientConn) uint64 { } } +func (ld *loader) leaseNamespaces() { + var maxNs uint64 + ld.namespaces.Range(func(key, value interface{}) bool { + if ns := key.(uint64); ns > maxNs { + maxNs = ns + } + return true + }) + + client := pb.NewZeroClient(ld.zero) + for { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + // TODO(Naman): Maybe lease maxNs-1 with some handling. Using maxNs for simplicity for now. + ns, err := client.AssignIds(ctx, &pb.Num{Val: maxNs, Type: pb.Num_NS_ID}) + cancel() + if err == nil { + fmt.Printf("Assigned namespaces till %d", ns.GetEndId()) + return + } + fmt.Printf("Error communicating with dgraph zero, retrying: %v", err) + time.Sleep(time.Second) + } +} + func readSchema(opt *options) *schema.ParsedSchema { f, err := filestore.Open(opt.SchemaFile) x.Check(err) diff --git a/dgraph/cmd/bulk/run.go b/dgraph/cmd/bulk/run.go index 0ab9a8827f7..875700ab21b 100644 --- a/dgraph/cmd/bulk/run.go +++ b/dgraph/cmd/bulk/run.go @@ -330,6 +330,7 @@ func run() { } else { loader.mapStage() mergeMapShardsIntoReduceShards(&opt) + loader.leaseNamespaces() bulkMeta := pb.BulkMeta{ EdgeCount: loader.prog.mapEdgeCount, From 52e88f09ebb45a54f04ae6a7c01741e3cd770847 Mon Sep 17 00:00:00 2001 From: NamanJain8 Date: Fri, 5 Feb 2021 17:46:08 +0530 Subject: [PATCH 3/7] test fixtures and cleanup --- chunker/json_parser_test.go | 29 ++++++++++++++++------------- chunker/rdf_parser_test.go | 27 ++++++++++++++++----------- dgraph/cmd/bulk/loader.go | 4 +--- dgraph/cmd/bulk/mapper.go | 7 ++++++- worker/online_restore_ee.go | 1 + 5 files changed, 40 insertions(+), 28 deletions(-) diff --git a/chunker/json_parser_test.go b/chunker/json_parser_test.go index 4808940cc56..be3209adbf0 100644 --- a/chunker/json_parser_test.go +++ b/chunker/json_parser_test.go @@ -60,14 +60,15 @@ type address struct { } type Person struct { - Uid string `json:"uid,omitempty"` - Name string `json:"name,omitempty"` - Age int `json:"age,omitempty"` - Married *bool `json:"married,omitempty"` - Now *time.Time `json:"now,omitempty"` - Address address `json:"address,omitempty"` // geo value - Friends []Person `json:"friend,omitempty"` - School *School `json:"school,omitempty"` + Uid string `json:"uid,omitempty"` + Namespace string `json:"namespace,omitempty"` + Name string `json:"name,omitempty"` + Age int `json:"age,omitempty"` + Married *bool `json:"married,omitempty"` + Now *time.Time `json:"now,omitempty"` + Address address `json:"address,omitempty"` // geo value + Friends []Person `json:"friend,omitempty"` + School *School `json:"school,omitempty"` } func Parse(b []byte, op int) ([]*api.NQuad, error) { @@ -90,6 +91,7 @@ func (exp *Experiment) verify() { exp.t.Fatalf("Error while getting a dgraph client: %v", err) } + // TODO(Naman): Fix these tests, once the ACL is integrated. ctx := context.Background() require.NoError(exp.t, dg.Alter(ctx, &api.Operation{DropAll: true}), "drop all failed") require.NoError(exp.t, dg.Alter(ctx, &api.Operation{Schema: exp.schema}), @@ -116,11 +118,12 @@ func TestNquadsFromJson1(t *testing.T) { tn := time.Now().UTC() m := true p := Person{ - Uid: "1", - Name: "Alice", - Age: 26, - Married: &m, - Now: &tn, + Uid: "1", + Namespace: "0x2", + Name: "Alice", + Age: 26, + Married: &m, + Now: &tn, Address: address{ Type: "Point", Coords: []float64{1.1, 2.0}, diff --git a/chunker/rdf_parser_test.go b/chunker/rdf_parser_test.go index 3aadbfec4b9..55a1915c0b3 100644 --- a/chunker/rdf_parser_test.go +++ b/chunker/rdf_parser_test.go @@ -346,40 +346,43 @@ var testNQuads = []struct { expectedErr: false, }, { - input: `_:alice "stuff"^^