Skip to content

Commit

Permalink
Implement DispatchLookupSubjects for finding all reachable subjects o…
Browse files Browse the repository at this point in the history
…f a particular type for a resource and permission

First part of work for authzed#261
  • Loading branch information
josephschorr committed Aug 5, 2022
1 parent 0920b37 commit 68ebdaf
Show file tree
Hide file tree
Showing 15 changed files with 1,626 additions and 81 deletions.
85 changes: 83 additions & 2 deletions internal/dispatch/caching/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Dispatcher struct {
lookupFromCacheCounter prometheus.Counter
reachableResourcesTotalCounter prometheus.Counter
reachableResourcesFromCacheCounter prometheus.Counter
lookupSubjectsTotalCounter prometheus.Counter
lookupSubjectsFromCacheCounter prometheus.Counter

cacheHits prometheus.CounterFunc
cacheMisses prometheus.CounterFunc
Expand All @@ -54,10 +56,15 @@ type reachableResourcesResultEntry struct {
responses []*v1.DispatchReachableResourcesResponse
}

type lookupSubjectsResultEntry struct {
responses []*v1.DispatchLookupSubjectsResponse
}

var (
checkResultEntryCost = int64(unsafe.Sizeof(checkResultEntry{}))
lookupResultEntryEmptyCost = int64(unsafe.Sizeof(lookupResultEntry{}))
reachbleResourcesEntryEmptyCost = int64(unsafe.Sizeof(reachableResourcesResultEntry{}))
lookupSubjectsEntryEmptyCost = int64(unsafe.Sizeof(lookupSubjectsResultEntry{}))
)

// NewCachingDispatcher creates a new dispatch.Dispatcher which delegates dispatch requests
Expand Down Expand Up @@ -122,6 +129,17 @@ func NewCachingDispatcher(
Name: "reachable_resources_from_cache_total",
})

lookupSubjectsTotalCounter := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: prometheusNamespace,
Subsystem: prometheusSubsystem,
Name: "lookup_subjects_total",
})
lookupSubjectsFromCacheCounter := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: prometheusNamespace,
Subsystem: prometheusSubsystem,
Name: "lookup_subjects_from_cache_total",
})

