diff --git a/chart/permissions-api/templates/_helpers.tpl b/chart/permissions-api/templates/_helpers.tpl index b43ff1cb..08527833 100644 --- a/chart/permissions-api/templates/_helpers.tpl +++ b/chart/permissions-api/templates/_helpers.tpl @@ -13,6 +13,11 @@ secret: secretName: {{ . }} {{- end }} +{{- with .Values.config.events.nats.credsSecretName }} +- name: nats-creds + secret: + secretName: {{ . }} +{{- end }} {{- with .Values.config.spicedb.policyConfigMapName }} - name: policy-file configMap: @@ -27,6 +32,10 @@ - name: spicedb-ca mountPath: /etc/ssl/spicedb/ {{- end }} +{{- if .Values.config.events.nats.credsSecretName }} +- name: nats-creds + mountPath: /nats +{{- end }} {{- if .Values.config.spicedb.policyConfigMapName }} - name: policy-file mountPath: /policy diff --git a/chart/permissions-api/templates/config-server.yaml b/chart/permissions-api/templates/config-server.yaml index e82d6521..c9f099b1 100644 --- a/chart/permissions-api/templates/config-server.yaml +++ b/chart/permissions-api/templates/config-server.yaml @@ -10,4 +10,4 @@ metadata: service: server data: config.yaml: | - {{- pick .Values.config "server" "oidc" "spicedb" "tracing" | toYaml | nindent 4 }} + {{- pick .Values.config "server" "oidc" "spicedb" "tracing" "events" | toYaml | nindent 4 }} diff --git a/chart/permissions-api/values.yaml b/chart/permissions-api/values.yaml index 0c5a2e3c..a06ba59d 100644 --- a/chart/permissions-api/values.yaml +++ b/chart/permissions-api/values.yaml @@ -47,6 +47,9 @@ config: policyConfigMapName: "" events: + # zedTokenBucket is the NATS bucket to use for caching ZedTokens + zedTokenBucket: "" + # topics are the list of topics to subscribe to topics: [] diff --git a/cmd/createrole.go b/cmd/createrole.go index 0eff487c..c1fa2d56 100644 --- a/cmd/createrole.go +++ b/cmd/createrole.go @@ -5,6 +5,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" + "go.infratographer.com/x/events" "go.infratographer.com/x/gidx" "go.infratographer.com/x/viperx" @@ -59,6 +60,16 @@ func createRole(ctx context.Context, cfg *config.AppConfig) { logger.Fatalw("unable to initialize spicedb client", "error", err) } + eventsConn, err := events.NewConnection(cfg.Events.Config, events.WithLogger(logger)) + if err != nil { + logger.Fatalw("failed to initialize events", "error", err) + } + + kv, err := initializeKV(cfg.Events, eventsConn) + if err != nil { + logger.Fatalw("failed to initialize KV", "error", err) + } + var policy iapl.Policy if cfg.SpiceDB.PolicyFile != "" { @@ -86,7 +97,10 @@ func createRole(ctx context.Context, cfg *config.AppConfig) { logger.Fatalw("error parsing subject ID", "error", err) } - engine := query.NewEngine("infratographer", spiceClient, query.WithPolicy(policy), query.WithLogger(logger)) + engine, err := query.NewEngine("infratographer", spiceClient, kv, query.WithPolicy(policy), query.WithLogger(logger)) + if err != nil { + logger.Fatalw("error creating engine", "error", err) + } resource, err := engine.NewResourceFromID(resourceID) if err != nil { diff --git a/cmd/kv.go b/cmd/kv.go new file mode 100644 index 00000000..a5c287e1 --- /dev/null +++ b/cmd/kv.go @@ -0,0 +1,30 @@ +package cmd + +import ( + "errors" + + "github.com/nats-io/nats.go" + "go.infratographer.com/x/events" + + "go.infratographer.com/permissions-api/internal/config" +) + +var ( + errInvalidSource = errors.New("events source must be a NATS connection") +) + +func initializeKV(cfg config.EventsConfig, eventsConn events.Connection) (nats.KeyValue, error) { + // While in theory the events package supports any kind of broker, in practice we only + // support NATS. + natsConn, ok := eventsConn.Source().(*nats.Conn) + if !ok { + return nil, errInvalidSource + } + + js, err := natsConn.JetStream() + if err != nil { + return nil, err + } + + return js.KeyValue(cfg.ZedTokenBucket) +} diff --git a/cmd/server.go b/cmd/server.go index d0b94c5a..2d8f5925 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -7,6 +7,7 @@ import ( "github.com/spf13/viper" "go.infratographer.com/x/echojwtx" "go.infratographer.com/x/echox" + "go.infratographer.com/x/events" "go.infratographer.com/x/otelx" "go.infratographer.com/x/versionx" "go.uber.org/zap" @@ -38,6 +39,7 @@ func init() { echox.MustViperFlags(v, serverCmd.Flags(), apiDefaultListen) otelx.MustViperFlags(v, serverCmd.Flags()) echojwtx.MustViperFlags(v, serverCmd.Flags()) + events.MustViperFlags(v, serverCmd.Flags(), appName) } func serve(ctx context.Context, cfg *config.AppConfig) { @@ -51,6 +53,16 @@ func serve(ctx context.Context, cfg *config.AppConfig) { logger.Fatalw("unable to initialize spicedb client", "error", err) } + eventsConn, err := events.NewConnection(cfg.Events.Config, events.WithLogger(logger)) + if err != nil { + logger.Fatalw("failed to initialize events", "error", err) + } + + kv, err := initializeKV(cfg.Events, eventsConn) + if err != nil { + logger.Fatalw("failed to initialize KV", "error", err) + } + var policy iapl.Policy if cfg.SpiceDB.PolicyFile != "" { @@ -68,7 +80,10 @@ func serve(ctx context.Context, cfg *config.AppConfig) { logger.Fatalw("invalid spicedb policy", "error", err) } - engine := query.NewEngine("infratographer", spiceClient, query.WithPolicy(policy)) + engine, err := query.NewEngine("infratographer", spiceClient, kv, query.WithPolicy(policy)) + if err != nil { + logger.Fatalw("error creating engine", "error", err) + } srv, err := echox.NewServer( logger.Desugar(), diff --git a/cmd/worker.go b/cmd/worker.go index 6e3ee2db..89e014f0 100644 --- a/cmd/worker.go +++ b/cmd/worker.go @@ -69,14 +69,22 @@ func worker(ctx context.Context, cfg *config.AppConfig) { logger.Fatalw("invalid spicedb policy", "error", err) } - engine := query.NewEngine("infratographer", spiceClient, query.WithPolicy(policy), query.WithLogger(logger)) - - events, err := events.NewConnection(cfg.Events.Config, events.WithLogger(logger)) + eventsConn, err := events.NewConnection(cfg.Events.Config, events.WithLogger(logger)) if err != nil { logger.Fatalw("failed to initialize events", "error", err) } - subscriber, err := pubsub.NewSubscriber(ctx, events, engine, + kv, err := initializeKV(cfg.Events, eventsConn) + if err != nil { + logger.Fatalw("failed to initialize KV", "error", err) + } + + engine, err := query.NewEngine("infratographer", spiceClient, kv, query.WithPolicy(policy), query.WithLogger(logger)) + if err != nil { + logger.Fatalw("error creating engine", "error", err) + } + + subscriber, err := pubsub.NewSubscriber(ctx, eventsConn, engine, pubsub.WithLogger(logger), ) if err != nil { @@ -140,7 +148,7 @@ func worker(ctx context.Context, cfg *config.AppConfig) { defer cancel() - if err := events.Shutdown(ctx); err != nil { + if err := eventsConn.Shutdown(ctx); err != nil { logger.Fatalw("failed to shutdown events gracefully", "error", "err") } } diff --git a/go.mod b/go.mod index c9cc1df5..b866420a 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/authzed/authzed-go v0.10.1 github.com/authzed/grpcutil v0.0.0-20230908193239-4286bb1d6403 github.com/labstack/echo/v4 v4.11.3 + github.com/nats-io/nats.go v1.31.0 github.com/pkg/errors v0.9.1 github.com/spf13/cobra v1.8.0 github.com/spf13/pflag v1.0.5 @@ -58,7 +59,6 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/nats-io/jwt/v2 v2.5.2 // indirect github.com/nats-io/nats-server/v2 v2.10.4 // indirect - github.com/nats-io/nats.go v1.31.0 // indirect github.com/nats-io/nkeys v0.4.6 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/pelletier/go-toml/v2 v2.1.0 // indirect diff --git a/internal/config/config.go b/internal/config/config.go index cd177b89..aeb3b517 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -16,8 +16,9 @@ import ( // EventsConfig stores the configuration for a load-balancer-api events config type EventsConfig struct { - events.Config `mapstructure:",squash"` - Topics []string + events.Config `mapstructure:",squash"` + Topics []string + ZedTokenBucket string } // AppConfig is the struct used for configuring the app @@ -34,4 +35,7 @@ type AppConfig struct { func MustViperFlags(v *viper.Viper, flags *pflag.FlagSet) { flags.StringSlice("events-topics", []string{}, "event topics to subscribe to") viperx.MustBindFlag(v, "events.topics", flags.Lookup("events-topics")) + + flags.String("events-zedtokenbucket", "", "NATS KV bucket to use for caching ZedTokens") + viperx.MustBindFlag(v, "events.zedtokenbucket", flags.Lookup("events-zedtokenbucket")) } diff --git a/internal/query/relations.go b/internal/query/relations.go index 8a0198d3..c7e136ab 100644 --- a/internal/query/relations.go +++ b/internal/query/relations.go @@ -88,14 +88,18 @@ func (e *engine) SubjectHasPermission(ctx context.Context, subject types.Resourc defer span.End() + consistency, consName := e.determineConsistency(ctx, resource) + span.SetAttributes( + attribute.String( + "permissions.consistency", + consName, + ), + ) + req := &pb.CheckPermissionRequest{ - Consistency: &pb.Consistency{ - Requirement: &pb.Consistency_FullyConsistent{ - FullyConsistent: true, - }, - }, - Resource: resourceToSpiceDBRef(e.namespace, resource), - Permission: action, + Consistency: consistency, + Resource: resourceToSpiceDBRef(e.namespace, resource), + Permission: action, Subject: &pb.SubjectReference{ Object: resourceToSpiceDBRef(e.namespace, subject), }, @@ -256,13 +260,16 @@ func (e *engine) CreateRelationships(ctx context.Context, rels []types.Relations Updates: relUpdates, } - if _, err := e.client.WriteRelationships(ctx, request); err != nil { + resp, err := e.client.WriteRelationships(ctx, request) + if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return err } + e.updateRelationshipZedTokens(ctx, rels, resp.WrittenAt.Token) + return nil } diff --git a/internal/query/relations_test.go b/internal/query/relations_test.go index 1159d92f..b0f6cbdd 100644 --- a/internal/query/relations_test.go +++ b/internal/query/relations_test.go @@ -6,10 +6,12 @@ import ( pb "github.com/authzed/authzed-go/proto/authzed/api/v1" "github.com/authzed/authzed-go/v1" + "github.com/nats-io/nats.go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.infratographer.com/x/gidx" + "go.infratographer.com/x/testing/eventtools" "go.infratographer.com/permissions-api/internal/iapl" "go.infratographer.com/permissions-api/internal/spicedbx" @@ -17,7 +19,7 @@ import ( "go.infratographer.com/permissions-api/internal/types" ) -func testEngine(ctx context.Context, t *testing.T, namespace string) Engine { +func testEngine(ctx context.Context, t *testing.T, namespace string) *engine { config := spicedbx.Config{ Endpoint: "spicedb:50051", Key: "infradev", @@ -36,13 +38,26 @@ func testEngine(ctx context.Context, t *testing.T, namespace string) Engine { _, err = client.WriteSchema(ctx, request) require.NoError(t, err) + natsSrv, err := eventtools.NewNatsServer() + require.NoError(t, err) + + kvCfg := nats.KeyValueConfig{ + Bucket: "zedtokens", + } + + kv, err := natsSrv.JetStream.CreateKeyValue(&kvCfg) + require.NoError(t, err) + t.Cleanup(func() { cleanDB(ctx, t, client, namespace) }) - out := NewEngine(namespace, client, WithPolicy(policy)) + // We call the constructor here to ensure the engine is created appropriately, but + // then return the underlying type so we can do testing with it. + out, err := NewEngine(namespace, client, kv, WithPolicy(policy)) + require.NoError(t, err) - return out + return out.(*engine) } func testPolicy() iapl.Policy { @@ -557,6 +572,10 @@ func TestSubjectActions(t *testing.T) { ctx := context.Background() e := testEngine(ctx, t, namespace) + parentID, err := gidx.NewID("tnntten") + require.NoError(t, err) + parentRes, err := e.NewResourceFromID(parentID) + require.NoError(t, err) tenID, err := gidx.NewID("tnntten") require.NoError(t, err) tenRes, err := e.NewResourceFromID(tenID) @@ -580,6 +599,23 @@ func TestSubjectActions(t *testing.T) { err = e.AssignSubjectRole(ctx, subjRes, role) assert.NoError(t, err) + // Force a ZedToken to be created for the relevant resources. This creates a hierarchy where + // the tenRes tenant and otherRes tenant are both child resources of the parentRes tenant. + rels := []types.Relationship{ + { + Resource: tenRes, + Relation: "parent", + Subject: parentRes, + }, + { + Resource: otherRes, + Relation: "parent", + Subject: parentRes, + }, + } + err = e.CreateRelationships(ctx, rels) + require.NoError(t, err) + type testInput struct { resource types.Resource action string diff --git a/internal/query/service.go b/internal/query/service.go index 197e182e..8c1afcfb 100644 --- a/internal/query/service.go +++ b/internal/query/service.go @@ -4,6 +4,7 @@ import ( "context" "github.com/authzed/authzed-go/v1" + "github.com/nats-io/nats.go" "go.infratographer.com/x/gidx" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" @@ -43,6 +44,7 @@ type engine struct { logger *zap.SugaredLogger namespace string client *authzed.Client + kv nats.KeyValue schema []types.ResourceType schemaPrefixMap map[string]types.ResourceType schemaTypeMap map[string]types.ResourceType @@ -89,13 +91,14 @@ func resourceHasRoleBindings(resType types.ResourceType) bool { } // NewEngine returns a new client for making permissions queries. -func NewEngine(namespace string, client *authzed.Client, options ...Option) Engine { +func NewEngine(namespace string, client *authzed.Client, kv nats.KeyValue, options ...Option) (Engine, error) { tracer := otel.GetTracerProvider().Tracer("go.infratographer.com/permissions-api/internal/query") e := &engine{ logger: zap.NewNop().Sugar(), namespace: namespace, client: client, + kv: kv, tracer: tracer, } @@ -109,7 +112,7 @@ func NewEngine(namespace string, client *authzed.Client, options ...Option) Engi e.cacheSchemaResources() } - return e + return e, nil } // Option is a functional option for the engine diff --git a/internal/query/zedtokens.go b/internal/query/zedtokens.go new file mode 100644 index 00000000..d4bbde64 --- /dev/null +++ b/internal/query/zedtokens.go @@ -0,0 +1,145 @@ +package query + +import ( + "context" + "errors" + + pb "github.com/authzed/authzed-go/proto/authzed/api/v1" + "github.com/nats-io/nats.go" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + + "go.infratographer.com/permissions-api/internal/types" +) + +const ( + consistencyMinimizeLatency = "minimize_latency" + consistencyAtLeastAsFresh = "at_least_as_fresh" +) + +// getLatestZedToken attempts to get the latest ZedToken for the given resource ID. +func (e *engine) getLatestZedToken(ctx context.Context, resourceID string) (string, error) { + _, span := e.tracer.Start( + ctx, + "getLatestZedToken", + trace.WithAttributes( + attribute.String( + "permissions.resource", + resourceID, + ), + ), + ) + + defer span.End() + + resp, err := e.kv.Get(resourceID) + if err != nil { + // Only record this as an error if it wasn't a not found error. + if !errors.Is(err, nats.ErrKeyNotFound) { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return "", err + } + + zedToken := string(resp.Value()) + + return zedToken, nil +} + +// upsertZedToken updates the ZedToken at the given resource ID key with the provided ZedToken. +func (e *engine) upsertZedToken(ctx context.Context, resourceID string, zedToken string) error { + _, span := e.tracer.Start( + ctx, + "upsertZedToken", + trace.WithAttributes( + attribute.String( + "permissions.resource", + resourceID, + ), + ), + ) + + defer span.End() + + zedTokenBytes := []byte(zedToken) + + // Attempt to get a ZedToken. If we found one, update it. If not, create it. If some other error + // happened, log that and return + resp, getErr := e.kv.Get(resourceID) + + var err error + + switch { + // If we found a token, update it. This may fail if another client updated it before we did + case getErr == nil: + _, err = e.kv.Update(resourceID, zedTokenBytes, resp.Revision()) + // If we did not find a token, create it. This may fail if another client created an entry already + case errors.Is(getErr, nats.ErrKeyNotFound): + _, err = e.kv.Create(resourceID, zedTokenBytes) + // If something else happened, just keep moving + default: + } + + // If an error happened when creating or updating the token, record it. + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + + return err + } + + return nil +} + +// updateRelationshipZedTokens updates the NATS KV bucket for ZedTokens, setting the given ZedToken +// as the latest point in time snapshot for every resource in the given list of relationships. +func (e *engine) updateRelationshipZedTokens(ctx context.Context, rels []types.Relationship, zedToken string) { + resourceIDMap := map[string]struct{}{} + for _, rel := range rels { + resourceIDMap[rel.Resource.ID.String()] = struct{}{} + resourceIDMap[rel.Subject.ID.String()] = struct{}{} + } + + for resourceID := range resourceIDMap { + if err := e.upsertZedToken(ctx, resourceID, zedToken); err != nil { + e.logger.Warnw("error upserting ZedToken", "error", err.Error(), "resource_id", resourceID) + } + } +} + +// determineConsistency produces a consistency strategy based on whether a ZedToken exists for a +// given resource. If a ZedToken is available for the resource, at_least_as_fresh is used with the +// retrieved ZedToken. If no such token is found, or if there is an error reaching NATS, minimize_latency +// is used. This ensures that if NATS is not working or available for some reason, we can still make +// permissions checks (albeit in a degraded state). +func (e *engine) determineConsistency(ctx context.Context, resource types.Resource) (*pb.Consistency, string) { + resourceID := resource.ID.String() + + zedToken, err := e.getLatestZedToken(ctx, resourceID) + if err != nil { + if !errors.Is(err, nats.ErrKeyNotFound) { + e.logger.Warnw("error getting latest ZedToken - falling back to minimize_latency", "error", err.Error(), "resource_id", resourceID) + } + + consistency := &pb.Consistency{ + Requirement: &pb.Consistency_MinimizeLatency{ + MinimizeLatency: true, + }, + } + + return consistency, consistencyMinimizeLatency + } + + consistency := &pb.Consistency{ + Requirement: &pb.Consistency_AtLeastAsFresh{ + AtLeastAsFresh: &pb.ZedToken{ + Token: zedToken, + }, + }, + } + + return consistency, consistencyAtLeastAsFresh +} diff --git a/internal/query/zedtokens_test.go b/internal/query/zedtokens_test.go new file mode 100644 index 00000000..3fd1f23c --- /dev/null +++ b/internal/query/zedtokens_test.go @@ -0,0 +1,80 @@ +package query + +import ( + "context" + "testing" + + "go.infratographer.com/permissions-api/internal/testingx" + "go.infratographer.com/permissions-api/internal/types" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.infratographer.com/x/gidx" +) + +func TestConsistency(t *testing.T) { + namespace := "testconsistency" + ctx := context.Background() + e := testEngine(ctx, t, namespace) + + tenantID, err := gidx.NewID("tnntten") + require.NoError(t, err) + tenantRes, err := e.NewResourceFromID(tenantID) + require.NoError(t, err) + + parentID, err := gidx.NewID("tnntten") + require.NoError(t, err) + parentRes, err := e.NewResourceFromID(parentID) + require.NoError(t, err) + + otherID, err := gidx.NewID("tnntten") + require.NoError(t, err) + otherRes, err := e.NewResourceFromID(otherID) + require.NoError(t, err) + + testCases := []testingx.TestCase[types.Resource, string]{ + { + Name: "WithZedToken", + Input: tenantRes, + SetupFn: func(ctx context.Context, t *testing.T) context.Context { + rels := []types.Relationship{ + { + Resource: tenantRes, + Relation: "parent", + Subject: parentRes, + }, + } + + err := e.CreateRelationships(ctx, rels) + + require.NoError(t, err) + + return ctx + }, + CheckFn: func(ctx context.Context, t *testing.T, res testingx.TestResult[string]) { + assert.NoError(t, res.Err) + assert.Equal(t, consistencyAtLeastAsFresh, res.Success) + }, + }, + { + Name: "WithoutZedToken", + Input: otherRes, + CheckFn: func(ctx context.Context, t *testing.T, res testingx.TestResult[string]) { + assert.NoError(t, res.Err) + assert.Equal(t, consistencyMinimizeLatency, res.Success) + }, + }, + } + + testFn := func(ctx context.Context, res types.Resource) testingx.TestResult[string] { + _, consistencyName := e.determineConsistency(ctx, res) + + out := testingx.TestResult[string]{ + Success: consistencyName, + } + + return out + } + + testingx.RunTests(ctx, t, testCases, testFn) +} diff --git a/permissions-api.example.yaml b/permissions-api.example.yaml index 3e74ca68..4eeb4e91 100644 --- a/permissions-api.example.yaml +++ b/permissions-api.example.yaml @@ -2,3 +2,7 @@ oidc: issuer: http://mock-oauth2-server:8081/default spicedb: policyFile: /workspace/policy.example.yaml +events: + nats: + credsFile: /tmp/user.creds + zedTokenBucket: zedtokens