Skip to content

Commit

Permalink
feat(multi-tenancy): Make GraphQL work with multiple namespaces (#7400)
Browse files Browse the repository at this point in the history
Add ability to set a GraphQL schema per namespace and do queries, mutations, and subscriptions specific to the namespace. Some CORS tests and ClosedByDefault tests need to be fixed which will happen later.
  • Loading branch information
pawanrawal authored Feb 10, 2021
1 parent cd81510 commit 3c42449
Show file tree
Hide file tree
Showing 45 changed files with 958 additions and 618 deletions.
3 changes: 2 additions & 1 deletion dgraph/cmd/alpha/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,8 +642,9 @@ func resolveWithAdminServer(gqlReq *schema.Request, r *http.Request,
ctx = x.AttachAccessJwt(ctx, r)
ctx = x.AttachRemoteIP(ctx, r)
ctx = x.AttachAuthToken(ctx, r)
ctx = x.AttachJWTNamespace(ctx)

return adminServer.Resolve(ctx, gqlReq)
return adminServer.ResolveWithNs(ctx, x.GalaxyNamespace, gqlReq)
}

func writeSuccessResponse(w http.ResponseWriter, r *http.Request) {
Expand Down
47 changes: 35 additions & 12 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"net/url"
"os"
"os/signal"
"strconv"
"strings"
"sync/atomic"
"syscall"
Expand Down Expand Up @@ -462,14 +463,23 @@ func setupServer(closer *z.Closer) {
// Implementation for server exit:
// The global epoch is set to maxUint64 while exiting the server.
// By using this information polling goroutine terminates the subscription.
globalEpoch := uint64(0)
globalEpoch := make(map[uint64]*uint64)
e := new(uint64)
atomic.StoreUint64(e, 0)
globalEpoch[x.GalaxyNamespace] = e
var mainServer web.IServeGraphQL
var gqlHealthStore *admin.GraphQLHealthStore
// Do not use := notation here because adminServer is a global variable.
mainServer, adminServer, gqlHealthStore = admin.NewServers(introspection, &globalEpoch, closer)
baseMux.Handle("/graphql", mainServer.HTTPHandler())
baseMux.HandleFunc("/probe/graphql", func(w http.ResponseWriter,
r *http.Request) {
mainServer, adminServer, gqlHealthStore = admin.NewServers(introspection,
globalEpoch, closer)
baseMux.HandleFunc("/graphql", func(w http.ResponseWriter, r *http.Request) {
namespace := x.ExtractNamespaceHTTP(r)
r.Header.Set("resolver", strconv.FormatUint(namespace, 10))
admin.LazyLoadSchema(namespace)
mainServer.HTTPHandler().ServeHTTP(w, r)
})

baseMux.HandleFunc("/probe/graphql", func(w http.ResponseWriter, r *http.Request) {
healthStatus := gqlHealthStore.GetHealth()
httpStatusCode := http.StatusOK
if !healthStatus.Healthy {
Expand All @@ -478,14 +488,25 @@ func setupServer(closer *z.Closer) {
w.Header().Set("Content-Type", "application/json")
x.AddCorsHeaders(w)
w.WriteHeader(httpStatusCode)
e = globalEpoch[x.ExtractNamespaceHTTP(r)]
var counter uint64
if e != nil {
counter = atomic.LoadUint64(e)
}
x.Check2(w.Write([]byte(fmt.Sprintf(`{"status":"%s","schemaUpdateCounter":%d}`,
healthStatus.StatusMsg, atomic.LoadUint64(&globalEpoch)))))
healthStatus.StatusMsg, counter))))
})
baseMux.HandleFunc("/admin", func(w http.ResponseWriter, r *http.Request) {
r.Header.Set("resolver", "0")
// We don't need to load the schema for all the admin operations.
// Only a few like getUser, queryGroup require this. So, this can be optimized.
admin.LazyLoadSchema(x.ExtractNamespaceHTTP(r))
allowedMethodsHandler(allowedMethods{
http.MethodGet: true,
http.MethodPost: true,
http.MethodOptions: true,
}, adminAuthHandler(adminServer.HTTPHandler())).ServeHTTP(w, r)
})
baseMux.Handle("/admin", allowedMethodsHandler(allowedMethods{
http.MethodGet: true,
http.MethodPost: true,
http.MethodOptions: true,
}, adminAuthHandler(adminServer.HTTPHandler())))

baseMux.Handle("/admin/schema", adminAuthHandler(http.HandlerFunc(func(
w http.ResponseWriter,
Expand Down Expand Up @@ -559,7 +580,9 @@ func setupServer(closer *z.Closer) {
defer admin.ServerCloser.Done()

<-admin.ServerCloser.HasBeenClosed()
atomic.StoreUint64(&globalEpoch, math.MaxUint64)
// TODO - Verify why do we do this and does it have to be done for all namespaces.
e = globalEpoch[x.GalaxyNamespace]
atomic.StoreUint64(e, math.MaxUint64)

// Stops grpc/http servers; Already accepted connections are not closed.
if err := grpcListener.Close(); err != nil {
Expand Down
12 changes: 12 additions & 0 deletions edgraph/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,15 @@ func AuthorizeGuardians(ctx context.Context) error {
// always allow access
return nil
}

func AuthGuardianOfTheGalaxy(ctx context.Context) error {
return nil
}

func upsertGuardian(ctx context.Context) error {
return nil
}

func upsertGroot(ctx context.Context) error {
return nil
}
3 changes: 3 additions & 0 deletions edgraph/access_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,9 @@ func authorizeSchemaQuery(ctx context.Context, er *query.ExecutionResult) error
}

func AuthGuardianOfTheGalaxy(ctx context.Context) error {
if !x.WorkerConfig.AclEnabled {
return nil
}
ns, err := x.ExtractJWTNamespace(ctx)
if err != nil {
return errors.Wrap(err, "Authorize guradian of the galaxy, extracting jwt token, error:")
Expand Down
2 changes: 1 addition & 1 deletion edgraph/acl_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (cache *aclCache) update(ns uint64, groups []acl.Group) {

func (cache *aclCache) authorizePredicate(groups []string, predicate string,
operation *acl.Operation) error {
if x.IsAclPredicate(predicate) {
if x.IsAclPredicate(x.ParseAttr(predicate)) {
return errors.Errorf("only groot is allowed to access the ACL predicate: %s", predicate)
}

Expand Down
10 changes: 6 additions & 4 deletions edgraph/acl_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"testing"

"github.com/dgraph-io/dgraph/ee/acl"
"github.com/dgraph-io/dgraph/x"
"github.com/stretchr/testify/require"
)

Expand All @@ -26,13 +27,14 @@ func TestAclCache(t *testing.T) {

var emptyGroups []string
group := "dev"
predicate := "friend"
predicate := x.GalaxyAttr("friend")
require.Error(t, aclCachePtr.authorizePredicate(emptyGroups, predicate, acl.Read),
"the anonymous user should not have access when the acl cache is empty")

acls := []acl.Acl{
{
Predicate: predicate,
// update operation on acl cache needs predicate without namespace.
Predicate: x.ParseAttr(predicate),
Perm: 4,
},
}
Expand All @@ -42,15 +44,15 @@ func TestAclCache(t *testing.T) {
Rules: acls,
},
}
aclCachePtr.update(groups)
aclCachePtr.update(x.GalaxyNamespace, groups)
// after a rule is defined, the anonymous user should no longer have access
require.Error(t, aclCachePtr.authorizePredicate(emptyGroups, predicate, acl.Read),
"the anonymous user should not have access when the predicate has acl defined")
require.NoError(t, aclCachePtr.authorizePredicate([]string{group}, predicate, acl.Read),
"the user with group authorized should have access")

// update the cache with empty acl list in order to clear the cache
aclCachePtr.update([]acl.Group{})
aclCachePtr.update(x.GalaxyNamespace, []acl.Group{})
// the anonymous user should have access again
require.Error(t, aclCachePtr.authorizePredicate(emptyGroups, predicate, acl.Read),
"the anonymous user should not have access when the acl cache is empty")
Expand Down
34 changes: 0 additions & 34 deletions edgraph/graphql.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,40 +179,6 @@ func GetCorsOrigins(ctx context.Context) (string, []string, error) {
return corsLast.Uid, corsLast.DgraphCors, nil
}

// UpdateSchemaHistory updates graphql schema history.
func UpdateSchemaHistory(ctx context.Context, schema string) error {
req := &Request{
req: &api.Request{
Mutations: []*api.Mutation{
{
Set: []*api.NQuad{
{
Subject: "_:a",
Predicate: "dgraph.graphql.schema_history",
ObjectValue: &api.Value{Val: &api.Value_StrVal{StrVal: schema}},
},
{
Subject: "_:a",
Predicate: "dgraph.type",
ObjectValue: &api.Value{Val: &api.Value_StrVal{
StrVal: "dgraph.graphql.history"}},
},
},
SetNquads: []byte(fmt.Sprintf(`_:a <dgraph.graphql.schema_created_at> "%s" .`,
time.Now().Format(time.RFC3339))),
},
},
CommitNow: true,
},
doAuth: NoAuthorize,
}
//TODO(Pawan): Make this use right namespace.
ctx = context.WithValue(ctx, IsGraphql, true)
ctx = x.AttachNamespace(ctx, x.GalaxyNamespace)
_, err := (&Server{}).doQuery(ctx, req)
return err
}

// ProcessPersistedQuery stores and retrieves persisted queries by following waterfall logic:
// 1. If sha256Hash is not provided process queries without persisting
// 2. If sha256Hash is provided try retrieving persisted queries
Expand Down
11 changes: 4 additions & 7 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (s *Server) DeleteNamespace(ctx context.Context, namespace uint64) error {
glog.Info("Deleting namespace", namespace)
ctx = x.AttachJWTNamespace(ctx)
if err := AuthGuardianOfTheGalaxy(ctx); err != nil {
return errors.Wrapf(err, "Creating namespace, got error: ")
return errors.Wrapf(err, "Deleting namespace, got error: ")
}
// TODO(Ahsan): We have to ban the pstore for all the groups.
ps := worker.State.Pstore
Expand Down Expand Up @@ -203,12 +203,9 @@ func PeriodicallyPostTelemetry() {

// GetGQLSchema queries for the GraphQL schema node, and returns the uid and the GraphQL schema.
// If multiple schema nodes were found, it returns an error.
func GetGQLSchema() (uid, graphQLSchema string, err error) {
func GetGQLSchema(namespace uint64) (uid, graphQLSchema string, err error) {
ctx := context.WithValue(context.Background(), Authorize, false)
//TODO(Ahsan): There should be a way to getGQLSchema for all the namespaces and reinsert them
// after dropAll. Need to think about what should be the behaviour of drop operations.
ctx = x.AttachNamespace(ctx, x.GalaxyNamespace)

ctx = x.AttachNamespace(ctx, namespace)
resp, err := (&Server{}).Query(ctx,
&api.Request{
Query: `
Expand Down Expand Up @@ -457,7 +454,7 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er
}

// query the GraphQL schema and keep it in memory, so it can be inserted again
_, graphQLSchema, err := GetGQLSchema()
_, graphQLSchema, err := GetGQLSchema(namespace)
if err != nil {
return empty, err
}
Expand Down
3 changes: 0 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ module github.com/dgraph-io/dgraph

go 1.12

// replace github.com/dgraph-io/badger/v2 => /home/mrjn/go/src/github.com/dgraph-io/badger
// replace github.com/dgraph-io/ristretto => /home/mrjn/go/src/github.com/dgraph-io/ristretto

require (
contrib.go.opencensus.io/exporter/jaeger v0.1.0
contrib.go.opencensus.io/exporter/prometheus v0.1.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Ev
github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
github.com/dgraph-io/badger/v3 v3.0.0-20210208122220-162b5787192b h1:nD2CjktZ78EClTWWwFhMvVuBxYunKEFLClNkFHGr+aU=
github.com/dgraph-io/badger/v3 v3.0.0-20210208122220-162b5787192b/go.mod h1:ag1DYFcc5xVWtMbbwnl9ESgk1dE2ukWO4hdvzENQnAw=
github.com/dgraph-io/dgo/v200 v200.0.0-20210208072308-4dd991b9b20e h1:3K0Wg/IMUTp4vyCRyD5SlmCYbmVxAz5o6eJvSiS72Gs=
github.com/dgraph-io/dgo/v200 v200.0.0-20210208072308-4dd991b9b20e/go.mod h1:ky1IOcEAlOxmk89KxXGECgRAEkdJrNHVymvCmixaVuM=
github.com/dgraph-io/dgo/v200 v200.0.0-20210208110130-c589adec3d8f h1:XZRsTllGQWUu0SpAb8mGW9motF0KmD7f6UEKa7y2grE=
github.com/dgraph-io/dgo/v200 v200.0.0-20210208110130-c589adec3d8f/go.mod h1:ky1IOcEAlOxmk89KxXGECgRAEkdJrNHVymvCmixaVuM=
github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM=
Expand Down
Loading

0 comments on commit 3c42449

Please sign in to comment.