diff --git a/pkg/cache/v3/cache.go b/pkg/cache/v3/cache.go index a453f97adf..d0d596a804 100644 --- a/pkg/cache/v3/cache.go +++ b/pkg/cache/v3/cache.go @@ -104,41 +104,67 @@ type Cache interface { // Response is a wrapper around Envoy's DiscoveryResponse. type Response interface { - // Get the Constructed DiscoveryResponse + // GetDiscoveryResponse returns the Constructed DiscoveryResponse. GetDiscoveryResponse() (*discovery.DiscoveryResponse, error) - // Get the original Request for the Response. + // GetRequest returns the request that created the watch that we're now responding to. + // This is provided to allow the caller to correlate the response with a request. + // Generally this will be the latest request seen on the stream for the specific type. GetRequest() *discovery.DiscoveryRequest - // Get the version in the Response. + // GetVersion returns the version in the Response. + // The version can be a property of the resources, allowing for optimizations in subsequent calls, + // or simply an internal property of the cache which can be used for debugging. + // The cache implementation should be able to determine if it can provide such optimization. + // Deprecated: use GetResponseVersion instead GetVersion() (string, error) + // GetResponseVersion returns the version in the Response. + // The version can be a property of the resources, allowing for optimizations in subsequent calls, + // or simply an internal property of the cache which can be used for debugging. + // The cache implementation should be able to determine if it can provide such optimization. + GetResponseVersion() string + // GetReturnedResources returns the map of resources and their versions returned in the subscription. // It may include more resources than directly set in the response to consider the full state of the client. // The caller is expected to provide this unchanged to the next call to CreateWatch as part of the subscription. GetReturnedResources() map[string]string - // Get the context provided during response creation. + // GetContext returns the context provided during response creation. GetContext() context.Context } -// DeltaResponse is a wrapper around Envoy's DeltaDiscoveryResponse +// DeltaResponse is a wrapper around Envoy's DeltaDiscoveryResponse. type DeltaResponse interface { - // Get the constructed DeltaDiscoveryResponse + // GetDeltaDiscoveryResponse returns the constructed DeltaDiscoveryResponse. GetDeltaDiscoveryResponse() (*discovery.DeltaDiscoveryResponse, error) - // Get the request that created the watch that we're now responding to. This is provided to allow the caller to correlate the - // response with a request. Generally this will be the latest request seen on the stream for the specific type. + // GetDeltaRequest returns the request that created the watch that we're now responding to. + // This is provided to allow the caller to correlate the response with a request. + // Generally this will be the latest request seen on the stream for the specific type. GetDeltaRequest() *discovery.DeltaDiscoveryRequest - // Get the version in the DeltaResponse. This field is generally used for debugging purposes as noted by the Envoy documentation. + // GetSystemVersion returns the version in the DeltaResponse. + // The version in delta response is not indicative of the resources included, + // but an internal property of the cache which can be used for debugging. + // Deprecated: use GetResponseVersion instead GetSystemVersion() (string, error) - // Get the version map of the internal cache. - // The version map consists of updated version mappings after this response is applied + // GetResponseVersion returns the version in the DeltaResponse. + // The version in delta response is not indicative of the resources included, + // but an internal property of the cache which can be used for debugging. + GetResponseVersion() string + + // GetNextVersionMap provides the version map of the internal cache. + // The version map consists of updated version mappings after this response is applied. + // Deprecated: use GetReturnedResources instead GetNextVersionMap() map[string]string - // Get the context provided during response creation + // GetReturnedResources provides the version map of the internal cache. + // The version map consists of updated version mappings after this response is applied. + GetReturnedResources() map[string]string + + // GetContext returns the context provided during response creation. GetContext() context.Context } @@ -183,12 +209,12 @@ type RawDeltaResponse struct { SystemVersionInfo string // Resources to be included in the response. - Resources []types.Resource + Resources []types.ResourceWithTTL // RemovedResources is a list of resource aliases which should be dropped by the consuming client. RemovedResources []string - // NextVersionMap consists of updated version mappings after this response is applied + // NextVersionMap consists of updated version mappings after this response is applied. NextVersionMap map[string]string // Context provided at the time of response creation. This allows associating additional @@ -290,8 +316,8 @@ func (r *RawDeltaResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscover marshaledResources := make([]*discovery.Resource, len(r.Resources)) for i, resource := range r.Resources { - name := GetResourceName(resource) - marshaledResource, err := MarshalResource(resource) + name := GetResourceName(resource.Resource) + marshaledResource, err := MarshalResource(resource.Resource) if err != nil { return nil, err } @@ -330,23 +356,41 @@ func (r *RawResponse) GetContext() context.Context { return r.Ctx } -// GetDeltaRequest returns the original DeltaRequest +// GetDeltaRequest returns the original DeltaRequest. func (r *RawDeltaResponse) GetDeltaRequest() *discovery.DeltaDiscoveryRequest { return r.DeltaRequest } // GetVersion returns the response version. +// Deprecated: use GetResponseVersion instead func (r *RawResponse) GetVersion() (string, error) { - return r.Version, nil + return r.GetResponseVersion(), nil } -// GetSystemVersion returns the raw SystemVersion +// GetResponseVersion returns the response version. +func (r *RawResponse) GetResponseVersion() string { + return r.Version +} + +// GetSystemVersion returns the raw SystemVersion. +// Deprecated: use GetResponseVersion instead func (r *RawDeltaResponse) GetSystemVersion() (string, error) { - return r.SystemVersionInfo, nil + return r.GetResponseVersion(), nil +} + +// GetResponseVersion returns the response version. +func (r *RawDeltaResponse) GetResponseVersion() string { + return r.SystemVersionInfo } -// NextVersionMap returns the version map which consists of updated version mappings after this response is applied +// NextVersionMap returns the version map which consists of updated version mappings after this response is applied. +// Deprecated: use GetReturnedResources instead func (r *RawDeltaResponse) GetNextVersionMap() map[string]string { + return r.GetReturnedResources() +} + +// GetReturnedResources returns the version map which consists of updated version mappings after this response is applied. +func (r *RawDeltaResponse) GetReturnedResources() map[string]string { return r.NextVersionMap } @@ -392,17 +436,18 @@ func (r *DeltaPassthroughResponse) GetDeltaDiscoveryResponse() (*discovery.Delta return r.DeltaDiscoveryResponse, nil } -// GetRequest returns the original Discovery Request +// GetRequest returns the original Discovery Request. func (r *PassthroughResponse) GetRequest() *discovery.DiscoveryRequest { return r.Request } -// GetDeltaRequest returns the original Delta Discovery Request +// GetDeltaRequest returns the original Delta Discovery Request. func (r *DeltaPassthroughResponse) GetDeltaRequest() *discovery.DeltaDiscoveryRequest { return r.DeltaRequest } // GetVersion returns the response version. +// Deprecated: use GetResponseVersion instead func (r *PassthroughResponse) GetVersion() (string, error) { discoveryResponse, _ := r.GetDiscoveryResponse() if discoveryResponse != nil { @@ -411,11 +456,21 @@ func (r *PassthroughResponse) GetVersion() (string, error) { return "", fmt.Errorf("DiscoveryResponse is nil") } +// GetResponseVersion returns the response version, or empty if not set. +func (r *PassthroughResponse) GetResponseVersion() string { + discoveryResponse, _ := r.GetDiscoveryResponse() + if discoveryResponse != nil { + return discoveryResponse.GetVersionInfo() + } + return "" +} + func (r *PassthroughResponse) GetContext() context.Context { return r.ctx } // GetSystemVersion returns the response version. +// Deprecated: use GetResponseVersion instead func (r *DeltaPassthroughResponse) GetSystemVersion() (string, error) { deltaDiscoveryResponse, _ := r.GetDeltaDiscoveryResponse() if deltaDiscoveryResponse != nil { @@ -424,8 +479,23 @@ func (r *DeltaPassthroughResponse) GetSystemVersion() (string, error) { return "", fmt.Errorf("DeltaDiscoveryResponse is nil") } -// NextVersionMap returns the version map from a DeltaPassthroughResponse +// GetResponseVersion returns the response version, or empty if not set. +func (r *DeltaPassthroughResponse) GetResponseVersion() string { + deltaDiscoveryResponse, _ := r.GetDeltaDiscoveryResponse() + if deltaDiscoveryResponse != nil { + return deltaDiscoveryResponse.GetSystemVersionInfo() + } + return "" +} + +// NextVersionMap returns the version map from a DeltaPassthroughResponse. +// Deprecated: use GetReturnedResources instead func (r *DeltaPassthroughResponse) GetNextVersionMap() map[string]string { + return r.GetReturnedResources() +} + +// GetReturnedResources returns the version map from a DeltaPassthroughResponse. +func (r *DeltaPassthroughResponse) GetReturnedResources() map[string]string { return r.NextVersionMap } diff --git a/pkg/cache/v3/delta.go b/pkg/cache/v3/delta.go index 9565634914..f426d34e4d 100644 --- a/pkg/cache/v3/delta.go +++ b/pkg/cache/v3/delta.go @@ -29,14 +29,14 @@ type resourceContainer struct { func createDeltaResponse(ctx context.Context, req *DeltaRequest, sub Subscription, resources resourceContainer, cacheVersion string) *RawDeltaResponse { // variables to build our response with var nextVersionMap map[string]string - var filtered []types.Resource + var filtered []types.ResourceWithTTL var toRemove []string // If we are handling a wildcard request, we want to respond with all resources switch { case sub.IsWildcard(): if len(sub.ReturnedResources()) == 0 { - filtered = make([]types.Resource, 0, len(resources.resourceMap)) + filtered = make([]types.ResourceWithTTL, 0, len(resources.resourceMap)) } nextVersionMap = make(map[string]string, len(resources.resourceMap)) for name, r := range resources.resourceMap { @@ -46,7 +46,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, sub Subscriptio nextVersionMap[name] = version prevVersion, found := sub.ReturnedResources()[name] if !found || (prevVersion != version) { - filtered = append(filtered, r) + filtered = append(filtered, types.ResourceWithTTL{Resource: r}) } } @@ -66,7 +66,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, sub Subscriptio if r, ok := resources.resourceMap[name]; ok { nextVersion := resources.versionMap[name] if prevVersion != nextVersion { - filtered = append(filtered, r) + filtered = append(filtered, types.ResourceWithTTL{Resource: r}) } nextVersionMap[name] = nextVersion } else if found { diff --git a/pkg/cache/v3/delta_test.go b/pkg/cache/v3/delta_test.go index c98c06cfb4..2461730769 100644 --- a/pkg/cache/v3/delta_test.go +++ b/pkg/cache/v3/delta_test.go @@ -22,7 +22,7 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/test/resource/v3" ) -func assertResourceMapEqual(t *testing.T, want, got map[string]types.Resource) { +func assertResourceMapEqual(t *testing.T, want, got map[string]types.ResourceWithTTL) { t.Helper() if !cmp.Equal(want, got, protocmp.Transform()) { @@ -57,7 +57,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) { select { case out := <-watches[typ]: snapshot := fixture.snapshot() - assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ)) + assertResourceMapEqual(t, cache.IndexResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResourcesAndTTL(typ)) sub := subscriptions[typ] sub.SetReturnedResources(out.GetNextVersionMap()) subscriptions[typ] = sub @@ -109,7 +109,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) { case out := <-watches[testTypes[0]]: snapshot2 := fixture.snapshot() snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{resource.MakeEndpoint(clusterName, 9090)}) - assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResources(rsrc.EndpointType)) + assertResourceMapEqual(t, cache.IndexResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResourcesAndTTL(rsrc.EndpointType)) sub := subscriptions[testTypes[0]] sub.SetReturnedResources(out.GetNextVersionMap()) subscriptions[testTypes[0]] = sub @@ -147,7 +147,7 @@ func TestDeltaRemoveResources(t *testing.T) { select { case out := <-watches[typ]: snapshot := fixture.snapshot() - assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ)) + assertResourceMapEqual(t, cache.IndexResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResourcesAndTTL(typ)) nextVersionMap := out.GetNextVersionMap() subscriptions[typ].SetReturnedResources(nextVersionMap) case <-time.After(time.Second): @@ -186,7 +186,7 @@ func TestDeltaRemoveResources(t *testing.T) { case out := <-watches[testTypes[0]]: snapshot2 := fixture.snapshot() snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{}) - assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResources(rsrc.EndpointType)) + assertResourceMapEqual(t, cache.IndexResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResourcesAndTTL(rsrc.EndpointType)) nextVersionMap := out.GetNextVersionMap() // make sure the version maps are different since we no longer are tracking any endpoint resources diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index 41245fceb9..6ae4b06b25 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -16,11 +16,8 @@ package cache import ( "context" - "encoding/hex" "errors" "fmt" - "hash/fnv" - "sort" "strconv" "strings" "sync" @@ -71,22 +68,29 @@ func (c *cachedResource) getVersion(useStableVersion bool) (string, error) { return c.getStableVersion() } -type watches struct { - // sotw keeps track of current sotw watches, indexed per watch id. - sotw map[uint64]ResponseWatch - // delta keeps track of current delta watches, indexed per watch id. - delta map[uint64]DeltaResponseWatch -} +type watch interface { + // isDelta indicates whether the watch is a delta one. + // It should not be used to take functional decisions, but is still currently used pending final changes. + // It can be used to generate statistics. + isDelta() bool + // useStableVersion indicates whether versions returned in the response are built using stable versions instead of cache update versions. + useStableVersion() bool + // sendFullStateResponses requires that all resources matching the request, with no regards to which ones actually updated, must be provided in the response. + // As a consequence, sending a response with no resources has a functional meaning of no matching resources available. + sendFullStateResponses() bool -func newWatches() watches { - return watches{ - sotw: make(map[uint64]ResponseWatch), - delta: make(map[uint64]DeltaResponseWatch), - } + getSubscription() Subscription + // buildResponse computes the actual WatchResponse object to be sent on the watch. + buildResponse(updatedResources []types.ResourceWithTTL, removedResources []string, returnedVersions map[string]string, version string) WatchResponse + // sendResponse sends the response for the watch. + // It must be called at most once. + sendResponse(resp WatchResponse) } -func (w *watches) empty() bool { - return len(w.sotw)+len(w.delta) == 0 +type watches map[uint64]watch + +func newWatches() watches { + return make(watches) } // LinearCache supports collections of opaque resources. This cache has a @@ -124,6 +128,8 @@ type LinearCache struct { // new subscription. useStableVersionsInSotw bool + watchCount int + log log.Logger mu sync.RWMutex @@ -198,22 +204,14 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache { // computeResourceChange compares the subscription known resources and the cache current state to compute the list of resources // which have changed and should be notified to the user. // -// The alwaysConsiderAllResources argument removes the consideration of the subscription known resources (e.g. if the version did not match), -// and return all known subscribed resources. -// // The useStableVersion argument defines what version type to use for resources: // - if set to false versions are based on when resources were updated in the cache. // - if set to true versions are a stable property of the resource, with no regard to when it was added to the cache. -func (cache *LinearCache) computeResourceChange(sub Subscription, alwaysConsiderAllResources, useStableVersion bool) (updated, removed []string, err error) { +func (cache *LinearCache) computeResourceChange(sub Subscription, useStableVersion bool) (updated, removed []string, err error) { var changedResources []string var removedResources []string knownVersions := sub.ReturnedResources() - if alwaysConsiderAllResources { - // The response will include all resources, with no regards of resources potentially already returned. - knownVersions = make(map[string]string) - } - if sub.IsWildcard() { for resourceName, resource := range cache.resources { knownVersion, ok := knownVersions[resourceName] @@ -280,38 +278,13 @@ func (cache *LinearCache) computeResourceChange(sub Subscription, alwaysConsider return changedResources, removedResources, nil } -func computeSotwStableVersion(versionMap map[string]string) string { - // To enforce a stable hash we need to have an ordered vision of the map. - keys := make([]string, 0, len(versionMap)) - for key := range versionMap { - keys = append(keys, key) - } - sort.Strings(keys) - - mapHasher := fnv.New64() - - buffer := make([]byte, 0, 8) - itemHasher := fnv.New64() - for _, key := range keys { - buffer = buffer[:0] - itemHasher.Reset() - itemHasher.Write([]byte(key)) - mapHasher.Write(itemHasher.Sum(buffer)) - buffer = buffer[:0] - itemHasher.Reset() - itemHasher.Write([]byte(versionMap[key])) - mapHasher.Write(itemHasher.Sum(buffer)) - } - buffer = buffer[:0] - return hex.EncodeToString(mapHasher.Sum(buffer)) -} - -func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, alwaysConsiderAllResources bool) (*RawResponse, error) { - changedResources, removedResources, err := cache.computeResourceChange(watch.subscription, alwaysConsiderAllResources, cache.useStableVersionsInSotw) +func (cache *LinearCache) computeResponse(watch watch, replyEvenIfEmpty bool) (WatchResponse, error) { + sub := watch.getSubscription() + changedResources, removedResources, err := cache.computeResourceChange(sub, watch.useStableVersion()) if err != nil { return nil, err } - if len(changedResources) == 0 && len(removedResources) == 0 && !alwaysConsiderAllResources { + if len(changedResources) == 0 && len(removedResources) == 0 && !replyEvenIfEmpty { // Nothing changed. return nil, nil } @@ -325,8 +298,9 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, alwaysConside switch { // For lds and cds, answers will always include all existing subscribed resources, with no regard to which resource was changed or removed. // For other types, the response only includes updated resources (sotw cannot notify for deletion). - case !ResourceRequiresFullStateInSotw(cache.typeURL): - if !alwaysConsiderAllResources && len(changedResources) == 0 { + case !watch.sendFullStateResponses(): + // TODO(valerian-roche): remove this leak of delta/sotw behavior here. + if !watch.isDelta() && !replyEvenIfEmpty && len(changedResources) == 0 { // If the request is not the initial one, and the type does not require full updates, // do not return if nothing is to be set. // For full-state resources an empty response does have a semantic meaning. @@ -335,7 +309,7 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, alwaysConside // changedResources is already filtered based on the subscription. resourcesToReturn = changedResources - case watch.subscription.IsWildcard(): + case sub.IsWildcard(): // Include all resources for the type. resourcesToReturn = make([]string, 0, len(cache.resources)) for resourceName := range cache.resources { @@ -343,7 +317,7 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, alwaysConside } default: // Include all resources matching the subscription, with no concern on whether it has been updated or not. - requestedResources := watch.subscription.SubscribedResources() + requestedResources := sub.SubscribedResources() // The linear cache could be very large (e.g. containing all potential CLAs) // Therefore drives on the subscription requested resources. resourcesToReturn = make([]string, 0, len(requestedResources)) @@ -355,9 +329,9 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, alwaysConside } // returnedVersions includes all resources currently known to the subscription and their version. - returnedVersions := make(map[string]string, len(watch.subscription.ReturnedResources())) + returnedVersions := make(map[string]string, len(sub.ReturnedResources())) // Clone the current returned versions. The cache should not alter the subscription. - for resourceName, version := range watch.subscription.ReturnedResources() { + for resourceName, version := range sub.ReturnedResources() { returnedVersions[resourceName] = version } @@ -365,7 +339,7 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, alwaysConside for _, resourceName := range resourcesToReturn { cachedResource := cache.resources[resourceName] resources = append(resources, types.ResourceWithTTL{Resource: cachedResource.Resource}) - version, err := cachedResource.getVersion(cache.useStableVersionsInSotw) + version, err := cachedResource.getVersion(watch.useStableVersion()) if err != nil { return nil, fmt.Errorf("failed to compute version of %s: %w", resourceName, err) } @@ -378,137 +352,52 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, alwaysConside delete(returnedVersions, resourceName) } + // TODO(valerian-roche): remove this leak of delta/sotw behavior here. responseVersion := cache.getVersion() - if cache.useStableVersionsInSotw { + if watch.useStableVersion() && !watch.isDelta() { responseVersion = cache.versionPrefix + computeSotwStableVersion(returnedVersions) } - return &RawResponse{ - Request: watch.Request, - Resources: resources, - ReturnedResources: returnedVersions, - Version: responseVersion, - Ctx: context.Background(), - }, nil -} - -func (cache *LinearCache) computeDeltaResponse(watch DeltaResponseWatch) (*RawDeltaResponse, error) { - changedResources, removedResources, err := cache.computeResourceChange(watch.subscription, false, true) - if err != nil { - return nil, err - } - - // On first request on a wildcard subscription, envoy does expect a response to come in to - // conclude initialization. - isFirstWildcardRequest := watch.subscription.IsWildcard() && watch.Request.GetResponseNonce() == "" - if len(changedResources) == 0 && len(removedResources) == 0 && !isFirstWildcardRequest { - // Nothing changed. - return nil, nil - } - - returnedVersions := make(map[string]string, len(watch.subscription.ReturnedResources())) - // Clone the current returned versions. The cache should not alter the subscription - for resourceName, version := range watch.subscription.ReturnedResources() { - returnedVersions[resourceName] = version - } - - cacheVersion := cache.getVersion() - resources := make([]types.Resource, 0, len(changedResources)) - for _, resourceName := range changedResources { - resource := cache.resources[resourceName] - resources = append(resources, resource.Resource) - version, err := resource.getStableVersion() - if err != nil { - return nil, fmt.Errorf("failed to compute stable version of %s: %w", resourceName, err) - } - returnedVersions[resourceName] = version - } - // Cleanup resources no longer existing in the cache or no longer subscribed. - for _, resourceName := range removedResources { - delete(returnedVersions, resourceName) - } - - return &RawDeltaResponse{ - DeltaRequest: watch.Request, - Resources: resources, - RemovedResources: removedResources, - NextVersionMap: returnedVersions, - SystemVersionInfo: cacheVersion, - Ctx: context.Background(), - }, nil + return watch.buildResponse(resources, removedResources, returnedVersions, responseVersion), nil } func (cache *LinearCache) notifyAll(modified []string) error { // Gather the list of watches impacted by the modified resources. - sotwWatches := make(map[uint64]ResponseWatch) - deltaWatches := make(map[uint64]DeltaResponseWatch) + resourceWatches := newWatches() for _, name := range modified { - for watchID, watch := range cache.resourceWatches[name].sotw { - sotwWatches[watchID] = watch - } - for watchID, watch := range cache.resourceWatches[name].delta { - deltaWatches[watchID] = watch + for watchID, watch := range cache.resourceWatches[name] { + resourceWatches[watchID] = watch } } - // sotw watches - for watchID, watch := range sotwWatches { - response, err := cache.computeSotwResponse(watch, false) + for watchID, watch := range resourceWatches { + response, err := cache.computeResponse(watch, false) if err != nil { return err } if response != nil { - watch.Response <- response - cache.removeWatch(watchID, watch.subscription) + watch.sendResponse(response) + cache.removeWatch(watchID, watch.getSubscription()) } else { cache.log.Infof("[Linear cache] Watch %d detected as triggered but no change was found", watchID) } } - for watchID, watch := range cache.wildcardWatches.sotw { - response, err := cache.computeSotwResponse(watch, false) + for watchID, watch := range cache.wildcardWatches { + response, err := cache.computeResponse(watch, false) if err != nil { return err } if response != nil { - watch.Response <- response - delete(cache.wildcardWatches.sotw, watchID) + watch.sendResponse(response) + cache.removeWildcardWatch(watchID) } else { cache.log.Infof("[Linear cache] Wildcard watch %d detected as triggered but no change was found", watchID) } } - // delta watches - for watchID, watch := range deltaWatches { - response, err := cache.computeDeltaResponse(watch) - if err != nil { - return err - } - - if response != nil { - watch.Response <- response - cache.removeDeltaWatch(watchID, watch.subscription) - } else { - cache.log.Infof("[Linear cache] Delta watch %d detected as triggered but no change was found", watchID) - } - } - - for watchID, watch := range cache.wildcardWatches.delta { - response, err := cache.computeDeltaResponse(watch) - if err != nil { - return err - } - - if response != nil { - watch.Response <- response - delete(cache.wildcardWatches.delta, watchID) - } else { - cache.log.Infof("[Linear cache] Wildcard delta watch %d detected as triggered but no change was found", watchID) - } - } - return nil } @@ -615,10 +504,10 @@ func (cache *LinearCache) CreateWatch(request *Request, sub Subscription, value // If the request does not include a version the client considers it has no current state. // In this case we will always reply to allow proper initialization of dependencies in the client. - ignoreCurrentSubscriptionResources := request.GetVersionInfo() == "" + replyEvenIfEmpty := request.GetVersionInfo() == "" if !strings.HasPrefix(request.GetVersionInfo(), cache.versionPrefix) { // If the version of the request does not match the cache prefix, we will send a response in all cases to match the legacy behavior. - ignoreCurrentSubscriptionResources = true + replyEvenIfEmpty = true cache.log.Debugf("[linear cache] received watch with version %s not matching the cache prefix %s. Will return all known resources", request.GetVersionInfo(), cache.versionPrefix) } @@ -633,12 +522,18 @@ func (cache *LinearCache) CreateWatch(request *Request, sub Subscription, value // For now it is not done as: // - for the first case, while the protocol documentation does not explicitly mention the case, it does not mark it impossible and explicitly references unsubscribing from wildcard. // - for the second one we could likely do it with little difficulty if need be, but if users rely on the current monotonic version it could impact their callbacks implementations. - watch := ResponseWatch{Request: request, Response: value, subscription: sub} + watch := ResponseWatch{ + Request: request, + Response: value, + subscription: sub, + enableStableVersion: cache.useStableVersionsInSotw, + fullStateResponses: ResourceRequiresFullStateInSotw(cache.typeURL), + } cache.mu.Lock() defer cache.mu.Unlock() - response, err := cache.computeSotwResponse(watch, ignoreCurrentSubscriptionResources) + response, err := cache.computeResponse(watch, replyEvenIfEmpty) if err != nil { return nil, fmt.Errorf("failed to compute the watch respnse: %w", err) } @@ -649,14 +544,17 @@ func (cache *LinearCache) CreateWatch(request *Request, sub Subscription, value // - provides a non-empty version, matching the version prefix // and the cache uses stable versions, if the generated versions are the same as the previous one, we do not return the response. // This avoids resending all data if the new subscription is just a resumption of the previous one. - if cache.useStableVersionsInSotw && request.GetResponseNonce() == "" && !ignoreCurrentSubscriptionResources { - shouldReply = request.GetVersionInfo() != response.Version - - // We confirmed the content of the known resources, store them in the watch we create. - subscription := newWatchSubscription(sub) - subscription.returnedResources = response.ReturnedResources - watch.subscription = subscription - sub = subscription + if cache.useStableVersionsInSotw && request.GetResponseNonce() == "" && !replyEvenIfEmpty { + if request.GetVersionInfo() != response.GetResponseVersion() { + // The response has a different returned version map as the request + shouldReply = true + } else { + // We confirmed the content of the known resources, store them in the watch we create. + subscription := newWatchSubscription(sub) + subscription.returnedResources = response.GetReturnedResources() + watch.subscription = subscription + sub = subscription + } } else { shouldReply = true } @@ -664,49 +562,11 @@ func (cache *LinearCache) CreateWatch(request *Request, sub Subscription, value if shouldReply { cache.log.Debugf("[linear cache] replying to the watch with resources %v (subscription values %v, known %v)", response.GetReturnedResources(), sub.SubscribedResources(), sub.ReturnedResources()) - watch.Response <- response + watch.sendResponse(response) return func() {}, nil } - watchID := cache.nextWatchID() - // Create open watches since versions are up to date. - if sub.IsWildcard() { - cache.log.Infof("[linear cache] open watch %d for %s all resources, known versions %v, system version %q", watchID, cache.typeURL, sub.ReturnedResources(), cache.getVersion()) - cache.wildcardWatches.sotw[watchID] = watch - return func() { - cache.mu.Lock() - defer cache.mu.Unlock() - delete(cache.wildcardWatches.sotw, watchID) - }, nil - } - - cache.log.Infof("[linear cache] open watch %d for %s resources %v, known versions %v, system version %q", watchID, cache.typeURL, sub.SubscribedResources(), sub.ReturnedResources(), cache.getVersion()) - for name := range sub.SubscribedResources() { - watches, exists := cache.resourceWatches[name] - if !exists { - watches = newWatches() - cache.resourceWatches[name] = watches - } - watches.sotw[watchID] = watch - } - return func() { - cache.mu.Lock() - defer cache.mu.Unlock() - cache.removeWatch(watchID, watch.subscription) - }, nil -} - -// Must be called under lock -func (cache *LinearCache) removeWatch(watchID uint64, sub Subscription) { - // Make sure we clean the watch for ALL resources it might be associated with, - // as the channel will no longer be listened to - for resource := range sub.SubscribedResources() { - resourceWatches := cache.resourceWatches[resource] - delete(resourceWatches.sotw, watchID) - if resourceWatches.empty() { - delete(cache.resourceWatches, resource) - } - } + return cache.trackWatch(watch), nil } func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, sub Subscription, value chan DeltaResponse) (func(), error) { @@ -716,71 +576,94 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, sub Subscripti watch := DeltaResponseWatch{Request: request, Response: value, subscription: sub} + // On first request on a wildcard subscription, envoy does expect a response to come in to + // conclude initialization. + replyEvenIfEmpty := false + if sub.IsWildcard() && request.GetResponseNonce() == "" { + replyEvenIfEmpty = true + } + cache.mu.Lock() defer cache.mu.Unlock() - response, err := cache.computeDeltaResponse(watch) + response, err := cache.computeResponse(watch, replyEvenIfEmpty) if err != nil { return nil, fmt.Errorf("failed to compute the watch respnse: %w", err) } - if response != nil { cache.log.Debugf("[linear cache] replying to the delta watch (subscription values %v, known %v)", sub.SubscribedResources(), sub.ReturnedResources()) - watch.Response <- response + watch.sendResponse(response) return nil, nil } + return cache.trackWatch(watch), nil +} + +func (cache *LinearCache) nextWatchID() uint64 { + cache.currentWatchID++ + if cache.currentWatchID == 0 { + panic("watch id count overflow") + } + return cache.currentWatchID +} + +// Must be called under lock +func (cache *LinearCache) trackWatch(watch watch) func() { + cache.watchCount++ + watchID := cache.nextWatchID() + sub := watch.getSubscription() // Create open watches since versions are up to date. if sub.IsWildcard() { - cache.log.Infof("[linear cache] open delta watch %d for all %s resources, system version %q", watchID, cache.typeURL, cache.getVersion()) - cache.wildcardWatches.delta[watchID] = watch + cache.log.Infof("[linear cache] open watch %d for %s all resources", watchID, cache.typeURL) + cache.log.Debugf("[linear cache] subscription details for watch %d: known versions %v, system version %q", watchID, sub.ReturnedResources(), cache.getVersion()) + cache.wildcardWatches[watchID] = watch return func() { cache.mu.Lock() defer cache.mu.Unlock() - delete(cache.wildcardWatches.delta, watchID) - }, nil + cache.removeWildcardWatch(watchID) + } } - cache.log.Infof("[linear cache] open delta watch %d for %s resources %v, system version %q", watchID, cache.typeURL, sub.SubscribedResources(), cache.getVersion()) + cache.log.Infof("[linear cache] open watch %d for %s resources %v", watchID, cache.typeURL, sub.SubscribedResources()) + cache.log.Debugf("[linear cache] subscription details for watch %d: known versions %v, system version %q", watchID, sub.ReturnedResources(), cache.getVersion()) for name := range sub.SubscribedResources() { watches, exists := cache.resourceWatches[name] if !exists { watches = newWatches() cache.resourceWatches[name] = watches } - watches.delta[watchID] = watch + watches[watchID] = watch } return func() { cache.mu.Lock() defer cache.mu.Unlock() - cache.removeDeltaWatch(watchID, watch.subscription) - }, nil -} - -func (cache *LinearCache) getVersion() string { - return cache.versionPrefix + strconv.FormatUint(cache.version, 10) + cache.removeWatch(watchID, sub) + } } -// cancellation function for cleaning stale watches -func (cache *LinearCache) removeDeltaWatch(watchID uint64, sub Subscription) { +// Must be called under lock +func (cache *LinearCache) removeWatch(watchID uint64, sub Subscription) { // Make sure we clean the watch for ALL resources it might be associated with, // as the channel will no longer be listened to for resource := range sub.SubscribedResources() { resourceWatches := cache.resourceWatches[resource] - delete(resourceWatches.delta, watchID) - if resourceWatches.empty() { + delete(resourceWatches, watchID) + if len(resourceWatches) == 0 { delete(cache.resourceWatches, resource) } } + cache.watchCount-- } -func (cache *LinearCache) nextWatchID() uint64 { - cache.currentWatchID++ - if cache.currentWatchID == 0 { - panic("watch id count overflow") - } - return cache.currentWatchID +// Must be called under lock +func (cache *LinearCache) removeWildcardWatch(watchID uint64) { + cache.watchCount-- + delete(cache.wildcardWatches, watchID) +} + +func (cache *LinearCache) getVersion() string { + return cache.versionPrefix + strconv.FormatUint(cache.version, 10) } func (cache *LinearCache) Fetch(context.Context, *Request) (Response, error) { @@ -795,30 +678,23 @@ func (cache *LinearCache) NumResources() int { return len(cache.resources) } -// NumWatches returns the number of active sotw watches for a resource name. +// NumWatches returns the number of active watches for a resource name. func (cache *LinearCache) NumWatches(name string) int { cache.mu.RLock() defer cache.mu.RUnlock() - return len(cache.resourceWatches[name].sotw) + len(cache.wildcardWatches.sotw) + return len(cache.resourceWatches[name]) + len(cache.wildcardWatches) } -// NumDeltaWatchesForResource returns the number of active delta watches for a resource name. -func (cache *LinearCache) NumDeltaWatchesForResource(name string) int { +// TotalWatches returns the number of active watches on the cache in general. +func (cache *LinearCache) NumWildcardWatches() int { cache.mu.RLock() defer cache.mu.RUnlock() - return len(cache.resourceWatches[name].delta) + len(cache.wildcardWatches.delta) + return len(cache.wildcardWatches) } -// NumDeltaWatches returns the total number of active delta watches. -// Warning: it is quite inefficient, and NumDeltaWatchesForResource should be preferred. -func (cache *LinearCache) NumDeltaWatches() int { +// NumCacheWatches returns the number of active watches on the cache in general. +func (cache *LinearCache) NumCacheWatches() int { cache.mu.RLock() defer cache.mu.RUnlock() - uniqueWatches := map[uint64]struct{}{} - for _, watches := range cache.resourceWatches { - for id := range watches.delta { - uniqueWatches[id] = struct{}{} - } - } - return len(uniqueWatches) + len(cache.wildcardWatches.delta) + return cache.watchCount } diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index 7c07926c6e..9724350e6a 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -199,9 +199,9 @@ func checkWatchCount(t *testing.T, c *LinearCache, name string, count int) { } } -func checkDeltaWatchCount(t *testing.T, c *LinearCache, count int) { +func checkTotalWatchCount(t *testing.T, c *LinearCache, count int) { t.Helper() - if i := c.NumDeltaWatches(); i != count { + if i := c.NumCacheWatches(); i != count { t.Errorf("unexpected number of delta watches: got %d, want %d", i, count) } } @@ -505,7 +505,7 @@ func TestLinearDeletion(t *testing.T) { require.NoError(t, err) // b is watched by wildcard, but for non-full-state resources we cannot report deletions mustBlock(t, w) - assert.Len(t, c.wildcardWatches.sotw, 1) + assert.Len(t, c.wildcardWatches, 1) }) t.Run("full-state resource", func(t *testing.T) { @@ -692,16 +692,16 @@ func TestLinearDeltaWildcard(t *testing.T) { _, err = c.CreateDeltaWatch(req1, sub1, w1) require.NoError(t, err) mustBlockDelta(t, w1) - _, err = c.CreateDeltaWatch(req2, sub2, w2) require.NoError(t, err) mustBlockDelta(t, w2) + checkTotalWatchCount(t, c, 2) a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} hash := hashResource(t, a) err = c.UpdateResource("a", a) require.NoError(t, err) - checkDeltaWatchCount(t, c, 0) + checkTotalWatchCount(t, c, 0) verifyDeltaResponse(t, w1, []resourceInfo{{"a", hash}}, nil) verifyDeltaResponse(t, w2, []resourceInfo{{"a", hash}}, nil) } @@ -722,14 +722,14 @@ func TestLinearDeltaExistingResources(t *testing.T) { w := make(chan DeltaResponse, 1) _, err = c.CreateDeltaWatch(req, subFromDeltaRequest(req), w) require.NoError(t, err) - checkDeltaWatchCount(t, c, 0) + checkTotalWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}}, []string{}) req = &DeltaRequest{TypeUrl: testType, ResourceNamesSubscribe: []string{"a", "b"}} w = make(chan DeltaResponse, 1) _, err = c.CreateDeltaWatch(req, subFromDeltaRequest(req), w) require.NoError(t, err) - checkDeltaWatchCount(t, c, 0) + checkTotalWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) } @@ -748,7 +748,7 @@ func TestLinearDeltaInitialResourcesVersionSet(t *testing.T) { w := make(chan DeltaResponse, 1) _, err = c.CreateDeltaWatch(req, subFromDeltaRequest(req), w) require.NoError(t, err) - checkDeltaWatchCount(t, c, 0) + checkTotalWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, nil) // b is up to date and shouldn't be returned req = &DeltaRequest{TypeUrl: testType, ResourceNamesSubscribe: []string{"a", "b"}, InitialResourceVersions: map[string]string{"a": hashA, "b": hashB}} @@ -756,12 +756,12 @@ func TestLinearDeltaInitialResourcesVersionSet(t *testing.T) { _, err = c.CreateDeltaWatch(req, subFromDeltaRequest(req), w) require.NoError(t, err) mustBlockDelta(t, w) - checkDeltaWatchCount(t, c, 1) + checkTotalWatchCount(t, c, 1) b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{{Priority: 10}}} // new version of b hashB = hashResource(t, b) err = c.UpdateResource("b", b) require.NoError(t, err) - checkDeltaWatchCount(t, c, 0) + checkTotalWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}}, nil) } @@ -782,7 +782,7 @@ func TestLinearDeltaResourceUpdate(t *testing.T) { w := make(chan DeltaResponse, 1) _, err = c.CreateDeltaWatch(req, subFromDeltaRequest(req), w) require.NoError(t, err) - checkDeltaWatchCount(t, c, 0) + checkTotalWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) checkStableVersionsAreComputed(t, c, "a", "b") @@ -791,7 +791,7 @@ func TestLinearDeltaResourceUpdate(t *testing.T) { _, err = c.CreateDeltaWatch(req, subFromDeltaRequest(req), w) require.NoError(t, err) mustBlockDelta(t, w) - checkDeltaWatchCount(t, c, 1) + checkTotalWatchCount(t, c, 1) a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update {Priority: 10}, @@ -818,7 +818,7 @@ func TestLinearDeltaResourceDelete(t *testing.T) { w := make(chan DeltaResponse, 1) _, err = c.CreateDeltaWatch(req, subFromDeltaRequest(req), w) require.NoError(t, err) - checkDeltaWatchCount(t, c, 0) + checkTotalWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) req = &DeltaRequest{TypeUrl: testType, ResourceNamesSubscribe: []string{"a", "b"}, InitialResourceVersions: map[string]string{"a": hashA, "b": hashB}} @@ -826,7 +826,7 @@ func TestLinearDeltaResourceDelete(t *testing.T) { _, err = c.CreateDeltaWatch(req, subFromDeltaRequest(req), w) require.NoError(t, err) mustBlockDelta(t, w) - checkDeltaWatchCount(t, c, 1) + checkTotalWatchCount(t, c, 1) a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update {Priority: 10}, @@ -848,7 +848,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { _, err := c.CreateDeltaWatch(req, subFromDeltaRequest(req), w) require.NoError(t, err) mustBlockDelta(t, w) - checkDeltaWatchCount(t, c, 1) + checkTotalWatchCount(t, c, 1) a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} hashA := hashResource(t, a) b := &endpoint.ClusterLoadAssignment{ClusterName: "b"} @@ -867,7 +867,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { _, err = c.CreateDeltaWatch(req, sub, w) require.NoError(t, err) mustBlockDelta(t, w) - checkDeltaWatchCount(t, c, 1) + checkTotalWatchCount(t, c, 1) a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update {Priority: 10}, }} @@ -888,7 +888,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { _, err = c.CreateDeltaWatch(req, sub, w) require.NoError(t, err) mustBlockDelta(t, w) - checkDeltaWatchCount(t, c, 1) + checkTotalWatchCount(t, c, 1) a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update {Priority: 15}, }} @@ -910,7 +910,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { _, err = c.CreateDeltaWatch(req, sub, w) require.NoError(t, err) mustBlockDelta(t, w) - checkDeltaWatchCount(t, c, 1) + checkTotalWatchCount(t, c, 1) b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{}} // recreate watched resource hashB = hashResource(t, b) err = c.UpdateResources(map[string]types.Resource{"b": b}, []string{"d"}) @@ -926,7 +926,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { // Wildcard create/update createWildcardDeltaWatch(t, false, c, w) mustBlockDelta(t, w) - checkDeltaWatchCount(t, c, 1) + checkTotalWatchCount(t, c, 1) b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update {Priority: 15}, }} @@ -943,7 +943,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { // Wildcard update/delete createWildcardDeltaWatch(t, false, c, w) mustBlockDelta(t, w) - checkDeltaWatchCount(t, c, 1) + checkTotalWatchCount(t, c, 1) a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update {Priority: 25}, }} @@ -953,7 +953,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { assert.NotContains(t, c.resources, "d", "resource with name d was found in cache") verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, []string{"d"}) - checkDeltaWatchCount(t, c, 0) + checkTotalWatchCount(t, c, 0) assert.Equal(t, 2, c.NumResources()) } @@ -977,6 +977,9 @@ func TestLinearMixedWatches(t *testing.T) { mustBlock(t, w) // Only sotw watches, should not have triggered stable resource computation checkStableVersionsAreNotComputed(t, c, "a", "b") + checkTotalWatchCount(t, c, 1) + checkWatchCount(t, c, "a", 1) + checkWatchCount(t, c, "b", 1) a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update {Priority: 25}, @@ -988,12 +991,15 @@ func TestLinearMixedWatches(t *testing.T) { resp := verifyResponseResources(t, w, resource.EndpointType, c.getVersion(), "a") updateFromSotwResponse(resp, &sotwSub, sotwReq) checkStableVersionsAreNotComputed(t, c, "a", "b") + checkTotalWatchCount(t, c, 0) + checkWatchCount(t, c, "a", 0) + checkWatchCount(t, c, "b", 0) - sotwReq.VersionInfo = c.getVersion() _, err = c.CreateWatch(sotwReq, sotwSub, w) require.NoError(t, err) mustBlock(t, w) checkStableVersionsAreNotComputed(t, c, "a", "b") + checkTotalWatchCount(t, c, 1) deltaReq := &DeltaRequest{TypeUrl: resource.EndpointType, ResourceNamesSubscribe: []string{"a", "b"}, InitialResourceVersions: map[string]string{"a": hashA, "b": hashB}} wd := make(chan DeltaResponse, 1) @@ -1002,8 +1008,10 @@ func TestLinearMixedWatches(t *testing.T) { _, err = c.CreateDeltaWatch(deltaReq, subFromDeltaRequest(deltaReq), wd) require.NoError(t, err) mustBlockDelta(t, wd) - checkDeltaWatchCount(t, c, 1) + checkTotalWatchCount(t, c, 2) checkStableVersionsAreComputed(t, c, "a", "b") + checkWatchCount(t, c, "a", 2) + checkWatchCount(t, c, "b", 2) err = c.UpdateResources(nil, []string{"b"}) require.NoError(t, err) @@ -1034,9 +1042,9 @@ func TestLinearSotwWatches(t *testing.T) { require.NoError(t, err) mustBlock(t, w) - assert.Len(t, cache.resourceWatches["a"].sotw, 1) - assert.Len(t, cache.resourceWatches["b"].sotw, 1) - assert.Len(t, cache.resourceWatches["c"].sotw, 1) + assert.Len(t, cache.resourceWatches["a"], 1) + assert.Len(t, cache.resourceWatches["b"], 1) + assert.Len(t, cache.resourceWatches["c"], 1) // Update a and c without touching b a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update @@ -1047,9 +1055,9 @@ func TestLinearSotwWatches(t *testing.T) { resp := verifyResponseResources(t, w, testType, cache.getVersion(), "a") updateFromSotwResponse(resp, &sotwSub, sotwReq) - assert.Empty(t, cache.resourceWatches["a"].sotw) - assert.Empty(t, cache.resourceWatches["b"].sotw) - assert.Empty(t, cache.resourceWatches["c"].sotw) + assert.Empty(t, cache.resourceWatches["a"]) + assert.Empty(t, cache.resourceWatches["b"]) + assert.Empty(t, cache.resourceWatches["c"]) // c no longer watched w = make(chan Response, 1) @@ -1064,9 +1072,9 @@ func TestLinearSotwWatches(t *testing.T) { }} err = cache.UpdateResources(map[string]types.Resource{"b": b}, nil) - assert.Empty(t, cache.resourceWatches["a"].sotw) - assert.Empty(t, cache.resourceWatches["b"].sotw) - assert.Empty(t, cache.resourceWatches["c"].sotw) + assert.Empty(t, cache.resourceWatches["a"]) + assert.Empty(t, cache.resourceWatches["b"]) + assert.Empty(t, cache.resourceWatches["c"]) require.NoError(t, err) resp = verifyResponseResources(t, w, testType, cache.getVersion(), "b") @@ -1086,9 +1094,9 @@ func TestLinearSotwWatches(t *testing.T) { require.NoError(t, err) verifyResponseResources(t, w, testType, cache.getVersion(), "c") - assert.Empty(t, cache.resourceWatches["a"].sotw) - assert.Empty(t, cache.resourceWatches["b"].sotw) - assert.Empty(t, cache.resourceWatches["c"].sotw) + assert.Empty(t, cache.resourceWatches["a"]) + assert.Empty(t, cache.resourceWatches["b"]) + assert.Empty(t, cache.resourceWatches["c"]) }) t.Run("watches return full state for types requesting it", func(t *testing.T) { @@ -1298,12 +1306,12 @@ func TestLinearSotwNonWildcard(t *testing.T) { checkPendingWatch(4) // Cancel two watches to change resources - assert.Len(t, cache.resourceWatches["c"].sotw, 2) + assert.Len(t, cache.resourceWatches["c"], 2) c2() - assert.Len(t, cache.resourceWatches["c"].sotw, 1) - assert.Len(t, cache.resourceWatches["b"].sotw, 1) + assert.Len(t, cache.resourceWatches["c"], 1) + assert.Len(t, cache.resourceWatches["b"], 1) c3() - assert.Empty(t, cache.resourceWatches["b"].sotw) + assert.Empty(t, cache.resourceWatches["b"]) // Remove a resource from 2 (was a, c, d) updateReqResources(2, []string{"a", "d"}) @@ -1424,12 +1432,12 @@ func TestLinearSotwNonWildcard(t *testing.T) { checkPendingWatch(4) // Cancel two watches to change resources - assert.Len(t, cache.resourceWatches["c"].sotw, 2) + assert.Len(t, cache.resourceWatches["c"], 2) c2() - assert.Len(t, cache.resourceWatches["c"].sotw, 1) - assert.Len(t, cache.resourceWatches["b"].sotw, 1) + assert.Len(t, cache.resourceWatches["c"], 1) + assert.Len(t, cache.resourceWatches["b"], 1) c3() - assert.Empty(t, cache.resourceWatches["b"].sotw) + assert.Empty(t, cache.resourceWatches["b"]) // Remove a resource from 2 (was a, c, d) updateReqResources(2, []string{"a", "d"}) diff --git a/pkg/cache/v3/resource.go b/pkg/cache/v3/resource.go index c98a3eb14b..2a0e627fc9 100644 --- a/pkg/cache/v3/resource.go +++ b/pkg/cache/v3/resource.go @@ -116,10 +116,10 @@ func ResourceRequiresFullStateInSotw(typeURL resource.Type) bool { } // GetResourceName returns the resource names for a list of valid xDS response types. -func GetResourceNames(resources []types.Resource) []string { +func GetResourceNames(resources []types.ResourceWithTTL) []string { out := make([]string, len(resources)) for i, r := range resources { - out[i] = GetResourceName(r) + out[i] = GetResourceName(r.Resource) } return out } diff --git a/pkg/cache/v3/resource_test.go b/pkg/cache/v3/resource_test.go index 0f311af5c8..5baebf521d 100644 --- a/pkg/cache/v3/resource_test.go +++ b/pkg/cache/v3/resource_test.go @@ -128,17 +128,17 @@ func TestGetResourceName(t *testing.T) { func TestGetResourceNames(t *testing.T) { tests := []struct { name string - input []types.Resource + input []types.ResourceWithTTL want []string }{ { name: "empty", - input: []types.Resource{}, + input: []types.ResourceWithTTL{}, want: []string{}, }, { name: "many", - input: []types.Resource{testRuntime, testListener, testListenerDefault, testVirtualHost}, + input: []types.ResourceWithTTL{{Resource: testRuntime}, {Resource: testListener}, {Resource: testListenerDefault}, {Resource: testVirtualHost}}, want: []string{runtimeName, listenerName, listenerName, virtualHostName}, }, } diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index e043637961..a9456657f6 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -411,7 +411,7 @@ func (cache *snapshotCache) CreateWatch(request *Request, sub Subscription, valu return cache.cancelWatch(nodeID, watchID) } - watch := ResponseWatch{Request: request, Response: value, subscription: sub} + watch := ResponseWatch{Request: request, Response: value, subscription: sub, fullStateResponses: ResourceRequiresFullStateInSotw(request.GetTypeUrl())} snapshot, exists := cache.snapshots[nodeID] if !exists { diff --git a/pkg/cache/v3/status.go b/pkg/cache/v3/status.go index b2a3063c69..1fa216a5ef 100644 --- a/pkg/cache/v3/status.go +++ b/pkg/cache/v3/status.go @@ -15,11 +15,15 @@ package cache import ( + "context" + "encoding/hex" + "hash/fnv" "sort" "sync" "time" core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + "github.com/envoyproxy/go-control-plane/pkg/cache/types" ) // NodeHash computes string identifiers for Envoy nodes. @@ -83,6 +87,33 @@ type statusInfo struct { mu sync.RWMutex } +func computeSotwStableVersion(versionMap map[string]string) string { + // To enforce a stable hash we need to have an ordered vision of the map. + keys := make([]string, 0, len(versionMap)) + for key := range versionMap { + keys = append(keys, key) + } + sort.Strings(keys) + + mapHasher := fnv.New64() + + itemHasher := fnv.New64() + for _, key := range keys { + itemHasher.Reset() + itemHasher.Write([]byte(key)) + mapHasher.Write(itemHasher.Sum(nil)) + itemHasher.Reset() + itemHasher.Write([]byte(versionMap[key])) + mapHasher.Write(itemHasher.Sum(nil)) + } + return hex.EncodeToString(mapHasher.Sum(nil)) +} + +type WatchResponse interface { + GetReturnedResources() map[string]string + GetResponseVersion() string +} + // ResponseWatch is a watch record keeping both the request and an open channel for the response. type ResponseWatch struct { // Request is the original request for the watch. @@ -93,6 +124,42 @@ type ResponseWatch struct { // Subscription stores the current client subscription state. subscription Subscription + + // enableStableVersion indicates whether versions returned in the response are built using stable versions instead of cache update versions. + enableStableVersion bool + + // fullStateResponses requires that all resources matching the request, with no regards to which ones actually updated, must be provided in the response. + fullStateResponses bool +} + +func (w ResponseWatch) isDelta() bool { + return false +} + +func (w ResponseWatch) buildResponse(updatedResources []types.ResourceWithTTL, _ []string, returnedVersions map[string]string, version string) WatchResponse { + return &RawResponse{ + Request: w.Request, + Resources: updatedResources, + ReturnedResources: returnedVersions, + Version: version, + Ctx: context.Background(), + } +} + +func (w ResponseWatch) useStableVersion() bool { + return w.enableStableVersion +} + +func (w ResponseWatch) sendFullStateResponses() bool { + return w.fullStateResponses +} + +func (w ResponseWatch) getSubscription() Subscription { + return w.subscription +} + +func (w ResponseWatch) sendResponse(resp WatchResponse) { + w.Response <- resp.(*RawResponse) } // DeltaResponseWatch is a watch record keeping both the delta request and an open channel for the delta response. @@ -107,6 +174,37 @@ type DeltaResponseWatch struct { subscription Subscription } +func (w DeltaResponseWatch) isDelta() bool { + return true +} + +func (w DeltaResponseWatch) useStableVersion() bool { + return true +} + +func (w DeltaResponseWatch) sendFullStateResponses() bool { + return false +} + +func (w DeltaResponseWatch) getSubscription() Subscription { + return w.subscription +} + +func (w DeltaResponseWatch) buildResponse(updatedResources []types.ResourceWithTTL, removedResources []string, returnedVersions map[string]string, version string) WatchResponse { + return &RawDeltaResponse{ + DeltaRequest: w.Request, + Resources: updatedResources, + RemovedResources: removedResources, + NextVersionMap: returnedVersions, + SystemVersionInfo: version, + Ctx: context.Background(), + } +} + +func (w DeltaResponseWatch) sendResponse(resp WatchResponse) { + w.Response <- resp.(*RawDeltaResponse) +} + // newStatusInfo initializes a status info data structure. func newStatusInfo(node *core.Node) *statusInfo { out := statusInfo{ diff --git a/pkg/server/v3/delta_test.go b/pkg/server/v3/delta_test.go index 711f58361e..3aa83088c7 100644 --- a/pkg/server/v3/delta_test.go +++ b/pkg/server/v3/delta_test.go @@ -33,14 +33,14 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR versionMap[name] = cache.HashResource(marshaledResource) } var nextVersionMap map[string]string - var filtered []types.Resource + var filtered []types.ResourceWithTTL var toRemove []string // If we are handling a wildcard request, we want to respond with all resources switch { case sub.IsWildcard(): if len(sub.ReturnedResources()) == 0 { - filtered = make([]types.Resource, 0, len(resourceMap)) + filtered = make([]types.ResourceWithTTL, 0, len(resourceMap)) } nextVersionMap = make(map[string]string, len(resourceMap)) for name, r := range resourceMap { @@ -50,7 +50,7 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR nextVersionMap[name] = version prevVersion, found := sub.ReturnedResources()[name] if !found || (prevVersion != version) { - filtered = append(filtered, r) + filtered = append(filtered, types.ResourceWithTTL{Resource: r}) } } @@ -69,7 +69,7 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR if r, ok := resourceMap[name]; ok { nextVersion := versionMap[name] if prevVersion != nextVersion { - filtered = append(filtered, r) + filtered = append(filtered, types.ResourceWithTTL{Resource: r}) } nextVersionMap[name] = nextVersion } else if found {