Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry-pick: fix: fixing graphql schema update when the data is restored + skippin… #7970

Merged
merged 2 commits into from
Aug 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ee/audit/interceptor_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ var skipApis = map[string]bool{

var skipEPs = map[string]bool{
// list of endpoints that needs to be skipped
"/health": true,
"/state": true,
"/health": true,
"/state": true,
"/probe/graphql": true,
}

func AuditRequestGRPC(ctx context.Context, req interface{},
Expand Down
52 changes: 24 additions & 28 deletions graphql/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package admin

import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -612,14 +613,6 @@ func (g *GraphQLHealthStore) updatingSchema() {
g.v.Store(GraphQLHealth{Healthy: true, StatusMsg: "updating schema"})
}

type gqlSchema struct {
ID string `json:"id,omitempty"`
Schema string `json:"schema,omitempty"`
Version uint64
GeneratedSchema string
loaded bool // This indicate whether the schema has been loaded into graphql server or not
}

type adminServer struct {
rf resolve.ResolverFactory
resolver *resolve.RequestResolver
Expand All @@ -630,8 +623,7 @@ type adminServer struct {
// The GraphQL server that's being admin'd
gqlServer IServeGraphQL

schema map[uint64]*gqlSchema

gqlSchemas *worker.GQLSchemaStore
// When the schema changes, we use these to create a new RequestResolver for
// the main graphql endpoint (gqlServer) and thus refresh the API.
fns *resolve.ResolverFns
Expand Down Expand Up @@ -689,7 +681,7 @@ func newAdminResolver(
fns: fns,
withIntrospection: withIntrospection,
globalEpoch: epoch,
schema: make(map[uint64]*gqlSchema),
gqlSchemas: worker.NewGQLSchemaStore(),
gqlServer: defaultGqlServer,
}
adminServerVar = server // store the admin server in package variable
Expand Down Expand Up @@ -728,7 +720,7 @@ func newAdminResolver(
var data x.GQL
data.Schema, data.Script = worker.ParseAsSchemaAndScript(pl.Postings[0].Value)

newSchema := &gqlSchema{
newSchema := &worker.GqlSchema{
ID: query.UidToHex(pk.Uid),
Version: kv.GetVersion(),
Schema: data.Schema,
Expand All @@ -743,7 +735,7 @@ func newAdminResolver(
currentScript = script.Script
}
server.mux.RLock()
currentSchema, ok := server.schema[ns]
currentSchema, ok := server.gqlSchemas.GetCurrent(ns)
if ok {
schemaNotChanged := newSchema.Schema == currentSchema.Schema
scriptNotChanged := newScript.Script == currentScript
Expand Down Expand Up @@ -773,19 +765,19 @@ func newAdminResolver(

server.incrementSchemaUpdateCounter(ns)
// if the schema hasn't been loaded yet, then we don't need to load it here
currentSchema, ok = server.schema[ns]
if !(ok && currentSchema.loaded) {
currentSchema, ok = server.gqlSchemas.GetCurrent(ns)
if !(ok && currentSchema.Loaded) {
// this just set schema in admin server, so that next invalid badger subscription update gets rejected upfront
server.schema[ns] = newSchema
worker.Lambda().Set(ns, newScript)
server.gqlSchemas.Set(ns, newSchema)
glog.Infof("namespace: %d. Skipping in-memory GraphQL schema update, "+
"it will be lazy-loaded later.", ns)
return
}

// update this schema in both admin and graphql server
newSchema.loaded = true
server.schema[ns] = newSchema
newSchema.Loaded = true
server.gqlSchemas.Set(ns, newSchema)
server.resetSchema(ns, gqlSchema)
// Update the lambda script
worker.Lambda().Set(ns, newScript)
Expand Down Expand Up @@ -866,16 +858,16 @@ func newAdminResolverFactory() resolve.ResolverFactory {
return rf.WithSchemaIntrospection()
}

func getCurrentGraphQLSchema(namespace uint64) (*gqlSchema, error) {
func getCurrentGraphQLSchema(namespace uint64) (*worker.GqlSchema, error) {
uid, graphQLSchema, err := edgraph.GetGQLSchema(namespace)
if err != nil {
return nil, err
}

return &gqlSchema{ID: uid, Schema: graphQLSchema}, nil
return &worker.GqlSchema{ID: uid, Schema: graphQLSchema}, nil
}

func generateGQLSchema(sch *gqlSchema, ns uint64) (schema.Schema, error) {
func generateGQLSchema(sch *worker.GqlSchema, ns uint64) (schema.Schema, error) {
schHandler, err := schema.NewHandler(sch.Schema, false)
if err != nil {
return nil, err
Expand Down Expand Up @@ -913,8 +905,8 @@ func (as *adminServer) initServer() {
glog.Errorf("namespace: %d. Error reading GraphQL schema: %s.", x.GalaxyNamespace, err)
continue
}
sch.loaded = true
as.schema[x.GalaxyNamespace] = sch
sch.Loaded = true
as.gqlSchemas.Set(x.GalaxyNamespace, sch)
// adding the actual resolvers for updateGQLSchema and getGQLSchema only after server has
// current GraphQL schema, if there was any.
as.addConnectedAdminResolvers()
Expand Down Expand Up @@ -1054,8 +1046,12 @@ func (as *adminServer) resetSchema(ns uint64, gqlSchema schema.Schema) {
return resolve.QueryResolverFunc(func(ctx context.Context, query schema.Query) *resolve.Resolved {
as.mux.RLock()
defer as.mux.RUnlock()
sch := as.schema[ns].Schema
handler, err := schema.NewHandler(sch, true)
sch, ok := as.gqlSchemas.GetCurrent(ns)
if !ok {
return resolve.EmptyResult(query,
fmt.Errorf("error while getting the schema for ns %d", ns))
}
handler, err := schema.NewHandler(sch.Schema, true)
if err != nil {
return resolve.EmptyResult(query, err)
}
Expand All @@ -1082,7 +1078,7 @@ func (as *adminServer) resetSchema(ns uint64, gqlSchema schema.Schema) {
func (as *adminServer) lazyLoadSchema(namespace uint64) error {
// if the schema is already in memory, no need to fetch it from disk
as.mux.RLock()
if currentSchema, ok := as.schema[namespace]; ok && currentSchema.loaded {
if currentSchema, ok := as.gqlSchemas.GetCurrent(namespace); ok && currentSchema.Loaded {
as.mux.RUnlock()
return nil
}
Expand Down Expand Up @@ -1112,8 +1108,8 @@ func (as *adminServer) lazyLoadSchema(namespace uint64) error {

as.mux.Lock()
defer as.mux.Unlock()
sch.loaded = true
as.schema[namespace] = sch
sch.Loaded = true
as.gqlSchemas.Set(namespace, sch)
as.resetSchema(namespace, generatedSchema)

glog.Infof("namespace: %d. Successfully lazy-loaded GraphQL schema.", namespace)
Expand Down
5 changes: 3 additions & 2 deletions graphql/admin/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package admin
import (
"context"
"encoding/json"
"github.com/dgraph-io/dgraph/worker"

"github.com/dgraph-io/dgraph/edgraph"
"github.com/dgraph-io/dgraph/graphql/resolve"
Expand All @@ -33,7 +34,7 @@ type getSchemaResolver struct {
}

type updateGQLSchemaInput struct {
Set gqlSchema `json:"set,omitempty"`
Set worker.GqlSchema `json:"set,omitempty"`
}

type updateSchemaResolver struct {
Expand Down Expand Up @@ -88,7 +89,7 @@ func (gsr *getSchemaResolver) Resolve(ctx context.Context, q schema.Query) *reso
return resolve.EmptyResult(q, err)
}

cs := gsr.admin.schema[ns]
cs, _ := gsr.admin.gqlSchemas.GetCurrent(ns)
if cs == nil || cs.ID == "" {
data = map[string]interface{}{q.Name(): nil}
} else {
Expand Down
47 changes: 47 additions & 0 deletions worker/graphql_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,55 @@ var (
errUpdatingGraphQLSchemaOnNonGroupOneLeader = errors.New(
"while updating GraphQL schema: this server isn't group-1 leader, please retry")
ErrMultipleGraphQLSchemaNodes = errors.New("found multiple nodes for GraphQL schema")
gqlSchemaStore *GQLSchemaStore
)

type GqlSchema struct {
ID string `json:"id,omitempty"`
Schema string `json:"schema,omitempty"`
Version uint64
GeneratedSchema string
Loaded bool // This indicate whether the schema has been loaded into graphql server
// or not
}

type GQLSchemaStore struct {
mux sync.RWMutex
schema map[uint64]*GqlSchema
}

func NewGQLSchemaStore() *GQLSchemaStore {
gqlSchemaStore = &GQLSchemaStore{
mux: sync.RWMutex{},
schema: make(map[uint64]*GqlSchema),
}
return gqlSchemaStore
}

func (gs *GQLSchemaStore) Set(ns uint64, sch *GqlSchema) {
gs.mux.Lock()
defer gs.mux.Unlock()
gs.schema[ns] = sch
}

func (gs *GQLSchemaStore) GetCurrent(ns uint64) (*GqlSchema, bool) {
gs.mux.RLock()
defer gs.mux.RUnlock()
sch, ok := gs.schema[ns]
return sch, ok
}

func (gs *GQLSchemaStore) resetGQLSchema() {
gs.mux.Lock()
defer gs.mux.Unlock()

gs.schema = make(map[uint64]*GqlSchema)
}

func ResetGQLSchemaStore() {
gqlSchemaStore.resetGQLSchema()
}

// UpdateGQLSchemaOverNetwork sends the request to the group one leader for execution.
func UpdateGQLSchemaOverNetwork(ctx context.Context, req *pb.UpdateGraphQLSchemaRequest) (*pb.
UpdateGraphQLSchemaResponse, error) {
Expand Down
5 changes: 3 additions & 2 deletions worker/online_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,10 @@ func handleRestoreProposal(ctx context.Context, req *pb.RestoreRequest, pidx uin
return errors.Wrapf(err, "cannot load schema after restore")
}

// Reset the lambda script store.
// reset gql schema and lambda script
glog.Info("reseting local gql schema and lambda script store")
ResetGQLSchemaStore()
ResetLambdaScriptStore()
// TODO(Aman): Reset the graphql schema store as well after cherry-picking changes to master.

// Propose a snapshot immediately after all the work is done to prevent the restore
// from being replayed.
Expand Down