Skip to content

Commit

Permalink
Merge pull request #217 from josephschorr/lookup-caching
Browse files Browse the repository at this point in the history
Add caching to Lookup dispatcher
  • Loading branch information
jakedt authored Oct 29, 2021
2 parents 84b3961 + 961cc02 commit f225a86
Show file tree
Hide file tree
Showing 15 changed files with 406 additions and 203 deletions.
87 changes: 75 additions & 12 deletions internal/dispatch/caching/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,26 @@ type cachingDispatcher struct {
d dispatch.Dispatcher
c *ristretto.Cache

checkTotalCounter prometheus.Counter
checkFromCacheCounter prometheus.Counter
checkTotalCounter prometheus.Counter
checkFromCacheCounter prometheus.Counter
lookupTotalCounter prometheus.Counter
lookupFromCacheCounter prometheus.Counter
}

type checkResultEntry struct {
result *v1.DispatchCheckResponse
computedWithDepthRemaining uint32
result *v1.DispatchCheckResponse
depthRequired uint32
}

var checkResultEntryCost = int64(unsafe.Sizeof(checkResultEntry{}))
type lookupResultEntry struct {
result *v1.DispatchLookupResponse
depthRequired uint32
}

var (
checkResultEntryCost = int64(unsafe.Sizeof(checkResultEntry{}))
lookupResultEntryEmptyCost = int64(unsafe.Sizeof(lookupResultEntry{}))
)

// NewCachingDispatcher creates a new dispatch.Dispatcher which delegates dispatch requests
// and caches the responses when possible and desirable.
Expand Down Expand Up @@ -66,6 +76,17 @@ func NewCachingDispatcher(
Name: "check_from_cache_total",
})

lookupTotalCounter := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: prometheusNamespace,
Subsystem: prometheusSubsystem,
Name: "lookup_total",
})
lookupFromCacheCounter := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: prometheusNamespace,
Subsystem: prometheusSubsystem,
Name: "lookup_from_cache_total",
})

