Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Linear cache] Increase mutualization between delta and sotw in linear cache to have behavior driven logic instead #14

Merged
merged 2 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 94 additions & 24 deletions pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,41 +104,67 @@ type Cache interface {

// Response is a wrapper around Envoy's DiscoveryResponse.
type Response interface {
// Get the Constructed DiscoveryResponse
// GetDiscoveryResponse returns the Constructed DiscoveryResponse.
GetDiscoveryResponse() (*discovery.DiscoveryResponse, error)

// Get the original Request for the Response.
// GetRequest returns the request that created the watch that we're now responding to.
// This is provided to allow the caller to correlate the response with a request.
// Generally this will be the latest request seen on the stream for the specific type.
GetRequest() *discovery.DiscoveryRequest

// Get the version in the Response.
// GetVersion returns the version in the Response.
// The version can be a property of the resources, allowing for optimizations in subsequent calls,
// or simply an internal property of the cache which can be used for debugging.
// The cache implementation should be able to determine if it can provide such optimization.
// Deprecated: use GetResponseVersion instead
GetVersion() (string, error)

// GetResponseVersion returns the version in the Response.
// The version can be a property of the resources, allowing for optimizations in subsequent calls,
// or simply an internal property of the cache which can be used for debugging.
// The cache implementation should be able to determine if it can provide such optimization.
GetResponseVersion() string

// GetReturnedResources returns the map of resources and their versions returned in the subscription.
// It may include more resources than directly set in the response to consider the full state of the client.
// The caller is expected to provide this unchanged to the next call to CreateWatch as part of the subscription.
GetReturnedResources() map[string]string

// Get the context provided during response creation.
// GetContext returns the context provided during response creation.
GetContext() context.Context
}

// DeltaResponse is a wrapper around Envoy's DeltaDiscoveryResponse
// DeltaResponse is a wrapper around Envoy's DeltaDiscoveryResponse.
type DeltaResponse interface {
// Get the constructed DeltaDiscoveryResponse
// GetDeltaDiscoveryResponse returns the constructed DeltaDiscoveryResponse.
GetDeltaDiscoveryResponse() (*discovery.DeltaDiscoveryResponse, error)

// Get the request that created the watch that we're now responding to. This is provided to allow the caller to correlate the
// response with a request. Generally this will be the latest request seen on the stream for the specific type.
// GetDeltaRequest returns the request that created the watch that we're now responding to.
// This is provided to allow the caller to correlate the response with a request.
// Generally this will be the latest request seen on the stream for the specific type.
GetDeltaRequest() *discovery.DeltaDiscoveryRequest

// Get the version in the DeltaResponse. This field is generally used for debugging purposes as noted by the Envoy documentation.
// GetSystemVersion returns the version in the DeltaResponse.
// The version in delta response is not indicative of the resources included,
// but an internal property of the cache which can be used for debugging.
// Deprecated: use GetResponseVersion instead
GetSystemVersion() (string, error)

// Get the version map of the internal cache.
// The version map consists of updated version mappings after this response is applied
// GetResponseVersion returns the version in the DeltaResponse.
// The version in delta response is not indicative of the resources included,
// but an internal property of the cache which can be used for debugging.
GetResponseVersion() string

// GetNextVersionMap provides the version map of the internal cache.
// The version map consists of updated version mappings after this response is applied.
// Deprecated: use GetReturnedResources instead
GetNextVersionMap() map[string]string

// Get the context provided during response creation
// GetReturnedResources provides the version map of the internal cache.
// The version map consists of updated version mappings after this response is applied.
GetReturnedResources() map[string]string

// GetContext returns the context provided during response creation.
GetContext() context.Context
}

Expand Down Expand Up @@ -183,12 +209,12 @@ type RawDeltaResponse struct {
SystemVersionInfo string

// Resources to be included in the response.
Resources []types.Resource
Resources []types.ResourceWithTTL

// RemovedResources is a list of resource aliases which should be dropped by the consuming client.
RemovedResources []string

// NextVersionMap consists of updated version mappings after this response is applied
// NextVersionMap consists of updated version mappings after this response is applied.
NextVersionMap map[string]string

// Context provided at the time of response creation. This allows associating additional
Expand Down Expand Up @@ -290,8 +316,8 @@ func (r *RawDeltaResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscover
marshaledResources := make([]*discovery.Resource, len(r.Resources))

for i, resource := range r.Resources {
name := GetResourceName(resource)
marshaledResource, err := MarshalResource(resource)
name := GetResourceName(resource.Resource)
marshaledResource, err := MarshalResource(resource.Resource)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -330,23 +356,41 @@ func (r *RawResponse) GetContext() context.Context {
return r.Ctx
}

// GetDeltaRequest returns the original DeltaRequest
// GetDeltaRequest returns the original DeltaRequest.
func (r *RawDeltaResponse) GetDeltaRequest() *discovery.DeltaDiscoveryRequest {
return r.DeltaRequest
}

// GetVersion returns the response version.
// Deprecated: use GetResponseVersion instead
func (r *RawResponse) GetVersion() (string, error) {
return r.Version, nil
return r.GetResponseVersion(), nil
}

// GetSystemVersion returns the raw SystemVersion
// GetResponseVersion returns the response version.
func (r *RawResponse) GetResponseVersion() string {
return r.Version
}

// GetSystemVersion returns the raw SystemVersion.
// Deprecated: use GetResponseVersion instead
func (r *RawDeltaResponse) GetSystemVersion() (string, error) {
return r.SystemVersionInfo, nil
return r.GetResponseVersion(), nil
}

// GetResponseVersion returns the response version.
func (r *RawDeltaResponse) GetResponseVersion() string {
atollena marked this conversation as resolved.
Show resolved Hide resolved
return r.SystemVersionInfo
}

// NextVersionMap returns the version map which consists of updated version mappings after this response is applied
// NextVersionMap returns the version map which consists of updated version mappings after this response is applied.
// Deprecated: use GetReturnedResources instead
func (r *RawDeltaResponse) GetNextVersionMap() map[string]string {
return r.GetReturnedResources()
}

// GetReturnedResources returns the version map which consists of updated version mappings after this response is applied.
func (r *RawDeltaResponse) GetReturnedResources() map[string]string {
return r.NextVersionMap
}

Expand Down Expand Up @@ -392,17 +436,18 @@ func (r *DeltaPassthroughResponse) GetDeltaDiscoveryResponse() (*discovery.Delta
return r.DeltaDiscoveryResponse, nil
}

// GetRequest returns the original Discovery Request
// GetRequest returns the original Discovery Request.
func (r *PassthroughResponse) GetRequest() *discovery.DiscoveryRequest {
return r.Request
}

// GetDeltaRequest returns the original Delta Discovery Request
// GetDeltaRequest returns the original Delta Discovery Request.
func (r *DeltaPassthroughResponse) GetDeltaRequest() *discovery.DeltaDiscoveryRequest {
return r.DeltaRequest
}

// GetVersion returns the response version.
// Deprecated: use GetResponseVersion instead
func (r *PassthroughResponse) GetVersion() (string, error) {
discoveryResponse, _ := r.GetDiscoveryResponse()
if discoveryResponse != nil {
Expand All @@ -411,11 +456,21 @@ func (r *PassthroughResponse) GetVersion() (string, error) {
return "", fmt.Errorf("DiscoveryResponse is nil")
}

// GetResponseVersion returns the response version, or empty if not set.
func (r *PassthroughResponse) GetResponseVersion() string {
discoveryResponse, _ := r.GetDiscoveryResponse()
if discoveryResponse != nil {
return discoveryResponse.GetVersionInfo()
}
return ""
}

func (r *PassthroughResponse) GetContext() context.Context {
return r.ctx
}

// GetSystemVersion returns the response version.
// Deprecated: use GetResponseVersion instead
func (r *DeltaPassthroughResponse) GetSystemVersion() (string, error) {
deltaDiscoveryResponse, _ := r.GetDeltaDiscoveryResponse()
if deltaDiscoveryResponse != nil {
Expand All @@ -424,8 +479,23 @@ func (r *DeltaPassthroughResponse) GetSystemVersion() (string, error) {
return "", fmt.Errorf("DeltaDiscoveryResponse is nil")
}

// NextVersionMap returns the version map from a DeltaPassthroughResponse
// GetResponseVersion returns the response version, or empty if not set.
func (r *DeltaPassthroughResponse) GetResponseVersion() string {
deltaDiscoveryResponse, _ := r.GetDeltaDiscoveryResponse()
if deltaDiscoveryResponse != nil {
return deltaDiscoveryResponse.GetSystemVersionInfo()
}
return ""
}

// NextVersionMap returns the version map from a DeltaPassthroughResponse.
// Deprecated: use GetReturnedResources instead
func (r *DeltaPassthroughResponse) GetNextVersionMap() map[string]string {
return r.GetReturnedResources()
}

// GetReturnedResources returns the version map from a DeltaPassthroughResponse.
func (r *DeltaPassthroughResponse) GetReturnedResources() map[string]string {
return r.NextVersionMap
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/cache/v3/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ type resourceContainer struct {
func createDeltaResponse(ctx context.Context, req *DeltaRequest, sub Subscription, resources resourceContainer, cacheVersion string) *RawDeltaResponse {
// variables to build our response with
var nextVersionMap map[string]string
var filtered []types.Resource
var filtered []types.ResourceWithTTL
var toRemove []string

// If we are handling a wildcard request, we want to respond with all resources
switch {
case sub.IsWildcard():
if len(sub.ReturnedResources()) == 0 {
filtered = make([]types.Resource, 0, len(resources.resourceMap))
filtered = make([]types.ResourceWithTTL, 0, len(resources.resourceMap))
}
nextVersionMap = make(map[string]string, len(resources.resourceMap))
for name, r := range resources.resourceMap {
Expand All @@ -46,7 +46,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, sub Subscriptio
nextVersionMap[name] = version
prevVersion, found := sub.ReturnedResources()[name]
if !found || (prevVersion != version) {
filtered = append(filtered, r)
filtered = append(filtered, types.ResourceWithTTL{Resource: r})
}
}

Expand All @@ -66,7 +66,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, sub Subscriptio
if r, ok := resources.resourceMap[name]; ok {
nextVersion := resources.versionMap[name]
if prevVersion != nextVersion {
filtered = append(filtered, r)
filtered = append(filtered, types.ResourceWithTTL{Resource: r})
}
nextVersionMap[name] = nextVersion
} else if found {
Expand Down
10 changes: 5 additions & 5 deletions pkg/cache/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/envoyproxy/go-control-plane/pkg/test/resource/v3"
)

func assertResourceMapEqual(t *testing.T, want, got map[string]types.Resource) {
func assertResourceMapEqual(t *testing.T, want, got map[string]types.ResourceWithTTL) {
t.Helper()

if !cmp.Equal(want, got, protocmp.Transform()) {
Expand Down Expand Up @@ -57,7 +57,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
select {
case out := <-watches[typ]:
snapshot := fixture.snapshot()
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ))
assertResourceMapEqual(t, cache.IndexResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResourcesAndTTL(typ))
sub := subscriptions[typ]
sub.SetReturnedResources(out.GetNextVersionMap())
subscriptions[typ] = sub
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
case out := <-watches[testTypes[0]]:
snapshot2 := fixture.snapshot()
snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{resource.MakeEndpoint(clusterName, 9090)})
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResources(rsrc.EndpointType))
assertResourceMapEqual(t, cache.IndexResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResourcesAndTTL(rsrc.EndpointType))
sub := subscriptions[testTypes[0]]
sub.SetReturnedResources(out.GetNextVersionMap())
subscriptions[testTypes[0]] = sub
Expand Down Expand Up @@ -147,7 +147,7 @@ func TestDeltaRemoveResources(t *testing.T) {
select {
case out := <-watches[typ]:
snapshot := fixture.snapshot()
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ))
assertResourceMapEqual(t, cache.IndexResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResourcesAndTTL(typ))
nextVersionMap := out.GetNextVersionMap()
subscriptions[typ].SetReturnedResources(nextVersionMap)
case <-time.After(time.Second):
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestDeltaRemoveResources(t *testing.T) {
case out := <-watches[testTypes[0]]:
snapshot2 := fixture.snapshot()
snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{})
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResources(rsrc.EndpointType))
assertResourceMapEqual(t, cache.IndexResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResourcesAndTTL(rsrc.EndpointType))
nextVersionMap := out.GetNextVersionMap()

// make sure the version maps are different since we no longer are tracking any endpoint resources
Expand Down
Loading
Loading