Skip to content

Commit

Permalink
Merge Query and Mutate endpoint for grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
mangalaman93 authored and Francesc Campoy committed Aug 23, 2019
1 parent 11afbc0 commit b9e1c90
Show file tree
Hide file tree
Showing 11 changed files with 294 additions and 92 deletions.
2 changes: 1 addition & 1 deletion contrib/integration/bigdata/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func main() {
// schema items.
resp, err := c.NewTxn().Query(ctx, "schema {}")
x.Check(err)
if len(resp.Schema) < 5 {
if len(resp.Json) < 5 {
// Run each schema alter separately so that there is an even
// distribution among all groups.
for _, s := range schema() {
Expand Down
9 changes: 6 additions & 3 deletions contrib/integration/testtxn/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,15 @@ func TestTxnRead5(t *testing.T) {
mu = &api.Mutation{}
mu.SetJson = []byte(fmt.Sprintf("{\"uid\": \"%s\", \"name\": \"Manish2\"}", uid))

mu.CommitNow = true
res, err := dc.Mutate(context.Background(), mu)
muReq := api.Request{
Mutations: []*api.Mutation{mu},
CommitNow: true,
}
res, err := dc.Query(context.Background(), &muReq)
if err != nil {
log.Fatalf("Error while running mutation: %v\n", err)
}
x.AssertTrue(res.Context.StartTs > 0)
x.AssertTrue(res.Txn.StartTs > 0)
resp, err = dc.Query(context.Background(), &req)
if err != nil {
log.Fatalf("Error while running query: %v\n", err)
Expand Down
27 changes: 14 additions & 13 deletions dgraph/cmd/alpha/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) {
// start parsing the query
parseStart := time.Now()

var mu *api.Mutation
var req *api.Request
contentType := r.Header.Get("Content-Type")
switch strings.ToLower(contentType) {
case "application/json":
Expand All @@ -312,15 +312,16 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) {
return
}

mu = &api.Mutation{}
mu := &api.Mutation{}
req = &api.Request{Mutations: []*api.Mutation{mu}}
if setJSON, ok := ms["set"]; ok && setJSON != nil {
mu.SetJson = setJSON.bs
}
if delJSON, ok := ms["delete"]; ok && delJSON != nil {
mu.DeleteJson = delJSON.bs
}
if queryText, ok := ms["query"]; ok && queryText != nil {
mu.Query, err = strconv.Unquote(string(queryText.bs))
req.Query, err = strconv.Unquote(string(queryText.bs))
if err != nil {
x.SetStatus(w, x.ErrorInvalidRequest, err.Error())
return
Expand All @@ -336,7 +337,7 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) {

case "application/rdf":
// Parse N-Quads.
mu, err = gql.ParseMutation(string(body))
req, err = gql.ParseMutation(string(body))
if err != nil {
x.SetStatus(w, x.ErrorInvalidRequest, err.Error())
return
Expand All @@ -351,26 +352,26 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) {
// end of query parsing
parseEnd := time.Now()

mu.StartTs = startTs
mu.CommitNow = commitNow
req.StartTs = startTs
req.CommitNow = commitNow

ctx := attachAccessJwt(context.Background(), r)
resp, err := (&edgraph.Server{}).Mutate(ctx, mu)
resp, err := (&edgraph.Server{}).Query(ctx, req)
if err != nil {
x.SetStatusWithData(w, x.ErrorInvalidRequest, err.Error())
return
}

resp.Latency.ParsingNs = uint64(parseEnd.Sub(parseStart).Nanoseconds())
e := query.Extensions{
Txn: resp.Context,
Txn: resp.Txn,
Latency: resp.Latency,
}
sort.Strings(e.Txn.Keys)
sort.Strings(e.Txn.Preds)

// Don't send keys array which is part of txn context if its commit immediately.
if mu.CommitNow {
if req.CommitNow {
e.Txn.Keys = e.Txn.Keys[:0]
}

Expand Down Expand Up @@ -487,11 +488,11 @@ func handleCommit(startTs uint64, reqText []byte) (map[string]interface{}, error
return nil, err
}

resp := &api.Assigned{}
resp.Context = tc
resp.Context.CommitTs = cts
resp := &api.Response{}
resp.Txn = tc
resp.Txn.CommitTs = cts
e := query.Extensions{
Txn: resp.Context,
Txn: resp.Txn,
}
e.Txn.Keys = e.Txn.Keys[:0]
response := map[string]interface{}{}
Expand Down
178 changes: 178 additions & 0 deletions dgraph/cmd/alpha/upsert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1244,3 +1244,181 @@ upsert {
_, _, _, err := mutationWithTs(m1, "application/rdf", false, true, 0)
require.Contains(t, err.Error(), "Matching brackets not found")
}

func TestUpsertDeleteOnlyYourPost(t *testing.T) {
require.NoError(t, dropAll())
require.NoError(t, alterSchema(`
name: string @index(exact) .
content: string @index(exact) .`))

m1 := `
{
set {
_:user1 <name> "user1" .
_:user2 <name> "user2" .
_:user3 <name> "user3" .
_:user4 <name> "user4" .
_:post1 <content> "post1" .
_:post1 <author> _:user1 .
_:post2 <content> "post2" .
_:post2 <author> _:user1 .
_:post3 <content> "post3" .
_:post3 <author> _:user2 .
_:post4 <content> "post4" .
_:post4 <author> _:user3 .
_:post5 <content> "post5" .
_:post5 <author> _:user3 .
_:post6 <content> "post6" .
_:post6 <author> _:user3 .
}
}`

_, _, _, err := mutationWithTs(m1, "application/rdf", false, true, 0)
require.NoError(t, err)

// user2 trying to delete the post4
m2 := `
upsert {
query {
var(func: eq(content, "post4")) {
p4 as uid
author {
n3 as name
}
}
u2 as var(func: eq(val(n3), "user2"))
}
mutation @if(eq(len(u2), 1)) {
delete {
uid(p4) <content> * .
uid(p4) <author> * .
}
}
}`
_, _, _, err = mutationWithTs(m2, "application/rdf", false, true, 0)
require.NoError(t, err)

// post4 must still exist
q2 := `
{
post(func: eq(content, "post4")) {
content
}
}`
res, _, err := queryWithTs(q2, "application/graphql+-", "", 0)
require.NoError(t, err)
require.Contains(t, res, "post4")

// user3 deleting the post4
m3 := `
upsert {
query {
var(func: eq(content, "post4")) {
p4 as uid
author {
n3 as name
}
}
u4 as var(func: eq(val(n3), "user3"))
}
mutation @if(eq(len(u4), 1)) {
delete {
uid(p4) <content> * .
uid(p4) <author> * .
}
}
}`
_, _, _, err = mutationWithTs(m3, "application/rdf", false, true, 0)
require.NoError(t, err)

// post4 shouldn't exist anymore
res, _, err = queryWithTs(q2, "application/graphql+-", "", 0)
require.NoError(t, err)
require.NotContains(t, res, "post4")
}

func TestUpsertBulkUpdateBranch(t *testing.T) {
require.NoError(t, dropAll())
require.NoError(t, alterSchema(`
name: string @index(exact) .
branch: string .`))

m1 := `
{
set {
_:user1 <name> "user1" .
_:user1 <branch> "Fuller Street, San Francisco" .
_:user2 <name> "user2" .
_:user2 <branch> "Fuller Street, San Francisco" .
_:user3 <name> "user3" .
_:user3 <branch> "Fuller Street, San Francisco" .
}
}`

_, _, _, err := mutationWithTs(m1, "application/rdf", false, true, 0)
require.NoError(t, err)

// Bulk Update: update everyone's branch
m2 := `
upsert {
query {
u as var(func: has(branch))
}
mutation {
set {
uid(u) <branch> "Fuller Street, SF" .
}
}
}`
_, _, _, err = mutationWithTs(m2, "application/rdf", false, true, 0)
require.NoError(t, err)

q2 := `
{
q(func: has(branch)) {
name
branch
}
}`
res, _, err := queryWithTs(q2, "application/graphql+-", "", 0)
require.NoError(t, err)
require.NotContains(t, res, "San Francisco")
require.Contains(t, res, "user1")
require.Contains(t, res, "user2")
require.Contains(t, res, "user3")
require.Contains(t, res, "Fuller Street, SF")

// Bulk Delete: delete everyone's branch
m3 := `
upsert {
query {
u as var(func: has(branch))
}
mutation {
delete {
uid(u) <branch> * .
}
}
}`
_, _, _, err = mutationWithTs(m3, "application/rdf", false, true, 0)
require.NoError(t, err)

res, _, err = queryWithTs(q2, "application/graphql+-", "", 0)
require.NoError(t, err)
require.NotContains(t, res, "San Francisco")
require.NotContains(t, res, "Fuller Street, SF")
}
2 changes: 1 addition & 1 deletion edgraph/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func authorizeMutation(ctx context.Context, gmu *gql.Mutation) error {
return nil
}

func authorizeQuery(ctx context.Context, req *api.Request) error {
func authorizeQuery(ctx context.Context, parsedReq *gql.Result) error {
// always allow access
return nil
}
36 changes: 17 additions & 19 deletions edgraph/access_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,9 @@ const queryUser = `

// authorizeUser queries the user with the given user id, and returns the associated uid,
// acl groups, and whether the password stored in DB matches the supplied password
func authorizeUser(ctx context.Context, userid string, password string) (*acl.User,
error) {
func authorizeUser(ctx context.Context, userid string, password string) (
*acl.User, error) {

queryVars := map[string]string{
"$userid": userid,
"$password": password,
Expand All @@ -276,7 +277,7 @@ func authorizeUser(ctx context.Context, userid string, password string) (*acl.Us
Vars: queryVars,
}

queryResp, err := (&Server{}).doQuery(ctx, &queryRequest)
queryResp, err := (&Server{}).doQuery(ctx, &queryRequest, NoAuthorize)
if err != nil {
glog.Errorf("Error while query user with id %s: %v", userid, err)
return nil, err
Expand Down Expand Up @@ -310,7 +311,7 @@ func RefreshAcls(closer *y.Closer) {

ctx := context.Background()
var err error
queryResp, err := (&Server{}).doQuery(ctx, &queryRequest)
queryResp, err := (&Server{}).doQuery(ctx, &queryRequest, NoAuthorize)
if err != nil {
return errors.Errorf("unable to retrieve acls: %v", err)
}
Expand Down Expand Up @@ -362,7 +363,7 @@ func ResetAcl() {
Vars: queryVars,
}

queryResp, err := (&Server{}).doQuery(ctx, &queryRequest)
queryResp, err := (&Server{}).doQuery(ctx, &queryRequest, NoAuthorize)
if err != nil {
return errors.Wrapf(err, "while querying user with id %s", x.GrootId)
}
Expand All @@ -379,13 +380,18 @@ func ResetAcl() {

// Insert Groot.
createUserNQuads := acl.CreateUserNQuads(x.GrootId, "password")
mu := &api.Mutation{
req := &api.Request{
StartTs: startTs,
CommitNow: true,
Set: createUserNQuads,
Mutations: []*api.Mutation{
{
Set: createUserNQuads,
},
},
}

if _, err := (&Server{}).doMutate(context.Background(), mu, false); err != nil {
_, err = (&Server{}).doMutate(context.Background(), req, NoAuthorize)
if err != nil {
return err
}
glog.Infof("Successfully upserted the groot account")
Expand Down Expand Up @@ -658,23 +664,15 @@ func logAccess(log *accessEntry) {
}

//authorizeQuery authorizes the query using the aclCachePtr
func authorizeQuery(ctx context.Context, req *api.Request) error {
func authorizeQuery(ctx context.Context, parsedReq *gql.Result) error {
if len(Config.HmacSecret) == 0 {
// the user has not turned on the acl feature
return nil
}

parsedReq, err := gql.Parse(gql.Request{
Str: req.Query,
Variables: req.Vars,
})
if err != nil {
return err
}
preds := parsePredsFromQuery(parsedReq.Query)

var userId string
var groupIds []string
preds := parsePredsFromQuery(parsedReq.Query)
doAuthorizeQuery := func() error {
userData, err := extractUserAndGroups(ctx)
if err == nil {
Expand Down Expand Up @@ -710,7 +708,7 @@ func authorizeQuery(ctx context.Context, req *api.Request) error {
return nil
}

err = doAuthorizeQuery()
err := doAuthorizeQuery()
if span := otrace.FromContext(ctx); span != nil {
span.Annotatef(nil, (&accessEntry{
userId: userId,
Expand Down
Loading

0 comments on commit b9e1c90

Please sign in to comment.