Skip to content

Commit

Permalink
Merge Query and Mutate endpoint for grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
mangalaman93 committed Aug 21, 2019
1 parent 65fe291 commit 5a3ec51
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 87 deletions.
2 changes: 1 addition & 1 deletion contrib/integration/bigdata/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func main() {
// schema items.
resp, err := c.NewTxn().Query(ctx, "schema {}")
x.Check(err)
if len(resp.Schema) < 5 {
if len(resp.Json) < 5 {
// Run each schema alter separately so that there is an even
// distribution among all groups.
for _, s := range schema() {
Expand Down
9 changes: 6 additions & 3 deletions contrib/integration/testtxn/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,15 @@ func TestTxnRead5(t *testing.T) {
mu = &api.Mutation{}
mu.SetJson = []byte(fmt.Sprintf("{\"uid\": \"%s\", \"name\": \"Manish2\"}", uid))

mu.CommitNow = true
res, err := dc.Mutate(context.Background(), mu)
muReq := api.Request{
Mutations: []*api.Mutation{mu},
CommitNow: true,
}
res, err := dc.Query(context.Background(), &muReq)
if err != nil {
log.Fatalf("Error while running mutation: %v\n", err)
}
x.AssertTrue(res.Context.StartTs > 0)
x.AssertTrue(res.Txn.StartTs > 0)
resp, err = dc.Query(context.Background(), &req)
if err != nil {
log.Fatalf("Error while running query: %v\n", err)
Expand Down
27 changes: 14 additions & 13 deletions dgraph/cmd/alpha/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) {
// start parsing the query
parseStart := time.Now()

var mu *api.Mutation
var req *api.Request
contentType := r.Header.Get("Content-Type")
switch strings.ToLower(contentType) {
case "application/json":
Expand All @@ -312,15 +312,16 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) {
return
}

mu = &api.Mutation{}
mu := &api.Mutation{}
req = &api.Request{Mutations: []*api.Mutation{mu}}
if setJSON, ok := ms["set"]; ok && setJSON != nil {
mu.SetJson = setJSON.bs
}
if delJSON, ok := ms["delete"]; ok && delJSON != nil {
mu.DeleteJson = delJSON.bs
}
if queryText, ok := ms["query"]; ok && queryText != nil {
mu.Query, err = strconv.Unquote(string(queryText.bs))
req.Query, err = strconv.Unquote(string(queryText.bs))
if err != nil {
x.SetStatus(w, x.ErrorInvalidRequest, err.Error())
return
Expand All @@ -336,7 +337,7 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) {

case "application/rdf":
// Parse N-Quads.
mu, err = gql.ParseMutation(string(body))
req, err = gql.ParseMutation(string(body))
if err != nil {
x.SetStatus(w, x.ErrorInvalidRequest, err.Error())
return
Expand All @@ -351,26 +352,26 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) {
// end of query parsing
parseEnd := time.Now()

mu.StartTs = startTs
mu.CommitNow = commitNow
req.StartTs = startTs
req.CommitNow = commitNow

ctx := attachAccessJwt(context.Background(), r)
resp, err := (&edgraph.Server{}).Mutate(ctx, mu)
resp, err := (&edgraph.Server{}).Query(ctx, req)
if err != nil {
x.SetStatusWithData(w, x.ErrorInvalidRequest, err.Error())
return
}

resp.Latency.ParsingNs = uint64(parseEnd.Sub(parseStart).Nanoseconds())
e := query.Extensions{
Txn: resp.Context,
Txn: resp.Txn,
Latency: resp.Latency,
}
sort.Strings(e.Txn.Keys)
sort.Strings(e.Txn.Preds)

// Don't send keys array which is part of txn context if its commit immediately.
if mu.CommitNow {
if req.CommitNow {
e.Txn.Keys = e.Txn.Keys[:0]
}

Expand Down Expand Up @@ -487,11 +488,11 @@ func handleCommit(startTs uint64, reqText []byte) (map[string]interface{}, error
return nil, err
}

resp := &api.Assigned{}
resp.Context = tc
resp.Context.CommitTs = cts
resp := &api.Response{}
resp.Txn = tc
resp.Txn.CommitTs = cts
e := query.Extensions{
Txn: resp.Context,
Txn: resp.Txn,
}
e.Txn.Keys = e.Txn.Keys[:0]
response := map[string]interface{}{}
Expand Down
2 changes: 1 addition & 1 deletion edgraph/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func authorizeMutation(ctx context.Context, gmu *gql.Mutation) error {
return nil
}

func authorizeQuery(ctx context.Context, req *api.Request) error {
func authorizeQuery(ctx context.Context, parsedReq *gql.Result) error {
// always allow access
return nil
}
35 changes: 16 additions & 19 deletions edgraph/access_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,9 @@ const queryUser = `

// authorizeUser queries the user with the given user id, and returns the associated uid,
// acl groups, and whether the password stored in DB matches the supplied password
func authorizeUser(ctx context.Context, userid string, password string) (*acl.User,
error) {
func authorizeUser(ctx context.Context, userid string, password string) (
*acl.User, error) {

queryVars := map[string]string{
"$userid": userid,
"$password": password,
Expand All @@ -276,7 +277,7 @@ func authorizeUser(ctx context.Context, userid string, password string) (*acl.Us
Vars: queryVars,
}

queryResp, err := (&Server{}).doQuery(ctx, &queryRequest)
queryResp, err := (&Server{}).doQuery(ctx, &queryRequest, false)
if err != nil {
glog.Errorf("Error while query user with id %s: %v", userid, err)
return nil, err
Expand Down Expand Up @@ -310,7 +311,7 @@ func RefreshAcls(closer *y.Closer) {

ctx := context.Background()
var err error
queryResp, err := (&Server{}).doQuery(ctx, &queryRequest)
queryResp, err := (&Server{}).doQuery(ctx, &queryRequest, false)
if err != nil {
return errors.Errorf("unable to retrieve acls: %v", err)
}
Expand Down Expand Up @@ -362,7 +363,7 @@ func ResetAcl() {
Vars: queryVars,
}

queryResp, err := (&Server{}).doQuery(ctx, &queryRequest)
queryResp, err := (&Server{}).doQuery(ctx, &queryRequest, false)
if err != nil {
return errors.Wrapf(err, "while querying user with id %s", x.GrootId)
}
Expand All @@ -379,13 +380,17 @@ func ResetAcl() {

// Insert Groot.
createUserNQuads := acl.CreateUserNQuads(x.GrootId, "password")
mu := &api.Mutation{
req := &api.Request{
StartTs: startTs,
CommitNow: true,
Set: createUserNQuads,
Mutations: []*api.Mutation{
{
Set: createUserNQuads,
},
},
}

if _, err := (&Server{}).doMutate(context.Background(), mu, false); err != nil {
if _, err := (&Server{}).doMutate(context.Background(), req, false); err != nil {
return err
}
glog.Infof("Successfully upserted the groot account")
Expand Down Expand Up @@ -658,23 +663,15 @@ func logAccess(log *accessEntry) {
}

//authorizeQuery authorizes the query using the aclCachePtr
func authorizeQuery(ctx context.Context, req *api.Request) error {
func authorizeQuery(ctx context.Context, parsedReq *gql.Result) error {
if len(Config.HmacSecret) == 0 {
// the user has not turned on the acl feature
return nil
}

parsedReq, err := gql.Parse(gql.Request{
Str: req.Query,
Variables: req.Vars,
})
if err != nil {
return err
}
preds := parsePredsFromQuery(parsedReq.Query)

var userId string
var groupIds []string
preds := parsePredsFromQuery(parsedReq.Query)
doAuthorizeQuery := func() error {
userData, err := extractUserAndGroups(ctx)
if err == nil {
Expand Down Expand Up @@ -710,7 +707,7 @@ func authorizeQuery(ctx context.Context, req *api.Request) error {
return nil
}

err = doAuthorizeQuery()
err := doAuthorizeQuery()
if span := otrace.FromContext(ctx); span != nil {
span.Annotatef(nil, (&accessEntry{
userId: userId,
Expand Down
Loading

0 comments on commit 5a3ec51

Please sign in to comment.