cacheHitsTotal := prometheus.NewCounterFunc(prometheus.CounterOpts{
Namespace: prometheusNamespace,
Subsystem: prometheusSubsystem,
Expand Down Expand Up @@ -178,6 +196,14 @@ func NewCachingDispatcher(
if err != nil {
return nil, fmt.Errorf(errCachingInitialization, err)
}
err = prometheus.Register(lookupSubjectsTotalCounter)
if err != nil {
return nil, fmt.Errorf(errCachingInitialization, err)
}
err = prometheus.Register(lookupSubjectsFromCacheCounter)
if err != nil {
return nil, fmt.Errorf(errCachingInitialization, err)
}

// Export some ristretto metrics
err = prometheus.Register(cacheHitsTotal)
Expand Down Expand Up @@ -212,6 +238,8 @@ func NewCachingDispatcher(
lookupFromCacheCounter: lookupFromCacheCounter,
reachableResourcesTotalCounter: reachableResourcesTotalCounter,
reachableResourcesFromCacheCounter: reachableResourcesFromCacheCounter,
lookupSubjectsTotalCounter: lookupSubjectsTotalCounter,
lookupSubjectsFromCacheCounter: lookupSubjectsFromCacheCounter,
cacheHits: cacheHitsTotal,
cacheMisses: cacheMissesTotal,
costAddedBytes: costAddedBytes,
Expand Down Expand Up @@ -343,7 +371,7 @@ func (cd *Dispatcher) DispatchReachableResources(req *v1.DispatchReachableResour
wrapped := &dispatch.WrappedDispatchStream[*v1.DispatchReachableResourcesResponse]{
Stream: stream,
Ctx: stream.Context(),
Processor: func(result *v1.DispatchReachableResourcesResponse) (*v1.DispatchReachableResourcesResponse, error) {
Processor: func(result *v1.DispatchReachableResourcesResponse) (*v1.DispatchReachableResourcesResponse, bool, error) {
mu.Lock()
defer mu.Unlock()

Expand All @@ -357,7 +385,7 @@ func (cd *Dispatcher) DispatchReachableResources(req *v1.DispatchReachableResour
for _, id := range result.Resource.ResourceIds {
estimatedSize += int64(len(id))
}
return result, nil
return result, true, nil
},
}

Expand All @@ -372,6 +400,59 @@ func (cd *Dispatcher) DispatchReachableResources(req *v1.DispatchReachableResour
return err
}

// DispatchLookupSubjects implements dispatch.LookupSubjects interface and does not do any caching yet.
func (cd *Dispatcher) DispatchLookupSubjects(req *v1.DispatchLookupSubjectsRequest, stream dispatch.LookupSubjectsStream) error {
cd.lookupSubjectsTotalCounter.Inc()

requestKey := dispatch.LookupSubjectsRequestToKey(req)
if cachedResultRaw, found := cd.c.Get(requestKey); found {
cachedResult := cachedResultRaw.(lookupSubjectsResultEntry)
cd.lookupSubjectsFromCacheCounter.Inc()
for _, result := range cachedResult.responses {
err := stream.Publish(result)
if err != nil {
return err
}
}

return nil
}

var mu sync.Mutex
estimatedSize := lookupSubjectsEntryEmptyCost
toCacheResults := []*v1.DispatchLookupSubjectsResponse{}
wrapped := &dispatch.WrappedDispatchStream[*v1.DispatchLookupSubjectsResponse]{
Stream: stream,
Ctx: stream.Context(),
Processor: func(result *v1.DispatchLookupSubjectsResponse) (*v1.DispatchLookupSubjectsResponse, bool, error) {
mu.Lock()
defer mu.Unlock()

adjustedResult := proto.Clone(result).(*v1.DispatchLookupSubjectsResponse)
adjustedResult.Metadata.CachedDispatchCount = adjustedResult.Metadata.DispatchCount
adjustedResult.Metadata.DispatchCount = 0
adjustedResult.Metadata.DebugInfo = nil

toCacheResults = append(toCacheResults, adjustedResult)

for _, id := range result.FoundSubjectIds {
estimatedSize += int64(len(id))
}
return result, true, nil
},
}

err := cd.d.DispatchLookupSubjects(req, wrapped)

// We only want to cache the result if there was no error
if err == nil {
toCache := lookupSubjectsResultEntry{toCacheResults}
cd.c.Set(requestKey, toCache, estimatedSize)
}

return err
}

func (cd *Dispatcher) Close() error {
prometheus.Unregister(cd.checkTotalCounter)
prometheus.Unregister(cd.lookupTotalCounter)
Expand Down
4 changes: 4 additions & 0 deletions internal/dispatch/caching/cachingdispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ func (ddm delegateDispatchMock) DispatchReachableResources(req *v1.DispatchReach
return nil
}

func (ddm delegateDispatchMock) DispatchLookupSubjects(req *v1.DispatchLookupSubjectsRequest, stream dispatch.LookupSubjectsStream) error {
return nil
}

func (ddm delegateDispatchMock) Close() error {
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions internal/dispatch/caching/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,8 @@ func (fd fakeDelegate) DispatchReachableResources(req *v1.DispatchReachableResou
panic(errMessage)
}

func (fd fakeDelegate) DispatchLookupSubjects(req *v1.DispatchLookupSubjectsRequest, stream dispatch.LookupSubjectsStream) error {
panic(errMessage)
}

var _ dispatch.Dispatcher = fakeDelegate{}
27 changes: 27 additions & 0 deletions internal/dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Dispatcher interface {
Expand
Lookup
ReachableResources
LookupSubjects

// Close closes the dispatcher.
Close() error
Expand Down Expand Up @@ -60,6 +61,18 @@ type ReachableResources interface {
) error
}

// LookupSubjectsStream is an alias for the stream to which found subjects will be written.
type LookupSubjectsStream = Stream[*v1.DispatchLookupSubjectsResponse]

// LookupSubjects interface describes just the methods required to dispatch lookup subjects requests.
type LookupSubjects interface {
// DispatchLookupSubjects submits a single lookup subjects request, writing its results to the specified stream.
DispatchLookupSubjects(
req *v1.DispatchLookupSubjectsRequest,
stream LookupSubjectsStream,
) error
}

// HasMetadata is an interface for requests containing resolver metadata.
type HasMetadata interface {
zerolog.LogObjectMarshaler
Expand Down Expand Up @@ -90,6 +103,7 @@ const (
lookupPrefix cachePrefix = "l"
expandPrefix cachePrefix = "e"
reachableResourcesPrefix cachePrefix = "rr"
lookupSubjectsPrefix cachePrefix = "ls"
)

var cachePrefixes = []cachePrefix{checkViaRelationPrefix, checkViaCanonicalPrefix, lookupPrefix, expandPrefix, reachableResourcesPrefix}
Expand Down Expand Up @@ -132,3 +146,16 @@ func ReachableResourcesRequestToKey(req *v1.DispatchReachableResourcesRequest) s
req.Metadata.AtRevision,
)
}

// LookupSubjectsRequestToKey converts a lookup subjects request into a cache key
func LookupSubjectsRequestToKey(req *v1.DispatchLookupSubjectsRequest) string {
return fmt.Sprintf("%s//%s#%s@%s#%s:[%s]@%s",
lookupSubjectsPrefix,
req.ResourceRelation.Namespace,
req.ResourceRelation.Relation,
req.SubjectRelation.Namespace,
req.SubjectRelation.Relation,
strings.Join(req.ResourceIds, ","),
req.Metadata.AtRevision,
)
}
35 changes: 35 additions & 0 deletions internal/dispatch/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func NewLocalOnlyDispatcher(concurrencyLimit uint16) dispatch.Dispatcher {
d.expander = graph.NewConcurrentExpander(d)
d.lookupHandler = graph.NewConcurrentLookup(d, d, concurrencyLimit)
d.reachableResourcesHandler = graph.NewConcurrentReachableResources(d, concurrencyLimit)
d.lookupSubjectsHandler = graph.NewConcurrentLookupSubjects(d, concurrencyLimit)

return d
}
Expand All @@ -42,12 +43,14 @@ func NewDispatcher(redispatcher dispatch.Dispatcher, concurrencyLimit uint16) di
expander := graph.NewConcurrentExpander(redispatcher)
lookupHandler := graph.NewConcurrentLookup(redispatcher, redispatcher, concurrencyLimit)
reachableResourcesHandler := graph.NewConcurrentReachableResources(redispatcher, concurrencyLimit)
lookupSubjectsHandler := graph.NewConcurrentLookupSubjects(redispatcher, concurrencyLimit)

return &localDispatcher{
checker: checker,
expander: expander,
lookupHandler: lookupHandler,
reachableResourcesHandler: reachableResourcesHandler,
lookupSubjectsHandler: lookupSubjectsHandler,
}
}

Expand All @@ -56,6 +59,7 @@ type localDispatcher struct {
expander *graph.ConcurrentExpander
lookupHandler *graph.ConcurrentLookup
reachableResourcesHandler *graph.ConcurrentReachableResources
lookupSubjectsHandler *graph.ConcurrentLookupSubjects
}

func (ld *localDispatcher) loadNamespace(ctx context.Context, nsName string, revision decimal.Decimal) (*core.NamespaceDefinition, error) {
Expand Down Expand Up @@ -281,6 +285,37 @@ func (ld *localDispatcher) DispatchReachableResources(
return ld.reachableResourcesHandler.ReachableResources(validatedReq, wrappedStream)
}

// DispatchLookupSubjects implements dispatch.LookupSubjects interface
func (ld *localDispatcher) DispatchLookupSubjects(
req *v1.DispatchLookupSubjectsRequest,
stream dispatch.LookupSubjectsStream,
) error {
ctx, span := tracer.Start(stream.Context(), "DispatchLookupSubjects", trace.WithAttributes(
attribute.Stringer("resource-type", stringableRelRef{req.ResourceRelation}),
attribute.Stringer("subject-type", stringableRelRef{req.SubjectRelation}),
attribute.StringSlice("resource-ids", req.ResourceIds),
))
defer span.End()

err := dispatch.CheckDepth(ctx, req)
if err != nil {
return err
}

revision, err := decimal.NewFromString(req.Metadata.AtRevision)
if err != nil {
return err
}

validatedReq := graph.ValidatedLookupSubjectsRequest{
DispatchLookupSubjectsRequest: req,
Revision: revision,
}

wrappedStream := dispatch.StreamWithContext(ctx, stream)
return ld.lookupSubjectsHandler.LookupSubjects(validatedReq, wrappedStream)
}

func (ld *localDispatcher) Close() error {
return nil
}
Expand Down
Loading

0 comments on commit 68ebdaf

Please sign in to comment.