From 4ce9e1e4d394eeb74c365f911e55d5727d04d98b Mon Sep 17 00:00:00 2001 From: Aman Mangal Date: Thu, 24 Oct 2019 16:06:36 +0530 Subject: [PATCH] Add support for multiple mutations --- edgraph/access_ee.go | 2 +- edgraph/server.go | 697 +++++++++++++++++++++---------------------- go.mod | 5 +- go.sum | 20 +- gql/mutation.go | 5 +- query/mutation.go | 80 ++--- 6 files changed, 414 insertions(+), 395 deletions(-) diff --git a/edgraph/access_ee.go b/edgraph/access_ee.go index 80d672f6c1c..690872103c4 100644 --- a/edgraph/access_ee.go +++ b/edgraph/access_ee.go @@ -395,7 +395,7 @@ func ResetAcl() { }, } - _, err = (&Server{}).doMutate(context.Background(), req, NoAuthorize) + _, err = (&Server{}).doQuery(context.Background(), req, NoAuthorize) if err != nil { return err } diff --git a/edgraph/server.go b/edgraph/server.go index e818766ba92..8d661a96738 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -54,6 +54,7 @@ import ( ostats "go.opencensus.io/stats" "go.opencensus.io/tag" + "go.opencensus.io/trace" otrace "go.opencensus.io/trace" ) @@ -428,37 +429,60 @@ func annotateStartTs(span *otrace.Span, ts uint64) { span.Annotate([]otrace.Attribute{otrace.Int64Attribute("startTs", int64(ts))}, "") } -func (s *Server) doMutate(ctx context.Context, req *api.Request, authorize int) ( +// queryContext is used to pass around all the variables needed +// to process a request for query, mutation or upsert. +type queryContext struct { + // req is the incoming, not yet parsed request containing + // a query or more than one mutations or both (in case of upsert) + req *api.Request + // gmuList is the list of mutations after parsing req.Mutations + gmuList []*gql.Mutation + // gqlRes contains result of parsing the req.Query + gqlRes gql.Result + // condVars are conditional variables used in the (modified) query to figure out + // whether the condition in Conditional Upsert is true. The string would be empty + // if the corresponding mutation is not a conditional upsert. + // Note that, len(condVars) == len(gmuList). + condVars []string + // uidRes stores mapping from variable names to UIDs for UID variables. + // These variables are either dummy variables used for Conditional + // Upsert or variables defined in the query in the incoming request. + uidRes map[string][]string + // valRes stores mapping from variable names to values for value variables. + valRes map[string]map[uint64]types.Val + // l stores latency numbers + latency *query.Latency + // span stores a opencensus span + span *trace.Span +} + +// Query handles queries or mutations +func (s *Server) Query(ctx context.Context, req *api.Request) (*api.Response, error) { + return s.doQuery(ctx, req, NeedAuthorize) +} + +func (s *Server) doQuery(ctx context.Context, req *api.Request, authorize int) ( resp *api.Response, rerr error) { if ctx.Err() != nil { return nil, ctx.Err() } - if len(req.Mutations) != 1 { - return nil, errors.Errorf("Only 1 mutation per request is supported") - } - mu := req.Mutations[0] + l := &query.Latency{} + l.Start = time.Now() - if !isMutationAllowed(ctx) { - return resp, errors.Errorf("No mutations allowed.") + // let's divide the request in two categories: + // 1. query + // 2. mutation (upsert is also a mutation) + isMutation := len(req.Mutations) > 0 + methodRequest := methodQuery + if isMutation { + methodRequest = methodMutate } - var parsingTime time.Duration - resp = &api.Response{} - - start := time.Now() - defer func() { - totalTime := time.Since(start) - processingTime := totalTime - parsingTime - resp.Latency = &api.Latency{ - ParsingNs: uint64(parsingTime.Nanoseconds()), - ProcessingNs: uint64(processingTime.Nanoseconds()), - } - }() - - ctx, span := otrace.StartSpan(ctx, methodMutate) - ctx = x.WithMethod(ctx, methodMutate) + var measurements []ostats.Measurement + ctx, span := otrace.StartSpan(ctx, methodRequest) + ctx = x.WithMethod(ctx, methodRequest) defer func() { span.End() v := x.TagValueStatusOK @@ -466,218 +490,233 @@ func (s *Server) doMutate(ctx context.Context, req *api.Request, authorize int) v = x.TagValueStatusError } ctx, _ = tag.New(ctx, tag.Upsert(x.KeyStatus, v)) - timeSpentMs := x.SinceMs(start) - ostats.Record(ctx, x.LatencyMs.M(timeSpentMs)) + timeSpentMs := x.SinceMs(l.Start) + measurements = append(measurements, x.LatencyMs.M(timeSpentMs)) + ostats.Record(ctx, measurements...) }() if rerr = x.HealthCheck(); rerr != nil { return } - ostats.Record(ctx, x.NumMutations.M(1)) - if req.Query != "" { - span.Annotatef(nil, "Got Mutation with Upsert Block: %s", mu) - } - if len(mu.SetJson) > 0 { - span.Annotatef(nil, "Got JSON Mutation: %s", mu.SetJson) - } else if len(mu.SetNquads) > 0 { - span.Annotatef(nil, "Got NQuad Mutation: %s", mu.SetNquads) + ostats.Record(ctx, x.PendingQueries.M(1), x.NumQueries.M(1)) + defer func() { + measurements = append(measurements, x.PendingQueries.M(-1)) + }() + + span.Annotatef(nil, "Request received: %v", req) + if len(req.Query) == 0 && len(req.Mutations) == 0 { + span.Annotate(nil, "Empty request") + return resp, errors.Errorf("Empty request") } - startParsingTime := time.Now() - gmu, err := parseMutationObject(mu) - if err != nil { - return resp, err + qc := &queryContext{req: req, latency: l, span: span} + if rerr = parseRequest(qc); rerr != nil { + return } - parsingTime += time.Since(startParsingTime) if authorize == NeedAuthorize { - if err := authorizeMutation(ctx, gmu); err != nil { - return resp, err + if rerr = authorizeRequest(ctx, qc); rerr != nil { + return } } - if len(gmu.Set) == 0 && len(gmu.Del) == 0 { - span.Annotate(nil, "Empty mutation") - return resp, errors.Errorf("Empty mutation") - } - - if req.StartTs == 0 { - req.StartTs = State.getTimestamp(false) + if resp, rerr = processQuery(ctx, qc); rerr != nil { + return } - annotateStartTs(span, req.StartTs) - l, varToUID, err := doQueryInUpsert(ctx, req, gmu) - if err != nil { - return resp, err - } - parsingTime += l.Parsing - if len(varToUID) > 0 { - resp.Vars = make(map[string]*api.Uids, len(varToUID)) - for v, uids := range varToUID { - // There could be a lot of these uids which could blow up the response size, especially - // for bulk mutations, hence only return variables which have less than a million uids. - if len(uids) <= 1e6 { - hexUids := make([]string, 0, len(uids)) - // doQueryInUpsert returns uids as base10 string representation. We convert them - // to base16 string so that response format is consistent with assigned uids. - for _, uid := range uids { - u, err := strconv.ParseUint(uid, 10, 64) - if err != nil { - return resp, errors.Errorf("Couldn't parse uid: [%v] as base 10 uint64", - uid) - } - huid := fmt.Sprintf("%#x", u) - hexUids = append(hexUids, huid) - } - resp.Vars[v] = &api.Uids{ - Uids: hexUids, - } - } + if isMutation { + updateMutations(qc) + if rerr = s.doMutate(ctx, qc, resp); rerr != nil { + return } } - newUids, err := query.AssignUids(ctx, gmu.Set) - if err != nil { - return resp, err + // TODO(martinmr): Include Transport as part of the latency. Need to do this separately + // since it involves modifying the API protos. + // TODO (Aman): may need to calculate processing time here!!! + resp.Latency = &api.Latency{ + AssignTimestampNs: uint64(l.AssignTimestamp.Nanoseconds()), + ParsingNs: uint64(l.Parsing.Nanoseconds()), + ProcessingNs: uint64(l.Processing.Nanoseconds()), + EncodingNs: uint64(l.Json.Nanoseconds()), } - // resp.Uids contains a map of the node name to the uid. - // 1. For a blank node, like _:foo, the key would be foo. - // 2. For a uid variable that is part of an upsert query, like uid(foo), the key would - // be uid(foo). - resp.Uids = query.UidsToHex(query.StripBlankNode(newUids)) - edges, err := query.ToDirectedEdges(gmu, newUids) - if err != nil { - return resp, err - } + return resp, nil +} - m := &pb.Mutations{Edges: edges, StartTs: req.StartTs} - span.Annotatef(nil, "Applying mutations: %+v", m) - resp.Txn, err = query.ApplyMutations(ctx, m) - span.Annotatef(nil, "Txn Context: %+v. Err=%v", resp.Txn, err) - if !req.CommitNow { - if err == zero.ErrConflict { - err = status.Error(codes.FailedPrecondition, err.Error()) +// parseRequest parses the incoming request +func parseRequest(qc *queryContext) error { + parsingStartTime := time.Now() + + // parsing mutations + qc.gmuList = make([]*gql.Mutation, 0, len(qc.req.Mutations)) + for _, mu := range qc.req.Mutations { + gmu, err := parseMutationObject(mu) + if err != nil { + return err } - return resp, err + qc.gmuList = append(qc.gmuList, gmu) } - // The following logic is for committing immediately. + // updating queries to include dummy variables for conditional upsert + upsertQuery, condVars := buildUpsertQuery(qc) + qc.condVars = condVars + + // parsing the updated query + needVars := findVars(qc) + gqlRes, err := gql.ParseWithNeedVars(gql.Request{ + Str: upsertQuery, + Variables: qc.req.Vars, + }, needVars) if err != nil { - // ApplyMutations failed. We now want to abort the transaction, - // ignoring any error that might occur during the abort (the user would - // care more about the previous error). - if resp.Txn == nil { - resp.Txn = &api.TxnContext{StartTs: req.StartTs} - } + return err + } + if err = validateQuery(qc.gqlRes.Query); err != nil { + return err + } - resp.Txn.Aborted = true - _, _ = worker.CommitOverNetwork(ctx, resp.Txn) + qc.gqlRes = gqlRes + qc.latency.Parsing = time.Since(parsingStartTime) + return nil +} - if err == zero.ErrConflict { - // We have already aborted the transaction, so the error message should reflect that. - return resp, dgo.ErrAborted +// buildUpsertQuery modifies the query to evaluate the +// @if condition defined in Conditional Upsert. +func buildUpsertQuery(qc *queryContext) (string, []string) { + vars := make([]string, len(qc.gmuList)) + qc.req.Query = strings.TrimSpace(qc.req.Query) + if qc.req.Query == "" { + return qc.req.Query, vars + } + + upsertQuery := strings.TrimSuffix(qc.req.Query, "}") + for i, gmu := range qc.gmuList { + isCondUpsert := strings.TrimSpace(gmu.Cond) != "" + if isCondUpsert { + vars[i] = fmt.Sprintf("__dgraph_%d__", rand.Int()) + // @if in upsert is same as @filter in the query + cond := strings.Replace(gmu.Cond, "@if", "@filter", 1) + + // Add dummy query to evaluate the @if directive, ok to use uid(0) because + // dgraph doesn't check for existence of UIDs until we query for other predicates. + // Here, we are only querying for uid predicate in the dummy query. + // + // For example if - mu.Query = { + // me(...) {...} + // } + // + // Then, upsertQuery = { + // me(...) {...} + // __dgraph_0__ as var(func: uid(0)) @filter(...) + // } + // + // The variable __dgraph_0__ will - + // * be empty if the condition is true + // * have 1 UID (the 0 UID) if the condition is false + upsertQuery += vars[i] + ` as var(func: uid(0)) ` + cond + `\n` } - - return resp, err } + upsertQuery += `}` - span.Annotatef(nil, "Prewrites err: %v. Attempting to commit/abort immediately.", err) - ctxn := resp.Txn - // zero would assign the CommitTs - cts, err := worker.CommitOverNetwork(ctx, ctxn) - span.Annotatef(nil, "Status of commit at ts: %d: %v", ctxn.StartTs, err) - if err != nil { - if err == dgo.ErrAborted { - err = status.Errorf(codes.Aborted, err.Error()) - resp.Txn.Aborted = true - } + return upsertQuery, vars +} - return resp, err - } +func authorizeRequest(ctx context.Context, qc *queryContext) error { + // if err := authorizeQuery(ctx, &qc.gqlRes); err != nil { + // return err + // } - // CommitNow was true, no need to send keys. - resp.Txn.Keys = resp.Txn.Keys[:0] - resp.Txn.CommitTs = cts + // // TODO(Aman): can be optimized to do the authorization in just one func call + // for _, gmu := range qc.gmuList { + // if err := authorizeMutation(ctx, gmu); err != nil { + // return err + // } + // } - return resp, nil + return nil } -// doQueryInUpsert processes the query in upsert block. -// It returns the latency and a map of variables => [ uids ...] used in the upsert mutation. -func doQueryInUpsert(ctx context.Context, req *api.Request, gmu *gql.Mutation) ( - *query.Latency, map[string][]string, error) { +func processQuery(ctx context.Context, qc *queryContext) (*api.Response, error) { + qr := query.Request{ + Latency: qc.latency, + GqlQuery: &qc.gqlRes, + } - l := &query.Latency{} - if req.Query == "" { - return l, nil, nil - } - - mu := req.Mutations[0] - upsertQuery := req.Query - needVars := findVars(gmu) - isCondUpsert := strings.TrimSpace(mu.Cond) != "" - // conditionalVar is a dummy var that we use to evaluate the result of - // conditional upsert. - conditionalVar := fmt.Sprintf("__dgraph%d__", rand.Int()) - if isCondUpsert { - // @if in upsert is same as @filter in the query - cond := strings.Replace(mu.Cond, "@if", "@filter", 1) - - // Add dummy query to evaluate the @if directive, ok to use uid(0) because - // dgraph doesn't check for existence of UIDs until we query for other predicates. - // Here, we are only querying for uid predicate in the dummy query. - // - // For example if - mu.Query = { - // me(...) {...} - // } - // - // Then, upsertQuery = { - // me(...) {...} - // __dgraph0__ as var(func: uid(0)) @if(...) - // } - // - // The variable __dgraph0__ will - - // * be empty if the condition is true - // * have 1 UID (the 0 UID) if the condition is false - upsertQuery = strings.TrimSuffix(strings.TrimSpace(req.Query), "}") - upsertQuery += conditionalVar + ` as var(func: uid(0)) ` + cond + `}` - needVars = append(needVars, conditionalVar) - } - - startParsingTime := time.Now() - parsedReq, err := gql.ParseWithNeedVars(gql.Request{ - Str: upsertQuery, - Variables: make(map[string]string), - }, needVars) - l.Parsing += time.Since(startParsingTime) - if err != nil { - return nil, nil, errors.Wrapf(err, "while parsing query: %q", upsertQuery) + // Here we try our best effort to not contact Zero for a timestamp. If we succeed, + // then we use the max known transaction ts value (from ProcessDelta) for a read-only query. + // If we haven't processed any updates yet then fall back to getting TS from Zero. + switch { + case qc.req.BestEffort: + qc.span.Annotate([]otrace.Attribute{otrace.BoolAttribute("be", true)}, "") + case qc.req.ReadOnly: + qc.span.Annotate([]otrace.Attribute{otrace.BoolAttribute("ro", true)}, "") + default: + qc.span.Annotate([]otrace.Attribute{otrace.BoolAttribute("no", true)}, "") } - if err := validateQuery(parsedReq.Query); err != nil { - return nil, nil, errors.Wrapf(err, "while validating query: %q", upsertQuery) + + if qc.req.BestEffort { + // Sanity: check that request is read-only too. + if !qc.req.ReadOnly { + return nil, errors.Errorf("A best effort query must be read-only.") + } + + if qc.req.StartTs == 0 { + qc.req.StartTs = posting.Oracle().MaxAssigned() + } + + qr.Cache = worker.NoCache } - if err := authorizeQuery(ctx, &parsedReq); err != nil { - return nil, nil, err + if qc.req.StartTs == 0 { + assignTimestampStart := time.Now() + qc.req.StartTs = State.getTimestamp(qc.req.ReadOnly) + qc.latency.AssignTimestamp = time.Since(assignTimestampStart) } - qr := query.Request{Latency: l, GqlQuery: &parsedReq, ReadTs: req.StartTs} - if err := qr.ProcessQuery(ctx); err != nil { - return nil, nil, errors.Wrapf(err, "while processing query: %q", upsertQuery) + qr.ReadTs = qc.req.StartTs + resp := &api.Response{} + resp.Txn = &api.TxnContext{StartTs: qc.req.StartTs} + annotateStartTs(qc.span, qc.req.StartTs) + + // Core processing happens here. + er, err := qr.Process(ctx) + if err != nil { + return nil, errors.Wrap(err, "") } - if len(qr.Vars) <= 0 { - return nil, nil, errors.Errorf("upsert query block has no variables") + var js []byte + if len(er.SchemaNode) > 0 || len(er.Types) > 0 { + sort.Slice(er.SchemaNode, func(i, j int) bool { + return er.SchemaNode[i].Predicate < er.SchemaNode[j].Predicate + }) + sort.Slice(er.Types, func(i, j int) bool { + return er.Types[i].TypeName < er.Types[j].TypeName + }) + + respMap := make(map[string]interface{}) + if len(er.SchemaNode) > 0 { + respMap["schema"] = er.SchemaNode + } + if len(er.Types) > 0 { + respMap["types"] = formatTypes(er.Types) + } + js, err = json.Marshal(respMap) + } else { + js, err = query.ToJson(qc.latency, er.Subgraphs) + } + if err != nil { + return nil, err } + resp.Json = js + qc.span.Annotatef(nil, "Response = %s", js) // varToUID contains a map of variable name to the uids corresponding to it. - // It is used later for constructing set and delete mutations by replacing variables - // with the actual uids they correspond to. + // It is used later for constructing set and delete mutations by replacing + // variables with the actual uids they correspond to. // If a variable doesn't have any UID, we generate one ourselves later. - varToUID := make(map[string][]string) + qc.uidRes = make(map[string][]string) for name, v := range qr.Vars { if v.Uids == nil || len(v.Uids.Uids) <= 0 { continue @@ -688,52 +727,60 @@ func doQueryInUpsert(ctx context.Context, req *api.Request, gmu *gql.Mutation) ( // We use base 10 here because the RDF mutations expect the uid to be in base 10. uids[i] = strconv.FormatUint(u, 10) } - varToUID[name] = uids + qc.uidRes[name] = uids } - // If @if condition is false, no need to process the mutations - if isCondUpsert { - v, ok := qr.Vars[conditionalVar] - isMut := ok && v.Uids != nil && len(v.Uids.Uids) == 1 - if !isMut { + return resp, err +} + +func updateMutations(qc *queryContext) { + for i, gmu := range qc.gmuList { + condVar := qc.condVars[i] + if condVar == "" { + continue + } + + uids, ok := qc.uidRes[condVar] + if !(ok && len(uids) == 1) { gmu.Set = nil gmu.Del = nil - return l, nil, nil } - } - updateUIDInMutations(gmu, varToUID) - updateValInMutations(gmu, qr) - // varToUID is returned to the client, let's delete the dummy var that we put in there for - // evaluating the conditional upsert. - delete(varToUID, conditionalVar) - return l, varToUID, nil + updateUIDInMutations(gmu, qc) + updateValInMutations(gmu, qc) + delete(qc.uidRes, condVar) + } } // findVars finds all the variables used in mutation block -func findVars(gmu *gql.Mutation) []string { - vars := make(map[string]struct{}) +func findVars(qc *queryContext) []string { updateVars := func(s string) { - if strings.HasPrefix(s, "uid(") || strings.HasPrefix(s, "val(") { + if strings.HasPrefix(s, "uid(") { + varName := s[4 : len(s)-1] + qc.uidRes[varName] = nil + } else if strings.HasPrefix(s, "val(") { varName := s[4 : len(s)-1] - vars[varName] = struct{}{} + qc.valRes[varName] = nil } } - for _, nq := range gmu.Set { - updateVars(nq.Subject) - updateVars(nq.ObjectId) - } - for _, nq := range gmu.Del { - updateVars(nq.Subject) - updateVars(nq.ObjectId) + + for _, gmu := range qc.gmuList { + for _, nq := range gmu.Set { + updateVars(nq.Subject) + updateVars(nq.ObjectId) + } + for _, nq := range gmu.Del { + updateVars(nq.Subject) + updateVars(nq.ObjectId) + } } - varsList := make([]string, 0, len(vars)) - for v := range vars { + varsList := make([]string, 0, len(qc.uidRes)+len(qc.valRes)) + for v := range qc.uidRes { varsList = append(varsList, v) } - if glog.V(3) { - glog.Infof("Variables used in mutation block: %v", varsList) + for v := range qc.valRes { + varsList = append(varsList, v) } return varsList @@ -743,12 +790,12 @@ func findVars(gmu *gql.Mutation) []string { // Assumption is that Subject can contain UID, whereas Object can contain Val // If val(variable) exists in a query, but the values are not there for the variable, // it will ignore the mutation silently. -func updateValInNQuads(nquads []*api.NQuad, req query.Request) []*api.NQuad { +func updateValInNQuads(nquads []*api.NQuad, qc *queryContext) []*api.NQuad { getNewVals := func(s string) (map[uint64]types.Val, bool) { if strings.HasPrefix(s, "val(") { varName := s[4 : len(s)-1] - if vals, ok := req.Vars[varName]; ok { - return vals.Vals, true + if v, ok := qc.valRes[varName]; ok && v != nil { + return v, true } return nil, true } @@ -820,22 +867,21 @@ func updateValInNQuads(nquads []*api.NQuad, req query.Request) []*api.NQuad { // updateValInMuations does following transformations: // 0x123 val(v) -> 0x123 13.0 -func updateValInMutations(gmu *gql.Mutation, req query.Request) { - gmu.Del = updateValInNQuads(gmu.Del, req) - gmu.Set = updateValInNQuads(gmu.Set, req) +func updateValInMutations(gmu *gql.Mutation, qc *queryContext) { + gmu.Del = updateValInNQuads(gmu.Del, qc) + gmu.Set = updateValInNQuads(gmu.Set, qc) } // updateUIDInMutations does following transformations: // * uid(v) -> 0x123 -- If v is defined in query block // * uid(v) -> _:uid(v) -- Otherwise - -func updateUIDInMutations(gmu *gql.Mutation, varToUID map[string][]string) { +func updateUIDInMutations(gmu *gql.Mutation, qc *queryContext) { // usedMutationVars keeps track of variables that are used in mutations. usedMutationVars := make(map[string]bool) getNewVals := func(s string) []string { if strings.HasPrefix(s, "uid(") { varName := s[4 : len(s)-1] - if uids, ok := varToUID[varName]; ok { + if uids, ok := qc.uidRes[varName]; ok { usedMutationVars[varName] = true return uids } @@ -890,165 +936,118 @@ func updateUIDInMutations(gmu *gql.Mutation, varToUID map[string][]string) { } } } - for v := range varToUID { + for v := range qc.uidRes { // We only want to return the vars which are used in the mutation. if _, ok := usedMutationVars[v]; !ok { - delete(varToUID, v) + delete(qc.uidRes, v) } } gmu.Set = gmuSet } -// Query handles queries and returns the data. -func (s *Server) Query(ctx context.Context, req *api.Request) (*api.Response, error) { - if len(req.Mutations) > 0 { - return s.doMutate(ctx, req, NeedAuthorize) - } - - return s.doQuery(ctx, req, NeedAuthorize) -} - -// This method is used to execute the query and return the response to the -// client as a protocol buffer message. -func (s *Server) doQuery(ctx context.Context, req *api.Request, authorize int) ( - resp *api.Response, rerr error) { - +func (s *Server) doMutate(ctx context.Context, qc *queryContext, resp *api.Response) error { if ctx.Err() != nil { - return nil, ctx.Err() + return ctx.Err() } - startTime := time.Now() - var measurements []ostats.Measurement - ctx, span := otrace.StartSpan(ctx, methodQuery) - ctx = x.WithMethod(ctx, methodQuery) - defer func() { - span.End() - v := x.TagValueStatusOK - if rerr != nil { - v = x.TagValueStatusError - } - ctx, _ = tag.New(ctx, tag.Upsert(x.KeyStatus, v)) - timeSpentMs := x.SinceMs(startTime) - measurements = append(measurements, x.LatencyMs.M(timeSpentMs)) - ostats.Record(ctx, measurements...) - }() + if !isMutationAllowed(ctx) { + return errors.Errorf("No mutations allowed.") + } if err := x.HealthCheck(); err != nil { - return nil, err + return err } - ostats.Record(ctx, x.PendingQueries.M(1), x.NumQueries.M(1)) - defer func() { - measurements = append(measurements, x.PendingQueries.M(-1)) - }() - - resp = &api.Response{} - if len(req.Query) == 0 { - span.Annotate(nil, "Empty query") - return resp, errors.Errorf("Empty query") + if len(qc.uidRes) > 0 { + resp.Vars = make(map[string]*api.Uids, len(qc.uidRes)) + for v, uids := range qc.uidRes { + // There could be a lot of these uids which could blow up the response size, especially + // for bulk mutations, hence only return variables which have less than a million uids. + if len(uids) <= 1e6 { + hexUids := make([]string, 0, len(uids)) + // doQueryInUpsert returns uids as base10 string representation. We convert them + // to base16 string so that response format is consistent with assigned uids. + for _, uid := range uids { + u, err := strconv.ParseUint(uid, 10, 64) + if err != nil { + return errors.Errorf("Couldn't parse uid: [%v] as base 10 uint64", uid) + } + huid := fmt.Sprintf("%#x", u) + hexUids = append(hexUids, huid) + } + resp.Vars[v] = &api.Uids{ + Uids: hexUids, + } + } + } } - var l query.Latency - l.Start = time.Now() - span.Annotatef(nil, "Query received: %v", req) - - parsedReq, err := gql.Parse(gql.Request{ - Str: req.Query, - Variables: req.Vars, - }) + newUids, err := query.AssignUids(ctx, qc.gmuList) if err != nil { - return resp, err + return err } - if err = validateQuery(parsedReq.Query); err != nil { - return resp, err + // resp.Uids contains a map of the node name to the uid. + // 1. For a blank node, like _:foo, the key would be foo. + // 2. For a uid variable that is part of an upsert query, + // like uid(foo), the key would be uid(foo). + resp.Uids = query.UidsToHex(query.StripBlankNode(newUids)) + edges, err := query.ToDirectedEdges(qc.gmuList, newUids) + if err != nil { + return err } - if authorize == NeedAuthorize { - if err := authorizeQuery(ctx, &parsedReq); err != nil { - return nil, err + m := &pb.Mutations{Edges: edges, StartTs: qc.req.StartTs} + qc.span.Annotatef(nil, "Applying mutations: %+v", m) + resp.Txn, err = query.ApplyMutations(ctx, m) + qc.span.Annotatef(nil, "Txn Context: %+v. Err=%v", resp.Txn, err) + if qc.req.CommitNow { + if err == zero.ErrConflict { + err = status.Error(codes.FailedPrecondition, err.Error()) } - } - var queryRequest = query.Request{ - Latency: &l, - GqlQuery: &parsedReq, - } - // Here we try our best effort to not contact Zero for a timestamp. If we succeed, - // then we use the max known transaction ts value (from ProcessDelta) for a read-only query. - // If we haven't processed any updates yet then fall back to getting TS from Zero. - switch { - case req.BestEffort: - span.Annotate([]otrace.Attribute{otrace.BoolAttribute("be", true)}, "") - case req.ReadOnly: - span.Annotate([]otrace.Attribute{otrace.BoolAttribute("ro", true)}, "") - default: - span.Annotate([]otrace.Attribute{otrace.BoolAttribute("no", true)}, "") + return err } - if req.BestEffort { - // Sanity: check that request is read-only too. - if !req.ReadOnly { - return resp, errors.Errorf("A best effort query must be read-only.") - } - if req.StartTs == 0 { - req.StartTs = posting.Oracle().MaxAssigned() + + // The following logic is for committing immediately. + if err != nil { + // ApplyMutations failed. We now want to abort the transaction, + // ignoring any error that might occur during the abort (the user would + // care more about the previous error). + if resp.Txn == nil { + resp.Txn = &api.TxnContext{StartTs: qc.req.StartTs} } - queryRequest.Cache = worker.NoCache - } - if req.StartTs == 0 { - assignTimestampStart := time.Now() - req.StartTs = State.getTimestamp(req.ReadOnly) - l.AssignTimestamp = time.Since(assignTimestampStart) - } + resp.Txn.Aborted = true + _, _ = worker.CommitOverNetwork(ctx, resp.Txn) - queryRequest.ReadTs = req.StartTs - resp.Txn = &api.TxnContext{StartTs: req.StartTs} - annotateStartTs(span, req.StartTs) + if err == zero.ErrConflict { + // We have already aborted the transaction, so the error message should reflect that. + return dgo.ErrAborted + } - // Core processing happens here. - var er query.ExecutionResult - if er, err = queryRequest.Process(ctx); err != nil { - return resp, errors.Wrap(err, "") + return err } - var js []byte - if len(er.SchemaNode) > 0 || len(er.Types) > 0 { - sort.Slice(er.SchemaNode, func(i, j int) bool { - return er.SchemaNode[i].Predicate < er.SchemaNode[j].Predicate - }) - sort.Slice(er.Types, func(i, j int) bool { - return er.Types[i].TypeName < er.Types[j].TypeName - }) - - respMap := make(map[string]interface{}) - if len(er.SchemaNode) > 0 { - respMap["schema"] = er.SchemaNode - } - if len(er.Types) > 0 { - respMap["types"] = formatTypes(er.Types) - } - js, err = json.Marshal(respMap) - } else { - js, err = query.ToJson(&l, er.Subgraphs) - } + qc.span.Annotatef(nil, "Prewrites err: %v. Attempting to commit/abort immediately.", err) + ctxn := resp.Txn + // zero would assign the CommitTs + cts, err := worker.CommitOverNetwork(ctx, ctxn) + qc.span.Annotatef(nil, "Status of commit at ts: %d: %v", ctxn.StartTs, err) if err != nil { - return resp, err - } - resp.Json = js - span.Annotatef(nil, "Response = %s", js) + if err == dgo.ErrAborted { + err = status.Errorf(codes.Aborted, err.Error()) + resp.Txn.Aborted = true + } - // TODO(martinmr): Include Transport as part of the latency. Need to do this separately - // since it involves modifying the API protos. - gl := &api.Latency{ - AssignTimestampNs: uint64(l.AssignTimestamp.Nanoseconds()), - ParsingNs: uint64(l.Parsing.Nanoseconds()), - ProcessingNs: uint64(l.Processing.Nanoseconds()), - EncodingNs: uint64(l.Json.Nanoseconds()), + return err } - resp.Latency = gl - return resp, err + // CommitNow was true, no need to send keys. + resp.Txn.Keys = resp.Txn.Keys[:0] + resp.Txn.CommitTs = cts + + return nil } // CommitOrAbort commits or aborts a transaction. @@ -1133,7 +1132,7 @@ func isAlterAllowed(ctx context.Context) error { // gql.Mutation.Set field. Similarly the 3 fields api.Mutation#DeleteJson, api.Mutation#DelNquads // and api.Mutation#Del are merged into the gql.Mutation#Del field. func parseMutationObject(mu *api.Mutation) (*gql.Mutation, error) { - res := &gql.Mutation{} + res := &gql.Mutation{Cond: mu.Cond} if len(mu.SetJson) > 0 { nqs, err := chunker.ParseJSON(mu.SetJson, chunker.SetNquads) diff --git a/go.mod b/go.mod index f1d9eadc961..73cb8293bcb 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/dgraph-io/dgraph -go 1.12 +go 1.13 require ( contrib.go.opencensus.io/exporter/jaeger v0.1.0 @@ -13,7 +13,6 @@ require ( github.com/blevesearch/segment v0.0.0-20160915185041-762005e7a34f // indirect github.com/blevesearch/snowballstem v0.0.0-20180110192139-26b06a2c243d // indirect github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd - github.com/d4l3k/messagediff v1.2.1 // indirect github.com/dgraph-io/badger v0.0.0-20190917133922-cbdef65095c7 github.com/dgraph-io/dgo/v2 v2.1.1-0.20191011032519-062f5605f6da github.com/dgraph-io/ristretto v0.0.0-20191010170704-2ba187ef9534 // indirect @@ -46,7 +45,7 @@ require ( github.com/spf13/viper v1.3.2 github.com/stretchr/testify v1.4.0 github.com/tinylib/msgp v0.0.0-20190103190839-ade0ca4ace05 // indirect - github.com/twpayne/go-geom v0.0.0-20170317090630-6753ad11e46b + github.com/twpayne/go-geom v1.0.5 github.com/willf/bitset v0.0.0-20181014161241-71fa2377963f // indirect go.etcd.io/etcd v0.0.0-20190228193606-a943ad0ee4c9 go.opencensus.io v0.21.0 diff --git a/go.sum b/go.sum index 4075b41e13d..6fb00798716 100644 --- a/go.sum +++ b/go.sum @@ -6,10 +6,13 @@ contrib.go.opencensus.io/exporter/prometheus v0.1.0 h1:SByaIoWwNgMdPSgl5sMqM2KDE contrib.go.opencensus.io/exporter/prometheus v0.1.0/go.mod h1:cGFniUXGZlKRjzOyuZJ6mgB+PgBcCIa79kEKR8YCW+A= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/DATA-DOG/go-sqlmock v1.3.2 h1:2L2f5t3kKnCLxnClDD/PrDfExFFa1wjESgxHG/B1ibo= +github.com/DATA-DOG/go-sqlmock v1.3.2/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/DataDog/datadog-go v0.0.0-20190425163447-40bafcb5f6c1 h1:fSu93OUqfEkoQJBkTsxFB1e0oESqabS45iRX880e7Xw= github.com/DataDog/datadog-go v0.0.0-20190425163447-40bafcb5f6c1/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/DataDog/opencensus-go-exporter-datadog v0.0.0-20190503082300-0f32ad59ab08 h1:5btKvK+N+FpW0EEgvxq7LWcUEwIRLsL4IwIo0u+Qlhs= github.com/DataDog/opencensus-go-exporter-datadog v0.0.0-20190503082300-0f32ad59ab08/go.mod h1:gMGUEe16aZh0QN941HgDjwrdjU4iTthPoz2/AtDRADE= +github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/OneOfOne/xxhash v1.2.5 h1:zl/OfRA6nftbBK9qTohYBJ5xvw6C/oNKizR7cZGl3cI= github.com/OneOfOne/xxhash v1.2.5/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= @@ -35,6 +38,7 @@ github.com/blevesearch/segment v0.0.0-20160915185041-762005e7a34f h1:kqbi9lqXLLs github.com/blevesearch/segment v0.0.0-20160915185041-762005e7a34f/go.mod h1:IInt5XRvpiGE09KOk9mmCMLjHhydIhNPKPPFLFBB7L8= github.com/blevesearch/snowballstem v0.0.0-20180110192139-26b06a2c243d h1:iPCfLXcTYDotqO1atEOQyoRDwlGaZVuMI4wSaKQlI2I= github.com/blevesearch/snowballstem v0.0.0-20180110192139-26b06a2c243d/go.mod h1:cdytUvf6FKWA9NpXJihYdZq8TN2AiQ5HOS0UZUz0C9g= +github.com/cenkalti/backoff v2.1.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.0.1-0.20190104013014-3767db7a7e18/go.mod h1:HD5P3vAIAh+Y2GAxg0PrPN1P8WkepXGpjbUPDHJqqKM= @@ -42,6 +46,7 @@ github.com/cespare/xxhash/v2 v2.1.0/go.mod h1:dgIUBU3pDso/gPgZ1osOZ0iQf77oPR28Tj github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd h1:qMd81Ts1T2OTKmB4acZcyKaMtRnY5Y44NuXGX2GFJ1w= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= +github.com/containerd/continuity v0.0.0-20181203112020-004b46473808/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y= github.com/coocood/freecache v1.1.0/go.mod h1:ePwxCDzOYvARfHdr1pByNct1at3CoKnsipOHwKlNbzI= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= @@ -68,6 +73,8 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczC github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-groupvarint v0.0.0-20190318181831-5ce5df8ca4e1 h1:RSnSk6/ViWmqng/mNcPztSPgr6/4EVDxMmBH/a0w/6I= github.com/dgryski/go-groupvarint v0.0.0-20190318181831-5ce5df8ca4e1/go.mod h1:MlkUQveSLEDbIgq2r1e++tSf0zfzU9mQpa9Qkczl+9Y= +github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= +github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= @@ -140,6 +147,7 @@ github.com/kr/pty v1.0.0/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= @@ -160,7 +168,11 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= +github.com/opencontainers/image-spec v1.0.1/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0= +github.com/opencontainers/runc v0.1.1/go.mod h1:qT5XzbpPznkRYVz/mWwUaVBUv2rmF59PVA73FjuZG0U= github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw= +github.com/ory/dockertest v3.3.4+incompatible/go.mod h1:1vX4m9wsvi00u5bseYwXaSnhNrne+V0E6LAcBILJdPs= github.com/paulmach/go.geojson v0.0.0-20170327170536-40612a87147b h1:rY7xFF9ktAzkr2OXol6GU9lrEw5PAMd5VV/5/T0A+FU= github.com/paulmach/go.geojson v0.0.0-20170327170536-40612a87147b/go.mod h1:YaKx1hKpWF+T2oj2lFJPsW/t1Q5e1jQI61eoQSTwpIs= github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= @@ -200,6 +212,7 @@ github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqn github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/sirupsen/logrus v1.0.5/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 h1:WN9BUFbdyOsSH/XohnWpXOlq9NBD5sGAB2FciQMUEe8= @@ -233,8 +246,10 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/tinylib/msgp v0.0.0-20190103190839-ade0ca4ace05 h1:4UEPSXT1HqXDvnBx4FPNMuqu+tOzKJsRnbSwyuF74Fc= github.com/tinylib/msgp v0.0.0-20190103190839-ade0ca4ace05/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= -github.com/twpayne/go-geom v0.0.0-20170317090630-6753ad11e46b h1:MhMhVM2yVsvchj7hjsCYLcrauXCiuYuXxxUDbrDVIao= -github.com/twpayne/go-geom v0.0.0-20170317090630-6753ad11e46b/go.mod h1:xN4fLTgWx1bpC4UKnBlzabpRNq5Kwxsw+6bPPrht9qg= +github.com/twpayne/go-geom v1.0.5 h1:XZBfc3Wx0dj4p17ZfmzqxnU9fTTa3pY4YG5RngKsVNI= +github.com/twpayne/go-geom v1.0.5/go.mod h1:gO3i8BeAvZuihwwXcw8dIOWXebCzTmy3uvXj9dZG2RA= +github.com/twpayne/go-kml v1.0.0/go.mod h1:LlvLIQSfMqYk2O7Nx8vYAbSLv4K9rjMvLlEdUKWdjq0= +github.com/twpayne/go-polyline v1.0.0/go.mod h1:ICh24bcLYBX8CknfvNPKqoTbe+eg+MX1NPyJmSBo7pU= github.com/ugorji/go v1.1.2/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/ugorji/go/codec v0.0.0-20190204201341-e444a5086c43/go.mod h1:iT03XoTwV7xq/+UGwKO3UbC1nNNlopQiY61beSdrtOA= @@ -290,6 +305,7 @@ golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190405154228-4b34438f7a67/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb h1:fgwFCsaw9buMuxNd6+DQfAuSFqbNiQZpcgJQAgJsK6k= golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/gql/mutation.go b/gql/mutation.go index 12f19f9caeb..2f196822c6b 100644 --- a/gql/mutation.go +++ b/gql/mutation.go @@ -32,8 +32,9 @@ var ( // Mutation stores the strings corresponding to set and delete operations. type Mutation struct { - Set []*api.NQuad - Del []*api.NQuad + Cond string + Set []*api.NQuad + Del []*api.NQuad } // ParseUid parses the given string into an UID. This method returns with an error diff --git a/query/mutation.go b/query/mutation.go index 3a19f40c498..4fd86ebdfaa 100644 --- a/query/mutation.go +++ b/query/mutation.go @@ -111,39 +111,41 @@ func verifyUid(ctx context.Context, uid uint64) error { // AssignUids tries to assign unique ids to each identity in the subjects and objects in the // format of _:xxx. An identity, e.g. _:a, will only be assigned one uid regardless how many times // it shows up in the subjects or objects -func AssignUids(ctx context.Context, nquads []*api.NQuad) (map[string]uint64, error) { +func AssignUids(ctx context.Context, gmuList []*gql.Mutation) (map[string]uint64, error) { newUids := make(map[string]uint64) num := &pb.Num{} var err error - for _, nq := range nquads { - // We dont want to assign uids to these. - if nq.Subject == x.Star && nq.ObjectValue.GetDefaultVal() == x.Star { - return newUids, errors.New("Predicate deletion should be called via alter") - } - - if len(nq.Subject) == 0 { - return nil, errors.Errorf("Subject must not be empty for nquad: %+v", nq) - } - var uid uint64 - if strings.HasPrefix(nq.Subject, "_:") { - newUids[nq.Subject] = 0 - } else if uid, err = gql.ParseUid(nq.Subject); err != nil { - return newUids, err - } - if err = verifyUid(ctx, uid); err != nil { - return newUids, err - } + for _, gmu := range gmuList { + for _, nq := range gmu.Set { + // We dont want to assign uids to these. + if nq.Subject == x.Star && nq.ObjectValue.GetDefaultVal() == x.Star { + return newUids, errors.New("Predicate deletion should be called via alter") + } - if len(nq.ObjectId) > 0 { + if len(nq.Subject) == 0 { + return nil, errors.Errorf("Subject must not be empty for nquad: %+v", nq) + } var uid uint64 - if strings.HasPrefix(nq.ObjectId, "_:") { - newUids[nq.ObjectId] = 0 - } else if uid, err = gql.ParseUid(nq.ObjectId); err != nil { + if strings.HasPrefix(nq.Subject, "_:") { + newUids[nq.Subject] = 0 + } else if uid, err = gql.ParseUid(nq.Subject); err != nil { return newUids, err } if err = verifyUid(ctx, uid); err != nil { return newUids, err } + + if len(nq.ObjectId) > 0 { + var uid uint64 + if strings.HasPrefix(nq.ObjectId, "_:") { + newUids[nq.ObjectId] = 0 + } else if uid, err = gql.ParseUid(nq.ObjectId); err != nil { + return newUids, err + } + if err = verifyUid(ctx, uid); err != nil { + return newUids, err + } + } } } @@ -167,8 +169,8 @@ func AssignUids(ctx context.Context, nquads []*api.NQuad) (map[string]uint64, er } // ToDirectedEdges converts the gql.Mutation input into a set of directed edges. -func ToDirectedEdges(gmu *gql.Mutation, - newUids map[string]uint64) (edges []*pb.DirectedEdge, err error) { +func ToDirectedEdges(gmuList []*gql.Mutation, newUids map[string]uint64) ( + edges []*pb.DirectedEdge, err error) { // Wrapper for a pointer to protos.Nquad var wnq *gql.NQuad @@ -189,20 +191,22 @@ func ToDirectedEdges(gmu *gql.Mutation, return nil } - for _, nq := range gmu.Set { - if err := facets.SortAndValidate(nq.Facets); err != nil { - return edges, err - } - if err := parse(nq, pb.DirectedEdge_SET); err != nil { - return edges, err - } - } - for _, nq := range gmu.Del { - if nq.Subject == x.Star && nq.ObjectValue.GetDefaultVal() == x.Star { - return edges, errors.New("Predicate deletion should be called via alter") + for _, gmu := range gmuList { + for _, nq := range gmu.Set { + if err := facets.SortAndValidate(nq.Facets); err != nil { + return edges, err + } + if err := parse(nq, pb.DirectedEdge_SET); err != nil { + return edges, err + } } - if err := parse(nq, pb.DirectedEdge_DEL); err != nil { - return edges, err + for _, nq := range gmu.Del { + if nq.Subject == x.Star && nq.ObjectValue.GetDefaultVal() == x.Star { + return edges, errors.New("Predicate deletion should be called via alter") + } + if err := parse(nq, pb.DirectedEdge_DEL); err != nil { + return edges, err + } } }