Skip to content

Commit

Permalink
graphql: execution refactor (#5244)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelJCompton authored Apr 20, 2020
1 parent 49446d8 commit 17a9c79
Show file tree
Hide file tree
Showing 12 changed files with 304 additions and 394 deletions.
11 changes: 7 additions & 4 deletions graphql/admin/add_group.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package admin

import (
"context"
"fmt"

dgoapi "github.com/dgraph-io/dgo/v200/protos/api"
"github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/graphql/resolve"
"github.com/dgraph-io/dgraph/graphql/schema"
Expand All @@ -20,7 +20,9 @@ func NewAddGroupRewriter() resolve.MutationRewriter {
// It ensures that only the last rule out of all duplicate rules in input is preserved.
// A rule is duplicate if it has same predicate name as another rule.
func (mrw *addGroupRewriter) Rewrite(
m schema.Mutation) (*gql.GraphQuery, []*dgoapi.Mutation, error) {
ctx context.Context,
m schema.Mutation) (*resolve.UpsertMutation, error) {

addGroupInput, _ := m.ArgValue(schema.InputArgName).([]interface{})

// remove rules with same predicate name for each group input
Expand All @@ -32,16 +34,17 @@ func (mrw *addGroupRewriter) Rewrite(

m.SetArgTo(schema.InputArgName, addGroupInput)

return ((*resolve.AddRewriter)(mrw)).Rewrite(m)
return ((*resolve.AddRewriter)(mrw)).Rewrite(ctx, m)
}

// FromMutationResult rewrites the query part of a GraphQL add mutation into a Dgraph query.
func (mrw *addGroupRewriter) FromMutationResult(
ctx context.Context,
mutation schema.Mutation,
assigned map[string]string,
result map[string]interface{}) (*gql.GraphQuery, error) {

return ((*resolve.AddRewriter)(mrw)).FromMutationResult(mutation, assigned, result)
return ((*resolve.AddRewriter)(mrw)).FromMutationResult(ctx, mutation, assigned, result)
}

// removeDuplicateRuleRef removes duplicate rules based on predicate value.
Expand Down
65 changes: 31 additions & 34 deletions graphql/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package admin

import (
"context"
"encoding/json"
"fmt"
"sync"
"sync/atomic"
Expand All @@ -31,6 +32,7 @@ import (

badgerpb "github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgraph/graphql/dgraph"
"github.com/dgraph-io/dgraph/graphql/resolve"
"github.com/dgraph-io/dgraph/graphql/schema"
"github.com/dgraph-io/dgraph/graphql/web"
Expand Down Expand Up @@ -310,6 +312,7 @@ func NewServers(withIntrospection bool, globalEpoch *uint64, closer *y.Closer) (
Arw: resolve.NewAddRewriter,
Urw: resolve.NewUpdateRewriter,
Drw: resolve.NewDeleteRewriter(),
Ex: resolve.NewDgraphExecutor(),
}
adminResolvers := newAdminResolver(mainServer, fns, withIntrospection, globalEpoch, closer)
adminServer := web.NewServer(globalEpoch, adminResolvers)
Expand Down Expand Up @@ -480,18 +483,23 @@ func upsertEmptyGQLSchema() (*gqlSchema, error) {
},
}

assigned, result, _, err := resolve.AdminMutationExecutor().Mutate(context.Background(), qry,
mutations)
resp, err := resolve.NewAdminExecutor().Execute(context.Background(),
&dgoapi.Request{Query: dgraph.AsString(qry), Mutations: mutations, CommitNow: true})
if err != nil {
return nil, err
}

// the Alpha which created the gql schema node will get the uid here
uid, ok := assigned[varName]
uid, ok := resp.GetUids()[varName]
if ok {
return &gqlSchema{ID: uid}, nil
}

result := make(map[string]interface{})
if err := json.Unmarshal(resp.GetJson(), &result); err != nil {
return nil, schema.GQLWrapf(err, "Couldn't unmarshal response from Dgraph mutation")
}

// the Alphas which didn't create the gql schema node, will get the uid here.
gqlSchemaNode := result[varName].([]interface{})[0].(map[string]interface{})
return &gqlSchema{
Expand Down Expand Up @@ -567,22 +575,17 @@ func (as *adminServer) addConnectedAdminResolvers() {

qryRw := resolve.NewQueryRewriter()
updRw := resolve.NewUpdateRewriter()
qryExec := resolve.DgraphAsQueryExecutor()
mutExec := resolve.DgraphAsMutationExecutor()

as.fns.Qe = qryExec
as.fns.Me = mutExec
dgEx := resolve.NewDgraphExecutor()

as.rf.WithMutationResolver("updateGQLSchema",
func(m schema.Mutation) resolve.MutationResolver {
updResolver := &updateSchemaResolver{
admin: as,
baseMutationRewriter: updRw,
baseMutationExecutor: mutExec,
baseMutationExecutor: dgEx,
}

return resolve.NewMutationResolver(
updResolver,
return resolve.NewDgraphResolver(
updResolver,
updResolver,
resolve.StdMutationCompletion(m.Name()))
Expand All @@ -602,21 +605,21 @@ func (as *adminServer) addConnectedAdminResolvers() {
func(q schema.Query) resolve.QueryResolver {
return resolve.NewQueryResolver(
qryRw,
qryExec,
dgEx,
resolve.StdQueryCompletion())
}).
WithQueryResolver("queryUser",
func(q schema.Query) resolve.QueryResolver {
return resolve.NewQueryResolver(
qryRw,
qryExec,
dgEx,
resolve.StdQueryCompletion())
}).
WithQueryResolver("getGroup",
func(q schema.Query) resolve.QueryResolver {
return resolve.NewQueryResolver(
qryRw,
qryExec,
dgEx,
resolve.StdQueryCompletion())
}).
WithQueryResolver("getCurrentUser",
Expand All @@ -627,62 +630,56 @@ func (as *adminServer) addConnectedAdminResolvers() {

return resolve.NewQueryResolver(
cuResolver,
qryExec,
dgEx,
resolve.StdQueryCompletion())
}).
WithQueryResolver("getUser",
func(q schema.Query) resolve.QueryResolver {
return resolve.NewQueryResolver(
qryRw,
qryExec,
dgEx,
resolve.StdQueryCompletion())
}).
WithMutationResolver("addUser",
func(m schema.Mutation) resolve.MutationResolver {
return resolve.NewMutationResolver(
return resolve.NewDgraphResolver(
resolve.NewAddRewriter(),
resolve.DgraphAsQueryExecutor(),
resolve.DgraphAsMutationExecutor(),
dgEx,
resolve.StdMutationCompletion(m.Name()))
}).
WithMutationResolver("addGroup",
func(m schema.Mutation) resolve.MutationResolver {
return resolve.NewMutationResolver(
return resolve.NewDgraphResolver(
NewAddGroupRewriter(),
resolve.DgraphAsQueryExecutor(),
resolve.DgraphAsMutationExecutor(),
dgEx,
resolve.StdMutationCompletion(m.Name()))
}).
WithMutationResolver("updateUser",
func(m schema.Mutation) resolve.MutationResolver {
return resolve.NewMutationResolver(
return resolve.NewDgraphResolver(
resolve.NewUpdateRewriter(),
resolve.DgraphAsQueryExecutor(),
resolve.DgraphAsMutationExecutor(),
dgEx,
resolve.StdMutationCompletion(m.Name()))
}).
WithMutationResolver("updateGroup",
func(m schema.Mutation) resolve.MutationResolver {
return resolve.NewMutationResolver(
return resolve.NewDgraphResolver(
NewUpdateGroupRewriter(),
resolve.DgraphAsQueryExecutor(),
resolve.DgraphAsMutationExecutor(),
dgEx,
resolve.StdMutationCompletion(m.Name()))
}).
WithMutationResolver("deleteUser",
func(m schema.Mutation) resolve.MutationResolver {
return resolve.NewMutationResolver(
return resolve.NewDgraphResolver(
resolve.NewDeleteRewriter(),
resolve.NoOpQueryExecution(),
resolve.DgraphAsMutationExecutor(),
dgEx,
resolve.StdDeleteCompletion(m.Name()))
}).
WithMutationResolver("deleteGroup",
func(m schema.Mutation) resolve.MutationResolver {
return resolve.NewMutationResolver(
return resolve.NewDgraphResolver(
resolve.NewDeleteRewriter(),
resolve.NoOpQueryExecution(),
resolve.DgraphAsMutationExecutor(),
dgEx,
resolve.StdDeleteCompletion(m.Name()))
})
}
Expand Down
52 changes: 29 additions & 23 deletions graphql/admin/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type updateSchemaResolver struct {
// The underlying executor and rewriter that persist the schema into Dgraph as
// GraphQL metadata
baseMutationRewriter resolve.MutationRewriter
baseMutationExecutor resolve.MutationExecutor
baseMutationExecutor resolve.DgraphExecutor
}

type getSchemaResolver struct {
Expand All @@ -58,18 +58,19 @@ type updateGQLSchemaInput struct {
}

func (asr *updateSchemaResolver) Rewrite(
m schema.Mutation) (*gql.GraphQuery, []*dgoapi.Mutation, error) {
ctx context.Context,
m schema.Mutation) (*resolve.UpsertMutation, error) {

glog.Info("Got updateGQLSchema request")

input, err := getSchemaInput(m)
if err != nil {
return nil, nil, err
return nil, err
}

schHandler, err := schema.NewHandler(input.Set.Schema)
if err != nil {
return nil, nil, err
return nil, err
}
asr.newDgraphSchema = schHandler.DGSchema()

Expand All @@ -81,10 +82,11 @@ func (asr *updateSchemaResolver) Rewrite(
"filter": map[string]interface{}{"ids": []interface{}{asr.admin.schema.ID}},
"set": map[string]interface{}{"schema": input.Set.Schema},
})
return asr.baseMutationRewriter.Rewrite(m)
return asr.baseMutationRewriter.Rewrite(ctx, m)
}

func (asr *updateSchemaResolver) FromMutationResult(
ctx context.Context,
mutation schema.Mutation,
assigned map[string]string,
result map[string]interface{}) (*gql.GraphQuery, error) {
Expand All @@ -93,28 +95,29 @@ func (asr *updateSchemaResolver) FromMutationResult(
return nil, nil
}

func (asr *updateSchemaResolver) Mutate(
func (asr *updateSchemaResolver) Execute(
ctx context.Context,
query *gql.GraphQuery,
mutations []*dgoapi.Mutation) (map[string]string, map[string]interface{}, *schema.Extensions,
error) {
assigned, result, ext, err := asr.baseMutationExecutor.Mutate(ctx, query, mutations)
req *dgoapi.Request) (*dgoapi.Response, error) {

if req == nil || (req.Query == "" && len(req.Mutations) == 0) {
// For schema updates, Execute will get called twice. Once for the
// mutation and once for the following query. This is the query case.
b, err := doQuery(asr.admin.schema, asr.mutation.QueryField())
return &dgoapi.Response{Json: b}, err
}

resp, err := asr.baseMutationExecutor.Execute(ctx, req)
if err != nil {
return nil, nil, ext, err
return nil, err
}

_, err = (&edgraph.Server{}).Alter(ctx, &dgoapi.Operation{Schema: asr.newDgraphSchema})
if err != nil {
return nil, nil, ext, schema.GQLWrapf(err,
return nil, schema.GQLWrapf(err,
"succeeded in saving GraphQL schema but failed to alter Dgraph schema ")
}

return assigned, result, ext, nil
}

func (asr *updateSchemaResolver) Query(ctx context.Context, query *gql.GraphQuery) ([]byte,
*schema.Extensions, error) {
return doQuery(asr.admin.schema, asr.mutation.QueryField())
return resp, nil
}

func (gsr *getSchemaResolver) Rewrite(ctx context.Context,
Expand All @@ -123,12 +126,15 @@ func (gsr *getSchemaResolver) Rewrite(ctx context.Context,
return nil, nil
}

func (gsr *getSchemaResolver) Query(ctx context.Context, query *gql.GraphQuery) ([]byte,
*schema.Extensions, error) {
return doQuery(gsr.admin.schema, gsr.gqlQuery)
func (gsr *getSchemaResolver) Execute(
ctx context.Context,
req *dgoapi.Request) (*dgoapi.Response, error) {

b, err := doQuery(gsr.admin.schema, gsr.gqlQuery)
return &dgoapi.Response{Json: b}, err
}

func doQuery(gql *gqlSchema, field schema.Field) ([]byte, *schema.Extensions, error) {
func doQuery(gql *gqlSchema, field schema.Field) ([]byte, error) {

var buf bytes.Buffer
x.Check2(buf.WriteString(`{ "`))
Expand Down Expand Up @@ -158,7 +164,7 @@ func doQuery(gql *gqlSchema, field schema.Field) ([]byte, *schema.Extensions, er
}
x.Check2(buf.WriteString("}]}"))

return buf.Bytes(), nil, nil
return buf.Bytes(), nil
}

func getSchemaInput(m schema.Mutation) (*updateGQLSchemaInput, error) {
Expand Down
21 changes: 13 additions & 8 deletions graphql/admin/update_group.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package admin

import (
"context"
"fmt"

dgoapi "github.com/dgraph-io/dgo/v200/protos/api"
Expand All @@ -21,14 +22,16 @@ func NewUpdateGroupRewriter() resolve.MutationRewriter {
// otherwise, it is created. It also ensures that only the last rule out of all
// duplicate rules in input is preserved. A rule is duplicate if it has same predicate
// name as another rule.
func (urw *updateGroupRewriter) Rewrite(m schema.Mutation) (*gql.GraphQuery,
[]*dgoapi.Mutation, error) {
func (urw *updateGroupRewriter) Rewrite(
ctx context.Context,
m schema.Mutation) (*resolve.UpsertMutation, error) {

inp := m.ArgValue(schema.InputArgName).(map[string]interface{})
setArg := inp["set"]
delArg := inp["remove"]

if setArg == nil && delArg == nil {
return nil, nil, nil
return nil, nil
}

upsertQuery := resolve.RewriteUpsertQueryFromMutation(m)
Expand Down Expand Up @@ -120,21 +123,23 @@ func (urw *updateGroupRewriter) Rewrite(m schema.Mutation) (*gql.GraphQuery,
// if there is no mutation being performed as a result of some specific input,
// then we don't need to do the upsertQuery for group
if len(mutSet) == 0 && len(mutDel) == 0 {
return nil, nil, nil
return nil, nil
}

return &gql.GraphQuery{Children: []*gql.GraphQuery{upsertQuery}},
append(mutSet, mutDel...),
schema.GQLWrapf(schema.AppendGQLErrs(errSet, errDel), "failed to rewrite mutation payload")
return &resolve.UpsertMutation{
Query: &gql.GraphQuery{Children: []*gql.GraphQuery{upsertQuery}},
Mutations: append(mutSet, mutDel...),
}, schema.GQLWrapf(schema.AppendGQLErrs(errSet, errDel), "failed to rewrite mutation payload")
}

// FromMutationResult rewrites the query part of a GraphQL update mutation into a Dgraph query.
func (urw *updateGroupRewriter) FromMutationResult(
ctx context.Context,
mutation schema.Mutation,
assigned map[string]string,
result map[string]interface{}) (*gql.GraphQuery, error) {

return ((*resolve.UpdateRewriter)(urw)).FromMutationResult(mutation, assigned, result)
return ((*resolve.UpdateRewriter)(urw)).FromMutationResult(ctx, mutation, assigned, result)
}

// addAclRuleQuery adds a *gql.GraphQuery to upsertQuery.Children to query a rule inside a group
Expand Down
Loading

0 comments on commit 17a9c79

Please sign in to comment.