From cf5f3ef22a88e79a8cf02fef242044776029b24b Mon Sep 17 00:00:00 2001 From: Valerian Roche Date: Fri, 23 Feb 2024 16:34:51 -0500 Subject: [PATCH] [Linear cache] Increase mutualization between delta and sotw in linear cache to have behavior driven logic instead (#14) Following the addition of stable version support in sotw, the remaining differences of behavior between sotw and delta are: - on first request, check the returned version in sotw if activating stable version. Also compute the version differently - in some cases, return full state in sotw. In other case don't return if the only change is deletion - compute the version differently In this context, the code can now be mostly merged, apart from the type differences on requests and responses (which I think can be reduced to none soon, but requires more rework on simple side and would be somewhat backward incompatible). In this PR, the linear cache no longer treats sotw and delta watches differently. They are tracked in the same maps and the changes and responses are computed in a common method. Differences in behavior (e.g. full state or which version to use) is now a functional attribute of the watch itself. There are two main parts not yet fully merged: - changing the version in sotw when using stable versions. The requirement to prepend cache version prefix makes it harder to abstract - defining whether a change only removing resources should trigger a response. This could be made a functional flag but given its intersection with `fullStateResponses` seemed fragile Signed-off-by: Valerian Roche --- pkg/cache/v3/cache.go | 118 ++++++++--- pkg/cache/v3/delta.go | 8 +- pkg/cache/v3/delta_test.go | 10 +- pkg/cache/v3/linear.go | 380 ++++++++++++---------------------- pkg/cache/v3/linear_test.go | 94 +++++---- pkg/cache/v3/resource.go | 4 +- pkg/cache/v3/resource_test.go | 6 +- pkg/cache/v3/simple.go | 2 +- pkg/cache/v3/status.go | 98 +++++++++ pkg/server/v3/delta_test.go | 8 +- 10 files changed, 390 insertions(+), 338 deletions(-) diff --git a/pkg/cache/v3/cache.go b/pkg/cache/v3/cache.go index a453f97adf..d0d596a804 100644 --- a/pkg/cache/v3/cache.go +++ b/pkg/cache/v3/cache.go @@ -104,41 +104,67 @@ type Cache interface { // Response is a wrapper around Envoy's DiscoveryResponse. type Response interface { - // Get the Constructed DiscoveryResponse + // GetDiscoveryResponse returns the Constructed DiscoveryResponse. GetDiscoveryResponse() (*discovery.DiscoveryResponse, error) - // Get the original Request for the Response. + // GetRequest returns the request that created the watch that we're now responding to. + // This is provided to allow the caller to correlate the response with a request. + // Generally this will be the latest request seen on the stream for the specific type. GetRequest() *discovery.DiscoveryRequest - // Get the version in the Response. + // GetVersion returns the version in the Response. + // The version can be a property of the resources, allowing for optimizations in subsequent calls, + // or simply an internal property of the cache which can be used for debugging. + // The cache implementation should be able to determine if it can provide such optimization. + // Deprecated: use GetResponseVersion instead GetVersion() (string, error) + // GetResponseVersion returns the version in the Response. + // The version can be a property of the resources, allowing for optimizations in subsequent calls, + // or simply an internal property of the cache which can be used for debugging. + // The cache implementation should be able to determine if it can provide such optimization. + GetResponseVersion() string + // GetReturnedResources returns the map of resources and their versions returned in the subscription. // It may include more resources than directly set in the response to consider the full state of the client. // The caller is expected to provide this unchanged to the next call to CreateWatch as part of the subscription. GetReturnedResources() map[string]string - // Get the context provided during response creation. + // GetContext returns the context provided during response creation. GetContext() context.Context } -// DeltaResponse is a wrapper around Envoy's DeltaDiscoveryResponse +// DeltaResponse is a wrapper around Envoy's DeltaDiscoveryResponse. type DeltaResponse interface { - // Get the constructed DeltaDiscoveryResponse + // GetDeltaDiscoveryResponse returns the constructed DeltaDiscoveryResponse. GetDeltaDiscoveryResponse() (*discovery.DeltaDiscoveryResponse, error) - // Get the request that created the watch that we're now responding to. This is provided to allow the caller to correlate the - // response with a request. Generally this will be the latest request seen on the stream for the specific type. + // GetDeltaRequest returns the request that created the watch that we're now responding to. + // This is provided to allow the caller to correlate the response with a request. + // Generally this will be the latest request seen on the stream for the specific type. GetDeltaRequest() *discovery.DeltaDiscoveryRequest - // Get the version in the DeltaResponse. This field is generally used for debugging purposes as noted by the Envoy documentation. + // GetSystemVersion returns the version in the DeltaResponse. + // The version in delta response is not indicative of the resources included, + // but an internal property of the cache which can be used for debugging. + // Deprecated: use GetResponseVersion instead GetSystemVersion() (string, error) - // Get the version map of the internal cache. - // The version map consists of updated version mappings after this response is applied + // GetResponseVersion returns the version in the DeltaResponse. + // The version in delta response is not indicative of the resources included, + // but an internal property of the cache which can be used for debugging. + GetResponseVersion() string + + // GetNextVersionMap provides the version map of the internal cache. + // The version map consists of updated version mappings after this response is applied. + // Deprecated: use GetReturnedResources instead GetNextVersionMap() map[string]string - // Get the context provided during response creation + // GetReturnedResources provides the version map of the internal cache. + // The version map consists of updated version mappings after this response is applied. + GetReturnedResources() map[string]string + + // GetContext returns the context provided during response creation. GetContext() context.Context } @@ -183,12 +209,12 @@ type RawDeltaResponse struct { SystemVersionInfo string // Resources to be included in the response. - Resources []types.Resource + Resources []types.ResourceWithTTL // RemovedResources is a list of resource aliases which should be dropped by the consuming client. RemovedResources []string - // NextVersionMap consists of updated version mappings after this response is applied + // NextVersionMap consists of updated version mappings after this response is applied. NextVersionMap map[string]string // Context provided at the time of response creation. This allows associating additional @@ -290,8 +316,8 @@ func (r *RawDeltaResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscover marshaledResources := make([]*discovery.Resource, len(r.Resources)) for i, resource := range r.Resources { - name := GetResourceName(resource) - marshaledResource, err := MarshalResource(resource) + name := GetResourceName(resource.Resource) + marshaledResource, err := MarshalResource(resource.Resource) if err != nil { return nil, err } @@ -330,23 +356,41 @@ func (r *RawResponse) GetContext() context.Context { return r.Ctx } -// GetDeltaRequest returns the original DeltaRequest +// GetDeltaRequest returns the original DeltaRequest. func (r *RawDeltaResponse) GetDeltaRequest() *discovery.DeltaDiscoveryRequest { return r.DeltaRequest } // GetVersion returns the response version. +// Deprecated: use GetResponseVersion instead func (r *RawResponse) GetVersion() (string, error) { - return r.Version, nil + return r.GetResponseVersion(), nil } -// GetSystemVersion returns the raw SystemVersion +// GetResponseVersion returns the response version. +func (r *RawResponse) GetResponseVersion() string { + return r.Version +} + +// GetSystemVersion returns the raw SystemVersion. +// Deprecated: use GetResponseVersion instead func (r *RawDeltaResponse) GetSystemVersion() (string, error) { - return r.SystemVersionInfo, nil + return r.GetResponseVersion(), nil +} + +// GetResponseVersion returns the response version. +func (r *RawDeltaResponse) GetResponseVersion() string { + return r.SystemVersionInfo } -// NextVersionMap returns the version map which consists of updated version mappings after this response is applied +// NextVersionMap returns the version map which consists of updated version mappings after this response is applied. +// Deprecated: use GetReturnedResources instead func (r *RawDeltaResponse) GetNextVersionMap() map[string]string { + return r.GetReturnedResources() +} + +// GetReturnedResources returns the version map which consists of updated version mappings after this response is applied. +func (r *RawDeltaResponse) GetReturnedResources() map[string]string { return r.NextVersionMap } @@ -392,17 +436,18 @@ func (r *DeltaPassthroughResponse) GetDeltaDiscoveryResponse() (*discovery.Delta return r.DeltaDiscoveryResponse, nil } -// GetRequest returns the original Discovery Request +// GetRequest returns the original Discovery Request. func (r *PassthroughResponse) GetRequest() *discovery.DiscoveryRequest { return r.Request } -// GetDeltaRequest returns the original Delta Discovery Request +// GetDeltaRequest returns the original Delta Discovery Request. func (r *DeltaPassthroughResponse) GetDeltaRequest() *discovery.DeltaDiscoveryRequest { return r.DeltaRequest } // GetVersion returns the response version. +// Deprecated: use GetResponseVersion instead func (r *PassthroughResponse) GetVersion() (string, error) { discoveryResponse, _ := r.GetDiscoveryResponse() if discoveryResponse != nil { @@ -411,11 +456,21 @@ func (r *PassthroughResponse) GetVersion() (string, error) { return "", fmt.Errorf("DiscoveryResponse is nil") } +// GetResponseVersion returns the response version, or empty if not set. +func (r *PassthroughResponse) GetResponseVersion() string { + discoveryResponse, _ := r.GetDiscoveryResponse() + if discoveryResponse != nil { + return discoveryResponse.GetVersionInfo() + } + return "" +} + func (r *PassthroughResponse) GetContext() context.Context { return r.ctx } // GetSystemVersion returns the response version. +// Deprecated: use GetResponseVersion instead func (r *DeltaPassthroughResponse) GetSystemVersion() (string, error) { deltaDiscoveryResponse, _ := r.GetDeltaDiscoveryResponse() if deltaDiscoveryResponse != nil { @@ -424,8 +479,23 @@ func (r *DeltaPassthroughResponse) GetSystemVersion() (string, error) { return "", fmt.Errorf("DeltaDiscoveryResponse is nil") } -// NextVersionMap returns the version map from a DeltaPassthroughResponse +// GetResponseVersion returns the response version, or empty if not set. +func (r *DeltaPassthroughResponse) GetResponseVersion() string { + deltaDiscoveryResponse, _ := r.GetDeltaDiscoveryResponse() + if deltaDiscoveryResponse != nil { + return deltaDiscoveryResponse.GetSystemVersionInfo() + } + return "" +} + +// NextVersionMap returns the version map from a DeltaPassthroughResponse. +// Deprecated: use GetReturnedResources instead func (r *DeltaPassthroughResponse) GetNextVersionMap() map[string]string { + return r.GetReturnedResources() +} + +// GetReturnedResources returns the version map from a DeltaPassthroughResponse. +func (r *DeltaPassthroughResponse) GetReturnedResources() map[string]string { return r.NextVersionMap } diff --git a/pkg/cache/v3/delta.go b/pkg/cache/v3/delta.go index 9565634914..f426d34e4d 100644 --- a/pkg/cache/v3/delta.go +++ b/pkg/cache/v3/delta.go @@ -29,14 +29,14 @@ type resourceContainer struct { func createDeltaResponse(ctx context.Context, req *DeltaRequest, sub Subscription, resources resourceContainer, cacheVersion string) *RawDeltaResponse { // variables to build our response with var nextVersionMap map[string]string - var filtered []types.Resource + var filtered []types.ResourceWithTTL var toRemove []string // If we are handling a wildcard request, we want to respond with all resources switch { case sub.IsWildcard(): if len(sub.ReturnedResources()) == 0 { - filtered = make([]types.Resource, 0, len(resources.resourceMap)) + filtered = make([]types.ResourceWithTTL, 0, len(resources.resourceMap)) } nextVersionMap = make(map[string]string, len(resources.resourceMap)) for name, r := range resources.resourceMap { @@ -46,7 +46,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, sub Subscriptio nextVersionMap[name] = version prevVersion, found := sub.ReturnedResources()[name] if !found || (prevVersion != version) { - filtered = append(filtered, r) + filtered = append(filtered, types.ResourceWithTTL{Resource: r}) } } @@ -66,7 +66,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, sub Subscriptio if r, ok := resources.resourceMap[name]; ok { nextVersion := resources.versionMap[name] if prevVersion != nextVersion { - filtered = append(filtered, r) + filtered = append(filtered, types.ResourceWithTTL{Resource: r}) } nextVersionMap[name] = nextVersion } else if found { diff --git a/pkg/cache/v3/delta_test.go b/pkg/cache/v3/delta_test.go index c98c06cfb4..2461730769 100644 --- a/pkg/cache/v3/delta_test.go +++ b/pkg/cache/v3/delta_test.go @@ -22,7 +22,7 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/test/resource/v3" ) -func assertResourceMapEqual(t *testing.T, want, got map[string]types.Resource) { +func assertResourceMapEqual(t *testing.T, want, got map[string]types.ResourceWithTTL) { t.Helper() if !cmp.Equal(want, got, protocmp.Transform()) { @@ -57,7 +57,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) { select { case out := <-watches[typ]: snapshot := fixture.snapshot() - assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ)) + assertResourceMapEqual(t, cache.IndexResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResourcesAndTTL(typ)) sub := subscriptions[typ] sub.SetReturnedResources(out.GetNextVersionMap()) subscriptions[typ] = sub @@ -109,7 +109,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) { case out := <-watches[testTypes[0]]: snapshot2 := fixture.snapshot() snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{resource.MakeEndpoint(clusterName, 9090)}) - assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResources(rsrc.EndpointType)) + assertResourceMapEqual(t, cache.IndexResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResourcesAndTTL(rsrc.EndpointType)) sub := subscriptions[testTypes[0]] sub.SetReturnedResources(out.GetNextVersionMap()) subscriptions[testTypes[0]] = sub @@ -147,7 +147,7 @@ func TestDeltaRemoveResources(t *testing.T) { select { case out := <-watches[typ]: snapshot := fixture.snapshot() - assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ)) + assertResourceMapEqual(t, cache.IndexResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResourcesAndTTL(typ)) nextVersionMap := out.GetNextVersionMap() subscriptions[typ].SetReturnedResources(nextVersionMap) case <-time.After(time.Second): @@ -186,7 +186,7 @@ func TestDeltaRemoveResources(t *testing.T) { case out := <-watches[testTypes[0]]: snapshot2 := fixture.snapshot() snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{}) - assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResources(rsrc.EndpointType)) + assertResourceMapEqual(t, cache.IndexResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResourcesAndTTL(rsrc.EndpointType)) nextVersionMap := out.GetNextVersionMap() // make sure the version maps are different since we no longer are tracking any endpoint resources diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index 41245fceb9..6ae4b06b25 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -16,11 +16,8 @@ package cache import ( "context" - "encoding/hex" "errors" "fmt" - "hash/fnv" - "sort" "strconv" "strings" "sync" @@ -71,22 +68,29 @@ func (c *cachedResource) getVersion(useStableVersion bool) (string, error) { return c.getStableVersion() } -type watches struct { - // sotw keeps track of current sotw watches, indexed per watch id. - sotw map[uint64]ResponseWatch - // delta keeps track of current delta watches, indexed per watch id. - delta map[uint64]DeltaResponseWatch -} +type watch interface { + // isDelta indicates whether the watch is a delta one. + // It should not be used to take functional decisions, but is still currently used pending final changes. + // It can be used to generate statistics. + isDelta() bool + // useStableVersion indicates whether versions returned in the response are built using stable versions instead of cache update versions. + useStableVersion() bool + // sendFullStateResponses requires that all resources matching the request, with no regards to which ones actually updated, must be provided in the response. + // As a consequence, sending a response with no resources has a functional meaning of no matching resources available. + sendFullStateResponses() bool -func newWatches() watches { - return watches{ - sotw: make(map[uint64]ResponseWatch), - delta: make(map[uint64]DeltaResponseWatch), - } + getSubscription() Subscription + // buildResponse computes the actual WatchResponse object to be sent on the watch. + buildResponse(updatedResources []types.ResourceWithTTL, removedResources []string, returnedVersions map[string]string, version string) WatchResponse + // sendResponse sends the response for the watch. + // It must be called at most once. + sendResponse(resp WatchResponse) } -func (w *watches) empty() bool { - return len(w.sotw)+len(w.delta) == 0 +type watches map[uint64]watch + +func newWatches() watches { + return make(watches) } // LinearCache supports collections of opaque resources. This cache has a @@ -124,6 +128,8 @@ type LinearCache struct { // new subscription. useStableVersionsInSotw bool + watchCount int + log log.Logger mu sync.RWMutex @@ -198,22 +204,14 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache { // computeResourceChange compares the subscription known resources and the cache current state to compute the list of resources // which have changed and should be notified to the user. // -// The alwaysConsiderAllResources argument removes the consideration of the subscription known resources (e.g. if the version did not match), -// and return all known subscribed resources. -// // The useStableVersion argument defines what version type to use for resources: // - if set to false versions are based on when resources were updated in the cache. // - if set to true versions are a stable property of the resource, with no regard to when it was added to the cache. -func (cache *LinearCache) computeResourceChange(sub Subscription, alwaysConsiderAllResources, useStableVersion bool) (updated, removed []string, err error) { +func (cache *LinearCache) computeResourceChange(sub Subscription, useStableVersion bool) (updated, removed []string, err error) { var changedResources []string var removedResources []string knownVersions := sub.ReturnedResources() - if alwaysConsiderAllResources { - // The response will include all resources, with no regards of resources potentially already returned. - knownVersions = make(map[string]string) - } - if sub.IsWildcard() { for resourceName, resource := range cache.resources { knownVersion, ok := knownVersions[resourceName] @@ -280,38 +278,13 @@ func (cache *LinearCache) computeResourceChange(sub Subscription, alwaysConsider return changedResources, removedResources, nil } -func computeSotwStableVersion(versionMap map[string]string) string { - // To enforce a stable hash we need to have an ordered vision of the map. - keys := make([]string, 0, len(versionMap)) - for key := range versionMap { - keys = append(keys, key) - } - sort.Strings(keys) - - mapHasher := fnv.New64() - - buffer := make([]byte, 0, 8) - itemHasher := fnv.New64() - for _, key := range keys { - buffer = buffer[:0] - itemHasher.Reset() - itemHasher.Write([]byte(key)) - mapHasher.Write(itemHasher.Sum(buffer)) - buffer = buffer[:0] - itemHasher.Reset() - itemHasher.Write([]byte(versionMap[key])) - mapHasher.Write(itemHasher.Sum(buffer)) - } - buffer = buffer[:0] - return hex.EncodeToString(mapHasher.Sum(buffer)) -} - -func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, alwaysConsiderAllResources bool) (*RawResponse, error) { - changedResources, removedResources, err := cache.computeResourceChange(watch.subscription, alwaysConsiderAllResources, cache.useStableVersionsInSotw) +func (cache *LinearCache) computeResponse(watch watch, replyEvenIfEmpty bool) (WatchResponse, error) { + sub := watch.getSubscription() + changedResources, removedResources, err := cache.computeResourceChange(sub, watch.useStableVersion()) if err != nil { return nil, err } - if len(changedResources) == 0 && len(removedResources) == 0 && !alwaysConsiderAllResources { + if len(changedResources) == 0 && len(removedResources) == 0 && !replyEvenIfEmpty { // Nothing changed. return nil, nil } @@ -325,8 +298,9 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, alwaysConside switch { // For lds and cds, answers will always include all existing subscribed resources, with no regard to which resource was changed or removed. // For other types, the response only includes updated resources (sotw cannot notify for deletion). - case !ResourceRequiresFullStateInSotw(cache.typeURL): - if !alwaysConsiderAllResources && len(changedResources) == 0 { + case !watch.sendFullStateResponses(): + // TODO(valerian-roche): remove this leak of delta/sotw behavior here. + if !watch.isDelta() && !replyEvenIfEmpty && len(changedResources) == 0 { // If the request is not the initial one, and the type does not require full updates, // do not return if nothing is to be set. // For full-state resources an empty response does have a semantic meaning. @@ -335,7 +309,7 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, alwaysConside // changedResources is already filtered based on the subscription. resourcesToReturn = changedResources - case watch.subscription.IsWildcard(): + case sub.IsWildcard(): // Include all resources for the type. resourcesToReturn = make([]string, 0, len(cache.resources)) for resourceName := range cache.resources { @@ -343,7 +317,7 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, alwaysConside } default: // Include all resources matching the subscription, with no concern on whether it has been updated or not. - requestedResources := watch.subscription.SubscribedResources() + requestedResources := sub.SubscribedResources() // The linear cache could be very large (e.g. containing all potential CLAs) // Therefore drives on the subscription requested resources. resourcesToReturn = make([]string, 0, len(requestedResources)) @@ -355,9 +329,9 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, alwaysConside } // returnedVersions includes all resources currently known to the subscription and their version. - returnedVersions := make(map[string]string, len(watch.subscription.ReturnedResources())) + returnedVersions := make(map[string]string, len(sub.ReturnedResources())) // Clone the current returned versions. The cache should not alter the subscription. - for resourceName, version := range watch.subscription.ReturnedResources() { + for resourceName, version := range sub.ReturnedResources() { returnedVersions[resourceName] = version } @@ -365,7 +339,7 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, alwaysConside for _, resourceName := range resourcesToReturn { cachedResource := cache.resources[resourceName] resources = append(resources, types.ResourceWithTTL{Resource: cachedResource.Resource}) - version, err := cachedResource.getVersion(cache.useStableVersionsInSotw) + version, err := cachedResource.getVersion(watch.useStableVersion()) if err != nil { return nil, fmt.Errorf("failed to compute version of %s: %w", resourceName, err) } @@ -378,137 +352,52 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, alwaysConside delete(returnedVersions, resourceName) } + // TODO(valerian-roche): remove this leak of delta/sotw behavior here. responseVersion := cache.getVersion() - if cache.useStableVersionsInSotw { + if watch.useStableVersion() && !watch.isDelta() { responseVersion = cache.versionPrefix + computeSotwStableVersion(returnedVersions) } - return &RawResponse{ - Request: watch.Request, - Resources: resources, - ReturnedResources: returnedVersions, - Version: responseVersion, - Ctx: context.Background(), - }, nil -} - -func (cache *LinearCache) computeDeltaResponse(watch DeltaResponseWatch) (*RawDeltaResponse, error) { - changedResources, removedResources, err := cache.computeResourceChange(watch.subscription, false, true) - if err != nil { - return nil, err - } - - // On first request on a wildcard subscription, envoy does expect a response to come in to - // conclude initialization. - isFirstWildcardRequest := watch.subscription.IsWildcard() && watch.Request.GetResponseNonce() == "" - if len(changedResources) == 0 && len(removedResources) == 0 && !isFirstWildcardRequest { - // Nothing changed. - return nil, nil - } - - returnedVersions := make(map[string]string, len(watch.subscription.ReturnedResources())) - // Clone the current returned versions. The cache should not alter the subscription - for resourceName, version := range watch.subscription.ReturnedResources() { - returnedVersions[resourceName] = version - } - - cacheVersion := cache.getVersion() - resources := make([]types.Resource, 0, len(changedResources)) - for _, resourceName := range changedResources { - resource := cache.resources[resourceName] - resources = append(resources, resource.Resource) - version, err := resource.getStableVersion() - if err != nil { - return nil, fmt.Errorf("failed to compute stable version of %s: %w", resourceName, err) - } - returnedVersions[resourceName] = version - } - // Cleanup resources no longer existing in the cache or no longer subscribed. - for _, resourceName := range removedResources { - delete(returnedVersions, resourceName) - } - - return &RawDeltaResponse{ - DeltaRequest: watch.Request, - Resources: resources, - RemovedResources: removedResources, - NextVersionMap: returnedVersions, - SystemVersionInfo: cacheVersion, - Ctx: context.Background(), - }, nil + return watch.buildResponse(resources, removedResources, returnedVersions, responseVersion), nil } func (cache *LinearCache) notifyAll(modified []string) error { // Gather the list of watches impacted by the modified resources. - sotwWatches := make(map[uint64]ResponseWatch) - deltaWatches := make(map[uint64]DeltaResponseWatch) + resourceWatches := newWatches() for _, name := range modified { - for watchID, watch := range cache.resourceWatches[name].sotw { - sotwWatches[watchID] = watch - } - for watchID, watch := range cache.resourceWatches[name].delta { - deltaWatches[watchID] = watch + for watchID, watch := range cache.resourceWatches[name] { + resourceWatches[watchID] = watch } } - // sotw watches - for watchID, watch := range sotwWatches { - response, err := cache.computeSotwResponse(watch, false) + for watchID, watch := range resourceWatches { + response, err := cache.computeResponse(watch, false) if err != nil { return err } if response != nil { - watch.Response <- response - cache.removeWatch(watchID, watch.subscription) + watch.sendResponse(response) + cache.removeWatch(watchID, watch.getSubscription()) } else { cache.log.Infof("[Linear cache] Watch %d detected as triggered but no change was found", watchID) } } - for watchID, watch := range cache.wildcardWatches.sotw { - response, err := cache.computeSotwResponse(watch, false) + for watchID, watch := range cache.wildcardWatches { + response, err := cache.computeResponse(watch, false) if err != nil { return err } if response != nil { - watch.Response <- response - delete(cache.wildcardWatches.sotw, watchID) + watch.sendResponse(response) + cache.removeWildcardWatch(watchID) } else { cache.log.Infof("[Linear cache] Wildcard watch %d detected as triggered but no change was found", watchID) } } - // delta watches - for watchID, watch := range deltaWatches { - response, err := cache.computeDeltaResponse(watch) - if err != nil { - return err - } - - if response != nil { - watch.Response <- response - cache.removeDeltaWatch(watchID, watch.subscription) - } else { - cache.log.Infof("[Linear cache] Delta watch %d detected as triggered but no change was found", watchID) - } - } - - for watchID, watch := range cache.wildcardWatches.delta { - response, err := cache.computeDeltaResponse(watch) - if err != nil { - return err - } - - if response != nil { - watch.Response <- response - delete(cache.wildcardWatches.delta, watchID) - } else { - cache.log.Infof("[Linear cache] Wildcard delta watch %d detected as triggered but no change was found", watchID) - } - } - return nil } @@ -615,10 +504,10 @@ func (cache *LinearCache) CreateWatch(request *Request, sub Subscription, value // If the request does not include a version the client considers it has no current state. // In this case we will always reply to allow proper initialization of dependencies in the client. - ignoreCurrentSubscriptionResources := request.GetVersionInfo() == "" + replyEvenIfEmpty := request.GetVersionInfo() == "" if !strings.HasPrefix(request.GetVersionInfo(), cache.versionPrefix) { // If the version of the request does not match the cache prefix, we will send a response in all cases to match the legacy behavior. - ignoreCurrentSubscriptionResources = true + replyEvenIfEmpty = true cache.log.Debugf("[linear cache] received watch with version %s not matching the cache prefix %s. Will return all known resources", request.GetVersionInfo(), cache.versionPrefix) } @@ -633,12 +522,18 @@ func (cache *LinearCache) CreateWatch(request *Request, sub Subscription, value // For now it is not done as: // - for the first case, while the protocol documentation does not explicitly mention the case, it does not mark it impossible and explicitly references unsubscribing from wildcard. // - for the second one we could likely do it with little difficulty if need be, but if users rely on the current monotonic version it could impact their callbacks implementations. - watch := ResponseWatch{Request: request, Response: value, subscription: sub} + watch := ResponseWatch{ + Request: request, + Response: value, + subscription: sub, + enableStableVersion: cache.useStableVersionsInSotw, + fullStateResponses: ResourceRequiresFullStateInSotw(cache.typeURL), + } cache.mu.Lock() defer cache.mu.Unlock() - response, err := cache.computeSotwResponse(watch, ignoreCurrentSubscriptionResources) + response, err := cache.computeResponse(watch, replyEvenIfEmpty) if err != nil { return nil, fmt.Errorf("failed to compute the watch respnse: %w", err) } @@ -649,14 +544,17 @@ func (cache *LinearCache) CreateWatch(request *Request, sub Subscription, value // - provides a non-empty version, matching the version prefix // and the cache uses stable versions, if the generated versions are the same as the previous one, we do not return the response. // This avoids resending all data if the new subscription is just a resumption of the previous one. - if cache.useStableVersionsInSotw && request.GetResponseNonce() == "" && !ignoreCurrentSubscriptionResources { - shouldReply = request.GetVersionInfo() != response.Version - - // We confirmed the content of the known resources, store them in the watch we create. - subscription := newWatchSubscription(sub) - subscription.returnedResources = response.ReturnedResources - watch.subscription = subscription - sub = subscription + if cache.useStableVersionsInSotw && request.GetResponseNonce() == "" && !replyEvenIfEmpty { + if request.GetVersionInfo() != response.GetResponseVersion() { + // The response has a different returned version map as the request + shouldReply = true + } else { + // We confirmed the content of the known resources, store them in the watch we create. + subscription := newWatchSubscription(sub) + subscription.returnedResources = response.GetReturnedResources() + watch.subscription = subscription + sub = subscription + } } else { shouldReply = true } @@ -664,49 +562,11 @@ func (cache *LinearCache) CreateWatch(request *Request, sub Subscription, value if shouldReply { cache.log.Debugf("[linear cache] replying to the watch with resources %v (subscription values %v, known %v)", response.GetReturnedResources(), sub.SubscribedResources(), sub.ReturnedResources()) - watch.Response <- response + watch.sendResponse(response) return func() {}, nil } - watchID := cache.nextWatchID() - // Create open watches since versions are up to date. - if sub.IsWildcard() { - cache.log.Infof("[linear cache] open watch %d for %s all resources, known versions %v, system version %q", watchID, cache.typeURL, sub.ReturnedResources(), cache.getVersion()) - cache.wildcardWatches.sotw[watchID] = watch - return func() { - cache.mu.Lock() - defer cache.mu.Unlock() - delete(cache.wildcardWatches.sotw, watchID) - }, nil - } - - cache.log.Infof("[linear cache] open watch %d for %s resources %v, known versions %v, system version %q", watchID, cache.typeURL, sub.SubscribedResources(), sub.ReturnedResources(), cache.getVersion()) - for name := range sub.SubscribedResources() { - watches, exists := cache.resourceWatches[name] - if !exists { - watches = newWatches() - cache.resourceWatches[name] = watches - } - watches.sotw[watchID] = watch - } - return func() { - cache.mu.Lock() - defer cache.mu.Unlock() - cache.removeWatch(watchID, watch.subscription) - }, nil -} - -// Must be called under lock -func (cache *LinearCache) removeWatch(watchID uint64, sub Subscription) { - // Make sure we clean the watch for ALL resources it might be associated with, - // as the channel will no longer be listened to - for resource := range sub.SubscribedResources() { - resourceWatches := cache.resourceWatches[resource] - delete(resourceWatches.sotw, watchID) - if resourceWatches.empty() { - delete(cache.resourceWatches, resource) - } - } + return cache.trackWatch(watch), nil } func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, sub Subscription, value chan DeltaResponse) (func(), error) { @@ -716,71 +576,94 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, sub Subscripti watch := DeltaResponseWatch{Request: request, Response: value, subscription: sub} + // On first request on a wildcard subscription, envoy does expect a response to come in to + // conclude initialization. + replyEvenIfEmpty := false + if sub.IsWildcard() && request.GetResponseNonce() == "" { + replyEvenIfEmpty = true + } + cache.mu.Lock() defer cache.mu.Unlock() - response, err := cache.computeDeltaResponse(watch) + response, err := cache.computeResponse(watch, replyEvenIfEmpty) if err != nil { return nil, fmt.Errorf("failed to compute the watch respnse: %w", err) } - if response != nil { cache.log.Debugf("[linear cache] replying to the delta watch (subscription values %v, known %v)", sub.SubscribedResources(), sub.ReturnedResources()) - watch.Response <- response + watch.sendResponse(response) return nil, nil } + return cache.trackWatch(watch), nil +} + +func (cache *LinearCache) nextWatchID() uint64 { + cache.currentWatchID++ + if cache.currentWatchID == 0 { + panic("watch id count overflow") + } + return cache.currentWatchID +} + +// Must be called under lock +func (cache *LinearCache) trackWatch(watch watch) func() { + cache.watchCount++ + watchID := cache.nextWatchID() + sub := watch.getSubscription() // Create open watches since versions are up to date. if sub.IsWildcard() { - cache.log.Infof("[linear cache] open delta watch %d for all %s resources, system version %q", watchID, cache.typeURL, cache.getVersion()) - cache.wildcardWatches.delta[watchID] = watch + cache.log.Infof("[linear cache] open watch %d for %s all resources", watchID, cache.typeURL) + cache.log.Debugf("[linear cache] subscription details for watch %d: known versions %v, system version %q", watchID, sub.ReturnedResources(), cache.getVersion()) + cache.wildcardWatches[watchID] = watch return func() { cache.mu.Lock() defer cache.mu.Unlock() - delete(cache.wildcardWatches.delta, watchID) - }, nil + cache.removeWildcardWatch(watchID) + } } - cache.log.Infof("[linear cache] open delta watch %d for %s resources %v, system version %q", watchID, cache.typeURL, sub.SubscribedResources(), cache.getVersion()) + cache.log.Infof("[linear cache] open watch %d for %s resources %v", watchID, cache.typeURL, sub.SubscribedResources()) + cache.log.Debugf("[linear cache] subscription details for watch %d: known versions %v, system version %q", watchID, sub.ReturnedResources(), cache.getVersion()) for name := range sub.SubscribedResources() { watches, exists := cache.resourceWatches[name] if !exists { watches = newWatches() cache.resourceWatches[name] = watches } - watches.delta[watchID] = watch + watches[watchID] = watch } return func() { cache.mu.Lock() defer cache.mu.Unlock() - cache.removeDeltaWatch(watchID, watch.subscription) - }, nil -} - -func (cache *LinearCache) getVersion() string { - return cache.versionPrefix + strconv.FormatUint(cache.version, 10) + cache.removeWatch(watchID, sub) + } } -// cancellation function for cleaning stale watches -func (cache *LinearCache) removeDeltaWatch(watchID uint64, sub Subscription) { +// Must be called under lock +func (cache *LinearCache) removeWatch(watchID uint64, sub Subscription) { // Make sure we clean the watch for ALL resources it might be associated with, // as the channel will no longer be listened to for resource := range sub.SubscribedResources() { resourceWatches := cache.resourceWatches[resource] - delete(resourceWatches.delta, watchID) - if resourceWatches.empty() { + delete(resourceWatches, watchID) + if len(resourceWatches) == 0 { delete(cache.resourceWatches, resource) } } + cache.watchCount-- } -func (cache *LinearCache) nextWatchID() uint64 { - cache.currentWatchID++ - if cache.currentWatchID == 0 { - panic("watch id count overflow") - } - return cache.currentWatchID +// Must be called under lock +func (cache *LinearCache) removeWildcardWatch(watchID uint64) { + cache.watchCount-- + delete(cache.wildcardWatches, watchID) +} + +func (cache *LinearCache) getVersion() string { + return cache.versionPrefix + strconv.FormatUint(cache.version, 10) } func (cache *LinearCache) Fetch(context.Context, *Request) (Response, error) { @@ -795,30 +678,23 @@ func (cache *LinearCache) NumResources() int { return len(cache.resources) } -// NumWatches returns the number of active sotw watches for a resource name. +// NumWatches returns the number of active watches for a resource name. func (cache *LinearCache) NumWatches(name string) int { cache.mu.RLock() defer cache.mu.RUnlock() - return len(cache.resourceWatches[name].sotw) + len(cache.wildcardWatches.sotw) + return len(cache.resourceWatches[name]) + len(cache.wildcardWatches) } -// NumDeltaWatchesForResource returns the number of active delta watches for a resource name. -func (cache *LinearCache) NumDeltaWatchesForResource(name string) int { +// TotalWatches returns the number of active watches on the cache in general. +func (cache *LinearCache) NumWildcardWatches() int { cache.mu.RLock() defer cache.mu.RUnlock() - return len(cache.resourceWatches[name].delta) + len(cache.wildcardWatches.delta) + return len(cache.wildcardWatches) } -// NumDeltaWatches returns the total number of active delta watches. -// Warning: it is quite inefficient, and NumDeltaWatchesForResource should be preferred. -func (cache *LinearCache) NumDeltaWatches() int { +// NumCacheWatches returns the number of active watches on the cache in general. +func (cache *LinearCache) NumCacheWatches() int { cache.mu.RLock() defer cache.mu.RUnlock() - uniqueWatches := map[uint64]struct{}{} - for _, watches := range cache.resourceWatches { - for id := range watches.delta { - uniqueWatches[id] = struct{}{} - } - } - return len(uniqueWatches) + len(cache.wildcardWatches.delta) + return cache.watchCount } diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index 7c07926c6e..9724350e6a 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -199,9 +199,9 @@ func checkWatchCount(t *testing.T, c *LinearCache, name string, count int) { } } -func checkDeltaWatchCount(t *testing.T, c *LinearCache, count int) { +func checkTotalWatchCount(t *testing.T, c *LinearCache, count int) { t.Helper() - if i := c.NumDeltaWatches(); i != count { + if i := c.NumCacheWatches(); i != count { t.Errorf("unexpected number of delta watches: got %d, want %d", i, count) } } @@ -505,7 +505,7 @@ func TestLinearDeletion(t *testing.T) { require.NoError(t, err) // b is watched by wildcard, but for non-full-state resources we cannot report deletions mustBlock(t, w) - assert.Len(t, c.wildcardWatches.sotw, 1) + assert.Len(t, c.wildcardWatches, 1) }) t.Run("full-state resource", func(t *testing.T) { @@ -692,16 +692,16 @@ func TestLinearDeltaWildcard(t *testing.T) { _, err = c.CreateDeltaWatch(req1, sub1, w1) require.NoError(t, err) mustBlockDelta(t, w1) - _, err = c.CreateDeltaWatch(req2, sub2, w2) require.NoError(t, err) mustBlockDelta(t, w2) + checkTotalWatchCount(t, c, 2) a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} hash := hashResource(t, a) err = c.UpdateResource("a", a) require.NoError(t, err) - checkDeltaWatchCount(t, c, 0) + checkTotalWatchCount(t, c, 0) verifyDeltaResponse(t, w1, []resourceInfo{{"a", hash}}, nil) verifyDeltaResponse(t, w2, []resourceInfo{{"a", hash}}, nil) } @@ -722,14 +722,14 @@ func TestLinearDeltaExistingResources(t *testing.T) { w := make(chan DeltaResponse, 1) _, err = c.CreateDeltaWatch(req, subFromDeltaRequest(req), w) require.NoError(t, err) - checkDeltaWatchCount(t, c, 0) + checkTotalWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}}, []string{}) req = &DeltaRequest{TypeUrl: testType, ResourceNamesSubscribe: []string{"a", "b"}} w = make(chan DeltaResponse, 1) _, err = c.CreateDeltaWatch(req, subFromDeltaRequest(req), w) require.NoError(t, err) - checkDeltaWatchCount(t, c, 0) + checkTotalWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) } @@ -748,7 +748,7 @@ func TestLinearDeltaInitialResourcesVersionSet(t *testing.T) { w := make(chan DeltaResponse, 1) _, err = c.CreateDeltaWatch(req, subFromDeltaRequest(req), w) require.NoError(t, err) - checkDeltaWatchCount(t, c, 0) + checkTotalWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, nil) // b is up to date and shouldn't be returned req = &DeltaRequest{TypeUrl: testType, ResourceNamesSubscribe: []string{"a", "b"}, InitialResourceVersions: map[string]string{"a": hashA, "b": hashB}} @@ -756,12 +756,12 @@ func TestLinearDeltaInitialResourcesVersionSet(t *testing.T) { _, err = c.CreateDeltaWatch(req, subFromDeltaRequest(req), w) require.NoError(t, err) mustBlockDelta(t, w) - checkDeltaWatchCount(t, c, 1) + checkTotalWatchCount(t, c, 1) b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{{Priority: 10}}} // new version of b hashB = hashResource(t, b) err = c.UpdateResource("b", b) require.NoError(t, err) - checkDeltaWatchCount(t, c, 0) + checkTotalWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}}, nil) } @@ -782,7 +782,7 @@ func TestLinearDeltaResourceUpdate(t *testing.T) { w := make(chan DeltaResponse, 1) _, err = c.CreateDeltaWatch(req, subFromDeltaRequest(req), w) require.NoError(t, err) - checkDeltaWatchCount(t, c, 0) + checkTotalWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) checkStableVersionsAreComputed(t, c, "a", "b") @@ -791,7 +791,7 @@ func TestLinearDeltaResourceUpdate(t *testing.T) { _, err = c.CreateDeltaWatch(req, subFromDeltaRequest(req), w) require.NoError(t, err) mustBlockDelta(t, w) - checkDeltaWatchCount(t, c, 1) + checkTotalWatchCount(t, c, 1) a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update {Priority: 10}, @@ -818,7 +818,7 @@ func TestLinearDeltaResourceDelete(t *testing.T) { w := make(chan DeltaResponse, 1) _, err = c.CreateDeltaWatch(req, subFromDeltaRequest(req), w) require.NoError(t, err) - checkDeltaWatchCount(t, c, 0) + checkTotalWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) req = &DeltaRequest{TypeUrl: testType, ResourceNamesSubscribe: []string{"a", "b"}, InitialResourceVersions: map[string]string{"a": hashA, "b": hashB}} @@ -826,7 +826,7 @@ func TestLinearDeltaResourceDelete(t *testing.T) { _, err = c.CreateDeltaWatch(req, subFromDeltaRequest(req), w) require.NoError(t, err) mustBlockDelta(t, w) - checkDeltaWatchCount(t, c, 1) + checkTotalWatchCount(t, c, 1) a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update {Priority: 10}, @@ -848,7 +848,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { _, err := c.CreateDeltaWatch(req, subFromDeltaRequest(req), w) require.NoError(t, err) mustBlockDelta(t, w) - checkDeltaWatchCount(t, c, 1) + checkTotalWatchCount(t, c, 1) a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} hashA := hashResource(t, a) b := &endpoint.ClusterLoadAssignment{ClusterName: "b"} @@ -867,7 +867,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { _, err = c.CreateDeltaWatch(req, sub, w) require.NoError(t, err) mustBlockDelta(t, w) - checkDeltaWatchCount(t, c, 1) + checkTotalWatchCount(t, c, 1) a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update {Priority: 10}, }} @@ -888,7 +888,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { _, err = c.CreateDeltaWatch(req, sub, w) require.NoError(t, err) mustBlockDelta(t, w) - checkDeltaWatchCount(t, c, 1) + checkTotalWatchCount(t, c, 1) a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update {Priority: 15}, }} @@ -910,7 +910,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { _, err = c.CreateDeltaWatch(req, sub, w) require.NoError(t, err) mustBlockDelta(t, w) - checkDeltaWatchCount(t, c, 1) + checkTotalWatchCount(t, c, 1) b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{}} // recreate watched resource hashB = hashResource(t, b) err = c.UpdateResources(map[string]types.Resource{"b": b}, []string{"d"}) @@ -926,7 +926,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { // Wildcard create/update createWildcardDeltaWatch(t, false, c, w) mustBlockDelta(t, w) - checkDeltaWatchCount(t, c, 1) + checkTotalWatchCount(t, c, 1) b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update {Priority: 15}, }} @@ -943,7 +943,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { // Wildcard update/delete createWildcardDeltaWatch(t, false, c, w) mustBlockDelta(t, w) - checkDeltaWatchCount(t, c, 1) + checkTotalWatchCount(t, c, 1) a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update {Priority: 25}, }} @@ -953,7 +953,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { assert.NotContains(t, c.resources, "d", "resource with name d was found in cache") verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, []string{"d"}) - checkDeltaWatchCount(t, c, 0) + checkTotalWatchCount(t, c, 0) assert.Equal(t, 2, c.NumResources()) } @@ -977,6 +977,9 @@ func TestLinearMixedWatches(t *testing.T) { mustBlock(t, w) // Only sotw watches, should not have triggered stable resource computation checkStableVersionsAreNotComputed(t, c, "a", "b") + checkTotalWatchCount(t, c, 1) + checkWatchCount(t, c, "a", 1) + checkWatchCount(t, c, "b", 1) a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update {Priority: 25}, @@ -988,12 +991,15 @@ func TestLinearMixedWatches(t *testing.T) { resp := verifyResponseResources(t, w, resource.EndpointType, c.getVersion(), "a") updateFromSotwResponse(resp, &sotwSub, sotwReq) checkStableVersionsAreNotComputed(t, c, "a", "b") + checkTotalWatchCount(t, c, 0) + checkWatchCount(t, c, "a", 0) + checkWatchCount(t, c, "b", 0) - sotwReq.VersionInfo = c.getVersion() _, err = c.CreateWatch(sotwReq, sotwSub, w) require.NoError(t, err) mustBlock(t, w) checkStableVersionsAreNotComputed(t, c, "a", "b") + checkTotalWatchCount(t, c, 1) deltaReq := &DeltaRequest{TypeUrl: resource.EndpointType, ResourceNamesSubscribe: []string{"a", "b"}, InitialResourceVersions: map[string]string{"a": hashA, "b": hashB}} wd := make(chan DeltaResponse, 1) @@ -1002,8 +1008,10 @@ func TestLinearMixedWatches(t *testing.T) { _, err = c.CreateDeltaWatch(deltaReq, subFromDeltaRequest(deltaReq), wd) require.NoError(t, err) mustBlockDelta(t, wd) - checkDeltaWatchCount(t, c, 1) + checkTotalWatchCount(t, c, 2) checkStableVersionsAreComputed(t, c, "a", "b") + checkWatchCount(t, c, "a", 2) + checkWatchCount(t, c, "b", 2) err = c.UpdateResources(nil, []string{"b"}) require.NoError(t, err) @@ -1034,9 +1042,9 @@ func TestLinearSotwWatches(t *testing.T) { require.NoError(t, err) mustBlock(t, w) - assert.Len(t, cache.resourceWatches["a"].sotw, 1) - assert.Len(t, cache.resourceWatches["b"].sotw, 1) - assert.Len(t, cache.resourceWatches["c"].sotw, 1) + assert.Len(t, cache.resourceWatches["a"], 1) + assert.Len(t, cache.resourceWatches["b"], 1) + assert.Len(t, cache.resourceWatches["c"], 1) // Update a and c without touching b a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update @@ -1047,9 +1055,9 @@ func TestLinearSotwWatches(t *testing.T) { resp := verifyResponseResources(t, w, testType, cache.getVersion(), "a") updateFromSotwResponse(resp, &sotwSub, sotwReq) - assert.Empty(t, cache.resourceWatches["a"].sotw) - assert.Empty(t, cache.resourceWatches["b"].sotw) - assert.Empty(t, cache.resourceWatches["c"].sotw) + assert.Empty(t, cache.resourceWatches["a"]) + assert.Empty(t, cache.resourceWatches["b"]) + assert.Empty(t, cache.resourceWatches["c"]) // c no longer watched w = make(chan Response, 1) @@ -1064,9 +1072,9 @@ func TestLinearSotwWatches(t *testing.T) { }} err = cache.UpdateResources(map[string]types.Resource{"b": b}, nil) - assert.Empty(t, cache.resourceWatches["a"].sotw) - assert.Empty(t, cache.resourceWatches["b"].sotw) - assert.Empty(t, cache.resourceWatches["c"].sotw) + assert.Empty(t, cache.resourceWatches["a"]) + assert.Empty(t, cache.resourceWatches["b"]) + assert.Empty(t, cache.resourceWatches["c"]) require.NoError(t, err) resp = verifyResponseResources(t, w, testType, cache.getVersion(), "b") @@ -1086,9 +1094,9 @@ func TestLinearSotwWatches(t *testing.T) { require.NoError(t, err) verifyResponseResources(t, w, testType, cache.getVersion(), "c") - assert.Empty(t, cache.resourceWatches["a"].sotw) - assert.Empty(t, cache.resourceWatches["b"].sotw) - assert.Empty(t, cache.resourceWatches["c"].sotw) + assert.Empty(t, cache.resourceWatches["a"]) + assert.Empty(t, cache.resourceWatches["b"]) + assert.Empty(t, cache.resourceWatches["c"]) }) t.Run("watches return full state for types requesting it", func(t *testing.T) { @@ -1298,12 +1306,12 @@ func TestLinearSotwNonWildcard(t *testing.T) { checkPendingWatch(4) // Cancel two watches to change resources - assert.Len(t, cache.resourceWatches["c"].sotw, 2) + assert.Len(t, cache.resourceWatches["c"], 2) c2() - assert.Len(t, cache.resourceWatches["c"].sotw, 1) - assert.Len(t, cache.resourceWatches["b"].sotw, 1) + assert.Len(t, cache.resourceWatches["c"], 1) + assert.Len(t, cache.resourceWatches["b"], 1) c3() - assert.Empty(t, cache.resourceWatches["b"].sotw) + assert.Empty(t, cache.resourceWatches["b"]) // Remove a resource from 2 (was a, c, d) updateReqResources(2, []string{"a", "d"}) @@ -1424,12 +1432,12 @@ func TestLinearSotwNonWildcard(t *testing.T) { checkPendingWatch(4) // Cancel two watches to change resources - assert.Len(t, cache.resourceWatches["c"].sotw, 2) + assert.Len(t, cache.resourceWatches["c"], 2) c2() - assert.Len(t, cache.resourceWatches["c"].sotw, 1) - assert.Len(t, cache.resourceWatches["b"].sotw, 1) + assert.Len(t, cache.resourceWatches["c"], 1) + assert.Len(t, cache.resourceWatches["b"], 1) c3() - assert.Empty(t, cache.resourceWatches["b"].sotw) + assert.Empty(t, cache.resourceWatches["b"]) // Remove a resource from 2 (was a, c, d) updateReqResources(2, []string{"a", "d"}) diff --git a/pkg/cache/v3/resource.go b/pkg/cache/v3/resource.go index c98a3eb14b..2a0e627fc9 100644 --- a/pkg/cache/v3/resource.go +++ b/pkg/cache/v3/resource.go @@ -116,10 +116,10 @@ func ResourceRequiresFullStateInSotw(typeURL resource.Type) bool { } // GetResourceName returns the resource names for a list of valid xDS response types. -func GetResourceNames(resources []types.Resource) []string { +func GetResourceNames(resources []types.ResourceWithTTL) []string { out := make([]string, len(resources)) for i, r := range resources { - out[i] = GetResourceName(r) + out[i] = GetResourceName(r.Resource) } return out } diff --git a/pkg/cache/v3/resource_test.go b/pkg/cache/v3/resource_test.go index 0f311af5c8..5baebf521d 100644 --- a/pkg/cache/v3/resource_test.go +++ b/pkg/cache/v3/resource_test.go @@ -128,17 +128,17 @@ func TestGetResourceName(t *testing.T) { func TestGetResourceNames(t *testing.T) { tests := []struct { name string - input []types.Resource + input []types.ResourceWithTTL want []string }{ { name: "empty", - input: []types.Resource{}, + input: []types.ResourceWithTTL{}, want: []string{}, }, { name: "many", - input: []types.Resource{testRuntime, testListener, testListenerDefault, testVirtualHost}, + input: []types.ResourceWithTTL{{Resource: testRuntime}, {Resource: testListener}, {Resource: testListenerDefault}, {Resource: testVirtualHost}}, want: []string{runtimeName, listenerName, listenerName, virtualHostName}, }, } diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index e043637961..a9456657f6 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -411,7 +411,7 @@ func (cache *snapshotCache) CreateWatch(request *Request, sub Subscription, valu return cache.cancelWatch(nodeID, watchID) } - watch := ResponseWatch{Request: request, Response: value, subscription: sub} + watch := ResponseWatch{Request: request, Response: value, subscription: sub, fullStateResponses: ResourceRequiresFullStateInSotw(request.GetTypeUrl())} snapshot, exists := cache.snapshots[nodeID] if !exists { diff --git a/pkg/cache/v3/status.go b/pkg/cache/v3/status.go index b2a3063c69..1fa216a5ef 100644 --- a/pkg/cache/v3/status.go +++ b/pkg/cache/v3/status.go @@ -15,11 +15,15 @@ package cache import ( + "context" + "encoding/hex" + "hash/fnv" "sort" "sync" "time" core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + "github.com/envoyproxy/go-control-plane/pkg/cache/types" ) // NodeHash computes string identifiers for Envoy nodes. @@ -83,6 +87,33 @@ type statusInfo struct { mu sync.RWMutex } +func computeSotwStableVersion(versionMap map[string]string) string { + // To enforce a stable hash we need to have an ordered vision of the map. + keys := make([]string, 0, len(versionMap)) + for key := range versionMap { + keys = append(keys, key) + } + sort.Strings(keys) + + mapHasher := fnv.New64() + + itemHasher := fnv.New64() + for _, key := range keys { + itemHasher.Reset() + itemHasher.Write([]byte(key)) + mapHasher.Write(itemHasher.Sum(nil)) + itemHasher.Reset() + itemHasher.Write([]byte(versionMap[key])) + mapHasher.Write(itemHasher.Sum(nil)) + } + return hex.EncodeToString(mapHasher.Sum(nil)) +} + +type WatchResponse interface { + GetReturnedResources() map[string]string + GetResponseVersion() string +} + // ResponseWatch is a watch record keeping both the request and an open channel for the response. type ResponseWatch struct { // Request is the original request for the watch. @@ -93,6 +124,42 @@ type ResponseWatch struct { // Subscription stores the current client subscription state. subscription Subscription + + // enableStableVersion indicates whether versions returned in the response are built using stable versions instead of cache update versions. + enableStableVersion bool + + // fullStateResponses requires that all resources matching the request, with no regards to which ones actually updated, must be provided in the response. + fullStateResponses bool +} + +func (w ResponseWatch) isDelta() bool { + return false +} + +func (w ResponseWatch) buildResponse(updatedResources []types.ResourceWithTTL, _ []string, returnedVersions map[string]string, version string) WatchResponse { + return &RawResponse{ + Request: w.Request, + Resources: updatedResources, + ReturnedResources: returnedVersions, + Version: version, + Ctx: context.Background(), + } +} + +func (w ResponseWatch) useStableVersion() bool { + return w.enableStableVersion +} + +func (w ResponseWatch) sendFullStateResponses() bool { + return w.fullStateResponses +} + +func (w ResponseWatch) getSubscription() Subscription { + return w.subscription +} + +func (w ResponseWatch) sendResponse(resp WatchResponse) { + w.Response <- resp.(*RawResponse) } // DeltaResponseWatch is a watch record keeping both the delta request and an open channel for the delta response. @@ -107,6 +174,37 @@ type DeltaResponseWatch struct { subscription Subscription } +func (w DeltaResponseWatch) isDelta() bool { + return true +} + +func (w DeltaResponseWatch) useStableVersion() bool { + return true +} + +func (w DeltaResponseWatch) sendFullStateResponses() bool { + return false +} + +func (w DeltaResponseWatch) getSubscription() Subscription { + return w.subscription +} + +func (w DeltaResponseWatch) buildResponse(updatedResources []types.ResourceWithTTL, removedResources []string, returnedVersions map[string]string, version string) WatchResponse { + return &RawDeltaResponse{ + DeltaRequest: w.Request, + Resources: updatedResources, + RemovedResources: removedResources, + NextVersionMap: returnedVersions, + SystemVersionInfo: version, + Ctx: context.Background(), + } +} + +func (w DeltaResponseWatch) sendResponse(resp WatchResponse) { + w.Response <- resp.(*RawDeltaResponse) +} + // newStatusInfo initializes a status info data structure. func newStatusInfo(node *core.Node) *statusInfo { out := statusInfo{ diff --git a/pkg/server/v3/delta_test.go b/pkg/server/v3/delta_test.go index 711f58361e..3aa83088c7 100644 --- a/pkg/server/v3/delta_test.go +++ b/pkg/server/v3/delta_test.go @@ -33,14 +33,14 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR versionMap[name] = cache.HashResource(marshaledResource) } var nextVersionMap map[string]string - var filtered []types.Resource + var filtered []types.ResourceWithTTL var toRemove []string // If we are handling a wildcard request, we want to respond with all resources switch { case sub.IsWildcard(): if len(sub.ReturnedResources()) == 0 { - filtered = make([]types.Resource, 0, len(resourceMap)) + filtered = make([]types.ResourceWithTTL, 0, len(resourceMap)) } nextVersionMap = make(map[string]string, len(resourceMap)) for name, r := range resourceMap { @@ -50,7 +50,7 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR nextVersionMap[name] = version prevVersion, found := sub.ReturnedResources()[name] if !found || (prevVersion != version) { - filtered = append(filtered, r) + filtered = append(filtered, types.ResourceWithTTL{Resource: r}) } } @@ -69,7 +69,7 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR if r, ok := resourceMap[name]; ok { nextVersion := versionMap[name] if prevVersion != nextVersion { - filtered = append(filtered, r) + filtered = append(filtered, types.ResourceWithTTL{Resource: r}) } nextVersionMap[name] = nextVersion } else if found {