Skip to content

Commit

Permalink
[Linear cache] Increase mutualization between delta and sotw in linea…
Browse files Browse the repository at this point in the history
…r cache to have behavior driven logic instead (#14)

Following the addition of stable version support in sotw, the remaining differences of behavior between sotw and delta are:
 - on first request, check the returned version in sotw if activating stable version. Also compute the version differently
 - in some cases, return full state in sotw. In other case don't return if the only change is deletion
 - compute the version differently

In this context, the code can now be mostly merged, apart from the type differences on requests and responses (which I think can be reduced to none soon, but requires more rework on simple side and would be somewhat backward incompatible).
In this PR, the linear cache no longer treats sotw and delta watches differently. They are tracked in the same maps and the changes and responses are computed in a common method. Differences in behavior (e.g. full state or which version to use) is now a functional attribute of the watch itself.
There are two main parts not yet fully merged:
 - changing the version in sotw when using stable versions. The requirement to prepend cache version prefix makes it harder to abstract
 - defining whether a change only removing resources should trigger a response. This could be made a functional flag but given its intersection with `fullStateResponses` seemed fragile

Signed-off-by: Valerian Roche <valerian.roche@datadoghq.com>
  • Loading branch information
valerian-roche authored and shamdor committed Sep 23, 2024
1 parent 0246762 commit cf5f3ef
Show file tree
Hide file tree
Showing 10 changed files with 390 additions and 338 deletions.
118 changes: 94 additions & 24 deletions pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,41 +104,67 @@ type Cache interface {

// Response is a wrapper around Envoy's DiscoveryResponse.
type Response interface {
// Get the Constructed DiscoveryResponse
// GetDiscoveryResponse returns the Constructed DiscoveryResponse.
GetDiscoveryResponse() (*discovery.DiscoveryResponse, error)

// Get the original Request for the Response.
// GetRequest returns the request that created the watch that we're now responding to.
// This is provided to allow the caller to correlate the response with a request.
// Generally this will be the latest request seen on the stream for the specific type.
GetRequest() *discovery.DiscoveryRequest

// Get the version in the Response.
// GetVersion returns the version in the Response.
// The version can be a property of the resources, allowing for optimizations in subsequent calls,
// or simply an internal property of the cache which can be used for debugging.
// The cache implementation should be able to determine if it can provide such optimization.
// Deprecated: use GetResponseVersion instead
GetVersion() (string, error)

// GetResponseVersion returns the version in the Response.
// The version can be a property of the resources, allowing for optimizations in subsequent calls,
// or simply an internal property of the cache which can be used for debugging.
// The cache implementation should be able to determine if it can provide such optimization.
GetResponseVersion() string

// GetReturnedResources returns the map of resources and their versions returned in the subscription.
// It may include more resources than directly set in the response to consider the full state of the client.
// The caller is expected to provide this unchanged to the next call to CreateWatch as part of the subscription.
GetReturnedResources() map[string]string

// Get the context provided during response creation.
// GetContext returns the context provided during response creation.
GetContext() context.Context
}

// DeltaResponse is a wrapper around Envoy's DeltaDiscoveryResponse
// DeltaResponse is a wrapper around Envoy's DeltaDiscoveryResponse.
type DeltaResponse interface {
// Get the constructed DeltaDiscoveryResponse
// GetDeltaDiscoveryResponse returns the constructed DeltaDiscoveryResponse.
GetDeltaDiscoveryResponse() (*discovery.DeltaDiscoveryResponse, error)

// Get the request that created the watch that we're now responding to. This is provided to allow the caller to correlate the
// response with a request. Generally this will be the latest request seen on the stream for the specific type.
// GetDeltaRequest returns the request that created the watch that we're now responding to.
// This is provided to allow the caller to correlate the response with a request.
// Generally this will be the latest request seen on the stream for the specific type.
GetDeltaRequest() *discovery.DeltaDiscoveryRequest

// Get the version in the DeltaResponse. This field is generally used for debugging purposes as noted by the Envoy documentation.
// GetSystemVersion returns the version in the DeltaResponse.
// The version in delta response is not indicative of the resources included,
// but an internal property of the cache which can be used for debugging.
// Deprecated: use GetResponseVersion instead
GetSystemVersion() (string, error)

// Get the version map of the internal cache.
// The version map consists of updated version mappings after this response is applied
// GetResponseVersion returns the version in the DeltaResponse.
// The version in delta response is not indicative of the resources included,
// but an internal property of the cache which can be used for debugging.
GetResponseVersion() string

// GetNextVersionMap provides the version map of the internal cache.
// The version map consists of updated version mappings after this response is applied.
// Deprecated: use GetReturnedResources instead
GetNextVersionMap() map[string]string

// Get the context provided during response creation
// GetReturnedResources provides the version map of the internal cache.
// The version map consists of updated version mappings after this response is applied.
GetReturnedResources() map[string]string

// GetContext returns the context provided during response creation.
GetContext() context.Context
}

Expand Down Expand Up @@ -183,12 +209,12 @@ type RawDeltaResponse struct {
SystemVersionInfo string

// Resources to be included in the response.
Resources []types.Resource
Resources []types.ResourceWithTTL

// RemovedResources is a list of resource aliases which should be dropped by the consuming client.
RemovedResources []string

// NextVersionMap consists of updated version mappings after this response is applied
// NextVersionMap consists of updated version mappings after this response is applied.
NextVersionMap map[string]string

// Context provided at the time of response creation. This allows associating additional
Expand Down Expand Up @@ -290,8 +316,8 @@ func (r *RawDeltaResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscover
marshaledResources := make([]*discovery.Resource, len(r.Resources))

for i, resource := range r.Resources {
name := GetResourceName(resource)
marshaledResource, err := MarshalResource(resource)
name := GetResourceName(resource.Resource)
marshaledResource, err := MarshalResource(resource.Resource)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -330,23 +356,41 @@ func (r *RawResponse) GetContext() context.Context {
return r.Ctx
}

// GetDeltaRequest returns the original DeltaRequest
// GetDeltaRequest returns the original DeltaRequest.
func (r *RawDeltaResponse) GetDeltaRequest() *discovery.DeltaDiscoveryRequest {
return r.DeltaRequest
}

// GetVersion returns the response version.
// Deprecated: use GetResponseVersion instead
func (r *RawResponse) GetVersion() (string, error) {
return r.Version, nil
return r.GetResponseVersion(), nil
}

// GetSystemVersion returns the raw SystemVersion
// GetResponseVersion returns the response version.
func (r *RawResponse) GetResponseVersion() string {
return r.Version
}

// GetSystemVersion returns the raw SystemVersion.
// Deprecated: use GetResponseVersion instead
func (r *RawDeltaResponse) GetSystemVersion() (string, error) {
return r.SystemVersionInfo, nil
return r.GetResponseVersion(), nil
}

// GetResponseVersion returns the response version.
func (r *RawDeltaResponse) GetResponseVersion() string {
return r.SystemVersionInfo
}

// NextVersionMap returns the version map which consists of updated version mappings after this response is applied
// NextVersionMap returns the version map which consists of updated version mappings after this response is applied.
// Deprecated: use GetReturnedResources instead
func (r *RawDeltaResponse) GetNextVersionMap() map[string]string {
return r.GetReturnedResources()
}

// GetReturnedResources returns the version map which consists of updated version mappings after this response is applied.
func (r *RawDeltaResponse) GetReturnedResources() map[string]string {
return r.NextVersionMap
}

Expand Down Expand Up @@ -392,17 +436,18 @@ func (r *DeltaPassthroughResponse) GetDeltaDiscoveryResponse() (*discovery.Delta
return r.DeltaDiscoveryResponse, nil
}

// GetRequest returns the original Discovery Request
// GetRequest returns the original Discovery Request.
func (r *PassthroughResponse) GetRequest() *discovery.DiscoveryRequest {
return r.Request
}

// GetDeltaRequest returns the original Delta Discovery Request
// GetDeltaRequest returns the original Delta Discovery Request.
func (r *DeltaPassthroughResponse) GetDeltaRequest() *discovery.DeltaDiscoveryRequest {
return r.DeltaRequest
}

// GetVersion returns the response version.
// Deprecated: use GetResponseVersion instead
func (r *PassthroughResponse) GetVersion() (string, error) {
discoveryResponse, _ := r.GetDiscoveryResponse()
if discoveryResponse != nil {
Expand All @@ -411,11 +456,21 @@ func (r *PassthroughResponse) GetVersion() (string, error) {
return "", fmt.Errorf("DiscoveryResponse is nil")
}

// GetResponseVersion returns the response version, or empty if not set.
func (r *PassthroughResponse) GetResponseVersion() string {
discoveryResponse, _ := r.GetDiscoveryResponse()
if discoveryResponse != nil {
return discoveryResponse.GetVersionInfo()
}
return ""
}

func (r *PassthroughResponse) GetContext() context.Context {
return r.ctx
}

// GetSystemVersion returns the response version.
// Deprecated: use GetResponseVersion instead
func (r *DeltaPassthroughResponse) GetSystemVersion() (string, error) {
deltaDiscoveryResponse, _ := r.GetDeltaDiscoveryResponse()
if deltaDiscoveryResponse != nil {
Expand All @@ -424,8 +479,23 @@ func (r *DeltaPassthroughResponse) GetSystemVersion() (string, error) {
return "", fmt.Errorf("DeltaDiscoveryResponse is nil")
}

// NextVersionMap returns the version map from a DeltaPassthroughResponse
// GetResponseVersion returns the response version, or empty if not set.
func (r *DeltaPassthroughResponse) GetResponseVersion() string {
deltaDiscoveryResponse, _ := r.GetDeltaDiscoveryResponse()
if deltaDiscoveryResponse != nil {
return deltaDiscoveryResponse.GetSystemVersionInfo()
}
return ""
}

// NextVersionMap returns the version map from a DeltaPassthroughResponse.
// Deprecated: use GetReturnedResources instead
func (r *DeltaPassthroughResponse) GetNextVersionMap() map[string]string {
return r.GetReturnedResources()
}

// GetReturnedResources returns the version map from a DeltaPassthroughResponse.
func (r *DeltaPassthroughResponse) GetReturnedResources() map[string]string {
return r.NextVersionMap
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/cache/v3/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ type resourceContainer struct {
func createDeltaResponse(ctx context.Context, req *DeltaRequest, sub Subscription, resources resourceContainer, cacheVersion string) *RawDeltaResponse {
// variables to build our response with
var nextVersionMap map[string]string
var filtered []types.Resource
var filtered []types.ResourceWithTTL
var toRemove []string

// If we are handling a wildcard request, we want to respond with all resources
switch {
case sub.IsWildcard():
if len(sub.ReturnedResources()) == 0 {
filtered = make([]types.Resource, 0, len(resources.resourceMap))
filtered = make([]types.ResourceWithTTL, 0, len(resources.resourceMap))
}
nextVersionMap = make(map[string]string, len(resources.resourceMap))
for name, r := range resources.resourceMap {
Expand All @@ -46,7 +46,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, sub Subscriptio
nextVersionMap[name] = version
prevVersion, found := sub.ReturnedResources()[name]
if !found || (prevVersion != version) {
filtered = append(filtered, r)
filtered = append(filtered, types.ResourceWithTTL{Resource: r})
}
}

Expand All @@ -66,7 +66,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, sub Subscriptio
if r, ok := resources.resourceMap[name]; ok {
nextVersion := resources.versionMap[name]
if prevVersion != nextVersion {
filtered = append(filtered, r)
filtered = append(filtered, types.ResourceWithTTL{Resource: r})
}
nextVersionMap[name] = nextVersion
} else if found {
Expand Down
10 changes: 5 additions & 5 deletions pkg/cache/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/envoyproxy/go-control-plane/pkg/test/resource/v3"
)

func assertResourceMapEqual(t *testing.T, want, got map[string]types.Resource) {
func assertResourceMapEqual(t *testing.T, want, got map[string]types.ResourceWithTTL) {
t.Helper()

if !cmp.Equal(want, got, protocmp.Transform()) {
Expand Down Expand Up @@ -57,7 +57,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
select {
case out := <-watches[typ]:
snapshot := fixture.snapshot()
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ))
assertResourceMapEqual(t, cache.IndexResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResourcesAndTTL(typ))
sub := subscriptions[typ]
sub.SetReturnedResources(out.GetNextVersionMap())
subscriptions[typ] = sub
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
case out := <-watches[testTypes[0]]:
snapshot2 := fixture.snapshot()
snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{resource.MakeEndpoint(clusterName, 9090)})
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResources(rsrc.EndpointType))
assertResourceMapEqual(t, cache.IndexResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResourcesAndTTL(rsrc.EndpointType))
sub := subscriptions[testTypes[0]]
sub.SetReturnedResources(out.GetNextVersionMap())
subscriptions[testTypes[0]] = sub
Expand Down Expand Up @@ -147,7 +147,7 @@ func TestDeltaRemoveResources(t *testing.T) {
select {
case out := <-watches[typ]:
snapshot := fixture.snapshot()
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ))
assertResourceMapEqual(t, cache.IndexResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResourcesAndTTL(typ))
nextVersionMap := out.GetNextVersionMap()
subscriptions[typ].SetReturnedResources(nextVersionMap)
case <-time.After(time.Second):
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestDeltaRemoveResources(t *testing.T) {
case out := <-watches[testTypes[0]]:
snapshot2 := fixture.snapshot()
snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{})
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResources(rsrc.EndpointType))
assertResourceMapEqual(t, cache.IndexResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResourcesAndTTL(rsrc.EndpointType))
nextVersionMap := out.GetNextVersionMap()

// make sure the version maps are different since we no longer are tracking any endpoint resources
Expand Down
Loading

0 comments on commit cf5f3ef

Please sign in to comment.