Skip to content

Commit

Permalink
graphql: Fix non-unique schema issue (#5054)
Browse files Browse the repository at this point in the history
Fixes #GRAPHQL-337

During the time, when alpha's admin server was waiting for Dgraph cluster to boot up, if someone sent a GraphQL schema update request, it would create a new node for GQLSchema in Dgraph. This PR resolves this problem. From now on, there will only be one node of type `dgraph.graphql` ever in dgraph.
  • Loading branch information
abhimanyusinghgaur authored Apr 8, 2020
1 parent 73825c4 commit 8a650d7
Show file tree
Hide file tree
Showing 17 changed files with 226 additions and 126 deletions.
1 change: 1 addition & 0 deletions dgraph/cmd/bulk/systest/test-bulk-schema.sh
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ EOF
diff <(LC_ALL=C sort all_dbs.out | uniq -c) - <<EOF
1 dgraph.acl.rule
1 dgraph.graphql.schema
1 dgraph.graphql.xid
1 dgraph.password
1 dgraph.rule.permission
1 dgraph.rule.predicate
Expand Down
3 changes: 2 additions & 1 deletion edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ const (
type GraphqlContextKey int

const (
// IsGraphql is used to validate requests which are allowed to mutate dgraph.graphql.schema.
// IsGraphql is used to validate requests which are allowed to mutate GraphQL reserved
// predicates, like dgraph.graphql.schema and dgraph.graphql.xid.
IsGraphql GraphqlContextKey = iota
// Authorize is used to set if the request requires validation.
Authorize
Expand Down
157 changes: 101 additions & 56 deletions graphql/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ package admin
import (
"bytes"
"context"
"encoding/json"
"fmt"
dgoapi "github.com/dgraph-io/dgo/v2/protos/api"
"github.com/dgraph-io/dgraph/gql"
"sync"
"time"

Expand All @@ -46,6 +47,10 @@ const (
"this indicates a resolver or validation bug " +
"(Please let us know : https://github.com/dgraph-io/dgraph/issues)"

gqlSchemaXidKey = "dgraph.graphql.xid"
gqlSchemaXidVal = "dgraph.graphql.schema"
gqlSchemaPred = "dgraph.graphql.schema"

// GraphQL schema for /admin endpoint.
graphqlAdminSchema = `
"""
Expand All @@ -57,7 +62,7 @@ const (
"""
Input schema (GraphQL types) that was used in the latest schema update.
"""
schema: String! @dgraph(type: "dgraph.graphql.schema")
schema: String! @dgraph(pred: "dgraph.graphql.schema")
"""
The GraphQL schema that was generated from the 'schema' field.
Expand Down Expand Up @@ -278,7 +283,7 @@ type adminServer struct {
// The GraphQL server that's being admin'd
gqlServer web.IServeGraphQL

schema gqlSchema
schema *gqlSchema

// When the schema changes, we use these to create a new RequestResolver for
// the main graphql endpoint (gqlServer) and thus refresh the API.
Expand Down Expand Up @@ -331,7 +336,7 @@ func newAdminResolver(
withIntrospection: withIntrospection,
}

prefix := x.DataKey("dgraph.graphql.schema", 0)
prefix := x.DataKey(gqlSchemaPred, 0)
// Remove uid from the key, to get the correct prefix
prefix = prefix[:len(prefix)-8]
// Listen for graphql schema changes in group 1.
Expand Down Expand Up @@ -363,31 +368,24 @@ func newAdminResolver(
return
}

newSchema := gqlSchema{
newSchema := &gqlSchema{
ID: fmt.Sprintf("%#x", pk.Uid),
Schema: string(pl.Postings[0].Value),
}

schHandler, err := schema.NewHandler(newSchema.Schema)
gqlSchema, err := generateGQLSchema(newSchema)
if err != nil {
glog.Errorf("Error processing GraphQL schema: %s. ", err)
return
}

newSchema.GeneratedSchema = schHandler.GQLSchema()
gqlSchema, err := schema.FromString(newSchema.GeneratedSchema)
if err != nil {
glog.Errorf("Error processing GraphQL schema: %s. ", err)
return
}

glog.Infof("Successfully updated GraphQL schema.")
glog.Infof("Successfully updated GraphQL schema. Serving New GraphQL API.")

server.mux.Lock()
defer server.mux.Unlock()

server.schema = newSchema
server.resetSchema(gqlSchema)
server.resetSchema(*gqlSchema)
}, 1, closer)

go server.initServer()
Expand Down Expand Up @@ -496,51 +494,119 @@ func newAdminResolverFactory() resolve.ResolverFactory {
return rf
}

func (as *adminServer) initServer() {
// It takes a few seconds for the Dgraph cluster to be up and running.
// Before that, trying to read the GraphQL schema will result in error:
// "Please retry again, server is not ready to accept requests."
// 5 seconds is a pretty reliable wait for a fresh instance to read the
// schema on a first try.
waitFor := 5 * time.Second
func upsertEmptyGQLSchema() (*gqlSchema, error) {
varName := "GQLSchema"
gqlType := "dgraph.graphql"

qry := &gql.GraphQuery{
Attr: varName,
Var: varName,
Func: &gql.Function{
Name: "eq",
Args: []gql.Arg{
{Value: gqlSchemaXidKey},
{Value: fmt.Sprintf("%q", gqlSchemaXidVal)},
},
},
Filter: &gql.FilterTree{
Func: &gql.Function{
Name: "type",
Args: []gql.Arg{{Value: gqlType}},
},
},
Children: []*gql.GraphQuery{{Attr: "uid"}, {Attr: gqlSchemaPred}},
}

mutations := []*dgoapi.Mutation{
{
SetJson: []byte(fmt.Sprintf(`
{
"uid": "_:%s",
"dgraph.type": ["%s"],
"%s": "%s",
"%s": ""
}`, varName, gqlType, gqlSchemaXidKey, gqlSchemaXidVal, gqlSchemaPred)),
Cond: fmt.Sprintf(`@if(eq(len(%s),0))`, varName),
},
}

assigned, result, err := resolve.AdminMutationExecutor().Mutate(context.Background(), qry,
mutations)
if err != nil {
return nil, err
}

// the Alpha which created the gql schema node will get the uid here
uid, ok := assigned[varName]
if ok {
return &gqlSchema{ID: uid}, nil
}

// the Alphas which didn't create the gql schema node, will get the uid here.
gqlSchemaNode := result[varName].([]interface{})[0].(map[string]interface{})
return &gqlSchema{
ID: gqlSchemaNode["uid"].(string),
Schema: gqlSchemaNode[gqlSchemaPred].(string),
}, nil
}

func generateGQLSchema(sch *gqlSchema) (*schema.Schema, error) {
schHandler, err := schema.NewHandler(sch.Schema)
if err != nil {
return nil, err
}

sch.GeneratedSchema = schHandler.GQLSchema()
generatedSchema, err := schema.FromString(sch.GeneratedSchema)
if err != nil {
return nil, err
}

return &generatedSchema, nil
}

func (as *adminServer) initServer() {
// Nothing else should be able to lock before here. The admin resolvers aren't yet
// set up (they all just error), so we will obtain the lock here without contention.
// We then setup the admin resolvers and they must wait until we are done before the
// first admin calls will go through.
as.mux.Lock()
defer as.mux.Unlock()

as.addConnectedAdminResolvers()
// It takes a few seconds for the Dgraph cluster to be up and running.
// Before that, trying to read the GraphQL schema will result in error:
// "Please retry again, server is not ready to accept requests."
// 5 seconds is a pretty reliable wait for a fresh instance to read the
// schema on a first try.
waitFor := 5 * time.Second

for {
<-time.After(waitFor)

sch, err := getCurrentGraphQLSchema(as.resolver)
sch, err := upsertEmptyGQLSchema()
if err != nil {
glog.Infof("Error reading GraphQL schema: %s.", err)
continue
} else if sch == nil {
glog.Infof("No GraphQL schema in Dgraph; serving empty GraphQL API")
break
}

schHandler, err := schema.NewHandler(sch.Schema)
if err != nil {
glog.Infof("Error processing GraphQL schema: %s.", err)
as.schema = sch
// adding the actual resolvers for updateGQLSchema and getGQLSchema only after server has ID
as.addConnectedAdminResolvers()

if sch.Schema == "" {
glog.Infof("No GraphQL schema in Dgraph; serving empty GraphQL API")
break
}

sch.GeneratedSchema = schHandler.GQLSchema()
generatedSchema, err := schema.FromString(sch.GeneratedSchema)
generatedSchema, err := generateGQLSchema(sch)
if err != nil {
glog.Infof("Error processing GraphQL schema: %s.", err)
break
}

glog.Infof("Successfully loaded GraphQL schema. Serving GraphQL API.")

as.schema = *sch
as.resetSchema(generatedSchema)
as.resetSchema(*generatedSchema)

break
}
Expand All @@ -550,7 +616,6 @@ func (as *adminServer) initServer() {
func (as *adminServer) addConnectedAdminResolvers() {

qryRw := resolve.NewQueryRewriter()
addRw := resolve.NewAddRewriter()
updRw := resolve.NewUpdateRewriter()
qryExec := resolve.DgraphAsQueryExecutor()
mutExec := resolve.DgraphAsMutationExecutor()
Expand All @@ -562,7 +627,6 @@ func (as *adminServer) addConnectedAdminResolvers() {
func(m schema.Mutation) resolve.MutationResolver {
updResolver := &updateSchemaResolver{
admin: as,
baseAddRewriter: addRw,
baseMutationRewriter: updRw,
baseMutationExecutor: mutExec,
}
Expand All @@ -576,9 +640,7 @@ func (as *adminServer) addConnectedAdminResolvers() {
WithQueryResolver("getGQLSchema",
func(q schema.Query) resolve.QueryResolver {
getResolver := &getSchemaResolver{
admin: as,
baseRewriter: qryRw,
baseExecutor: resolve.AdminQueryExecutor(),
admin: as,
}

return resolve.NewQueryResolver(
Expand Down Expand Up @@ -675,23 +737,6 @@ func (as *adminServer) addConnectedAdminResolvers() {
})
}

func getCurrentGraphQLSchema(r *resolve.RequestResolver) (*gqlSchema, error) {
req := &schema.Request{
Query: `query { getGQLSchema { id schema } }`}
resp := r.Resolve(context.Background(), req)
if len(resp.Errors) > 0 || resp.Data.Len() == 0 {
return nil, resp.Errors
}

var result struct {
GetGQLSchema *gqlSchema
}

err := json.Unmarshal(resp.Data.Bytes(), &result)

return result.GetGQLSchema, err
}

func resolverFactoryWithErrorMsg(msg string) resolve.ResolverFactory {
errFunc := func(name string) error { return errors.Errorf(msg, name) }
qErr :=
Expand Down
Loading

0 comments on commit 8a650d7

Please sign in to comment.