From 5a3ec511fa179044c76ba0fc8a244d1e5f0bd964 Mon Sep 17 00:00:00 2001 From: Aman Mangal Date: Thu, 22 Aug 2019 04:13:24 +0530 Subject: [PATCH] Merge Query and Mutate endpoint for grpc --- contrib/integration/bigdata/main.go | 2 +- contrib/integration/testtxn/main_test.go | 9 ++- dgraph/cmd/alpha/http.go | 27 +++---- edgraph/access.go | 2 +- edgraph/access_ee.go | 35 ++++----- edgraph/server.go | 94 ++++++++++++++---------- gql/parser_mutation.go | 25 ++++--- gql/parser_test.go | 3 +- 8 files changed, 110 insertions(+), 87 deletions(-) diff --git a/contrib/integration/bigdata/main.go b/contrib/integration/bigdata/main.go index a0f1b5c0bfb..3587dedd550 100644 --- a/contrib/integration/bigdata/main.go +++ b/contrib/integration/bigdata/main.go @@ -54,7 +54,7 @@ func main() { // schema items. resp, err := c.NewTxn().Query(ctx, "schema {}") x.Check(err) - if len(resp.Schema) < 5 { + if len(resp.Json) < 5 { // Run each schema alter separately so that there is an even // distribution among all groups. for _, s := range schema() { diff --git a/contrib/integration/testtxn/main_test.go b/contrib/integration/testtxn/main_test.go index bfb95c726a2..eb6636b8b65 100644 --- a/contrib/integration/testtxn/main_test.go +++ b/contrib/integration/testtxn/main_test.go @@ -232,12 +232,15 @@ func TestTxnRead5(t *testing.T) { mu = &api.Mutation{} mu.SetJson = []byte(fmt.Sprintf("{\"uid\": \"%s\", \"name\": \"Manish2\"}", uid)) - mu.CommitNow = true - res, err := dc.Mutate(context.Background(), mu) + muReq := api.Request{ + Mutations: []*api.Mutation{mu}, + CommitNow: true, + } + res, err := dc.Query(context.Background(), &muReq) if err != nil { log.Fatalf("Error while running mutation: %v\n", err) } - x.AssertTrue(res.Context.StartTs > 0) + x.AssertTrue(res.Txn.StartTs > 0) resp, err = dc.Query(context.Background(), &req) if err != nil { log.Fatalf("Error while running query: %v\n", err) diff --git a/dgraph/cmd/alpha/http.go b/dgraph/cmd/alpha/http.go index f006d631c47..c3d698b0820 100644 --- a/dgraph/cmd/alpha/http.go +++ b/dgraph/cmd/alpha/http.go @@ -302,7 +302,7 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) { // start parsing the query parseStart := time.Now() - var mu *api.Mutation + var req *api.Request contentType := r.Header.Get("Content-Type") switch strings.ToLower(contentType) { case "application/json": @@ -312,7 +312,8 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) { return } - mu = &api.Mutation{} + mu := &api.Mutation{} + req = &api.Request{Mutations: []*api.Mutation{mu}} if setJSON, ok := ms["set"]; ok && setJSON != nil { mu.SetJson = setJSON.bs } @@ -320,7 +321,7 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) { mu.DeleteJson = delJSON.bs } if queryText, ok := ms["query"]; ok && queryText != nil { - mu.Query, err = strconv.Unquote(string(queryText.bs)) + req.Query, err = strconv.Unquote(string(queryText.bs)) if err != nil { x.SetStatus(w, x.ErrorInvalidRequest, err.Error()) return @@ -336,7 +337,7 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) { case "application/rdf": // Parse N-Quads. - mu, err = gql.ParseMutation(string(body)) + req, err = gql.ParseMutation(string(body)) if err != nil { x.SetStatus(w, x.ErrorInvalidRequest, err.Error()) return @@ -351,11 +352,11 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) { // end of query parsing parseEnd := time.Now() - mu.StartTs = startTs - mu.CommitNow = commitNow + req.StartTs = startTs + req.CommitNow = commitNow ctx := attachAccessJwt(context.Background(), r) - resp, err := (&edgraph.Server{}).Mutate(ctx, mu) + resp, err := (&edgraph.Server{}).Query(ctx, req) if err != nil { x.SetStatusWithData(w, x.ErrorInvalidRequest, err.Error()) return @@ -363,14 +364,14 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) { resp.Latency.ParsingNs = uint64(parseEnd.Sub(parseStart).Nanoseconds()) e := query.Extensions{ - Txn: resp.Context, + Txn: resp.Txn, Latency: resp.Latency, } sort.Strings(e.Txn.Keys) sort.Strings(e.Txn.Preds) // Don't send keys array which is part of txn context if its commit immediately. - if mu.CommitNow { + if req.CommitNow { e.Txn.Keys = e.Txn.Keys[:0] } @@ -487,11 +488,11 @@ func handleCommit(startTs uint64, reqText []byte) (map[string]interface{}, error return nil, err } - resp := &api.Assigned{} - resp.Context = tc - resp.Context.CommitTs = cts + resp := &api.Response{} + resp.Txn = tc + resp.Txn.CommitTs = cts e := query.Extensions{ - Txn: resp.Context, + Txn: resp.Txn, } e.Txn.Keys = e.Txn.Keys[:0] response := map[string]interface{}{} diff --git a/edgraph/access.go b/edgraph/access.go index 11b7a710530..7f01f9c2eeb 100644 --- a/edgraph/access.go +++ b/edgraph/access.go @@ -57,7 +57,7 @@ func authorizeMutation(ctx context.Context, gmu *gql.Mutation) error { return nil } -func authorizeQuery(ctx context.Context, req *api.Request) error { +func authorizeQuery(ctx context.Context, parsedReq *gql.Result) error { // always allow access return nil } diff --git a/edgraph/access_ee.go b/edgraph/access_ee.go index e1f2fdad83f..8ebb3f97f65 100644 --- a/edgraph/access_ee.go +++ b/edgraph/access_ee.go @@ -265,8 +265,9 @@ const queryUser = ` // authorizeUser queries the user with the given user id, and returns the associated uid, // acl groups, and whether the password stored in DB matches the supplied password -func authorizeUser(ctx context.Context, userid string, password string) (*acl.User, - error) { +func authorizeUser(ctx context.Context, userid string, password string) ( + *acl.User, error) { + queryVars := map[string]string{ "$userid": userid, "$password": password, @@ -276,7 +277,7 @@ func authorizeUser(ctx context.Context, userid string, password string) (*acl.Us Vars: queryVars, } - queryResp, err := (&Server{}).doQuery(ctx, &queryRequest) + queryResp, err := (&Server{}).doQuery(ctx, &queryRequest, false) if err != nil { glog.Errorf("Error while query user with id %s: %v", userid, err) return nil, err @@ -310,7 +311,7 @@ func RefreshAcls(closer *y.Closer) { ctx := context.Background() var err error - queryResp, err := (&Server{}).doQuery(ctx, &queryRequest) + queryResp, err := (&Server{}).doQuery(ctx, &queryRequest, false) if err != nil { return errors.Errorf("unable to retrieve acls: %v", err) } @@ -362,7 +363,7 @@ func ResetAcl() { Vars: queryVars, } - queryResp, err := (&Server{}).doQuery(ctx, &queryRequest) + queryResp, err := (&Server{}).doQuery(ctx, &queryRequest, false) if err != nil { return errors.Wrapf(err, "while querying user with id %s", x.GrootId) } @@ -379,13 +380,17 @@ func ResetAcl() { // Insert Groot. createUserNQuads := acl.CreateUserNQuads(x.GrootId, "password") - mu := &api.Mutation{ + req := &api.Request{ StartTs: startTs, CommitNow: true, - Set: createUserNQuads, + Mutations: []*api.Mutation{ + { + Set: createUserNQuads, + }, + }, } - if _, err := (&Server{}).doMutate(context.Background(), mu, false); err != nil { + if _, err := (&Server{}).doMutate(context.Background(), req, false); err != nil { return err } glog.Infof("Successfully upserted the groot account") @@ -658,23 +663,15 @@ func logAccess(log *accessEntry) { } //authorizeQuery authorizes the query using the aclCachePtr -func authorizeQuery(ctx context.Context, req *api.Request) error { +func authorizeQuery(ctx context.Context, parsedReq *gql.Result) error { if len(Config.HmacSecret) == 0 { // the user has not turned on the acl feature return nil } - parsedReq, err := gql.Parse(gql.Request{ - Str: req.Query, - Variables: req.Vars, - }) - if err != nil { - return err - } - preds := parsePredsFromQuery(parsedReq.Query) - var userId string var groupIds []string + preds := parsePredsFromQuery(parsedReq.Query) doAuthorizeQuery := func() error { userData, err := extractUserAndGroups(ctx) if err == nil { @@ -710,7 +707,7 @@ func authorizeQuery(ctx context.Context, req *api.Request) error { return nil } - err = doAuthorizeQuery() + err := doAuthorizeQuery() if span := otrace.FromContext(ctx); span != nil { span.Annotatef(nil, (&accessEntry{ userId: userId, diff --git a/edgraph/server.go b/edgraph/server.go index 34983098505..7613c24ed43 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -75,6 +75,14 @@ type ServerState struct { needTs chan tsReq } +const ( + // NeedAuthorize is used to indicate that the request needs to be authorized. + NeedAuthorize = iota + // NoAuthorize is used to indicate that authorization needs to be skipped. + // Used when ACL needs to query information for performing the authorization check. + NoAuthorize +) + // State is the instance of ServerState used by the current server. var State ServerState @@ -419,24 +427,24 @@ func annotateStartTs(span *otrace.Span, ts uint64) { span.Annotate([]otrace.Attribute{otrace.Int64Attribute("startTs", int64(ts))}, "") } -// Mutate handles requests to perform mutations. -func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (*api.Assigned, error) { - return s.doMutate(ctx, mu, true) -} - -func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool) ( - resp *api.Assigned, rerr error) { +func (s *Server) doMutate(ctx context.Context, req *api.Request, authorize int) ( + resp *api.Response, rerr error) { if ctx.Err() != nil { return nil, ctx.Err() } + if len(req.Mutations) != 1 { + return nil, errors.Errorf("Only 1 mutation per request is supported") + } + mu := req.Mutations[0] + if !isMutationAllowed(ctx) { return resp, errors.Errorf("No mutations allowed.") } var parsingTime time.Duration - resp = &api.Assigned{} + resp = &api.Response{} start := time.Now() defer func() { @@ -466,7 +474,7 @@ func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool) } ostats.Record(ctx, x.NumMutations.M(1)) - if mu.Query != "" { + if req.Query != "" { span.Annotatef(nil, "Got Mutation with Upsert Block: %s", mu) } if len(mu.SetJson) > 0 { @@ -482,7 +490,7 @@ func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool) } parsingTime += time.Since(startParsingTime) - if authorize { + if authorize == NeedAuthorize { if err := authorizeMutation(ctx, gmu); err != nil { return resp, err } @@ -493,12 +501,12 @@ func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool) return resp, errors.Errorf("Empty mutation") } - if mu.StartTs == 0 { - mu.StartTs = State.getTimestamp(false) + if req.StartTs == 0 { + req.StartTs = State.getTimestamp(false) } - annotateStartTs(span, mu.StartTs) + annotateStartTs(span, req.StartTs) - l, err := doQueryInUpsert(ctx, mu, gmu) + l, err := doQueryInUpsert(ctx, req, gmu) if err != nil { return resp, err } @@ -514,11 +522,11 @@ func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool) return resp, err } - m := &pb.Mutations{Edges: edges, StartTs: mu.StartTs} + m := &pb.Mutations{Edges: edges, StartTs: req.StartTs} span.Annotatef(nil, "Applying mutations: %+v", m) - resp.Context, err = query.ApplyMutations(ctx, m) - span.Annotatef(nil, "Txn Context: %+v. Err=%v", resp.Context, err) - if !mu.CommitNow { + resp.Txn, err = query.ApplyMutations(ctx, m) + span.Annotatef(nil, "Txn Context: %+v. Err=%v", resp.Txn, err) + if !req.CommitNow { if err == y.ErrConflict { err = status.Error(codes.FailedPrecondition, err.Error()) } @@ -531,12 +539,12 @@ func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool) // ApplyMutations failed. We now want to abort the transaction, // ignoring any error that might occur during the abort (the user would // care more about the previous error). - if resp.Context == nil { - resp.Context = &api.TxnContext{StartTs: mu.StartTs} + if resp.Txn == nil { + resp.Txn = &api.TxnContext{StartTs: req.StartTs} } - resp.Context.Aborted = true - _, _ = worker.CommitOverNetwork(ctx, resp.Context) + resp.Txn.Aborted = true + _, _ = worker.CommitOverNetwork(ctx, resp.Txn) if err == y.ErrConflict { // We have already aborted the transaction, so the error message should reflect that. @@ -547,36 +555,37 @@ func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool) } span.Annotatef(nil, "Prewrites err: %v. Attempting to commit/abort immediately.", err) - ctxn := resp.Context + ctxn := resp.Txn // zero would assign the CommitTs cts, err := worker.CommitOverNetwork(ctx, ctxn) span.Annotatef(nil, "Status of commit at ts: %d: %v", ctxn.StartTs, err) if err != nil { if err == y.ErrAborted { err = status.Errorf(codes.Aborted, err.Error()) - resp.Context.Aborted = true + resp.Txn.Aborted = true } return resp, err } // CommitNow was true, no need to send keys. - resp.Context.Keys = resp.Context.Keys[:0] - resp.Context.CommitTs = cts + resp.Txn.Keys = resp.Txn.Keys[:0] + resp.Txn.CommitTs = cts return resp, nil } // doQueryInUpsert processes the query in upsert block. -func doQueryInUpsert(ctx context.Context, mu *api.Mutation, gmu *gql.Mutation) ( +func doQueryInUpsert(ctx context.Context, req *api.Request, gmu *gql.Mutation) ( *query.Latency, error) { l := &query.Latency{} - if mu.Query == "" { + if req.Query == "" { return l, nil } - upsertQuery := mu.Query + mu := req.Mutations[0] + upsertQuery := req.Query needVars := findVars(gmu) isCondUpsert := strings.TrimSpace(mu.Cond) != "" varName := fmt.Sprintf("__dgraph%d__", rand.Int()) @@ -600,7 +609,7 @@ func doQueryInUpsert(ctx context.Context, mu *api.Mutation, gmu *gql.Mutation) ( // The variable __dgraph0__ will - // * be empty if the condition is true // * have 1 UID (the 0 UID) if the condition is false - upsertQuery = strings.TrimSuffix(strings.TrimSpace(mu.Query), "}") + upsertQuery = strings.TrimSuffix(strings.TrimSpace(req.Query), "}") upsertQuery += varName + ` as var(func: uid(0)) ` + cond + `}` needVars = append(needVars, varName) } @@ -618,7 +627,11 @@ func doQueryInUpsert(ctx context.Context, mu *api.Mutation, gmu *gql.Mutation) ( return nil, errors.Wrapf(err, "while validating query: %q", upsertQuery) } - qr := query.Request{Latency: l, GqlQuery: &parsedReq, ReadTs: mu.StartTs} + if err := authorizeQuery(ctx, &parsedReq); err != nil { + return nil, err + } + + qr := query.Request{Latency: l, GqlQuery: &parsedReq, ReadTs: req.StartTs} if err := qr.ProcessQuery(ctx); err != nil { return nil, errors.Wrapf(err, "while processing query: %q", upsertQuery) } @@ -751,19 +764,18 @@ func updateMutations(gmu *gql.Mutation, varToUID map[string][]string) { // Query handles queries and returns the data. func (s *Server) Query(ctx context.Context, req *api.Request) (*api.Response, error) { - if err := authorizeQuery(ctx, req); err != nil { - return nil, err - } - if glog.V(3) { - glog.Infof("Got a query: %+v", req) + if len(req.Mutations) > 0 { + return s.doMutate(ctx, req, true) } - return s.doQuery(ctx, req) + return s.doQuery(ctx, req, true) } // This method is used to execute the query and return the response to the // client as a protocol buffer message. -func (s *Server) doQuery(ctx context.Context, req *api.Request) (resp *api.Response, rerr error) { +func (s *Server) doQuery(ctx context.Context, req *api.Request, authorize int) ( + resp *api.Response, rerr error) { + if ctx.Err() != nil { return nil, ctx.Err() } @@ -815,6 +827,12 @@ func (s *Server) doQuery(ctx context.Context, req *api.Request) (resp *api.Respo return resp, err } + if authorize == NeedAuthorize { + if err := authorizeQuery(ctx, &parsedReq); err != nil { + return nil, err + } + } + var queryRequest = query.Request{ Latency: &l, GqlQuery: &parsedReq, diff --git a/gql/parser_mutation.go b/gql/parser_mutation.go index 76b6fca6f72..1eb1e724656 100644 --- a/gql/parser_mutation.go +++ b/gql/parser_mutation.go @@ -23,7 +23,7 @@ import ( // ParseMutation parses a block into a mutation. Returns an object with a mutation or // an upsert block with mutation, otherwise returns nil with an error. -func ParseMutation(mutation string) (mu *api.Mutation, err error) { +func ParseMutation(mutation string) (req *api.Request, err error) { var lexer lex.Lexer lexer.Reset(mutation) lexer.Run(lexIdentifyBlock) @@ -39,13 +39,15 @@ func ParseMutation(mutation string) (mu *api.Mutation, err error) { item := it.Item() switch item.Typ { case itemUpsertBlock: - if mu, err = parseUpsertBlock(it); err != nil { + if req, err = parseUpsertBlock(it); err != nil { return nil, err } case itemLeftCurl: - if mu, err = parseMutationBlock(it); err != nil { + mu, err := parseMutationBlock(it) + if err != nil { return nil, err } + req = &api.Request{Mutations: []*api.Mutation{mu}} default: return nil, it.Errorf("Unexpected token: [%s]", item.Val) } @@ -55,12 +57,12 @@ func ParseMutation(mutation string) (mu *api.Mutation, err error) { return nil, it.Errorf("Unexpected %s after the end of the block", it.Item().Val) } - return mu, nil + return req, nil } // parseUpsertBlock parses the upsert block -func parseUpsertBlock(it *lex.ItemIterator) (*api.Mutation, error) { - var mu *api.Mutation +func parseUpsertBlock(it *lex.ItemIterator) (*api.Request, error) { + var req *api.Request var queryText, condText string var queryFound, condFound bool @@ -80,13 +82,13 @@ func parseUpsertBlock(it *lex.ItemIterator) (*api.Mutation, error) { switch { // upsert {... ===>}<=== case item.Typ == itemRightCurl: - if mu == nil { + if req == nil { return nil, it.Errorf("Empty mutation block") } else if !queryFound { return nil, it.Errorf("Query op not found in upsert block") } else { - mu.Query = queryText - return mu, nil + req.Query = queryText + return req, nil } // upsert { mutation{...} ===>query<==={...}} @@ -124,11 +126,12 @@ func parseUpsertBlock(it *lex.ItemIterator) (*api.Mutation, error) { } // upsert @if(...) ===>{<=== ....} - var err error - if mu, err = parseMutationBlock(it); err != nil { + mu, err := parseMutationBlock(it) + if err != nil { return nil, err } mu.Cond = condText + req = &api.Request{Mutations: []*api.Mutation{mu}} // upsert { mutation{...} ===>fragment<==={...}} case item.Typ == itemUpsertBlockOp && item.Val == "fragment": diff --git a/gql/parser_test.go b/gql/parser_test.go index c57d1668802..617a11b2b6d 100644 --- a/gql/parser_test.go +++ b/gql/parser_test.go @@ -4466,8 +4466,9 @@ func TestParseMutation(t *testing.T) { } } ` - mu, err := ParseMutation(m) + req, err := ParseMutation(m) require.NoError(t, err) + mu := req.Mutations[0] require.NotNil(t, mu) sets, err := parseNquads(mu.SetNquads) require.NoError(t, err)