Skip to content

Commit

Permalink
Add DepthRequired to dispatch and use for cache checking
Browse files Browse the repository at this point in the history
  • Loading branch information
josephschorr committed Oct 29, 2021
1 parent c9e1e09 commit 3e4faff
Show file tree
Hide file tree
Showing 12 changed files with 172 additions and 100 deletions.
22 changes: 11 additions & 11 deletions internal/dispatch/caching/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ type cachingDispatcher struct {
}

type checkResultEntry struct {
result *v1.DispatchCheckResponse
computedWithDepthRemaining uint32
result *v1.DispatchCheckResponse
depthRequired uint32
}

type lookupResultEntry struct {
result *v1.DispatchLookupResponse
computedWithDepthRemaining uint32
result *v1.DispatchLookupResponse
depthRequired uint32
}

var (
Expand Down Expand Up @@ -150,7 +150,7 @@ func (cd *cachingDispatcher) DispatchCheck(ctx context.Context, req *v1.Dispatch

if cachedResultRaw, found := cd.c.Get(requestKey); found {
cachedResult := cachedResultRaw.(checkResultEntry)
if req.Metadata.DepthRemaining >= cachedResult.computedWithDepthRemaining {
if req.Metadata.DepthRemaining >= cachedResult.depthRequired {
cd.checkFromCacheCounter.Inc()
return cachedResult.result, nil
}
Expand All @@ -160,7 +160,7 @@ func (cd *cachingDispatcher) DispatchCheck(ctx context.Context, req *v1.Dispatch

// We only want to cache the result if there was no error
if err == nil {
toCache := checkResultEntry{computed, req.Metadata.DepthRemaining}
toCache := checkResultEntry{computed, computed.Metadata.DepthRequired}
toCache.result.Metadata.DispatchCount = 0
cd.c.Set(requestKey, toCache, checkResultEntryCost)
}
Expand All @@ -178,10 +178,10 @@ func (cd *cachingDispatcher) DispatchExpand(ctx context.Context, req *v1.Dispatc
// DispatchLookup implements dispatch.Lookup interface and does not do any caching yet.
func (cd *cachingDispatcher) DispatchLookup(ctx context.Context, req *v1.DispatchLookupRequest) (*v1.DispatchLookupResponse, error) {
cd.lookupTotalCounter.Inc()
if req.Metadata.DepthRemaining > 0 {
requestKey := lookupRequestToKey(req)
if cachedResultRaw, found := cd.c.Get(requestKey); found {
cachedResult := cachedResultRaw.(lookupResultEntry)
requestKey := lookupRequestToKey(req)
if cachedResultRaw, found := cd.c.Get(requestKey); found {
cachedResult := cachedResultRaw.(lookupResultEntry)
if req.Metadata.DepthRemaining >= cachedResult.depthRequired {
cd.lookupFromCacheCounter.Inc()
return cachedResult.result, nil
}
Expand All @@ -192,7 +192,7 @@ func (cd *cachingDispatcher) DispatchLookup(ctx context.Context, req *v1.Dispatc
// We only want to cache the result if there was no error
if err == nil {
requestKey := lookupRequestToKey(req)
toCache := lookupResultEntry{computed, req.Metadata.DepthRemaining}
toCache := lookupResultEntry{computed, computed.Metadata.DepthRequired}
toCache.result.Metadata.DispatchCount = 0

estimatedSize := lookupResultEntryEmptyCost
Expand Down
56 changes: 29 additions & 27 deletions internal/dispatch/caching/cachingdispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ type checkRequest struct {
start string
goal string
atRevision decimal.Decimal
depthRemaining uint16
depthRequired uint32
depthRemaining uint32
expectPassthrough bool
}

Expand All @@ -32,46 +33,46 @@ func TestMaxDepthCaching(t *testing.T) {
script []checkRequest
}{
{"single request", []checkRequest{
{start1, user1, decimal.Zero, 50, true},
{start1, user1, decimal.Zero, 1, 50, true},
}},
{"two requests, hit", []checkRequest{
{start1, user1, decimal.Zero, 50, true},
{start1, user1, decimal.Zero, 50, false},
{start1, user1, decimal.Zero, 1, 50, true},
{start1, user1, decimal.Zero, 1, 50, false},
}},
{"many requests, hit", []checkRequest{
{start1, user1, decimal.Zero, 50, true},
{start1, user1, decimal.Zero, 50, false},
{start1, user1, decimal.Zero, 50, false},
{start1, user1, decimal.Zero, 50, false},
{start1, user1, decimal.Zero, 50, false},
{start1, user1, decimal.Zero, 1, 50, true},
{start1, user1, decimal.Zero, 1, 50, false},
{start1, user1, decimal.Zero, 1, 50, false},
{start1, user1, decimal.Zero, 1, 50, false},
{start1, user1, decimal.Zero, 1, 50, false},
}},
{"multiple keys", []checkRequest{
{start1, user1, decimal.Zero, 50, true},
{start2, user2, decimal.Zero, 50, true},
{start1, user1, decimal.Zero, 1, 50, true},
{start2, user2, decimal.Zero, 1, 50, true},
}},
{"same object, different revisions miss", []checkRequest{
{start1, user1, decimal.Zero, 50, true},
{start1, user1, decimal.NewFromInt(50), 50, true},
{start1, user1, decimal.Zero, 1, 50, true},
{start1, user1, decimal.NewFromInt(50), 1, 50, true},
}},
{"interleaved objects, hit", []checkRequest{
{start1, user1, decimal.Zero, 50, true},
{start2, user2, decimal.Zero, 50, true},
{start1, user1, decimal.Zero, 50, false},
{start2, user2, decimal.Zero, 50, false},
{start1, user1, decimal.Zero, 1, 50, true},
{start2, user2, decimal.Zero, 1, 50, true},
{start1, user1, decimal.Zero, 1, 50, false},
{start2, user2, decimal.Zero, 1, 50, false},
}},
{"insufficient depth", []checkRequest{
{start1, user1, decimal.Zero, 50, true},
{start1, user1, decimal.Zero, 40, true},
{start1, user1, decimal.Zero, 21, 50, true},
{start1, user1, decimal.Zero, 21, 20, true},
}},
{"sufficient depth", []checkRequest{
{start1, user1, decimal.Zero, 40, true},
{start1, user1, decimal.Zero, 50, false},
{start1, user1, decimal.Zero, 1, 40, true},
{start1, user1, decimal.Zero, 1, 50, false},
}},
{"updated cached depth", []checkRequest{
{start1, user1, decimal.Zero, 50, true},
{start1, user1, decimal.Zero, 40, true},
{start1, user1, decimal.Zero, 40, false},
{start1, user1, decimal.Zero, 50, false},
{start1, user1, decimal.Zero, 21, 50, true},
{start1, user1, decimal.Zero, 21, 40, false},
{start1, user1, decimal.Zero, 21, 20, true},
{start1, user1, decimal.Zero, 21, 50, false},
}},
}

Expand All @@ -88,12 +89,13 @@ func TestMaxDepthCaching(t *testing.T) {
Subject: tuple.ParseSubjectONR(step.goal),
Metadata: &v1.ResolverMeta{
AtRevision: step.atRevision.String(),
DepthRemaining: uint32(step.depthRemaining),
DepthRemaining: step.depthRemaining,
},
}).Return(&v1.DispatchCheckResponse{
Membership: v1.DispatchCheckResponse_MEMBER,
Metadata: &v1.ResponseMeta{
DispatchCount: 1,
DepthRequired: step.depthRequired,
},
}, nil).Times(1)
}
Expand All @@ -109,7 +111,7 @@ func TestMaxDepthCaching(t *testing.T) {
Subject: tuple.ParseSubjectONR(step.goal),
Metadata: &v1.ResolverMeta{
AtRevision: step.atRevision.String(),
DepthRemaining: uint32(step.depthRemaining),
DepthRemaining: step.depthRemaining,
},
})
require.NoError(err)
Expand Down
1 change: 1 addition & 0 deletions internal/dispatch/graph/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func TestSimple(t *testing.T) {

require.NoError(err)
require.Equal(expected.isMember, checkResult.Membership == v1.DispatchCheckResponse_MEMBER)
require.GreaterOrEqual(checkResult.Metadata.DepthRequired, uint32(1))
})
}
}
Expand Down
1 change: 1 addition & 0 deletions internal/dispatch/graph/expand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func TestExpand(t *testing.T) {

require.NoError(err)
require.NotNil(expandResult.TreeNode)
require.GreaterOrEqual(expandResult.Metadata.DepthRequired, uint32(1))

if diff := cmp.Diff(tc.expected, expandResult.TreeNode, protocmp.Transform()); diff != "" {
fset := token.NewFileSet()
Expand Down
1 change: 1 addition & 0 deletions internal/dispatch/graph/lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func TestSimpleLookup(t *testing.T) {

require.NoError(err)
require.ElementsMatch(tc.resolvedObjects, lookupResult.ResolvedOnrs)
require.GreaterOrEqual(lookupResult.Metadata.DepthRequired, uint32(1))
})
}
}
Expand Down
39 changes: 26 additions & 13 deletions internal/graph/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (cc *ConcurrentChecker) checkDirect(ctx context.Context, req *v1.DispatchCh
for tpl := it.Next(); tpl != nil; tpl = it.Next() {
tplUserset := tpl.User.GetUserset()
if onrEqual(tplUserset, req.Subject) {
resultChan <- checkResult(v1.DispatchCheckResponse_MEMBER, 1)
resultChan <- checkResult(v1.DispatchCheckResponse_MEMBER, 1, 0)
return
}
if tplUserset.Relation != Ellipsis {
Expand Down Expand Up @@ -215,10 +215,11 @@ func (cc *ConcurrentChecker) checkTupleToUserset(ctx context.Context, req *v1.Di
// all returns whether all of the lazy checks pass, and is used for intersection.
func all(ctx context.Context, requests []ReduceableCheckFunc) CheckResult {
if len(requests) == 0 {
return checkResult(v1.DispatchCheckResponse_NOT_MEMBER, 0)
return checkResult(v1.DispatchCheckResponse_NOT_MEMBER, 0, 0)
}

var totalRequestCount uint32
var maxDepthRequired uint32

resultChan := make(chan CheckResult, len(requests))
childCtx, cancelFn := context.WithCancel(ctx)
Expand All @@ -232,19 +233,20 @@ func all(ctx context.Context, requests []ReduceableCheckFunc) CheckResult {
select {
case result := <-resultChan:
totalRequestCount += result.Resp.Metadata.DispatchCount
maxDepthRequired = max(maxDepthRequired, result.Resp.Metadata.DepthRequired)
if result.Err != nil {
return checkResultError(result.Err, totalRequestCount)
}

if result.Resp.Membership != v1.DispatchCheckResponse_MEMBER {
return checkResult(v1.DispatchCheckResponse_NOT_MEMBER, totalRequestCount)
return checkResult(v1.DispatchCheckResponse_NOT_MEMBER, totalRequestCount, maxDepthRequired)
}
case <-ctx.Done():
return checkResultError(NewRequestCanceledErr(), totalRequestCount)
}
}

return checkResult(v1.DispatchCheckResponse_MEMBER, totalRequestCount)
return checkResult(v1.DispatchCheckResponse_MEMBER, totalRequestCount, maxDepthRequired)
}

// checkError returns the error.
Expand All @@ -257,21 +259,21 @@ func checkError(err error) ReduceableCheckFunc {
// alwaysMember returns that the check always passes.
func alwaysMember() ReduceableCheckFunc {
return func(ctx context.Context, resultChan chan<- CheckResult) {
resultChan <- checkResult(v1.DispatchCheckResponse_MEMBER, 0)
resultChan <- checkResult(v1.DispatchCheckResponse_MEMBER, 0, 0)
}
}

// notMember returns that the check always returns false.
func notMember() ReduceableCheckFunc {
return func(ctx context.Context, resultChan chan<- CheckResult) {
resultChan <- checkResult(v1.DispatchCheckResponse_NOT_MEMBER, 0)
resultChan <- checkResult(v1.DispatchCheckResponse_NOT_MEMBER, 0, 0)
}
}

// any returns whether any one of the lazy checks pass, and is used for union.
func any(ctx context.Context, requests []ReduceableCheckFunc) CheckResult {
if len(requests) == 0 {
return checkResult(v1.DispatchCheckResponse_NOT_MEMBER, 0)
return checkResult(v1.DispatchCheckResponse_NOT_MEMBER, 0, 0)
}

resultChan := make(chan CheckResult, len(requests))
Expand All @@ -283,13 +285,16 @@ func any(ctx context.Context, requests []ReduceableCheckFunc) CheckResult {
}

var totalRequestCount uint32
var maxDepthRequired uint32

for i := 0; i < len(requests); i++ {
select {
case result := <-resultChan:
log.Trace().Object("any result", result.Resp).Send()
totalRequestCount += result.Resp.Metadata.DispatchCount
maxDepthRequired = max(maxDepthRequired, result.Resp.Metadata.DepthRequired)
if result.Err == nil && result.Resp.Membership == v1.DispatchCheckResponse_MEMBER {
return checkResult(v1.DispatchCheckResponse_MEMBER, totalRequestCount)
return checkResult(v1.DispatchCheckResponse_MEMBER, totalRequestCount, maxDepthRequired)
}
if result.Err != nil {
return checkResultError(result.Err, totalRequestCount)
Expand All @@ -300,7 +305,7 @@ func any(ctx context.Context, requests []ReduceableCheckFunc) CheckResult {
}
}

return checkResult(v1.DispatchCheckResponse_NOT_MEMBER, totalRequestCount)
return checkResult(v1.DispatchCheckResponse_NOT_MEMBER, totalRequestCount, maxDepthRequired)
}

// difference returns whether the first lazy check passes and none of the supsequent checks pass.
Expand All @@ -317,39 +322,46 @@ func difference(ctx context.Context, requests []ReduceableCheckFunc) CheckResult
}

var totalRequestCount uint32
var maxDepthRequired uint32

for i := 0; i < len(requests); i++ {
select {
case base := <-baseChan:
totalRequestCount += base.Resp.Metadata.DispatchCount
maxDepthRequired = max(maxDepthRequired, base.Resp.Metadata.DepthRequired)

if base.Err != nil {
return checkResultError(base.Err, totalRequestCount)
}

if base.Resp.Membership != v1.DispatchCheckResponse_MEMBER {
return checkResult(v1.DispatchCheckResponse_NOT_MEMBER, totalRequestCount)
return checkResult(v1.DispatchCheckResponse_NOT_MEMBER, totalRequestCount, maxDepthRequired)
}
case sub := <-othersChan:
totalRequestCount += sub.Resp.Metadata.DispatchCount
maxDepthRequired = max(maxDepthRequired, sub.Resp.Metadata.DepthRequired)

if sub.Err != nil {
return checkResultError(sub.Err, totalRequestCount)
}

if sub.Resp.Membership == v1.DispatchCheckResponse_MEMBER {
return checkResult(v1.DispatchCheckResponse_NOT_MEMBER, totalRequestCount)
return checkResult(v1.DispatchCheckResponse_NOT_MEMBER, totalRequestCount, maxDepthRequired)
}
case <-ctx.Done():
return checkResultError(NewRequestCanceledErr(), totalRequestCount)
}
}

return checkResult(v1.DispatchCheckResponse_MEMBER, totalRequestCount)
return checkResult(v1.DispatchCheckResponse_MEMBER, totalRequestCount, maxDepthRequired)
}

func checkResult(membership v1.DispatchCheckResponse_Membership, numRequests uint32) CheckResult {
func checkResult(membership v1.DispatchCheckResponse_Membership, numRequests uint32, maxDepthRequired uint32) CheckResult {
return CheckResult{
&v1.DispatchCheckResponse{
Metadata: &v1.ResponseMeta{
DispatchCount: numRequests,
DepthRequired: maxDepthRequired + 1, // +1 for the current call.
},
Membership: membership,
},
Expand All @@ -362,6 +374,7 @@ func checkResultError(err error, numRequests uint32) CheckResult {
&v1.DispatchCheckResponse{
Metadata: &v1.ResponseMeta{
DispatchCount: numRequests,
DepthRequired: 1,
},
Membership: v1.DispatchCheckResponse_UNKNOWN,
},
Expand Down
Loading

0 comments on commit 3e4faff

Please sign in to comment.