diff --git a/contrib/integration/bigdata/main.go b/contrib/integration/bigdata/main.go index a0f1b5c0bfb..3587dedd550 100644 --- a/contrib/integration/bigdata/main.go +++ b/contrib/integration/bigdata/main.go @@ -54,7 +54,7 @@ func main() { // schema items. resp, err := c.NewTxn().Query(ctx, "schema {}") x.Check(err) - if len(resp.Schema) < 5 { + if len(resp.Json) < 5 { // Run each schema alter separately so that there is an even // distribution among all groups. for _, s := range schema() { diff --git a/contrib/integration/testtxn/main_test.go b/contrib/integration/testtxn/main_test.go index bfb95c726a2..eb6636b8b65 100644 --- a/contrib/integration/testtxn/main_test.go +++ b/contrib/integration/testtxn/main_test.go @@ -232,12 +232,15 @@ func TestTxnRead5(t *testing.T) { mu = &api.Mutation{} mu.SetJson = []byte(fmt.Sprintf("{\"uid\": \"%s\", \"name\": \"Manish2\"}", uid)) - mu.CommitNow = true - res, err := dc.Mutate(context.Background(), mu) + muReq := api.Request{ + Mutations: []*api.Mutation{mu}, + CommitNow: true, + } + res, err := dc.Query(context.Background(), &muReq) if err != nil { log.Fatalf("Error while running mutation: %v\n", err) } - x.AssertTrue(res.Context.StartTs > 0) + x.AssertTrue(res.Txn.StartTs > 0) resp, err = dc.Query(context.Background(), &req) if err != nil { log.Fatalf("Error while running query: %v\n", err) diff --git a/dgraph/cmd/alpha/http.go b/dgraph/cmd/alpha/http.go index f006d631c47..c3d698b0820 100644 --- a/dgraph/cmd/alpha/http.go +++ b/dgraph/cmd/alpha/http.go @@ -302,7 +302,7 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) { // start parsing the query parseStart := time.Now() - var mu *api.Mutation + var req *api.Request contentType := r.Header.Get("Content-Type") switch strings.ToLower(contentType) { case "application/json": @@ -312,7 +312,8 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) { return } - mu = &api.Mutation{} + mu := &api.Mutation{} + req = &api.Request{Mutations: []*api.Mutation{mu}} if setJSON, ok := ms["set"]; ok && setJSON != nil { mu.SetJson = setJSON.bs } @@ -320,7 +321,7 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) { mu.DeleteJson = delJSON.bs } if queryText, ok := ms["query"]; ok && queryText != nil { - mu.Query, err = strconv.Unquote(string(queryText.bs)) + req.Query, err = strconv.Unquote(string(queryText.bs)) if err != nil { x.SetStatus(w, x.ErrorInvalidRequest, err.Error()) return @@ -336,7 +337,7 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) { case "application/rdf": // Parse N-Quads. - mu, err = gql.ParseMutation(string(body)) + req, err = gql.ParseMutation(string(body)) if err != nil { x.SetStatus(w, x.ErrorInvalidRequest, err.Error()) return @@ -351,11 +352,11 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) { // end of query parsing parseEnd := time.Now() - mu.StartTs = startTs - mu.CommitNow = commitNow + req.StartTs = startTs + req.CommitNow = commitNow ctx := attachAccessJwt(context.Background(), r) - resp, err := (&edgraph.Server{}).Mutate(ctx, mu) + resp, err := (&edgraph.Server{}).Query(ctx, req) if err != nil { x.SetStatusWithData(w, x.ErrorInvalidRequest, err.Error()) return @@ -363,14 +364,14 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) { resp.Latency.ParsingNs = uint64(parseEnd.Sub(parseStart).Nanoseconds()) e := query.Extensions{ - Txn: resp.Context, + Txn: resp.Txn, Latency: resp.Latency, } sort.Strings(e.Txn.Keys) sort.Strings(e.Txn.Preds) // Don't send keys array which is part of txn context if its commit immediately. - if mu.CommitNow { + if req.CommitNow { e.Txn.Keys = e.Txn.Keys[:0] } @@ -487,11 +488,11 @@ func handleCommit(startTs uint64, reqText []byte) (map[string]interface{}, error return nil, err } - resp := &api.Assigned{} - resp.Context = tc - resp.Context.CommitTs = cts + resp := &api.Response{} + resp.Txn = tc + resp.Txn.CommitTs = cts e := query.Extensions{ - Txn: resp.Context, + Txn: resp.Txn, } e.Txn.Keys = e.Txn.Keys[:0] response := map[string]interface{}{} diff --git a/edgraph/access_ee.go b/edgraph/access_ee.go index 59a9587774d..1f1e29115c6 100644 --- a/edgraph/access_ee.go +++ b/edgraph/access_ee.go @@ -259,8 +259,9 @@ const queryUser = ` // authorizeUser queries the user with the given user id, and returns the associated uid, // acl groups, and whether the password stored in DB matches the supplied password -func authorizeUser(ctx context.Context, userid string, password string) (*acl.User, - error) { +func authorizeUser(ctx context.Context, userid string, password string) ( + *acl.User, error) { + queryVars := map[string]string{ "$userid": userid, "$password": password, @@ -270,7 +271,7 @@ func authorizeUser(ctx context.Context, userid string, password string) (*acl.Us Vars: queryVars, } - queryResp, err := (&Server{}).doQuery(ctx, &queryRequest) + queryResp, err := (&Server{}).doQuery(ctx, &queryRequest, false) if err != nil { glog.Errorf("Error while query user with id %s: %v", userid, err) return nil, err @@ -304,7 +305,7 @@ func RefreshAcls(closer *y.Closer) { ctx := context.Background() var err error - queryResp, err := (&Server{}).doQuery(ctx, &queryRequest) + queryResp, err := (&Server{}).doQuery(ctx, &queryRequest, false) if err != nil { return errors.Errorf("unable to retrieve acls: %v", err) } @@ -356,7 +357,7 @@ func ResetAcl() { Vars: queryVars, } - queryResp, err := (&Server{}).doQuery(ctx, &queryRequest) + queryResp, err := (&Server{}).doQuery(ctx, &queryRequest, false) if err != nil { return errors.Wrapf(err, "while querying user with id %s", x.GrootId) } @@ -373,13 +374,15 @@ func ResetAcl() { // Insert Groot. createUserNQuads := acl.CreateUserNQuads(x.GrootId, "password") - mu := &api.Mutation{ + req := &api.Request{ StartTs: startTs, CommitNow: true, - Set: createUserNQuads, + Mutations: []*api.Mutation{ + &api.Mutation{Set: createUserNQuads}, + }, } - if _, err := (&Server{}).doMutate(context.Background(), mu, false); err != nil { + if _, err := (&Server{}).doMutate(context.Background(), req, false); err != nil { return err } glog.Infof("Successfully upserted the groot account") diff --git a/edgraph/server.go b/edgraph/server.go index 34983098505..15aab2adc26 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -419,24 +419,24 @@ func annotateStartTs(span *otrace.Span, ts uint64) { span.Annotate([]otrace.Attribute{otrace.Int64Attribute("startTs", int64(ts))}, "") } -// Mutate handles requests to perform mutations. -func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (*api.Assigned, error) { - return s.doMutate(ctx, mu, true) -} - -func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool) ( - resp *api.Assigned, rerr error) { +func (s *Server) doMutate(ctx context.Context, req *api.Request, authorize bool) ( + resp *api.Response, rerr error) { if ctx.Err() != nil { return nil, ctx.Err() } + if len(req.Mutations) != 1 { + return nil, errors.Errorf("Only 1 mutation per request is supported") + } + mu := req.Mutations[0] + if !isMutationAllowed(ctx) { return resp, errors.Errorf("No mutations allowed.") } var parsingTime time.Duration - resp = &api.Assigned{} + resp = &api.Response{} start := time.Now() defer func() { @@ -466,7 +466,7 @@ func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool) } ostats.Record(ctx, x.NumMutations.M(1)) - if mu.Query != "" { + if req.Query != "" { span.Annotatef(nil, "Got Mutation with Upsert Block: %s", mu) } if len(mu.SetJson) > 0 { @@ -493,12 +493,12 @@ func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool) return resp, errors.Errorf("Empty mutation") } - if mu.StartTs == 0 { - mu.StartTs = State.getTimestamp(false) + if req.StartTs == 0 { + req.StartTs = State.getTimestamp(false) } - annotateStartTs(span, mu.StartTs) + annotateStartTs(span, req.StartTs) - l, err := doQueryInUpsert(ctx, mu, gmu) + l, err := doQueryInUpsert(ctx, req, gmu) if err != nil { return resp, err } @@ -514,11 +514,11 @@ func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool) return resp, err } - m := &pb.Mutations{Edges: edges, StartTs: mu.StartTs} + m := &pb.Mutations{Edges: edges, StartTs: req.StartTs} span.Annotatef(nil, "Applying mutations: %+v", m) - resp.Context, err = query.ApplyMutations(ctx, m) - span.Annotatef(nil, "Txn Context: %+v. Err=%v", resp.Context, err) - if !mu.CommitNow { + resp.Txn, err = query.ApplyMutations(ctx, m) + span.Annotatef(nil, "Txn Context: %+v. Err=%v", resp.Txn, err) + if !req.CommitNow { if err == y.ErrConflict { err = status.Error(codes.FailedPrecondition, err.Error()) } @@ -531,12 +531,12 @@ func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool) // ApplyMutations failed. We now want to abort the transaction, // ignoring any error that might occur during the abort (the user would // care more about the previous error). - if resp.Context == nil { - resp.Context = &api.TxnContext{StartTs: mu.StartTs} + if resp.Txn == nil { + resp.Txn = &api.TxnContext{StartTs: req.StartTs} } - resp.Context.Aborted = true - _, _ = worker.CommitOverNetwork(ctx, resp.Context) + resp.Txn.Aborted = true + _, _ = worker.CommitOverNetwork(ctx, resp.Txn) if err == y.ErrConflict { // We have already aborted the transaction, so the error message should reflect that. @@ -547,36 +547,41 @@ func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool) } span.Annotatef(nil, "Prewrites err: %v. Attempting to commit/abort immediately.", err) - ctxn := resp.Context + ctxn := resp.Txn // zero would assign the CommitTs cts, err := worker.CommitOverNetwork(ctx, ctxn) span.Annotatef(nil, "Status of commit at ts: %d: %v", ctxn.StartTs, err) if err != nil { if err == y.ErrAborted { err = status.Errorf(codes.Aborted, err.Error()) - resp.Context.Aborted = true + resp.Txn.Aborted = true } return resp, err } // CommitNow was true, no need to send keys. - resp.Context.Keys = resp.Context.Keys[:0] - resp.Context.CommitTs = cts + resp.Txn.Keys = resp.Txn.Keys[:0] + resp.Txn.CommitTs = cts return resp, nil } // doQueryInUpsert processes the query in upsert block. -func doQueryInUpsert(ctx context.Context, mu *api.Mutation, gmu *gql.Mutation) ( +func doQueryInUpsert(ctx context.Context, req *api.Request, gmu *gql.Mutation) ( *query.Latency, error) { l := &query.Latency{} - if mu.Query == "" { + if req.Query == "" { return l, nil } - upsertQuery := mu.Query + // if err := authorizeQuery(ctx, req); err != nil { + // return nil, err + // } + + mu := req.Mutations[0] + upsertQuery := req.Query needVars := findVars(gmu) isCondUpsert := strings.TrimSpace(mu.Cond) != "" varName := fmt.Sprintf("__dgraph%d__", rand.Int()) @@ -600,7 +605,7 @@ func doQueryInUpsert(ctx context.Context, mu *api.Mutation, gmu *gql.Mutation) ( // The variable __dgraph0__ will - // * be empty if the condition is true // * have 1 UID (the 0 UID) if the condition is false - upsertQuery = strings.TrimSuffix(strings.TrimSpace(mu.Query), "}") + upsertQuery = strings.TrimSuffix(strings.TrimSpace(req.Query), "}") upsertQuery += varName + ` as var(func: uid(0)) ` + cond + `}` needVars = append(needVars, varName) } @@ -618,7 +623,7 @@ func doQueryInUpsert(ctx context.Context, mu *api.Mutation, gmu *gql.Mutation) ( return nil, errors.Wrapf(err, "while validating query: %q", upsertQuery) } - qr := query.Request{Latency: l, GqlQuery: &parsedReq, ReadTs: mu.StartTs} + qr := query.Request{Latency: l, GqlQuery: &parsedReq, ReadTs: req.StartTs} if err := qr.ProcessQuery(ctx); err != nil { return nil, errors.Wrapf(err, "while processing query: %q", upsertQuery) } @@ -751,24 +756,29 @@ func updateMutations(gmu *gql.Mutation, varToUID map[string][]string) { // Query handles queries and returns the data. func (s *Server) Query(ctx context.Context, req *api.Request) (*api.Response, error) { - if err := authorizeQuery(ctx, req); err != nil { - return nil, err - } - if glog.V(3) { - glog.Infof("Got a query: %+v", req) + if len(req.Mutations) > 0 { + return s.doMutate(ctx, req, true) } - return s.doQuery(ctx, req) + return s.doQuery(ctx, req, true) } // This method is used to execute the query and return the response to the // client as a protocol buffer message. -func (s *Server) doQuery(ctx context.Context, req *api.Request) (resp *api.Response, rerr error) { +func (s *Server) doQuery(ctx context.Context, req *api.Request, authorize bool) ( + resp *api.Response, rerr error) { + if ctx.Err() != nil { return nil, ctx.Err() } startTime := time.Now() + if authorize { + if err := authorizeQuery(ctx, req); err != nil { + return nil, err + } + } + var measurements []ostats.Measurement ctx, span := otrace.StartSpan(ctx, methodQuery) ctx = x.WithMethod(ctx, methodQuery)