From f5a805f050dc29357f0c6034c4a1c2ef2d23c395 Mon Sep 17 00:00:00 2001 From: Aman Mangal Date: Wed, 13 Nov 2019 12:40:04 +0530 Subject: [PATCH] Add support for multiple mutation blocks --- dgraph/cmd/alpha/upsert_test.go | 325 ++++++++++++++++++++++++++++++++ edgraph/server.go | 68 +++---- gql/parser_mutation.go | 12 +- gql/upsert_test.go | 64 +++++++ query/mutation.go | 80 ++++---- 5 files changed, 468 insertions(+), 81 deletions(-) diff --git a/dgraph/cmd/alpha/upsert_test.go b/dgraph/cmd/alpha/upsert_test.go index d6eeaac5efd..07b7bb383e3 100644 --- a/dgraph/cmd/alpha/upsert_test.go +++ b/dgraph/cmd/alpha/upsert_test.go @@ -2230,3 +2230,328 @@ func TestInvalidAliasUids(t *testing.T) { _, err := mutationWithTs(m1, "application/rdf", false, true, 0) require.Contains(t, "query alias [uids] not allowed", err.Error()) } + +func TestMultipleMutation(t *testing.T) { + require.NoError(t, dropAll()) + require.NoError(t, alterSchema(`email: string @index(exact) .`)) + + m1 := ` +upsert { + query { + q(func: eq(email, "email@company.io")) { + v as uid + } + } + + mutation @if(not(eq(len(v), 0))) { + set { + uid(v) "not_name" . + uid(v) "not_email@company.io" . + } + } + + mutation @if(eq(len(v), 0)) { + set { + _:user "name" . + _:user "email@company.io" . + } + } +}` + mr, err := mutationWithTs(m1, "application/rdf", false, true, 0) + require.NoError(t, err) + require.True(t, len(mr.keys) == 0) + require.Equal(t, []string{"1-email", "1-name"}, mr.preds) + result := QueryResult{} + require.NoError(t, json.Unmarshal(mr.data, &result)) + require.Equal(t, 0, len(result.Q)) + + q1 := ` +{ + q(func: eq(email, "email@company.io")) { + name + } +}` + res, _, err := queryWithTs(q1, "application/graphql+-", "", 0) + expectedRes := ` +{ + "data": { + "q": [{ + "name": "name" + }] + } +}` + require.NoError(t, err) + testutil.CompareJSON(t, res, expectedRes) + + // This time the other mutation will get executed + _, err = mutationWithTs(m1, "application/rdf", false, true, 0) + require.NoError(t, err) + + q2 := ` +{ + q(func: eq(email, "not_email@company.io")) { + name + } +}` + res, _, err = queryWithTs(q2, "application/graphql+-", "", 0) + require.NoError(t, err) + + expectedRes = ` +{ + "data": { + "q": [{ + "name": "not_name" + }] + } +}` + require.NoError(t, err) + testutil.CompareJSON(t, res, expectedRes) +} + +func TestMultiMutationWithoutIf(t *testing.T) { + require.NoError(t, dropAll()) + require.NoError(t, alterSchema(`email: string @index(exact) .`)) + + m1 := ` +upsert { + query { + me(func: eq(email, "email@company.io")) { + v as uid + } + } + + mutation @if(not(eq(len(v), 0))) { + set { + uid(v) "not_name" . + uid(v) "not_email@company.io" . + } + } + + mutation @if(eq(len(v), 0)) { + set { + _:user "name" . + } + } + + mutation { + set { + _:user "email@company.io" . + } + } + + mutation { + set { + _:other "other" . + _:other "other@company.io" . + } + } +}` + mr, err := mutationWithTs(m1, "application/rdf", false, true, 0) + require.NoError(t, err) + require.True(t, len(mr.keys) == 0) + require.Equal(t, []string{"1-email", "1-name"}, mr.preds) + + q1 := ` +{ + q(func: has(email)) { + name + } +}` + res, _, err := queryWithTs(q1, "application/graphql+-", "", 0) + expectedRes := ` +{ + "data": { + "q": [{ + "name": "name" + }, + { + "name": "other" + }] + } +}` + require.NoError(t, err) + testutil.CompareJSON(t, res, expectedRes) +} + +func TestMultiMutationCount(t *testing.T) { + require.NoError(t, dropAll()) + require.NoError(t, alterSchema(` + email: string @index(exact) . + count: int .`)) + + m1 := ` +upsert { + query { + q(func: eq(email, "email@company.io")) { + v as uid + c as count + nc as math(c+1) + } + } + + mutation @if(eq(len(v), 0)) { + set { + uid(v) "name" . + uid(v) "email@company.io" . + uid(v) "1" . + } + } + + mutation @if(not(eq(len(v), 0))) { + set { + uid(v) val(nc) . + } + } +}` + mr, err := mutationWithTs(m1, "application/rdf", false, true, 0) + require.NoError(t, err) + require.True(t, len(mr.keys) == 0) + require.Equal(t, []string{"1-count", "1-email", "1-name"}, mr.preds) + + q1 := ` +{ + q(func: has(email)) { + count + } +}` + res, _, err := queryWithTs(q1, "application/graphql+-", "", 0) + expectedRes := ` +{ + "data": { + "q": [{ + "count": 1 + }] + } +}` + require.NoError(t, err) + testutil.CompareJSON(t, res, expectedRes) + + // second time + mr, err = mutationWithTs(m1, "application/rdf", false, true, 0) + require.NoError(t, err) + require.True(t, len(mr.keys) == 0) + require.Equal(t, []string{"1-count"}, mr.preds) + + res, _, err = queryWithTs(q1, "application/graphql+-", "", 0) + expectedRes = ` +{ + "data": { + "q": [{ + "count": 2 + }] + } +}` + require.NoError(t, err) + testutil.CompareJSON(t, res, expectedRes) +} + +func TestMultipleMutationMerge(t *testing.T) { + require.NoError(t, dropAll()) + require.NoError(t, alterSchema(` +name: string @index(term) . +email: [string] @index(exact) @upsert .`)) + + m1 := ` +{ + set { + _:user1 "user1" . + _:user1 "user_email1@company1.io" . + _:user2 "user2" . + _:user2 "user_email2@company1.io" . + } +}` + mr, err := mutationWithTs(m1, "application/rdf", false, true, 0) + require.NoError(t, err) + require.True(t, len(mr.keys) == 0) + require.Equal(t, []string{"1-email", "1-name"}, mr.preds) + + q1 := ` +{ + q(func: has(name)) { + uid + } +}` + res, _, err := queryWithTs(q1, "application/graphql+-", "", 0) + require.NoError(t, err) + var result struct { + Data struct { + Q []struct { + UID string `json:"uid"` + } `json:"q"` + } `json:"data"` + } + require.NoError(t, json.Unmarshal([]byte(res), &result)) + require.Equal(t, 2, len(result.Data.Q)) + + m2 := ` +upsert { + query { + # filter is needed to ensure that we do not get same UIDs in u1 and u2 + q1(func: eq(email, "user_email1@company1.io")) @filter(not(eq(email, "user_email2@company1.io"))) { + u1 as uid + } + + q2(func: eq(email, "user_email2@company1.io")) @filter(not(eq(email, "user_email1@company1.io"))) { + u2 as uid + } + + q3(func: eq(email, "user_email1@company1.io")) @filter(eq(email, "user_email2@company1.io")) { + u3 as uid + } + } + + # case when both emails do not exist + mutation @if(eq(len(u1), 0) AND eq(len(u2), 0) AND eq(len(u3), 0)) { + set { + _:user "user" . + _:user "user_email1@company1.io" . + _:user "user_email2@company1.io" . + } + } + + # case when email1 exists but email2 does not + mutation @if(eq(len(u1), 1) AND eq(len(u2), 0) AND eq(len(u3), 0)) { + set { + uid(u1) "user_email2@company1.io" . + } + } + + # case when email1 does not exist but email2 exists + mutation @if(eq(len(u1), 0) AND eq(len(u2), 1) AND eq(len(u3), 0)) { + set { + uid(u2) "user_email1@company1.io" . + } + } + + # case when both emails exist and needs merging + mutation @if(eq(len(u1), 1) AND eq(len(u2), 1) AND eq(len(u3), 0)) { + set { + _:user "user" . + _:user "user_email1@company1.io" . + _:user "user_email2@company1.io" . + } + + delete { + uid(u1) * . + uid(u1) * . + uid(u2) * . + uid(u2) * . + } + } +}` + mr, err = mutationWithTs(m2, "application/rdf", false, true, 0) + require.NoError(t, err) + require.True(t, len(mr.keys) == 0) + require.Equal(t, []string{"1-email", "1-name"}, mr.preds) + + res, _, err = queryWithTs(q1, "application/graphql+-", "", 0) + require.NoError(t, err) + require.NoError(t, json.Unmarshal([]byte(res), &result)) + require.Equal(t, 1, len(result.Data.Q)) + + // Now, data is all correct. So, following mutation should be no-op + mr, err = mutationWithTs(m2, "application/rdf", false, true, 0) + require.NoError(t, err) + require.True(t, len(mr.keys) == 0) + require.Equal(t, 0, len(mr.preds)) +} diff --git a/edgraph/server.go b/edgraph/server.go index 26357eebaac..78fd9df72c6 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -18,10 +18,8 @@ package edgraph import ( "encoding/json" - "fmt" "io/ioutil" "math" - "math/rand" "os" "path/filepath" "sort" @@ -467,8 +465,7 @@ func (s *Server) doMutate(ctx context.Context, qc *queryContext, resp *api.Respo // update mutations from the query results before assigning UIDs updateMutations(qc) - gmu := qc.gmuList[0] - newUids, err := query.AssignUids(ctx, gmu.Set) + newUids, err := query.AssignUids(ctx, qc.gmuList) if err != nil { return err } @@ -478,7 +475,7 @@ func (s *Server) doMutate(ctx context.Context, qc *queryContext, resp *api.Respo // 2. For a uid variable that is part of an upsert query, // like uid(foo), the key would be uid(foo). resp.Uids = query.UidsToHex(query.StripBlankNode(newUids)) - edges, err := query.ToDirectedEdges(gmu, newUids) + edges, err := query.ToDirectedEdges(qc.gmuList, newUids) if err != nil { return err } @@ -543,35 +540,35 @@ func buildUpsertQuery(qc *queryContext) string { return qc.req.Query } - gmu := qc.gmuList[0] - qc.condVars = make([]string, 1) + qc.condVars = make([]string, len(qc.req.Mutations)) upsertQuery := strings.TrimSuffix(qc.req.Query, "}") - - isCondUpsert := strings.TrimSpace(gmu.Cond) != "" - if isCondUpsert { - qc.condVars[0] = fmt.Sprintf("__dgraph_%d__", rand.Int()) - qc.uidRes[qc.condVars[0]] = nil - // @if in upsert is same as @filter in the query - cond := strings.Replace(gmu.Cond, "@if", "@filter", 1) - - // Add dummy query to evaluate the @if directive, ok to use uid(0) because - // dgraph doesn't check for existence of UIDs until we query for other predicates. - // Here, we are only querying for uid predicate in the dummy query. - // - // For example if - mu.Query = { - // me(...) {...} - // } - // - // Then, upsertQuery = { - // me(...) {...} - // __dgraph_0__ as var(func: uid(0)) @filter(...) - // } - // - // The variable __dgraph_0__ will - - // * be empty if the condition is true - // * have 1 UID (the 0 UID) if the condition is false - upsertQuery += qc.condVars[0] + ` as var(func: uid(0)) ` + cond + ` -` + for i, gmu := range qc.gmuList { + isCondUpsert := strings.TrimSpace(gmu.Cond) != "" + if isCondUpsert { + qc.condVars[i] = "__dgraph__" + strconv.Itoa(i) + qc.uidRes[qc.condVars[i]] = nil + // @if in upsert is same as @filter in the query + cond := strings.Replace(gmu.Cond, "@if", "@filter", 1) + + // Add dummy query to evaluate the @if directive, ok to use uid(0) because + // dgraph doesn't check for existence of UIDs until we query for other predicates. + // Here, we are only querying for uid predicate in the dummy query. + // + // For example if - mu.Query = { + // me(...) {...} + // } + // + // Then, upsertQuery = { + // me(...) {...} + // __dgraph_0__ as var(func: uid(0)) @filter(...) + // } + // + // The variable __dgraph_0__ will - + // * be empty if the condition is true + // * have 1 UID (the 0 UID) if the condition is false + upsertQuery += qc.condVars[i] + ` as var(func: uid(0)) ` + cond + ` + ` + } } upsertQuery += `}` @@ -870,10 +867,6 @@ func (s *Server) doQuery(ctx context.Context, req *api.Request, authorize int) ( ostats.Record(ctx, x.NumMutations.M(1)) } - if isMutation && len(req.Mutations) != 1 { - return nil, errors.Errorf("Only 1 mutation per request is supported") - } - qc := &queryContext{req: req, latency: l, span: span} if rerr = parseRequest(qc); rerr != nil { return @@ -1073,6 +1066,7 @@ func authorizeRequest(ctx context.Context, qc *queryContext) error { return err } + // TODO(Aman): can be optimized to do the authorization in just one func call for _, gmu := range qc.gmuList { if err := authorizeMutation(ctx, gmu); err != nil { return err diff --git a/gql/parser_mutation.go b/gql/parser_mutation.go index 2cb80e87361..d10abcadbca 100644 --- a/gql/parser_mutation.go +++ b/gql/parser_mutation.go @@ -64,7 +64,7 @@ func ParseMutation(mutation string) (req *api.Request, err error) { func parseUpsertBlock(it *lex.ItemIterator) (*api.Request, error) { var req *api.Request var queryText, condText string - var queryFound, condFound bool + var queryFound bool // ===>upsert<=== {...} if !it.Next() { @@ -115,10 +115,6 @@ func parseUpsertBlock(it *lex.ItemIterator) (*api.Request, error) { // upsert { mutation ===>@if(...)<=== {....} query{...}} item = it.Item() if item.Typ == itemUpsertBlockOpContent { - if condFound { - return nil, it.Errorf("Multiple @if directive inside upsert block") - } - condFound = true condText = item.Val if !it.Next() { return nil, it.Errorf("Unexpected end of upsert block") @@ -131,7 +127,11 @@ func parseUpsertBlock(it *lex.ItemIterator) (*api.Request, error) { return nil, err } mu.Cond = condText - req = &api.Request{Mutations: []*api.Mutation{mu}} + if req == nil { + req = &api.Request{Mutations: []*api.Mutation{mu}} + } else { + req.Mutations = append(req.Mutations, mu) + } // upsert { mutation{...} ===>fragment<==={...}} case item.Typ == itemUpsertBlockOp && item.Val == "fragment": diff --git a/gql/upsert_test.go b/gql/upsert_test.go index 7318d0616ad..2e89719dfc5 100644 --- a/gql/upsert_test.go +++ b/gql/upsert_test.go @@ -467,3 +467,67 @@ func TestConditionalUpsertErrWrongIf(t *testing.T) { _, err := ParseMutation(query) require.Contains(t, err.Error(), "Expected @if, found [@fi]") } + +func TestMultipleMutation(t *testing.T) { + query := ` +upsert { + mutation @if(eq(len(m), 1)) { + set { + uid(m) "45" . + } + } + + mutation @if(not(eq(len(m), 1))) { + set { + uid(f) "45" . + } + } + + mutation { + set { + _:user "45" . + } + } + + query { + me(func: eq(age, 34)) @filter(ge(name, "user")) { + uid + } + } +}` + req, err := ParseMutation(query) + require.NoError(t, err) + require.Equal(t, 3, len(req.Mutations)) +} + +func TestMultipleMutationDifferentOrder(t *testing.T) { + query := ` +upsert { + mutation @if(eq(len(m), 1)) { + set { + uid(m) "45" . + } + } + + query { + me(func: eq(age, 34)) @filter(ge(name, "user")) { + uid + } + } + + mutation @if(not(eq(len(m), 1))) { + set { + uid(f) "45" . + } + } + + mutation { + set { + _:user "45" . + } + } +}` + req, err := ParseMutation(query) + require.NoError(t, err) + require.Equal(t, 3, len(req.Mutations)) +} diff --git a/query/mutation.go b/query/mutation.go index 3a19f40c498..c4dfc8bcc86 100644 --- a/query/mutation.go +++ b/query/mutation.go @@ -111,39 +111,41 @@ func verifyUid(ctx context.Context, uid uint64) error { // AssignUids tries to assign unique ids to each identity in the subjects and objects in the // format of _:xxx. An identity, e.g. _:a, will only be assigned one uid regardless how many times // it shows up in the subjects or objects -func AssignUids(ctx context.Context, nquads []*api.NQuad) (map[string]uint64, error) { +func AssignUids(ctx context.Context, gmuList []*gql.Mutation) (map[string]uint64, error) { newUids := make(map[string]uint64) num := &pb.Num{} var err error - for _, nq := range nquads { - // We dont want to assign uids to these. - if nq.Subject == x.Star && nq.ObjectValue.GetDefaultVal() == x.Star { - return newUids, errors.New("Predicate deletion should be called via alter") - } - - if len(nq.Subject) == 0 { - return nil, errors.Errorf("Subject must not be empty for nquad: %+v", nq) - } - var uid uint64 - if strings.HasPrefix(nq.Subject, "_:") { - newUids[nq.Subject] = 0 - } else if uid, err = gql.ParseUid(nq.Subject); err != nil { - return newUids, err - } - if err = verifyUid(ctx, uid); err != nil { - return newUids, err - } + for _, gmu := range gmuList { + for _, nq := range gmu.Set { + // We dont want to assign uids to these. + if nq.Subject == x.Star && nq.ObjectValue.GetDefaultVal() == x.Star { + return newUids, errors.New("predicate deletion should be called via alter") + } - if len(nq.ObjectId) > 0 { + if len(nq.Subject) == 0 { + return nil, errors.Errorf("subject must not be empty for nquad: %+v", nq) + } var uid uint64 - if strings.HasPrefix(nq.ObjectId, "_:") { - newUids[nq.ObjectId] = 0 - } else if uid, err = gql.ParseUid(nq.ObjectId); err != nil { + if strings.HasPrefix(nq.Subject, "_:") { + newUids[nq.Subject] = 0 + } else if uid, err = gql.ParseUid(nq.Subject); err != nil { return newUids, err } if err = verifyUid(ctx, uid); err != nil { return newUids, err } + + if len(nq.ObjectId) > 0 { + var uid uint64 + if strings.HasPrefix(nq.ObjectId, "_:") { + newUids[nq.ObjectId] = 0 + } else if uid, err = gql.ParseUid(nq.ObjectId); err != nil { + return newUids, err + } + if err = verifyUid(ctx, uid); err != nil { + return newUids, err + } + } } } @@ -167,8 +169,8 @@ func AssignUids(ctx context.Context, nquads []*api.NQuad) (map[string]uint64, er } // ToDirectedEdges converts the gql.Mutation input into a set of directed edges. -func ToDirectedEdges(gmu *gql.Mutation, - newUids map[string]uint64) (edges []*pb.DirectedEdge, err error) { +func ToDirectedEdges(gmuList []*gql.Mutation, newUids map[string]uint64) ( + edges []*pb.DirectedEdge, err error) { // Wrapper for a pointer to protos.Nquad var wnq *gql.NQuad @@ -189,20 +191,22 @@ func ToDirectedEdges(gmu *gql.Mutation, return nil } - for _, nq := range gmu.Set { - if err := facets.SortAndValidate(nq.Facets); err != nil { - return edges, err - } - if err := parse(nq, pb.DirectedEdge_SET); err != nil { - return edges, err - } - } - for _, nq := range gmu.Del { - if nq.Subject == x.Star && nq.ObjectValue.GetDefaultVal() == x.Star { - return edges, errors.New("Predicate deletion should be called via alter") + for _, gmu := range gmuList { + for _, nq := range gmu.Set { + if err := facets.SortAndValidate(nq.Facets); err != nil { + return edges, err + } + if err := parse(nq, pb.DirectedEdge_SET); err != nil { + return edges, err + } } - if err := parse(nq, pb.DirectedEdge_DEL); err != nil { - return edges, err + for _, nq := range gmu.Del { + if nq.Subject == x.Star && nq.ObjectValue.GetDefaultVal() == x.Star { + return edges, errors.New("Predicate deletion should be called via alter") + } + if err := parse(nq, pb.DirectedEdge_DEL); err != nil { + return edges, err + } } }