if prometheusSubsystem != "" {
err = prometheus.Register(checkTotalCounter)
if err != nil {
Expand All @@ -77,6 +98,16 @@ func NewCachingDispatcher(
return nil, fmt.Errorf(errCachingInitialization, err)
}

err = prometheus.Register(lookupTotalCounter)
if err != nil {
return nil, fmt.Errorf(errCachingInitialization, err)
}

err = prometheus.Register(lookupFromCacheCounter)
if err != nil {
return nil, fmt.Errorf(errCachingInitialization, err)
}

// Export some ristretto metrics
err = registerMetricsFunc("cache_hits_total", prometheusSubsystem, cache.Metrics.Hits)
if err != nil {
Expand All @@ -99,7 +130,7 @@ func NewCachingDispatcher(
}
}

return &cachingDispatcher{delegate, cache, checkTotalCounter, checkFromCacheCounter}, nil
return &cachingDispatcher{delegate, cache, checkTotalCounter, checkFromCacheCounter, lookupTotalCounter, lookupFromCacheCounter}, nil
}

func registerMetricsFunc(name string, subsystem string, metricsFunc func() uint64) error {
Expand All @@ -115,11 +146,11 @@ func registerMetricsFunc(name string, subsystem string, metricsFunc func() uint6
// DispatchCheck implements dispatch.Check interface
func (cd *cachingDispatcher) DispatchCheck(ctx context.Context, req *v1.DispatchCheckRequest) (*v1.DispatchCheckResponse, error) {
cd.checkTotalCounter.Inc()
requestKey := requestToKey(req)
requestKey := checkRequestToKey(req)

if cachedResultRaw, found := cd.c.Get(requestKey); found {
cachedResult := cachedResultRaw.(checkResultEntry)
if req.Metadata.DepthRemaining >= cachedResult.computedWithDepthRemaining {
if req.Metadata.DepthRemaining >= cachedResult.depthRequired {
cd.checkFromCacheCounter.Inc()
return cachedResult.result, nil
}
Expand All @@ -129,7 +160,7 @@ func (cd *cachingDispatcher) DispatchCheck(ctx context.Context, req *v1.Dispatch

// We only want to cache the result if there was no error
if err == nil {
toCache := checkResultEntry{computed, req.Metadata.DepthRemaining}
toCache := checkResultEntry{computed, computed.Metadata.DepthRequired}
toCache.result.Metadata.DispatchCount = 0
cd.c.Set(requestKey, toCache, checkResultEntryCost)
}
Expand All @@ -146,7 +177,35 @@ func (cd *cachingDispatcher) DispatchExpand(ctx context.Context, req *v1.Dispatc

// DispatchLookup implements dispatch.Lookup interface and does not do any caching yet.
func (cd *cachingDispatcher) DispatchLookup(ctx context.Context, req *v1.DispatchLookupRequest) (*v1.DispatchLookupResponse, error) {
return cd.d.DispatchLookup(ctx, req)
cd.lookupTotalCounter.Inc()
requestKey := lookupRequestToKey(req)
if cachedResultRaw, found := cd.c.Get(requestKey); found {
cachedResult := cachedResultRaw.(lookupResultEntry)
if req.Metadata.DepthRemaining >= cachedResult.depthRequired {
cd.lookupFromCacheCounter.Inc()
return cachedResult.result, nil
}
}

computed, err := cd.d.DispatchLookup(ctx, req)

// We only want to cache the result if there was no error
if err == nil {
requestKey := lookupRequestToKey(req)
toCache := lookupResultEntry{computed, computed.Metadata.DepthRequired}
toCache.result.Metadata.DispatchCount = 0

estimatedSize := lookupResultEntryEmptyCost
for _, onr := range toCache.result.ResolvedOnrs {
estimatedSize += int64(len(onr.Namespace) + len(onr.ObjectId) + len(onr.Relation))
}

cd.c.Set(requestKey, toCache, estimatedSize)
}

// Return both the computed and err in ALL cases: computed contains resolved metadata even
// if there was an error.
return computed, err
}

func (cd *cachingDispatcher) Close() error {
Expand All @@ -158,6 +217,10 @@ func (cd *cachingDispatcher) Close() error {
return nil
}

func requestToKey(req *v1.DispatchCheckRequest) string {
return fmt.Sprintf("%s@%s@%s", tuple.StringONR(req.ObjectAndRelation), tuple.StringONR(req.Subject), req.Metadata.AtRevision)
func checkRequestToKey(req *v1.DispatchCheckRequest) string {
return fmt.Sprintf("check//%s@%s@%s", tuple.StringONR(req.ObjectAndRelation), tuple.StringONR(req.Subject), req.Metadata.AtRevision)
}

func lookupRequestToKey(req *v1.DispatchLookupRequest) string {
return fmt.Sprintf("lookup//%s#%s@%s@%s", req.ObjectRelation.Namespace, req.ObjectRelation.Relation, tuple.StringONR(req.Subject), req.Metadata.AtRevision)
}
56 changes: 29 additions & 27 deletions internal/dispatch/caching/cachingdispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ type checkRequest struct {
start string
goal string
atRevision decimal.Decimal
depthRemaining uint16
depthRequired uint32
depthRemaining uint32
expectPassthrough bool
}

Expand All @@ -32,46 +33,46 @@ func TestMaxDepthCaching(t *testing.T) {
script []checkRequest
}{
{"single request", []checkRequest{
{start1, user1, decimal.Zero, 50, true},
{start1, user1, decimal.Zero, 1, 50, true},
}},
{"two requests, hit", []checkRequest{
{start1, user1, decimal.Zero, 50, true},
{start1, user1, decimal.Zero, 50, false},
{start1, user1, decimal.Zero, 1, 50, true},
{start1, user1, decimal.Zero, 1, 50, false},
}},
{"many requests, hit", []checkRequest{
{start1, user1, decimal.Zero, 50, true},
{start1, user1, decimal.Zero, 50, false},
{start1, user1, decimal.Zero, 50, false},
{start1, user1, decimal.Zero, 50, false},
{start1, user1, decimal.Zero, 50, false},
{start1, user1, decimal.Zero, 1, 50, true},
{start1, user1, decimal.Zero, 1, 50, false},
{start1, user1, decimal.Zero, 1, 50, false},
{start1, user1, decimal.Zero, 1, 50, false},
{start1, user1, decimal.Zero, 1, 50, false},
}},
{"multiple keys", []checkRequest{
{start1, user1, decimal.Zero, 50, true},
{start2, user2, decimal.Zero, 50, true},
{start1, user1, decimal.Zero, 1, 50, true},
{start2, user2, decimal.Zero, 1, 50, true},
}},
{"same object, different revisions miss", []checkRequest{
{start1, user1, decimal.Zero, 50, true},
{start1, user1, decimal.NewFromInt(50), 50, true},
{start1, user1, decimal.Zero, 1, 50, true},
{start1, user1, decimal.NewFromInt(50), 1, 50, true},
}},
{"interleaved objects, hit", []checkRequest{
{start1, user1, decimal.Zero, 50, true},
{start2, user2, decimal.Zero, 50, true},
{start1, user1, decimal.Zero, 50, false},
{start2, user2, decimal.Zero, 50, false},
{start1, user1, decimal.Zero, 1, 50, true},
{start2, user2, decimal.Zero, 1, 50, true},
{start1, user1, decimal.Zero, 1, 50, false},
{start2, user2, decimal.Zero, 1, 50, false},
}},
{"insufficient depth", []checkRequest{
{start1, user1, decimal.Zero, 50, true},
{start1, user1, decimal.Zero, 40, true},
{start1, user1, decimal.Zero, 21, 50, true},
{start1, user1, decimal.Zero, 21, 20, true},
}},
{"sufficient depth", []checkRequest{
{start1, user1, decimal.Zero, 40, true},
{start1, user1, decimal.Zero, 50, false},
{start1, user1, decimal.Zero, 1, 40, true},
{start1, user1, decimal.Zero, 1, 50, false},
}},
{"updated cached depth", []checkRequest{
{start1, user1, decimal.Zero, 50, true},
{start1, user1, decimal.Zero, 40, true},
{start1, user1, decimal.Zero, 40, false},
{start1, user1, decimal.Zero, 50, false},
{start1, user1, decimal.Zero, 21, 50, true},
{start1, user1, decimal.Zero, 21, 40, false},
{start1, user1, decimal.Zero, 21, 20, true},
{start1, user1, decimal.Zero, 21, 50, false},
}},
}

Expand All @@ -88,12 +89,13 @@ func TestMaxDepthCaching(t *testing.T) {
Subject: tuple.ParseSubjectONR(step.goal),
Metadata: &v1.ResolverMeta{
AtRevision: step.atRevision.String(),
DepthRemaining: uint32(step.depthRemaining),
DepthRemaining: step.depthRemaining,
},
}).Return(&v1.DispatchCheckResponse{
Membership: v1.DispatchCheckResponse_MEMBER,
Metadata: &v1.ResponseMeta{
DispatchCount: 1,
DepthRequired: step.depthRequired,
},
}, nil).Times(1)
}
Expand All @@ -109,7 +111,7 @@ func TestMaxDepthCaching(t *testing.T) {
Subject: tuple.ParseSubjectONR(step.goal),
Metadata: &v1.ResolverMeta{
AtRevision: step.atRevision.String(),
DepthRemaining: uint32(step.depthRemaining),
DepthRemaining: step.depthRemaining,
},
})
require.NoError(err)
Expand Down
1 change: 1 addition & 0 deletions internal/dispatch/graph/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func TestSimple(t *testing.T) {

require.NoError(err)
require.Equal(expected.isMember, checkResult.Membership == v1.DispatchCheckResponse_MEMBER)
require.GreaterOrEqual(checkResult.Metadata.DepthRequired, uint32(1))
})
}
}
Expand Down
1 change: 1 addition & 0 deletions internal/dispatch/graph/expand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func TestExpand(t *testing.T) {

require.NoError(err)
require.NotNil(expandResult.TreeNode)
require.GreaterOrEqual(expandResult.Metadata.DepthRequired, uint32(1))

if diff := cmp.Diff(tc.expected, expandResult.TreeNode, protocmp.Transform()); diff != "" {
fset := token.NewFileSet()
Expand Down
21 changes: 21 additions & 0 deletions internal/dispatch/graph/lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,27 @@ func TestSimpleLookup(t *testing.T) {

require.NoError(err)
require.ElementsMatch(tc.resolvedObjects, lookupResult.ResolvedOnrs)

// We have to sleep a while to let the cache converge:
// https://github.com/dgraph-io/ristretto/blob/01b9f37dd0fd453225e042d6f3a27cd14f252cd0/cache_test.go#L17
time.Sleep(10 * time.Millisecond)

// Run again with the cache available.
lookupResult, err = dispatch.DispatchLookup(context.Background(), &v1.DispatchLookupRequest{
ObjectRelation: tc.start,
Subject: tc.target,
Metadata: &v1.ResolverMeta{
AtRevision: revision.String(),
DepthRemaining: 50,
},
Limit: 10,
DirectStack: nil,
TtuStack: nil,
})

require.NoError(err)
require.ElementsMatch(tc.resolvedObjects, lookupResult.ResolvedOnrs)
require.GreaterOrEqual(lookupResult.Metadata.DepthRequired, uint32(1))
})
}
}
Expand Down
Loading

0 comments on commit f225a86

Please sign in to comment.