Skip to content

Commit

Permalink
[Linear cache] Increase mutualization between delta and sotw in linea…
Browse files Browse the repository at this point in the history
…r cache to have behavior driven logic instead

Signed-off-by: Valerian Roche <valerian.roche@datadoghq.com>
  • Loading branch information
valerian-roche committed Feb 8, 2024
1 parent 16f4615 commit fb7e1fc
Show file tree
Hide file tree
Showing 10 changed files with 349 additions and 314 deletions.
66 changes: 61 additions & 5 deletions pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,12 @@ type Response interface {
GetRequest() *discovery.DiscoveryRequest

// Get the version in the Response.
// Deprecated: use GetResponseVersion instead
GetVersion() (string, error)

// Get the version in the Response.
GetResponseVersion() string

// GetReturnedResources returns the map of resources and their versions returned in the subscription.
// It may include more resources than directly set in the response to consider the full state of the client.
// The caller is expected to provide this unchanged to the next call to CreateWatch as part of the subscription.
Expand All @@ -132,12 +136,21 @@ type DeltaResponse interface {
GetDeltaRequest() *discovery.DeltaDiscoveryRequest

// Get the version in the DeltaResponse. This field is generally used for debugging purposes as noted by the Envoy documentation.
// Deprecated: use GetVersion instead
GetSystemVersion() (string, error)

// Get the version in the DeltaResponse. This field is generally used for debugging purposes as noted by the Envoy documentation.
GetResponseVersion() string

// Get the version map of the internal cache.
// The version map consists of updated version mappings after this response is applied
// Deprecated: use GetReturnedResources instead
GetNextVersionMap() map[string]string

// Get the version map of the internal cache.
// The version map consists of updated version mappings after this response is applied.
GetReturnedResources() map[string]string

// Get the context provided during response creation
GetContext() context.Context
}
Expand Down Expand Up @@ -183,7 +196,7 @@ type RawDeltaResponse struct {
SystemVersionInfo string

// Resources to be included in the response.
Resources []types.Resource
Resources []types.ResourceWithTTL

// RemovedResources is a list of resource aliases which should be dropped by the consuming client.
RemovedResources []string
Expand Down Expand Up @@ -290,8 +303,8 @@ func (r *RawDeltaResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscover
marshaledResources := make([]*discovery.Resource, len(r.Resources))

for i, resource := range r.Resources {
name := GetResourceName(resource)
marshaledResource, err := MarshalResource(resource)
name := GetResourceName(resource.Resource)
marshaledResource, err := MarshalResource(resource.Resource)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -336,17 +349,34 @@ func (r *RawDeltaResponse) GetDeltaRequest() *discovery.DeltaDiscoveryRequest {
}

// GetVersion returns the response version.
// Deprecated: use GetResponseVersion instead
func (r *RawResponse) GetVersion() (string, error) {
return r.Version, nil
return r.GetResponseVersion(), nil
}

// GetVersion returns the response version.
func (r *RawResponse) GetResponseVersion() string {
return r.Version
}

// GetSystemVersion returns the raw SystemVersion
// Deprecated: use GetResponseVersion instead
func (r *RawDeltaResponse) GetSystemVersion() (string, error) {
return r.SystemVersionInfo, nil
return r.GetResponseVersion(), nil
}

func (r *RawDeltaResponse) GetResponseVersion() string {
return r.SystemVersionInfo
}

// NextVersionMap returns the version map which consists of updated version mappings after this response is applied
// Deprecated: use GetReturnedResources instead
func (r *RawDeltaResponse) GetNextVersionMap() map[string]string {
return r.GetReturnedResources()
}

// GetReturnedResources returns the version map which consists of updated version mappings after this response is applied
func (r *RawDeltaResponse) GetReturnedResources() map[string]string {
return r.NextVersionMap
}

Expand Down Expand Up @@ -403,6 +433,7 @@ func (r *DeltaPassthroughResponse) GetDeltaRequest() *discovery.DeltaDiscoveryRe
}

// GetVersion returns the response version.
// Deprecated: use GetResponseVersion instead
func (r *PassthroughResponse) GetVersion() (string, error) {
discoveryResponse, _ := r.GetDiscoveryResponse()
if discoveryResponse != nil {
Expand All @@ -411,11 +442,21 @@ func (r *PassthroughResponse) GetVersion() (string, error) {
return "", fmt.Errorf("DiscoveryResponse is nil")
}

// GetResponseVersion returns the response version, or empty if not set.
func (r *PassthroughResponse) GetResponseVersion() string {
discoveryResponse, _ := r.GetDiscoveryResponse()
if discoveryResponse != nil {
return discoveryResponse.GetVersionInfo()
}
return ""
}

func (r *PassthroughResponse) GetContext() context.Context {
return r.ctx
}

// GetSystemVersion returns the response version.
// Deprecated: use GetResponseVersion instead
func (r *DeltaPassthroughResponse) GetSystemVersion() (string, error) {
deltaDiscoveryResponse, _ := r.GetDeltaDiscoveryResponse()
if deltaDiscoveryResponse != nil {
Expand All @@ -424,8 +465,23 @@ func (r *DeltaPassthroughResponse) GetSystemVersion() (string, error) {
return "", fmt.Errorf("DeltaDiscoveryResponse is nil")
}

// GetResponseVersion returns the response version, or empty if not set.
func (r *DeltaPassthroughResponse) GetResponseVersion() string {
deltaDiscoveryResponse, _ := r.GetDeltaDiscoveryResponse()
if deltaDiscoveryResponse != nil {
return deltaDiscoveryResponse.GetSystemVersionInfo()
}
return ""
}

// NextVersionMap returns the version map from a DeltaPassthroughResponse
// Deprecated: use GetReturnedResources instead
func (r *DeltaPassthroughResponse) GetNextVersionMap() map[string]string {
return r.GetReturnedResources()
}

// GetReturnedResources returns the version map from a DeltaPassthroughResponse
func (r *DeltaPassthroughResponse) GetReturnedResources() map[string]string {
return r.NextVersionMap
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/cache/v3/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ type resourceContainer struct {
func createDeltaResponse(ctx context.Context, req *DeltaRequest, sub Subscription, resources resourceContainer, cacheVersion string) *RawDeltaResponse {
// variables to build our response with
var nextVersionMap map[string]string
var filtered []types.Resource
var filtered []types.ResourceWithTTL
var toRemove []string

// If we are handling a wildcard request, we want to respond with all resources
switch {
case sub.IsWildcard():
if len(sub.ReturnedResources()) == 0 {
filtered = make([]types.Resource, 0, len(resources.resourceMap))
filtered = make([]types.ResourceWithTTL, 0, len(resources.resourceMap))
}
nextVersionMap = make(map[string]string, len(resources.resourceMap))
for name, r := range resources.resourceMap {
Expand All @@ -46,7 +46,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, sub Subscriptio
nextVersionMap[name] = version
prevVersion, found := sub.ReturnedResources()[name]
if !found || (prevVersion != version) {
filtered = append(filtered, r)
filtered = append(filtered, types.ResourceWithTTL{Resource: r})
}
}

Expand All @@ -66,7 +66,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, sub Subscriptio
if r, ok := resources.resourceMap[name]; ok {
nextVersion := resources.versionMap[name]
if prevVersion != nextVersion {
filtered = append(filtered, r)
filtered = append(filtered, types.ResourceWithTTL{Resource: r})
}
nextVersionMap[name] = nextVersion
} else if found {
Expand Down
10 changes: 5 additions & 5 deletions pkg/cache/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/envoyproxy/go-control-plane/pkg/test/resource/v3"
)

func assertResourceMapEqual(t *testing.T, want, got map[string]types.Resource) {
func assertResourceMapEqual(t *testing.T, want, got map[string]types.ResourceWithTTL) {
t.Helper()

if !cmp.Equal(want, got, protocmp.Transform()) {
Expand Down Expand Up @@ -57,7 +57,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
select {
case out := <-watches[typ]:
snapshot := fixture.snapshot()
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ))
assertResourceMapEqual(t, cache.IndexResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResourcesAndTTL(typ))
sub := subscriptions[typ]
sub.SetReturnedResources(out.GetNextVersionMap())
subscriptions[typ] = sub
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
case out := <-watches[testTypes[0]]:
snapshot2 := fixture.snapshot()
snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{resource.MakeEndpoint(clusterName, 9090)})
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResources(rsrc.EndpointType))
assertResourceMapEqual(t, cache.IndexResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResourcesAndTTL(rsrc.EndpointType))
sub := subscriptions[testTypes[0]]
sub.SetReturnedResources(out.GetNextVersionMap())
subscriptions[testTypes[0]] = sub
Expand Down Expand Up @@ -147,7 +147,7 @@ func TestDeltaRemoveResources(t *testing.T) {
select {
case out := <-watches[typ]:
snapshot := fixture.snapshot()
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ))
assertResourceMapEqual(t, cache.IndexResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResourcesAndTTL(typ))
nextVersionMap := out.GetNextVersionMap()
subscriptions[typ].SetReturnedResources(nextVersionMap)
case <-time.After(time.Second):
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestDeltaRemoveResources(t *testing.T) {
case out := <-watches[testTypes[0]]:
snapshot2 := fixture.snapshot()
snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{})
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResources(rsrc.EndpointType))
assertResourceMapEqual(t, cache.IndexResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResourcesAndTTL(rsrc.EndpointType))
nextVersionMap := out.GetNextVersionMap()

// make sure the version maps are different since we no longer are tracking any endpoint resources
Expand Down
Loading

0 comments on commit fb7e1fc

Please sign in to comment.