Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

graphql: duplicate /health in GraphQL /admin #4768

Merged
merged 14 commits into from
Feb 14, 2020
19 changes: 3 additions & 16 deletions dgraph/cmd/alpha/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func queryHandler(w http.ResponseWriter, r *http.Request) {
}

ctx := context.WithValue(context.Background(), query.DebugKey, isDebugMode)
ctx = attachAccessJwt(ctx, r)
ctx = x.AttachAccessJwt(ctx, r)

if queryTimeout != 0 {
var cancel context.CancelFunc
Expand Down Expand Up @@ -397,7 +397,7 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) {
req.StartTs = startTs
req.CommitNow = commitNow

ctx := attachAccessJwt(context.Background(), r)
ctx := x.AttachAccessJwt(context.Background(), r)
resp, err := (&edgraph.Server{}).Query(ctx, req)
if err != nil {
x.SetStatusWithData(w, x.ErrorInvalidRequest, err.Error())
Expand Down Expand Up @@ -548,19 +548,6 @@ func handleCommit(startTs uint64, reqText []byte) (map[string]interface{}, error
return response, nil
}

func attachAccessJwt(ctx context.Context, r *http.Request) context.Context {
if accessJwt := r.Header.Get("X-Dgraph-AccessToken"); accessJwt != "" {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = metadata.New(nil)
}

md.Append("accessJwt", accessJwt)
ctx = metadata.NewIncomingContext(ctx, md)
}
return ctx
}

func alterHandler(w http.ResponseWriter, r *http.Request) {
if commonHandler(w, r) {
return
Expand All @@ -586,7 +573,7 @@ func alterHandler(w http.ResponseWriter, r *http.Request) {
// Pass in an auth token, if present.
md.Append("auth-token", r.Header.Get("X-Dgraph-AuthToken"))
ctx := metadata.NewIncomingContext(context.Background(), md)
ctx = attachAccessJwt(ctx, r)
ctx = x.AttachAccessJwt(ctx, r)
if _, err := (&edgraph.Server{}).Alter(ctx, op); err != nil {
x.SetStatus(w, x.Error, err.Error())
return
Expand Down
4 changes: 2 additions & 2 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func healthCheck(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)

ctx := attachAccessJwt(context.Background(), r)
ctx := x.AttachAccessJwt(context.Background(), r)
var resp *api.Response
if resp, err = (&edgraph.Server{}).Health(ctx, true); err != nil {
x.SetStatus(w, x.Error, err.Error())
Expand Down Expand Up @@ -335,7 +335,7 @@ func stateHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")

ctx := context.Background()
ctx = attachAccessJwt(ctx, r)
ctx = x.AttachAccessJwt(ctx, r)

var aResp *api.Response
if aResp, err = (&edgraph.Server{}).State(ctx); err != nil {
Expand Down
108 changes: 43 additions & 65 deletions graphql/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,31 +47,26 @@ const (
"(Please let us know : https://github.com/dgraph-io/dgraph/issues)"

// GraphQL schema for /admin endpoint.
//
// Eventually we should generate this from just the types definition.
// But for now, that would add too much into the schema, so this is
// hand crafted to be one of our schemas so we can pass it into the
// pipeline.
graphqlAdminSchema = `
type GQLSchema @dgraph(type: "dgraph.graphql") {
id: ID!
schema: String! @dgraph(type: "dgraph.graphql.schema")
generatedSchema: String!
}

type Health {
message: String!
status: HealthStatus!
}

enum HealthStatus {
ErrNoConnection
NoGraphQLSchema
Healthy

"""Node state is the state of an individual node int he Dgraph cluster """
type NodeState {
"""node type : either 'alpha' or 'zero'"""
instance: String
address: String
"""node health status : either 'healthy' or 'unhealthy'"""
status: String
group: Int
version: String
uptime: Int
lastEcho: Int
}

scalar DateTime

directive @dgraph(type: String, pred: String) on OBJECT | INTERFACE | FIELD_DEFINITION

type UpdateGQLSchemaPayload {
Expand Down Expand Up @@ -134,7 +129,7 @@ const (

type Query {
getGQLSchema: GQLSchema
health: Health
health: [NodeState]
}

type Mutation {
Expand All @@ -157,7 +152,6 @@ type gqlSchema struct {
type adminServer struct {
rf resolve.ResolverFactory
resolver *resolve.RequestResolver
status healthStatus

// The mutex that locks schema update operations
mux sync.Mutex
Expand Down Expand Up @@ -213,7 +207,6 @@ func newAdminResolver(
server := &adminServer{
rf: rf,
resolver: resolve.New(adminSchema, rf),
status: errNoConnection,
gqlServer: gqlServer,
fns: fns,
withIntrospection: withIntrospection,
Expand Down Expand Up @@ -275,7 +268,6 @@ func newAdminResolver(
defer server.mux.Unlock()

server.schema = newSchema
server.status = healthy
server.resetSchema(gqlSchema)
}, 1, closer)

Expand All @@ -288,14 +280,12 @@ func newAdminResolverFactory() resolve.ResolverFactory {
rf := resolverFactoryWithErrorMsg(errResolverNotFound).
WithQueryResolver("health",
func(q schema.Query) resolve.QueryResolver {
health := &healthResolver{
status: errNoConnection,
}
health := &healthResolver{}

return resolve.NewQueryResolver(
health,
health,
resolve.StdQueryCompletion())
resolve.AliasQueryCompletion())
}).
WithMutationResolver("updateGQLSchema", func(m schema.Mutation) resolve.MutationResolver {
return resolve.MutationResolverFunc(
Expand Down Expand Up @@ -370,26 +360,28 @@ func newAdminResolverFactory() resolve.ResolverFactory {
}

func (as *adminServer) initServer() {
var waitFor time.Duration
// It takes a few seconds for the Dgraph cluster to be up and running.
// Before that, trying to read the GraphQL schema will result in error:
// "Please retry again, server is not ready to accept requests."
// 5 seconds is a pretty reliable wait for a fresh instance to read the
// schema on a first try.
waitFor := 5 * time.Second

// Nothing else should be able to lock before here. The admin resolvers aren't yet
// set up (they all just error), so we will obtain the lock here without contention.
// We then setup the admin resolvers and they must wait until we are done before the
// first admin calls will go through.
as.mux.Lock()
defer as.mux.Unlock()

as.addConnectedAdminResolvers()
for {
<-time.After(waitFor)
waitFor = 10 * time.Second

// Nothing else should be able to lock before here. The admin resolvers aren't yet
// set up (they all just error), so we will obtain the lock here without contention.
// We then setup the admin resolvers and they must wait until we are done before the
// first admin calls will go through.
as.mux.Lock()
defer as.mux.Unlock()

as.addConnectedAdminResolvers()

as.status = noGraphQLSchema

sch, err := getCurrentGraphQLSchema(as.resolver)
if err != nil {
glog.Infof("Error reading GraphQL schema: %s.", err)
break
continue
} else if sch == nil {
glog.Infof("No GraphQL schema in Dgraph; serving empty GraphQL API")
break
Expand All @@ -411,7 +403,6 @@ func (as *adminServer) initServer() {
glog.Infof("Successfully loaded GraphQL schema. Serving GraphQL API.")

as.schema = *sch
as.status = healthy
as.resetSchema(generatedSchema)

break
Expand All @@ -430,32 +421,21 @@ func (as *adminServer) addConnectedAdminResolvers() {
as.fns.Qe = qryExec
as.fns.Me = mutExec

as.rf.WithQueryResolver("health",
func(q schema.Query) resolve.QueryResolver {
health := &healthResolver{
status: as.status,
as.rf.WithMutationResolver("updateGQLSchema",
func(m schema.Mutation) resolve.MutationResolver {
updResolver := &updateSchemaResolver{
admin: as,
baseAddRewriter: addRw,
baseMutationRewriter: updRw,
baseMutationExecutor: mutExec,
}

return resolve.NewQueryResolver(
health,
health,
resolve.StdQueryCompletion())
return resolve.NewMutationResolver(
updResolver,
updResolver,
updResolver,
resolve.StdMutationCompletion(m.Name()))
}).
WithMutationResolver("updateGQLSchema",
func(m schema.Mutation) resolve.MutationResolver {
updResolver := &updateSchemaResolver{
admin: as,
baseAddRewriter: addRw,
baseMutationRewriter: updRw,
baseMutationExecutor: mutExec,
}

return resolve.NewMutationResolver(
updResolver,
updResolver,
updResolver,
resolve.StdMutationCompletion(m.Name()))
}).
WithQueryResolver("getGQLSchema",
func(q schema.Query) resolve.QueryResolver {
getResolver := &getSchemaResolver{
Expand Down Expand Up @@ -512,8 +492,6 @@ func (as *adminServer) resetSchema(gqlSchema schema.Schema) {
}

as.gqlServer.ServeGQL(resolve.New(gqlSchema, resolverFactory))

as.status = healthy
}

func writeResponse(m schema.Mutation, code, message string) []byte {
Expand Down
51 changes: 21 additions & 30 deletions graphql/admin/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,49 +17,40 @@
package admin

import (
"bytes"
"context"
"fmt"

"github.com/dgraph-io/dgo/v2/protos/api"
"github.com/dgraph-io/dgraph/edgraph"
"github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/graphql/schema"
"github.com/dgraph-io/dgraph/x"
"github.com/pkg/errors"
)

const (
errNoConnection healthStatus = "ErrNoConnection"
noGraphQLSchema healthStatus = "NoGraphQLSchema"
healthy healthStatus = "Healthy"
)

type healthStatus string

type healthResolver struct {
status healthStatus
format string
}

var statusMessage = map[healthStatus]string{
errNoConnection: "Unable to contact Dgraph",
noGraphQLSchema: "Dgraph connection established but there's no GraphQL schema.",
healthy: "Dgraph connection established and serving GraphQL schema.",
func (hr *healthResolver) Rewrite(q schema.Query) (*gql.GraphQuery, error) {
return nil, nil
}

func (hr *healthResolver) Rewrite(q schema.Query) (*gql.GraphQuery, error) {
msg := "message"
status := "status"
func (hr *healthResolver) Query(ctx context.Context, query *gql.GraphQuery) ([]byte, error) {
var err error

for _, f := range q.SelectionSet() {
if f.Name() == "message" {
msg = f.ResponseName()
}
if f.Name() == "status" {
status = f.ResponseName()
}
var resp *api.Response
var respErr error
if resp, respErr = (&edgraph.Server{}).Health(ctx, true); respErr != nil {
err = errors.Errorf("%s: %s", x.Error, respErr.Error())
}
if resp == nil {
err = errors.Errorf("%s: %s", x.ErrorNoData, "No state information available.")
}

hr.format = fmt.Sprintf(`{"%s":[{"%s":"%%s","%s":"%%s"}]}`, q.ResponseName(), msg, status)
return nil, nil
}
var buf bytes.Buffer
x.Check2(buf.WriteString(`{ "health":`))
x.Check2(buf.Write(resp.Json))
x.Check2(buf.WriteString(`}`))

func (hr *healthResolver) Query(ctx context.Context, query *gql.GraphQuery) ([]byte, error) {
return []byte(fmt.Sprintf(hr.format, statusMessage[hr.status], string(hr.status))), nil
return buf.Bytes(), err
}
Loading