diff --git a/agent/consul/acl.go b/agent/consul/acl.go index ed33836d4453..7553291fef96 100644 --- a/agent/consul/acl.go +++ b/agent/consul/acl.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sentinel" + "golang.org/x/sync/singleflight" "golang.org/x/time/rate" ) @@ -81,28 +82,12 @@ func IsACLRemoteError(err error) bool { type ACLResolverDelegate interface { ACLsEnabled() bool ACLDatacenter(legacy bool) string - // UseLegacyACLs UseLegacyACLs() bool ResolveIdentityFromToken(token string) (bool, structs.ACLIdentity, error) ResolvePolicyFromID(policyID string) (bool, *structs.ACLPolicy, error) RPC(method string, args interface{}, reply interface{}) error } -type remoteACLLegacyResult struct { - authorizer acl.Authorizer - err error -} - -type remoteACLIdentityResult struct { - identity structs.ACLIdentity - err error -} - -type remoteACLPolicyResult struct { - policy *structs.ACLPolicy - err error -} - type policyTokenError struct { Err error token string @@ -146,7 +131,7 @@ type ACLResolverConfig struct { // // When the down policy is set to async-cache and we have already cached values // then go routines will be spawned to perform the RPCs in the background -// and then will udpate the cache with either the positive or negative result. +// and then will update the cache with either the positive or negative result. // // When the down policy is set to extend-cache or the token/policy is not already // cached then the same go routines are spawned to do the RPCs in the background. @@ -161,13 +146,10 @@ type ACLResolver struct { delegate ACLResolverDelegate sentinel sentinel.Evaluator - cache *structs.ACLCaches - asyncIdentityResults map[string][]chan (*remoteACLIdentityResult) - asyncIdentityResultsMutex sync.RWMutex - asyncPolicyResults map[string][]chan (*remoteACLPolicyResult) - asyncPolicyResultsMutex sync.RWMutex - asyncLegacyResults map[string][]chan (*remoteACLLegacyResult) - asyncLegacyMutex sync.RWMutex + cache *structs.ACLCaches + identityGroup singleflight.Group + policyGroup singleflight.Group + legacyGroup singleflight.Group down acl.Authorizer @@ -211,40 +193,17 @@ func NewACLResolver(config *ACLResolverConfig) (*ACLResolver, error) { } return &ACLResolver{ - config: config.Config, - logger: config.Logger, - delegate: config.Delegate, - sentinel: config.Sentinel, - cache: cache, - asyncIdentityResults: make(map[string][]chan (*remoteACLIdentityResult)), - asyncPolicyResults: make(map[string][]chan (*remoteACLPolicyResult)), - asyncLegacyResults: make(map[string][]chan (*remoteACLLegacyResult)), - autoDisable: config.AutoDisable, - down: down, + config: config.Config, + logger: config.Logger, + delegate: config.Delegate, + sentinel: config.Sentinel, + cache: cache, + autoDisable: config.AutoDisable, + down: down, }, nil } -// fireAsyncLegacyResult is used to notify any watchers that legacy resolution of a token is complete -func (r *ACLResolver) fireAsyncLegacyResult(token string, authorizer acl.Authorizer, ttl time.Duration, err error) { - // cache the result: positive or negative - r.cache.PutAuthorizerWithTTL(token, authorizer, ttl) - - // get the list of channels to send the result to - r.asyncLegacyMutex.Lock() - channels := r.asyncLegacyResults[token] - delete(r.asyncLegacyResults, token) - r.asyncLegacyMutex.Unlock() - - // notify all watchers of the RPC results - result := &remoteACLLegacyResult{authorizer, err} - for _, cx := range channels { - // only chans that are being blocked on will be in the list of channels so this cannot block - cx <- result - close(cx) - } -} - -func (r *ACLResolver) resolveTokenLegacyAsync(token string, cached *structs.AuthorizerCacheEntry) { +func (r *ACLResolver) fetchAndCacheTokenLegacy(token string, cached *structs.AuthorizerCacheEntry) (acl.Authorizer, error) { req := structs.ACLPolicyResolveLegacyRequest{ Datacenter: r.delegate.ACLDatacenter(true), ACL: token, @@ -260,8 +219,12 @@ func (r *ACLResolver) resolveTokenLegacyAsync(token string, cached *structs.Auth if err == nil { parent := acl.RootAuthorizer(reply.Parent) if parent == nil { - r.fireAsyncLegacyResult(token, cached.Authorizer, cacheTTL, acl.ErrInvalidParent) - return + var authorizer acl.Authorizer + if cached != nil { + authorizer = cached.Authorizer + } + r.cache.PutAuthorizerWithTTL(token, authorizer, cacheTTL) + return authorizer, acl.ErrInvalidParent } var policies []*acl.Policy @@ -271,30 +234,32 @@ func (r *ACLResolver) resolveTokenLegacyAsync(token string, cached *structs.Auth } authorizer, err := acl.NewPolicyAuthorizer(parent, policies, r.sentinel) - r.fireAsyncLegacyResult(token, authorizer, reply.TTL, err) - return + + r.cache.PutAuthorizerWithTTL(token, authorizer, reply.TTL) + return authorizer, err } if acl.IsErrNotFound(err) { // Make sure to remove from the cache if it was deleted - r.fireAsyncLegacyResult(token, nil, cacheTTL, acl.ErrNotFound) - return + r.cache.PutAuthorizerWithTTL(token, nil, cacheTTL) + return nil, acl.ErrNotFound + } // some other RPC error switch r.config.ACLDownPolicy { case "allow": - r.fireAsyncLegacyResult(token, acl.AllowAll(), cacheTTL, nil) - return + r.cache.PutAuthorizerWithTTL(token, acl.AllowAll(), cacheTTL) + return acl.AllowAll(), nil case "async-cache", "extend-cache": if cached != nil { - r.fireAsyncLegacyResult(token, cached.Authorizer, cacheTTL, nil) - return + r.cache.PutAuthorizerWithTTL(token, cached.Authorizer, cacheTTL) + return cached.Authorizer, nil } fallthrough default: - r.fireAsyncLegacyResult(token, acl.DenyAll(), cacheTTL, nil) - return + r.cache.PutAuthorizerWithTTL(token, acl.DenyAll(), cacheTTL) + return acl.DenyAll(), nil } } @@ -331,32 +296,12 @@ func (r *ACLResolver) resolveTokenLegacy(token string) (acl.Authorizer, error) { metrics.IncrCounter([]string{"acl", "token", "cache_miss"}, 1) // Resolve the token in the background and wait on the result if we must - var waitChan chan *remoteACLLegacyResult - waitForResult := entry == nil || r.config.ACLDownPolicy != "async-cache" - - r.asyncLegacyMutex.Lock() - - // check if resolution for this token is already happening - waiters, ok := r.asyncLegacyResults[token] - if !ok || waiters == nil { - // initialize the slice of waiters if not already done - waiters = make([]chan *remoteACLLegacyResult, 0) - } - if waitForResult { - // create the waitChan only if we are going to block waiting - // for the response and then append it to the list of waiters - // Because we will block (not select or discard this chan) we - // do not need to create it as buffered - waitChan = make(chan *remoteACLLegacyResult) - r.asyncLegacyResults[token] = append(waiters, waitChan) - } - r.asyncLegacyMutex.Unlock() - - if !ok { - // start the async RPC if it wasn't already ongoing - go r.resolveTokenLegacyAsync(token, entry) - } + waitChan := r.legacyGroup.DoChan(token, func() (interface{}, error) { + authorizer, err := r.fetchAndCacheTokenLegacy(token, entry) + return authorizer, err + }) + waitForResult := entry == nil || r.config.ACLDownPolicy != "async-cache" if !waitForResult { // waitForResult being false requires the cacheEntry to not be nil if entry.Authorizer != nil { @@ -367,30 +312,16 @@ func (r *ACLResolver) resolveTokenLegacy(token string) (acl.Authorizer, error) { // block waiting for the async RPC to finish. res := <-waitChan - return res.authorizer, res.err -} - -// fireAsyncTokenResult is used to notify all waiters that the results of a token resolution is complete -func (r *ACLResolver) fireAsyncTokenResult(token string, identity structs.ACLIdentity, err error) { - // cache the result: positive or negative - r.cache.PutIdentity(token, identity) - - // get the list of channels to send the result to - r.asyncIdentityResultsMutex.Lock() - channels := r.asyncIdentityResults[token] - delete(r.asyncIdentityResults, token) - r.asyncIdentityResultsMutex.Unlock() - // notify all watchers of the RPC results - result := &remoteACLIdentityResult{identity, err} - for _, cx := range channels { - // cannot block because all wait chans will have another goroutine blocked on the read - cx <- result - close(cx) + var authorizer acl.Authorizer + if res.Val != nil { // avoid a nil-not-nil bug + authorizer = res.Val.(acl.Authorizer) } + + return authorizer, res.Err } -func (r *ACLResolver) resolveIdentityFromTokenAsync(token string, cached *structs.IdentityCacheEntry) { +func (r *ACLResolver) fetchAndCacheIdentityFromToken(token string, cached *structs.IdentityCacheEntry) (structs.ACLIdentity, error) { req := structs.ACLTokenGetRequest{ Datacenter: r.delegate.ACLDatacenter(false), TokenID: token, @@ -405,28 +336,30 @@ func (r *ACLResolver) resolveIdentityFromTokenAsync(token string, cached *struct err := r.delegate.RPC("ACL.TokenRead", &req, &resp) if err == nil { if resp.Token == nil { - r.fireAsyncTokenResult(token, nil, acl.ErrNotFound) + r.cache.PutIdentity(token, nil) + return nil, acl.ErrNotFound } else { - r.fireAsyncTokenResult(token, resp.Token, nil) + r.cache.PutIdentity(token, resp.Token) + return resp.Token, nil } - return } if acl.IsErrNotFound(err) { // Make sure to remove from the cache if it was deleted - r.fireAsyncTokenResult(token, nil, acl.ErrNotFound) - return + r.cache.PutIdentity(token, nil) + return nil, acl.ErrNotFound + } // some other RPC error if cached != nil && (r.config.ACLDownPolicy == "extend-cache" || r.config.ACLDownPolicy == "async-cache") { // extend the cache - r.fireAsyncTokenResult(token, cached.Identity, nil) - return + r.cache.PutIdentity(token, cached.Identity) + return cached.Identity, nil } - r.fireAsyncTokenResult(token, nil, err) - return + r.cache.PutIdentity(token, nil) + return nil, err } func (r *ACLResolver) resolveIdentityFromToken(token string) (structs.ACLIdentity, error) { @@ -445,32 +378,12 @@ func (r *ACLResolver) resolveIdentityFromToken(token string) (structs.ACLIdentit metrics.IncrCounter([]string{"acl", "token", "cache_miss"}, 1) // Background a RPC request and wait on it if we must - var waitChan chan *remoteACLIdentityResult + waitChan := r.identityGroup.DoChan(token, func() (interface{}, error) { + identity, err := r.fetchAndCacheIdentityFromToken(token, cacheEntry) + return identity, err + }) waitForResult := cacheEntry == nil || r.config.ACLDownPolicy != "async-cache" - - r.asyncIdentityResultsMutex.Lock() - // check if resolution of this token is already ongoing - waiters, ok := r.asyncIdentityResults[token] - if !ok || waiters == nil { - // only initialize the slice of waiters if need be (when this token resolution isn't ongoing) - waiters = make([]chan *remoteACLIdentityResult, 0) - } - if waitForResult { - // create the waitChan only if we are going to block waiting - // for the response and then append it to the list of waiters - // Because we will block (not select or discard this chan) we - // do not need to create it as buffered - waitChan = make(chan *remoteACLIdentityResult) - r.asyncIdentityResults[token] = append(waiters, waitChan) - } - r.asyncIdentityResultsMutex.Unlock() - - if !ok { - // only start the RPC if one isn't in flight - go r.resolveIdentityFromTokenAsync(token, cacheEntry) - } - if !waitForResult { // waitForResult being false requires the cacheEntry to not be nil return cacheEntry.Identity, nil @@ -479,34 +392,18 @@ func (r *ACLResolver) resolveIdentityFromToken(token string) (structs.ACLIdentit // block on the read here, this is why we don't need chan buffering res := <-waitChan - if res.err != nil && !acl.IsErrNotFound(res.err) { - return res.identity, ACLRemoteError{Err: res.err} - } - return res.identity, res.err -} - -// fireAsyncPolicyResult is used to notify all waiters that policy resolution is complete. -func (r *ACLResolver) fireAsyncPolicyResult(policyID string, policy *structs.ACLPolicy, err error, updateCache bool) { - if updateCache { - // cache the result: positive or negative - r.cache.PutPolicy(policyID, policy) + var identity structs.ACLIdentity + if res.Val != nil { // avoid a nil-not-nil bug + identity = res.Val.(structs.ACLIdentity) } - // get the list of channels to send the result to - r.asyncPolicyResultsMutex.Lock() - channels := r.asyncPolicyResults[policyID] - delete(r.asyncPolicyResults, policyID) - r.asyncPolicyResultsMutex.Unlock() - - // notify all watchers of the RPC results - result := &remoteACLPolicyResult{policy, err} - for _, cx := range channels { - // not closing the channel as there could be more events to be fired. - cx <- result + if res.Err != nil && !acl.IsErrNotFound(res.Err) { + return identity, ACLRemoteError{Err: res.Err} } + return identity, res.Err } -func (r *ACLResolver) resolvePoliciesAsyncForIdentity(identity structs.ACLIdentity, policyIDs []string, cached map[string]*structs.PolicyCacheEntry) { +func (r *ACLResolver) fetchAndCachePoliciesForIdentity(identity structs.ACLIdentity, policyIDs []string, cached map[string]*structs.PolicyCacheEntry) (map[string]*structs.ACLPolicy, error) { req := structs.ACLPolicyBatchGetRequest{ Datacenter: r.delegate.ACLDatacenter(false), PolicyIDs: policyIDs, @@ -516,68 +413,66 @@ func (r *ACLResolver) resolvePoliciesAsyncForIdentity(identity structs.ACLIdenti }, } - found := make(map[string]struct{}) var resp structs.ACLPolicyBatchResponse err := r.delegate.RPC("ACL.PolicyResolve", &req, &resp) if err == nil { + out := make(map[string]*structs.ACLPolicy) for _, policy := range resp.Policies { - r.fireAsyncPolicyResult(policy.ID, policy, nil, true) - found[policy.ID] = struct{}{} + out[policy.ID] = policy } for _, policyID := range policyIDs { - if _, ok := found[policyID]; !ok { - r.fireAsyncPolicyResult(policyID, nil, acl.ErrNotFound, true) + if policy, ok := out[policyID]; ok { + r.cache.PutPolicy(policyID, policy) + } else { + r.cache.PutPolicy(policyID, nil) } } - return + return out, nil } if acl.IsErrNotFound(err) { // make sure to indicate that this identity is no longer valid within // the cache - // - // Note - This must be done before firing the results or else it would - // be possible for waiters to get woken up an get the cached identity - // again r.cache.PutIdentity(identity.SecretToken(), nil) - for _, policyID := range policyIDs { - // Do not touch the cache. Getting a top level ACL not found error - // only indicates that the secret token used in the request - // no longer exists - r.fireAsyncPolicyResult(policyID, nil, &policyTokenError{acl.ErrNotFound, identity.SecretToken()}, false) - } - return + + // Do not touch the policy cache. Getting a top level ACL not found error + // only indicates that the secret token used in the request + // no longer exists + return nil, &policyTokenError{acl.ErrNotFound, identity.SecretToken()} } if acl.IsErrPermissionDenied(err) { // invalidate our ID cache so that identity resolution will take place // again in the future - // - // Note - This must be done before firing the results or else it would - // be possible for waiters to get woken up and get the cached identity - // again r.cache.RemoveIdentity(identity.SecretToken()) - for _, policyID := range policyIDs { - // Do not remove from the cache for permission denied - // what this does indicate is that our view of the token is out of date - r.fireAsyncPolicyResult(policyID, nil, &policyTokenError{acl.ErrPermissionDenied, identity.SecretToken()}, false) - } - return + // Do not remove from the policy cache for permission denied + // what this does indicate is that our view of the token is out of date + return nil, &policyTokenError{acl.ErrPermissionDenied, identity.SecretToken()} } // other RPC error - use cache if available extendCache := r.config.ACLDownPolicy == "extend-cache" || r.config.ACLDownPolicy == "async-cache" + + out := make(map[string]*structs.ACLPolicy) + insufficientCache := false for _, policyID := range policyIDs { if entry, ok := cached[policyID]; extendCache && ok { - r.fireAsyncPolicyResult(policyID, entry.Policy, nil, true) + r.cache.PutPolicy(policyID, entry.Policy) + if entry.Policy != nil { + out[policyID] = entry.Policy + } } else { - r.fireAsyncPolicyResult(policyID, nil, ACLRemoteError{Err: err}, true) + r.cache.PutPolicy(policyID, nil) + insufficientCache = true } } - return + if insufficientCache { + return nil, ACLRemoteError{Err: err} + } + return out, nil } func (r *ACLResolver) filterPoliciesByScope(policies structs.ACLPolicies) structs.ACLPolicies { @@ -660,70 +555,37 @@ func (r *ACLResolver) resolvePoliciesForIdentity(identity structs.ACLIdentity) ( return r.filterPoliciesByScope(policies), nil } + hasMissing := len(missing) > 0 + fetchIDs := missing for _, policy := range expired { fetchIDs = append(fetchIDs, policy.ID) } // Background a RPC request and wait on it if we must - var waitChan chan *remoteACLPolicyResult - waitForResult := len(missing) > 0 || r.config.ACLDownPolicy != "async-cache" - if waitForResult { - // buffered because there are going to be multiple go routines that send data to this chan - waitChan = make(chan *remoteACLPolicyResult, len(fetchIDs)) - } - - var newAsyncFetchIds []string - r.asyncPolicyResultsMutex.Lock() - for _, policyID := range fetchIDs { - clients, ok := r.asyncPolicyResults[policyID] - if !ok || clients == nil { - clients = make([]chan *remoteACLPolicyResult, 0) - } - if waitForResult { - r.asyncPolicyResults[policyID] = append(clients, waitChan) - } - - if !ok { - newAsyncFetchIds = append(newAsyncFetchIds, policyID) - } - } - r.asyncPolicyResultsMutex.Unlock() - - if len(newAsyncFetchIds) > 0 { - // only start the RPC if one isn't in flight - go r.resolvePoliciesAsyncForIdentity(identity, newAsyncFetchIds, expCacheMap) - } + waitChan := r.policyGroup.DoChan(identity.SecretToken(), func() (interface{}, error) { + policies, err := r.fetchAndCachePoliciesForIdentity(identity, fetchIDs, expCacheMap) + return policies, err + }) + waitForResult := hasMissing || r.config.ACLDownPolicy != "async-cache" if !waitForResult { // waitForResult being false requires that all the policies were cached already policies = append(policies, expired...) return r.filterPoliciesByScope(policies), nil } - for i := 0; i < len(fetchIDs); i++ { - res := <-waitChan + res := <-waitChan - if res.err != nil { - if _, ok := res.err.(*policyTokenError); ok { - // always return token errors - return nil, res.err - } else if !acl.IsErrNotFound(res.err) { - // ignore regular not found errors for policies - return nil, res.err - } - } + if res.Err != nil { + return nil, res.Err + } - // we probably could handle a special case where we - // get a permission denied error due to another requests - // issues and spawn the go routine to resolve it ourselves. - // however this should be exceedingly rare and in this case - // we can just kick the can down the road and retry the whole - // token/policy resolution. All the remaining good bits that - // we need will already be cached anyways. + if res.Val != nil { + foundPolicies := res.Val.(map[string]*structs.ACLPolicy) - if res.policy != nil { - policies = append(policies, res.policy) + for _, policy := range foundPolicies { + policies = append(policies, policy) } } diff --git a/agent/consul/acl_test.go b/agent/consul/acl_test.go index 9275f8cb8598..cf53f9d01483 100644 --- a/agent/consul/acl_test.go +++ b/agent/consul/acl_test.go @@ -129,7 +129,7 @@ func testIdentityForToken(token string) (bool, structs.ACLIdentity, error) { }, }, }, nil - case "concurrent-resolve-1": + case "concurrent-resolve": return true, &structs.ACLToken{ AccessorID: "5f57c1f6-6a89-4186-9445-531b316e01df", SecretID: "a1a54629-5050-4d17-8a4e-560d2423f835", @@ -142,19 +142,6 @@ func testIdentityForToken(token string) (bool, structs.ACLIdentity, error) { }, }, }, nil - case "concurrent-resolve-2": - return true, &structs.ACLToken{ - AccessorID: "296bbe10-01aa-437e-ac3b-3ecdc00ea65c", - SecretID: "cc58f0f3-2273-42a7-8b4a-2bef9d2863d7", - Policies: []structs.ACLTokenPolicyLink{ - structs.ACLTokenPolicyLink{ - ID: "node-wr", - }, - structs.ACLTokenPolicyLink{ - ID: "acl-wr", - }, - }, - }, nil case anonymousToken: return true, &structs.ACLToken{ AccessorID: "00000000-0000-0000-0000-000000000002", @@ -737,6 +724,135 @@ func TestACLResolver_DownPolicy(t *testing.T) { requireIdentityCached(t, r, "foo", false, "no longer cached") }) + + t.Run("PolicyResolve-TokenNotFound", func(t *testing.T) { + t.Parallel() + + _, rawToken, _ := testIdentityForToken("found") + foundToken := rawToken.(*structs.ACLToken) + secretID := foundToken.SecretID + + tokenResolved := false + policyResolved := false + delegate := &ACLResolverTestDelegate{ + enabled: true, + datacenter: "dc1", + legacy: false, + localTokens: false, + localPolicies: false, + tokenReadFn: func(args *structs.ACLTokenGetRequest, reply *structs.ACLTokenResponse) error { + if !tokenResolved { + reply.Token = foundToken + tokenResolved = true + return nil + } + + return fmt.Errorf("Not Supposed to be Invoked again") + }, + policyResolveFn: func(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error { + if !policyResolved { + for _, policyID := range args.PolicyIDs { + _, policy, _ := testPolicyForID(policyID) + if policy != nil { + reply.Policies = append(reply.Policies, policy) + } + } + policyResolved = true + return nil + } + return acl.ErrNotFound // test condition + }, + } + r := newTestACLResolver(t, delegate, func(config *ACLResolverConfig) { + config.Config.ACLDownPolicy = "extend-cache" + config.Config.ACLTokenTTL = 0 + config.Config.ACLPolicyTTL = 0 + }) + + // Prime the standard caches. + authz, err := r.ResolveToken(secretID) + require.NoError(t, err) + require.NotNil(t, authz) + require.True(t, authz.NodeWrite("foo", nil)) + + // Verify that the caches are setup properly. + requireIdentityCached(t, r, secretID, true, "cached") + requirePolicyCached(t, r, "node-wr", true, "cached") // from "found" token + requirePolicyCached(t, r, "dc2-key-wr", true, "cached") // from "found" token + + // Nuke 1 policy from the cache so that we force a policy resolve + // during token resolve. + r.cache.RemovePolicy("dc2-key-wr") + + _, err = r.ResolveToken(secretID) + require.True(t, acl.IsErrNotFound(err)) + + requireIdentityCached(t, r, secretID, false, "identity not found cached") + requirePolicyCached(t, r, "node-wr", true, "still cached") + require.Nil(t, r.cache.GetPolicy("dc2-key-wr"), "not stored at all") + }) + + t.Run("PolicyResolve-PermissionDenied", func(t *testing.T) { + t.Parallel() + + _, rawToken, _ := testIdentityForToken("found") + foundToken := rawToken.(*structs.ACLToken) + secretID := foundToken.SecretID + + policyResolved := false + delegate := &ACLResolverTestDelegate{ + enabled: true, + datacenter: "dc1", + legacy: false, + localTokens: false, + localPolicies: false, + tokenReadFn: func(args *structs.ACLTokenGetRequest, reply *structs.ACLTokenResponse) error { + // no limit + reply.Token = foundToken + return nil + }, + policyResolveFn: func(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error { + if !policyResolved { + for _, policyID := range args.PolicyIDs { + _, policy, _ := testPolicyForID(policyID) + if policy != nil { + reply.Policies = append(reply.Policies, policy) + } + } + policyResolved = true + return nil + } + return acl.ErrPermissionDenied // test condition + }, + } + r := newTestACLResolver(t, delegate, func(config *ACLResolverConfig) { + config.Config.ACLDownPolicy = "extend-cache" + config.Config.ACLTokenTTL = 0 + config.Config.ACLPolicyTTL = 0 + }) + + // Prime the standard caches. + authz, err := r.ResolveToken(secretID) + require.NoError(t, err) + require.NotNil(t, authz) + require.True(t, authz.NodeWrite("foo", nil)) + + // Verify that the caches are setup properly. + requireIdentityCached(t, r, secretID, true, "cached") + requirePolicyCached(t, r, "node-wr", true, "cached") // from "found" token + requirePolicyCached(t, r, "dc2-key-wr", true, "cached") // from "found" token + + // Nuke 1 policy from the cache so that we force a policy resolve + // during token resolve. + r.cache.RemovePolicy("dc2-key-wr") + + _, err = r.ResolveToken(secretID) + require.True(t, acl.IsErrPermissionDenied(err)) + + require.Nil(t, r.cache.GetIdentity(secretID), "identity not stored at all") + requirePolicyCached(t, r, "node-wr", true, "still cached") + require.Nil(t, r.cache.GetPolicy("dc2-key-wr"), "not stored at all") + }) } func TestACLResolver_DatacenterScoping(t *testing.T) { @@ -893,7 +1009,7 @@ func TestACLResolver_Client(t *testing.T) { switch args.TokenID { case "a1a54629-5050-4d17-8a4e-560d2423f835": - _, token, _ := testIdentityForToken("concurrent-resolve-1") + _, token, _ := testIdentityForToken("concurrent-resolve") reply.Token = token.(*structs.ACLToken) default: return acl.ErrNotFound @@ -907,6 +1023,7 @@ func TestACLResolver_Client(t *testing.T) { }, policyResolveFn: func(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error { atomic.AddInt32(&policyResolves, 1) + for _, policyID := range args.PolicyIDs { _, policy, _ := testPolicyForID(policyID) if policy != nil { @@ -939,255 +1056,6 @@ func TestACLResolver_Client(t *testing.T) { require.Equal(t, int32(1), tokenReads) require.Equal(t, int32(1), policyResolves) }) - - t.Run("Concurrent-Policy-Resolve", func(t *testing.T) { - t.Parallel() - - var tokenReads int32 - var policyResolves int32 - delegate := &ACLResolverTestDelegate{ - enabled: true, - datacenter: "dc1", - legacy: false, - localTokens: false, - localPolicies: false, - tokenReadFn: func(args *structs.ACLTokenGetRequest, reply *structs.ACLTokenResponse) error { - atomic.AddInt32(&tokenReads, 1) - - switch args.TokenID { - case "a1a54629-5050-4d17-8a4e-560d2423f835": - _, token, _ := testIdentityForToken("concurrent-resolve-1") - reply.Token = token.(*structs.ACLToken) - case "cc58f0f3-2273-42a7-8b4a-2bef9d2863d7": - _, token, _ := testIdentityForToken("concurrent-resolve-2") - reply.Token = token.(*structs.ACLToken) - default: - return acl.ErrNotFound - } - - return nil - }, - policyResolveFn: func(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error { - atomic.AddInt32(&policyResolves, 1) - // waits until both tokens have been read for up to 1 second - for i := 0; i < 100; i++ { - time.Sleep(10 * time.Millisecond) - reads := atomic.LoadInt32(&tokenReads) - if reads >= 2 { - time.Sleep(100 * time.Millisecond) - break - } - } - - for _, policyID := range args.PolicyIDs { - _, policy, _ := testPolicyForID(policyID) - if policy != nil { - reply.Policies = append(reply.Policies, policy) - } - } - return nil - }, - } - - r := newTestACLResolver(t, delegate, func(config *ACLResolverConfig) { - config.Config.ACLTokenTTL = 600 * time.Second - // effectively disables the cache - therefore the only way we end up - // with 1 policy resolution is if they get single flighted - config.Config.ACLPolicyTTL = 0 * time.Millisecond - config.Config.ACLDownPolicy = "extend-cache" - }) - - ch1 := make(chan *asyncResolutionResult) - ch2 := make(chan *asyncResolutionResult) - - go resolveTokenAsync(r, "a1a54629-5050-4d17-8a4e-560d2423f835", ch1) - go resolveTokenAsync(r, "cc58f0f3-2273-42a7-8b4a-2bef9d2863d7", ch2) - - res1 := <-ch1 - res2 := <-ch2 - - require.NoError(t, res1.err) - require.NoError(t, res2.err) - require.Equal(t, res1.authz, res2.authz) - require.Equal(t, int32(2), tokenReads) - require.Equal(t, int32(1), policyResolves) - }) - - t.Run("Concurrent-Policy-Resolve-Permission-Denied", func(t *testing.T) { - t.Parallel() - - var waitReady int32 = 1 - var tokenReads int32 - var policyResolves int32 - delegate := &ACLResolverTestDelegate{ - enabled: true, - datacenter: "dc1", - legacy: false, - localTokens: false, - localPolicies: false, - tokenReadFn: func(args *structs.ACLTokenGetRequest, reply *structs.ACLTokenResponse) error { - atomic.AddInt32(&tokenReads, 1) - - switch args.TokenID { - case "a1a54629-5050-4d17-8a4e-560d2423f835": - _, token, _ := testIdentityForToken("concurrent-resolve-1") - reply.Token = token.(*structs.ACLToken) - case "cc58f0f3-2273-42a7-8b4a-2bef9d2863d7": - _, token, _ := testIdentityForToken("concurrent-resolve-2") - reply.Token = token.(*structs.ACLToken) - default: - return acl.ErrNotFound - } - - return nil - }, - policyResolveFn: func(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error { - atomic.AddInt32(&policyResolves, 1) - - if atomic.CompareAndSwapInt32(&waitReady, 1, 0) { - // waits until both tokens have been read for up to 1 second - for i := 0; i < 100; i++ { - time.Sleep(10 * time.Millisecond) - reads := atomic.LoadInt32(&tokenReads) - if reads >= 2 { - time.Sleep(100 * time.Millisecond) - break - } - } - - return acl.ErrPermissionDenied - } - - for _, policyID := range args.PolicyIDs { - _, policy, _ := testPolicyForID(policyID) - if policy != nil { - reply.Policies = append(reply.Policies, policy) - } - } - return nil - }, - } - - r := newTestACLResolver(t, delegate, func(config *ACLResolverConfig) { - config.Config.ACLTokenTTL = 600 * time.Second - config.Config.ACLPolicyTTL = 600 * time.Second - config.Config.ACLDownPolicy = "extend-cache" - }) - - ch1 := make(chan *asyncResolutionResult) - ch2 := make(chan *asyncResolutionResult) - - go resolveTokenAsync(r, "a1a54629-5050-4d17-8a4e-560d2423f835", ch1) - go resolveTokenAsync(r, "cc58f0f3-2273-42a7-8b4a-2bef9d2863d7", ch2) - - res1 := <-ch1 - res2 := <-ch2 - - require.NoError(t, res1.err) - require.NoError(t, res2.err) - require.Equal(t, res1.authz, res2.authz) - // 2 reads for 1 token (cache gets invalidated and only 1 for the other) - require.Equal(t, int32(3), tokenReads) - require.Equal(t, int32(2), policyResolves) - require.True(t, res1.authz.ACLRead()) - require.True(t, res1.authz.NodeWrite("foo", nil)) - }) - - t.Run("Concurrent-Policy-Resolve-Not-Found", func(t *testing.T) { - t.Parallel() - - var waitReady int32 = 1 - var tokenReads int32 - var policyResolves int32 - var tokenNotAllowed string - delegate := &ACLResolverTestDelegate{ - enabled: true, - datacenter: "dc1", - legacy: false, - localTokens: false, - localPolicies: false, - tokenReadFn: func(args *structs.ACLTokenGetRequest, reply *structs.ACLTokenResponse) error { - atomic.AddInt32(&tokenReads, 1) - - switch args.TokenID { - case "a1a54629-5050-4d17-8a4e-560d2423f835": - _, token, _ := testIdentityForToken("concurrent-resolve-1") - reply.Token = token.(*structs.ACLToken) - case "cc58f0f3-2273-42a7-8b4a-2bef9d2863d7": - _, token, _ := testIdentityForToken("concurrent-resolve-2") - reply.Token = token.(*structs.ACLToken) - default: - return acl.ErrNotFound - } - - return nil - }, - policyResolveFn: func(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error { - atomic.AddInt32(&policyResolves, 1) - - if atomic.CompareAndSwapInt32(&waitReady, 1, 0) { - // waits until both tokens have been read for up to 1 second - for i := 0; i < 100; i++ { - time.Sleep(10 * time.Millisecond) - reads := atomic.LoadInt32(&tokenReads) - if reads >= 2 { - time.Sleep(100 * time.Millisecond) - break - } - } - - tokenNotAllowed = args.Token - return acl.ErrNotFound - } - - for _, policyID := range args.PolicyIDs { - _, policy, _ := testPolicyForID(policyID) - if policy != nil { - reply.Policies = append(reply.Policies, policy) - } - } - return nil - }, - } - - r := newTestACLResolver(t, delegate, func(config *ACLResolverConfig) { - config.Config.ACLTokenTTL = 600 * time.Second - config.Config.ACLPolicyTTL = 600 * time.Second - config.Config.ACLDownPolicy = "extend-cache" - }) - - ch1 := make(chan *asyncResolutionResult) - ch2 := make(chan *asyncResolutionResult) - - go resolveTokenAsync(r, "a1a54629-5050-4d17-8a4e-560d2423f835", ch1) - go resolveTokenAsync(r, "cc58f0f3-2273-42a7-8b4a-2bef9d2863d7", ch2) - - res1 := <-ch1 - res2 := <-ch2 - - var errResult *asyncResolutionResult - var goodResult *asyncResolutionResult - - // can't be sure which token resolution is going to be the one that does the first policy resolution - // so we record it and then determine here how the results should be validated - if tokenNotAllowed == "a1a54629-5050-4d17-8a4e-560d2423f835" { - errResult = res1 - goodResult = res2 - } else { - errResult = res2 - goodResult = res1 - } - - require.Error(t, errResult.err) - require.Nil(t, errResult.authz) - require.EqualError(t, errResult.err, acl.ErrNotFound.Error()) - require.NoError(t, goodResult.err) - require.Equal(t, int32(2), tokenReads) - require.Equal(t, int32(2), policyResolves) - require.NotNil(t, goodResult.authz) - require.True(t, goodResult.authz.ACLRead()) - require.True(t, goodResult.authz.NodeWrite("foo", nil)) - }) } func TestACLResolver_LocalTokensAndPolicies(t *testing.T) { diff --git a/vendor/golang.org/x/sync/singleflight/singleflight.go b/vendor/golang.org/x/sync/singleflight/singleflight.go new file mode 100644 index 000000000000..9a4f8d59e069 --- /dev/null +++ b/vendor/golang.org/x/sync/singleflight/singleflight.go @@ -0,0 +1,111 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package singleflight provides a duplicate function call suppression +// mechanism. +package singleflight // import "golang.org/x/sync/singleflight" + +import "sync" + +// call is an in-flight or completed singleflight.Do call +type call struct { + wg sync.WaitGroup + + // These fields are written once before the WaitGroup is done + // and are only read after the WaitGroup is done. + val interface{} + err error + + // These fields are read and written with the singleflight + // mutex held before the WaitGroup is done, and are read but + // not written after the WaitGroup is done. + dups int + chans []chan<- Result +} + +// Group represents a class of work and forms a namespace in +// which units of work can be executed with duplicate suppression. +type Group struct { + mu sync.Mutex // protects m + m map[string]*call // lazily initialized +} + +// Result holds the results of Do, so they can be passed +// on a channel. +type Result struct { + Val interface{} + Err error + Shared bool +} + +// Do executes and returns the results of the given function, making +// sure that only one execution is in-flight for a given key at a +// time. If a duplicate comes in, the duplicate caller waits for the +// original to complete and receives the same results. +// The return value shared indicates whether v was given to multiple callers. +func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + g.mu.Unlock() + c.wg.Wait() + return c.val, c.err, true + } + c := new(call) + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + g.doCall(c, key, fn) + return c.val, c.err, c.dups > 0 +} + +// DoChan is like Do but returns a channel that will receive the +// results when they are ready. +func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { + ch := make(chan Result, 1) + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + c.chans = append(c.chans, ch) + g.mu.Unlock() + return ch + } + c := &call{chans: []chan<- Result{ch}} + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + go g.doCall(c, key, fn) + + return ch +} + +// doCall handles the single call for a key. +func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { + c.val, c.err = fn() + c.wg.Done() + + g.mu.Lock() + delete(g.m, key) + for _, ch := range c.chans { + ch <- Result{c.val, c.err, c.dups > 0} + } + g.mu.Unlock() +} + +// Forget tells the singleflight to forget about a key. Future calls +// to Do for this key will call the function rather than waiting for +// an earlier call to complete. +func (g *Group) Forget(key string) { + g.mu.Lock() + delete(g.m, key) + g.mu.Unlock() +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 9f77482b35fe..1331d3f02c0d 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -320,6 +320,7 @@ {"path":"golang.org/x/net/trace","checksumSHA1":"u/r66lwYfgg682u5hZG7/E7+VCY=","revision":"d866cfc389cec985d6fda2859936a575a55a3ab6","revisionTime":"2017-12-11T20:45:21Z"}, {"path":"golang.org/x/oauth2","revision":""}, {"path":"golang.org/x/oauth2/google","revision":""}, + {"path":"golang.org/x/sync/singleflight","checksumSHA1":"VhUZFUuhLFSBFUfskMC4am5RIdc=","revision":"e225da77a7e68af35c70ccbf71af2b83e6acac3c","revisionTime":"2019-02-15T22:36:53Z"}, {"path":"golang.org/x/sys/cpu","checksumSHA1":"REkmyB368pIiip76LiqMLspgCRk=","revision":"ad87a3a340fa7f3bed189293fbfa7a9b7e021ae1","revisionTime":"2018-06-18T16:37:21Z"}, {"path":"golang.org/x/sys/unix","checksumSHA1":"su2QDjUzrUO0JnOH9m0cNg0QqsM=","revision":"ac767d655b305d4e9612f5f6e33120b9176c4ad4","revisionTime":"2018-07-08T03:57:06Z"}, {"path":"golang.org/x/sys/windows","checksumSHA1":"P8Y8GFwxybTfltfh8Q0lHqlEYIM=","revision":"ac767d655b305d4e9612f5f6e33120b9176c4ad4","revisionTime":"2018-07-08T03:57:06Z"},