diff --git a/pkg/cache/v3/cache.go b/pkg/cache/v3/cache.go index 35e963343b..be124cb484 100644 --- a/pkg/cache/v3/cache.go +++ b/pkg/cache/v3/cache.go @@ -26,7 +26,6 @@ import ( "google.golang.org/protobuf/types/known/durationpb" "github.com/envoyproxy/go-control-plane/pkg/cache/types" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" ) @@ -37,6 +36,27 @@ type Request = discovery.DiscoveryRequest // DeltaRequest is an alias for the delta discovery request type. type DeltaRequest = discovery.DeltaDiscoveryRequest +// ClientState provides additional data on the client knowledge for the type matching the request +// This allows proper implementation of stateful aspects of the protocol (e.g. returning only some updated resources) +// Though the methods may return mutable parts of the state for performance reasons, +// the cache is expected to consider this state as immutable and thread safe between a watch creation and its cancellation +type ClientState interface { + // GetKnownResources returns the list of resources the clients has ACKed and their associated version. + // The versions are: + // - delta protocol: version of the specific resource set in the response + // - sotw protocol: version of the global response when the resource was last ACKed + GetKnownResources() map[string]string + + // GetSubscribedResources returns the list of resources currently subscribed to by the client for the type. + // For delta it keeps track across requests + // For sotw it is a normalized view of the request resources + GetSubscribedResources() map[string]struct{} + + // IsWildcard returns whether the client has a wildcard watch. + // This considers subtilities related to the current migration of wildcard definition within the protocol. + IsWildcard() bool +} + // ConfigWatcher requests watches for configuration resources by a node, last // applied version identifier, and resource names hint. The watch should send // the responses when they are ready. The watch can be canceled by the @@ -50,7 +70,7 @@ type ConfigWatcher interface { // // Cancel is an optional function to release resources in the producer. If // provided, the consumer may call this function multiple times. - CreateWatch(*Request, stream.StreamState, chan Response) (cancel func()) + CreateWatch(*Request, ClientState, chan Response) (cancel func()) // CreateDeltaWatch returns a new open incremental xDS watch. // @@ -59,7 +79,7 @@ type ConfigWatcher interface { // // Cancel is an optional function to release resources in the producer. If // provided, the consumer may call this function multiple times. - CreateDeltaWatch(*DeltaRequest, stream.StreamState, chan DeltaResponse) (cancel func()) + CreateDeltaWatch(*DeltaRequest, ClientState, chan DeltaResponse) (cancel func()) } // ConfigFetcher fetches configuration resources from cache diff --git a/pkg/cache/v3/delta.go b/pkg/cache/v3/delta.go index deaeeb7ed1..c5adf9fb8b 100644 --- a/pkg/cache/v3/delta.go +++ b/pkg/cache/v3/delta.go @@ -18,7 +18,6 @@ import ( "context" "github.com/envoyproxy/go-control-plane/pkg/cache/types" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) // groups together resource-related arguments for the createDeltaResponse function @@ -28,7 +27,7 @@ type resourceContainer struct { systemVersion string } -func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.StreamState, resources resourceContainer) *RawDeltaResponse { +func createDeltaResponse(ctx context.Context, req *DeltaRequest, state ClientState, resources resourceContainer) *RawDeltaResponse { // variables to build our response with var nextVersionMap map[string]string var filtered []types.Resource @@ -37,7 +36,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St // If we are handling a wildcard request, we want to respond with all resources switch { case state.IsWildcard(): - if len(state.GetResourceVersions()) == 0 { + if len(state.GetKnownResources()) == 0 { filtered = make([]types.Resource, 0, len(resources.resourceMap)) } nextVersionMap = make(map[string]string, len(resources.resourceMap)) @@ -46,7 +45,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St // we can just set it here to be used for comparison later version := resources.versionMap[name] nextVersionMap[name] = version - prevVersion, found := state.GetResourceVersions()[name] + prevVersion, found := state.GetKnownResources()[name] if !found || (prevVersion != version) { filtered = append(filtered, r) } @@ -54,17 +53,17 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St // Compute resources for removal // The resource version can be set to "" here to trigger a removal even if never returned before - for name := range state.GetResourceVersions() { + for name := range state.GetKnownResources() { if _, ok := resources.resourceMap[name]; !ok { toRemove = append(toRemove, name) } } default: - nextVersionMap = make(map[string]string, len(state.GetSubscribedResourceNames())) + nextVersionMap = make(map[string]string, len(state.GetSubscribedResources())) // state.GetResourceVersions() may include resources no longer subscribed // In the current code this gets silently cleaned when updating the version map - for name := range state.GetSubscribedResourceNames() { - prevVersion, found := state.GetResourceVersions()[name] + for name := range state.GetSubscribedResources() { + prevVersion, found := state.GetKnownResources()[name] if r, ok := resources.resourceMap[name]; ok { nextVersion := resources.versionMap[name] if prevVersion != nextVersion { diff --git a/pkg/cache/v3/delta_test.go b/pkg/cache/v3/delta_test.go index 4999cad603..e171abad69 100644 --- a/pkg/cache/v3/delta_test.go +++ b/pkg/cache/v3/delta_test.go @@ -3,12 +3,12 @@ package cache_test import ( "context" "fmt" - "reflect" "testing" "time" "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "google.golang.org/protobuf/testing/protocmp" core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" @@ -35,13 +35,14 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) { // Make our initial request as a wildcard to get all resources and make sure the wildcard requesting works as intended for _, typ := range testTypes { watches[typ] = make(chan cache.DeltaResponse, 1) + state := stream.NewStreamState(true, nil) c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: "node", }, TypeUrl: typ, ResourceNamesSubscribe: names[typ], - }, stream.NewStreamState(true, nil), watches[typ]) + }, &state, watches[typ]) } if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil { @@ -69,7 +70,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) { watches[typ] = make(chan cache.DeltaResponse, 1) state := stream.NewStreamState(false, versionMap[typ]) for resource := range versionMap[typ] { - state.GetSubscribedResourceNames()[resource] = struct{}{} + state.GetSubscribedResources()[resource] = struct{}{} } c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ @@ -77,7 +78,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) { }, TypeUrl: typ, ResourceNamesSubscribe: names[typ], - }, state, watches[typ]) + }, &state, watches[typ]) } if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) { @@ -123,12 +124,10 @@ func TestDeltaRemoveResources(t *testing.T) { Id: "node", }, TypeUrl: typ, - }, *streams[typ], watches[typ]) + }, streams[typ], watches[typ]) } - if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil { - t.Fatal(err) - } + require.NoError(t, c.SetSnapshot(context.Background(), key, fixture.snapshot())) for _, typ := range testTypes { t.Run(typ, func(t *testing.T) { @@ -139,7 +138,7 @@ func TestDeltaRemoveResources(t *testing.T) { nextVersionMap := out.GetNextVersionMap() streams[typ].SetResourceVersions(nextVersionMap) case <-time.After(time.Second): - t.Fatal("failed to receive a snapshot response") + require.Fail(t, "failed to receive a snapshot response") } }) } @@ -152,20 +151,17 @@ func TestDeltaRemoveResources(t *testing.T) { Node: &core.Node{ Id: "node", }, - TypeUrl: typ, - }, *streams[typ], watches[typ]) + TypeUrl: typ, + ResponseNonce: "nonce", + }, streams[typ], watches[typ]) } - if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) { - t.Errorf("watches should be created for the latest version, saw %d watches expected %d", count, len(testTypes)) - } + assert.Equal(t, len(testTypes), c.GetStatusInfo(key).GetNumDeltaWatches(), "watches should be created for the latest version") // set a partially versioned snapshot with no endpoints snapshot2 := fixture.snapshot() snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{}) - if err := c.SetSnapshot(context.Background(), key, snapshot2); err != nil { - t.Fatal(err) - } + require.NoError(t, c.SetSnapshot(context.Background(), key, snapshot2)) // validate response for endpoints select { @@ -176,11 +172,9 @@ func TestDeltaRemoveResources(t *testing.T) { nextVersionMap := out.GetNextVersionMap() // make sure the version maps are different since we no longer are tracking any endpoint resources - if reflect.DeepEqual(streams[testTypes[0]].GetResourceVersions(), nextVersionMap) { - t.Fatalf("versionMap for the endpoint resource type did not change, received: %v, instead of an empty map", nextVersionMap) - } + require.Equal(t, nextVersionMap, streams[testTypes[0]].GetKnownResources(), "versionMap for the endpoint resource type did not change") case <-time.After(time.Second): - t.Fatal("failed to receive snapshot response") + assert.Fail(t, "failed to receive snapshot response") } } @@ -203,13 +197,14 @@ func TestConcurrentSetDeltaWatch(t *testing.T) { t.Fatalf("snapshot failed: %s", err) } } else { + state := stream.NewStreamState(false, make(map[string]string)) cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: id, }, TypeUrl: rsrc.EndpointType, ResourceNamesSubscribe: []string{clusterName}, - }, stream.NewStreamState(false, make(map[string]string)), responses) + }, &state, responses) defer cancel() } @@ -226,14 +221,14 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) { // Create a non-buffered channel that will block sends. watchCh := make(chan cache.DeltaResponse) state := stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{names[rsrc.EndpointType][0]: {}}) + state.SetSubscribedResources(map[string]struct{}{names[rsrc.EndpointType][0]: {}}) c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: key, }, TypeUrl: rsrc.EndpointType, ResourceNamesSubscribe: names[rsrc.EndpointType], - }, state, watchCh) + }, &state, watchCh) // The first time we set the snapshot without consuming from the blocking channel, so this should time out. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) @@ -269,13 +264,14 @@ func TestSnapshotCacheDeltaWatchCancel(t *testing.T) { c := cache.NewSnapshotCache(true, group{}, logger{t: t}) for _, typ := range testTypes { responses := make(chan cache.DeltaResponse, 1) + state := stream.NewStreamState(false, make(map[string]string)) cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: key, }, TypeUrl: typ, ResourceNamesSubscribe: names[typ], - }, stream.NewStreamState(false, make(map[string]string)), responses) + }, &state, responses) // Cancel the watch cancel() diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index ae55105d59..8b4b4b1652 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -24,7 +24,6 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/log" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) type watches = map[chan Response]struct{} @@ -164,11 +163,11 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) { } for id, watch := range cache.deltaWatches { - if !watch.StreamState.WatchesResources(modified) { + if !watch.WatchesResources(modified) { continue } - res := cache.respondDelta(watch.Request, watch.Response, watch.StreamState) + res := cache.respondDelta(watch.Request, watch.Response, watch.clientState) if res != nil { delete(cache.deltaWatches, id) } @@ -176,8 +175,8 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) { } } -func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, state stream.StreamState) *RawDeltaResponse { - resp := createDeltaResponse(context.Background(), request, state, resourceContainer{ +func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, clientState ClientState) *RawDeltaResponse { + resp := createDeltaResponse(context.Background(), request, clientState, resourceContainer{ resourceMap: cache.resources, versionMap: cache.versionMap, systemVersion: cache.getVersion(), @@ -187,7 +186,7 @@ func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaRe if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 { if cache.log != nil { cache.log.Debugf("[linear cache] node: %s, sending delta response with resources: %v removed resources %v wildcard: %t", - request.GetNode().GetId(), resp.Resources, resp.RemovedResources, state.IsWildcard()) + request.GetNode().GetId(), resp.Resources, resp.RemovedResources, clientState.IsWildcard()) } value <- resp return resp @@ -298,7 +297,7 @@ func (cache *LinearCache) GetResources() map[string]types.Resource { return resources } -func (cache *LinearCache) CreateWatch(request *Request, streamState stream.StreamState, value chan Response) func() { +func (cache *LinearCache) CreateWatch(request *Request, clientState ClientState, value chan Response) func() { if request.TypeUrl != cache.typeURL { value <- nil return nil @@ -371,7 +370,7 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea } } -func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() { +func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, clientState ClientState, value chan DeltaResponse) func() { cache.mu.Lock() defer cache.mu.Unlock() @@ -388,7 +387,7 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.S cache.log.Errorf("failed to update version map: %v", err) } } - response := cache.respondDelta(request, value, state) + response := cache.respondDelta(request, value, clientState) // if respondDelta returns nil this means that there is no change in any resource version // create a new watch accordingly @@ -396,10 +395,10 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.S watchID := cache.nextDeltaWatchID() if cache.log != nil { cache.log.Infof("[linear cache] open delta watch ID:%d for %s Resources:%v, system version %q", watchID, - cache.typeURL, state.GetSubscribedResourceNames(), cache.getVersion()) + cache.typeURL, clientState.GetSubscribedResources(), cache.getVersion()) } - cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, StreamState: state} + cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, clientState: clientState} return cache.cancelDeltaWatch(watchID) } diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index 617d90366e..8f8db7fe04 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -191,19 +191,19 @@ func hashResource(t *testing.T, resource types.Resource) string { func createWildcardDeltaWatch(c *LinearCache, w chan DeltaResponse) { state := stream.NewStreamState(true, nil) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) resp := <-w state.SetResourceVersions(resp.GetNextVersionMap()) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) // Ensure the watch is set properly with cache values + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) // Ensure the watch is set properly with cache values } func TestLinearInitialResources(t *testing.T) { streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType}, streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType}, &streamState, w) verifyResponse(t, w, "0", 1) - c.CreateWatch(&Request{TypeUrl: testType}, streamState, w) + c.CreateWatch(&Request{TypeUrl: testType}, &streamState, w) verifyResponse(t, w, "0", 2) checkVersionMapNotSet(t, c) } @@ -217,7 +217,7 @@ func TestLinearCornerCases(t *testing.T) { } // create an incorrect type URL request w := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: "test"}, streamState, w) + c.CreateWatch(&Request{TypeUrl: "test"}, &streamState, w) select { case r := <-w: if r != nil { @@ -234,12 +234,12 @@ func TestLinearBasic(t *testing.T) { // Create watches before a resource is ready w1 := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w1) mustBlock(t, w1) checkVersionMapNotSet(t, c) w := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &streamState, w) mustBlock(t, w) checkWatchCount(t, c, "a", 2) checkWatchCount(t, c, "b", 1) @@ -250,19 +250,19 @@ func TestLinearBasic(t *testing.T) { verifyResponse(t, w, "1", 1) // Request again, should get same response - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w) checkWatchCount(t, c, "a", 0) verifyResponse(t, w, "1", 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &streamState, w) checkWatchCount(t, c, "a", 0) verifyResponse(t, w, "1", 1) // Add another element and update the first, response should be different require.NoError(t, c.UpdateResource("b", testResource("b"))) require.NoError(t, c.UpdateResource("a", testResource("aa"))) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w) verifyResponse(t, w, "3", 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &streamState, w) verifyResponse(t, w, "3", 2) // Ensure the version map was not created as we only ever used stow watches checkVersionMapNotSet(t, c) @@ -274,10 +274,10 @@ func TestLinearSetResources(t *testing.T) { // Create new resources w1 := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w1) mustBlock(t, w1) w2 := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w2) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &streamState, w2) mustBlock(t, w2) c.SetResources(map[string]types.Resource{ "a": testResource("a"), @@ -287,9 +287,9 @@ func TestLinearSetResources(t *testing.T) { verifyResponse(t, w2, "1", 2) // the version was only incremented once for all resources // Add another element and update the first, response should be different - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, &streamState, w1) mustBlock(t, w1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w2) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, &streamState, w2) mustBlock(t, w2) c.SetResources(map[string]types.Resource{ "a": testResource("aa"), @@ -300,9 +300,9 @@ func TestLinearSetResources(t *testing.T) { verifyResponse(t, w2, "2", 3) // Delete resource - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"}, streamState, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"}, &streamState, w1) mustBlock(t, w1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"}, streamState, w2) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"}, &streamState, w2) mustBlock(t, w2) c.SetResources(map[string]types.Resource{ "b": testResource("b"), @@ -334,14 +334,14 @@ func TestLinearVersionPrefix(t *testing.T) { c := NewLinearCache(testType, WithVersionPrefix("instance1-")) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w) verifyResponse(t, w, "instance1-0", 0) require.NoError(t, c.UpdateResource("a", testResource("a"))) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w) verifyResponse(t, w, "instance1-1", 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, &streamState, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1) } @@ -350,17 +350,17 @@ func TestLinearDeletion(t *testing.T) { streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1) require.NoError(t, c.DeleteResource("a")) verifyResponse(t, w, "1", 0) checkWatchCount(t, c, "a", 0) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &streamState, w) verifyResponse(t, w, "1", 1) checkWatchCount(t, c, "b", 0) require.NoError(t, c.DeleteResource("b")) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, &streamState, w) verifyResponse(t, w, "2", 0) checkWatchCount(t, c, "b", 0) } @@ -369,10 +369,10 @@ func TestLinearWatchTwo(t *testing.T) { streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w) mustBlock(t, w) w1 := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w1) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &streamState, w1) mustBlock(t, w1) require.NoError(t, c.UpdateResource("a", testResource("aa"))) // should only get the modified resource @@ -387,14 +387,14 @@ func TestLinearCancel(t *testing.T) { // cancel watch-all w := make(chan Response, 1) - cancel := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w) + cancel := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, &streamState, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1) cancel() checkWatchCount(t, c, "a", 0) // cancel watch for "a" - cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w) + cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, &streamState, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1) cancel() @@ -404,10 +404,10 @@ func TestLinearCancel(t *testing.T) { w2 := make(chan Response, 1) w3 := make(chan Response, 1) w4 := make(chan Response, 1) - cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w) - cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w2) - cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w3) - cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w4) + cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, &streamState, w) + cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}, &streamState, w2) + cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, &streamState, w3) + cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, &streamState, w4) mustBlock(t, w) mustBlock(t, w2) mustBlock(t, w3) @@ -449,7 +449,7 @@ func TestLinearConcurrentSetWatch(t *testing.T) { ResourceNames: []string{id, id2}, VersionInfo: "0", TypeUrl: testType, - }, streamState, value) + }, &streamState, value) // wait until all updates apply verifyResponse(t, value, "", 1) } @@ -462,11 +462,11 @@ func TestLinearDeltaWildcard(t *testing.T) { c := NewLinearCache(testType) state1 := stream.NewStreamState(true, map[string]string{}) w1 := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state1, w1) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state1, w1) mustBlockDelta(t, w1) state2 := stream.NewStreamState(true, map[string]string{}) w2 := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state2, w2) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state2, w2) mustBlockDelta(t, w1) checkDeltaWatchCount(t, c, 2) @@ -491,16 +491,16 @@ func TestLinearDeltaExistingResources(t *testing.T) { assert.NoError(t, err) state := stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{"b": {}, "c": {}}) // watching b and c - not interested in a + state.SetSubscribedResources(map[string]struct{}{"b": {}, "c": {}}) // watching b and c - not interested in a w := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}}, []string{}) state = stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) w = make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) } @@ -517,16 +517,16 @@ func TestLinearDeltaInitialResourcesVersionSet(t *testing.T) { assert.NoError(t, err) state := stream.NewStreamState(false, map[string]string{"b": hashB}) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) w := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, nil) // b is up to date and shouldn't be returned state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB}) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) w = make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{{Priority: 10}}} // new version of b @@ -551,17 +551,17 @@ func TestLinearDeltaResourceUpdate(t *testing.T) { checkVersionMapNotSet(t, c) state := stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) w := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) checkVersionMapSet(t, c) state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB}) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) w = make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) @@ -587,16 +587,16 @@ func TestLinearDeltaResourceDelete(t *testing.T) { assert.NoError(t, err) state := stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) w := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB}) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) w = make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) @@ -612,13 +612,13 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { c := NewLinearCache(testType) state := stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) w := make(chan DeltaResponse, 1) checkVersionMapNotSet(t, c) assert.Equal(t, 0, c.NumResources()) // Initial update - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) // The version map should now be created, even if empty @@ -636,7 +636,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { state.SetResourceVersions(resp.GetNextVersionMap()) // Multiple updates - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update @@ -656,7 +656,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { state.SetResourceVersions(resp.GetNextVersionMap()) // Update/add/delete - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update @@ -675,7 +675,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { state.SetResourceVersions(resp.GetNextVersionMap()) // Re-add previously deleted watched resource - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{}} // recreate watched resource @@ -738,7 +738,7 @@ func TestLinearMixedWatches(t *testing.T) { sotwState := stream.NewStreamState(false, nil) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, sotwState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, &sotwState, w) mustBlock(t, w) checkVersionMapNotSet(t, c) @@ -752,16 +752,16 @@ func TestLinearMixedWatches(t *testing.T) { verifyResponse(t, w, c.getVersion(), 1) checkVersionMapNotSet(t, c) - c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, sotwState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, &sotwState, w) mustBlock(t, w) checkVersionMapNotSet(t, c) deltaState := stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB}) - deltaState.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + deltaState.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) wd := make(chan DeltaResponse, 1) // Initial update - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, deltaState, wd) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &deltaState, wd) mustBlockDelta(t, wd) checkDeltaWatchCount(t, c, 1) checkVersionMapSet(t, c) diff --git a/pkg/cache/v3/mux.go b/pkg/cache/v3/mux.go index db5a65d0a7..5b5db4b489 100644 --- a/pkg/cache/v3/mux.go +++ b/pkg/cache/v3/mux.go @@ -17,8 +17,6 @@ package cache import ( "context" "errors" - - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) // MuxCache multiplexes across several caches using a classification function. @@ -37,7 +35,7 @@ type MuxCache struct { var _ Cache = &MuxCache{} -func (mux *MuxCache) CreateWatch(request *Request, state stream.StreamState, value chan Response) func() { +func (mux *MuxCache) CreateWatch(request *Request, state ClientState, value chan Response) func() { key := mux.Classify(request) cache, exists := mux.Caches[key] if !exists { @@ -47,7 +45,7 @@ func (mux *MuxCache) CreateWatch(request *Request, state stream.StreamState, val return cache.CreateWatch(request, state, value) } -func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() { +func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, state ClientState, value chan DeltaResponse) func() { key := mux.ClassifyDelta(request) cache, exists := mux.Caches[key] if !exists { diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index 5fe36c5b9e..bb2c5a313a 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -23,7 +23,6 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/log" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) // ResourceSnapshot is an abstract snapshot of a collection of resources that @@ -265,7 +264,7 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh snapshot, watch.Request, watch.Response, - watch.StreamState, + watch.clientState, ) if err != nil { return err @@ -322,7 +321,7 @@ func superset(names map[string]bool, resources map[string]types.ResourceWithTTL) } // CreateWatch returns a watch for an xDS request. -func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.StreamState, value chan Response) func() { +func (cache *snapshotCache) CreateWatch(request *Request, clientState ClientState, value chan Response) func() { nodeID := cache.hash.ID(request.Node) cache.mu.Lock() @@ -347,7 +346,7 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str } if exists { - knownResourceNames := streamState.GetKnownResourceNames(request.TypeUrl) + knownResourceNames := clientState.GetKnownResources() diff := []string{} for _, r := range request.ResourceNames { if _, ok := knownResourceNames[r]; !ok { @@ -461,7 +460,7 @@ func createResponse(ctx context.Context, request *Request, resources map[string] } // CreateDeltaWatch returns a watch for a delta xDS request which implements the Simple SnapshotCache. -func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() { +func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, clientState ClientState, value chan DeltaResponse) func() { nodeID := cache.hash.ID(request.Node) t := request.GetTypeUrl() @@ -490,7 +489,7 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream if err != nil { cache.log.Errorf("failed to compute version for snapshot resources inline: %s", err) } - response, err := cache.respondDelta(context.Background(), snapshot, request, value, state) + response, err := cache.respondDelta(context.Background(), snapshot, request, value, clientState) if err != nil { cache.log.Errorf("failed to respond with delta response: %s", err) } @@ -502,12 +501,12 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream watchID := cache.nextDeltaWatchID() if exists { - cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, version %q", watchID, t, state.GetSubscribedResourceNames(), nodeID, snapshot.GetVersion(t)) + cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, version %q", watchID, t, clientState.GetSubscribedResources(), nodeID, snapshot.GetVersion(t)) } else { - cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q", watchID, t, state.GetSubscribedResourceNames(), nodeID) + cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q", watchID, t, clientState.GetSubscribedResources(), nodeID) } - info.setDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, StreamState: state}) + info.setDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, clientState: clientState}) return cache.cancelDeltaWatch(nodeID, watchID) } @@ -515,20 +514,20 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream } // Respond to a delta watch with the provided snapshot value. If the response is nil, there has been no state change. -func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceSnapshot, request *DeltaRequest, value chan DeltaResponse, state stream.StreamState) (*RawDeltaResponse, error) { - resp := createDeltaResponse(ctx, request, state, resourceContainer{ +func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceSnapshot, request *DeltaRequest, value chan DeltaResponse, clientState ClientState) (*RawDeltaResponse, error) { + resp := createDeltaResponse(ctx, request, clientState, resourceContainer{ resourceMap: snapshot.GetResources(request.TypeUrl), versionMap: snapshot.GetVersionMap(request.TypeUrl), systemVersion: snapshot.GetVersion(request.TypeUrl), }) // Only send a response if there were changes - // We want to respond immediately for the first wildcard request in a stream, even if the response is empty + // We want to respond immediately for the first request in a stream if it is wildcard, even if the response is empty // otherwise, envoy won't complete initialization - if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 || (state.IsWildcard() && state.IsFirst()) { + if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 || (clientState.IsWildcard() && request.ResponseNonce == "") { if cache.log != nil { cache.log.Debugf("node: %s, sending delta response with resources: %v removed resources %v wildcard: %t", - request.GetNode().GetId(), resp.Resources, resp.RemovedResources, state.IsWildcard()) + request.GetNode().GetId(), resp.Resources, resp.RemovedResources, clientState.IsWildcard()) } select { case value <- resp: diff --git a/pkg/cache/v3/simple_test.go b/pkg/cache/v3/simple_test.go index d5e1f92711..1f5a626239 100644 --- a/pkg/cache/v3/simple_test.go +++ b/pkg/cache/v3/simple_test.go @@ -125,7 +125,7 @@ func TestSnapshotCacheWithTTL(t *testing.T) { t.Run(typ, func(t *testing.T) { defer wg.Done() value := make(chan cache.Response, 1) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, streamState, value) + c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, &streamState, value) select { case out := <-value: if gotVersion, _ := out.GetVersion(); gotVersion != fixture.version { @@ -135,7 +135,9 @@ func TestSnapshotCacheWithTTL(t *testing.T) { t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTTL.GetResourcesAndTTL(typ)) } // Update streamState - streamState.SetKnownResourceNamesAsList(typ, out.GetRequest().GetResourceNames()) + for _, resource := range out.GetRequest().GetResourceNames() { + streamState.GetKnownResources()[resource] = fixture.version + } case <-time.After(2 * time.Second): t.Errorf("failed to receive snapshot response") } @@ -155,7 +157,7 @@ func TestSnapshotCacheWithTTL(t *testing.T) { for { value := make(chan cache.Response, 1) cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: fixture.version}, - streamState, value) + &streamState, value) select { case out := <-value: @@ -172,7 +174,9 @@ func TestSnapshotCacheWithTTL(t *testing.T) { updatesByType[typ]++ - streamState.SetKnownResourceNamesAsList(typ, out.GetRequest().ResourceNames) + for _, resource := range out.GetRequest().GetResourceNames() { + streamState.GetKnownResources()[resource] = fixture.version + } case <-end: cancel() return @@ -216,7 +220,7 @@ func TestSnapshotCache(t *testing.T) { value := make(chan cache.Response, 1) streamState := stream.NewStreamState(false, map[string]string{}) c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{"none"}}, - streamState, value) + &streamState, value) select { case out := <-value: t.Errorf("watch for endpoints and mismatched names => got %v, want none", out) @@ -228,7 +232,7 @@ func TestSnapshotCache(t *testing.T) { value := make(chan cache.Response, 1) streamState := stream.NewStreamState(false, map[string]string{}) c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, - streamState, value) + &streamState, value) select { case out := <-value: snapshot := fixture.snapshot() @@ -282,7 +286,7 @@ func TestSnapshotCacheWatch(t *testing.T) { streamState := stream.NewStreamState(false, map[string]string{}) for _, typ := range testTypes { watches[typ] = make(chan cache.Response, 1) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, streamState, watches[typ]) + c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, &streamState, watches[typ]) } if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil { t.Fatal(err) @@ -298,7 +302,9 @@ func TestSnapshotCacheWatch(t *testing.T) { if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResourcesAndTTL(typ)) { t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResourcesAndTTL(typ)) } - streamState.SetKnownResourceNamesAsList(typ, out.GetRequest().GetResourceNames()) + for _, resource := range out.GetRequest().GetResourceNames() { + streamState.GetKnownResources()[resource] = fixture.version + } case <-time.After(time.Second): t.Fatal("failed to receive snapshot response") } @@ -309,7 +315,7 @@ func TestSnapshotCacheWatch(t *testing.T) { for _, typ := range testTypes { watches[typ] = make(chan cache.Response, 1) c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: fixture.version}, - streamState, watches[typ]) + &streamState, watches[typ]) } if count := c.GetStatusInfo(key).GetNumWatches(); count != len(testTypes) { t.Errorf("watches should be created for the latest version: %d", count) @@ -357,7 +363,7 @@ func TestConcurrentSetWatch(t *testing.T) { cancel := c.CreateWatch(&discovery.DiscoveryRequest{ Node: &core.Node{Id: id}, TypeUrl: rsrc.EndpointType, - }, streamState, value) + }, &streamState, value) defer cancel() } @@ -370,7 +376,7 @@ func TestSnapshotCacheWatchCancel(t *testing.T) { streamState := stream.NewStreamState(false, map[string]string{}) for _, typ := range testTypes { value := make(chan cache.Response, 1) - cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, streamState, value) + cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, &streamState, value) cancel() } // should be status info for the node @@ -396,7 +402,7 @@ func TestSnapshotCacheWatchTimeout(t *testing.T) { watchCh := make(chan cache.Response) streamState := stream.NewStreamState(false, map[string]string{}) c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: names[rsrc.EndpointType]}, - streamState, watchCh) + &streamState, watchCh) // The first time we set the snapshot without consuming from the blocking channel, so this should time out. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) @@ -450,8 +456,9 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { // Request resource with name=ClusterName go func() { + state := stream.NewStreamState(false, map[string]string{}) c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{clusterName}}, - stream.NewStreamState(false, map[string]string{}), watch) + &state, watch) }() select { @@ -470,9 +477,9 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { // Request additional resource with name=clusterName2 for same version go func() { state := stream.NewStreamState(false, map[string]string{}) - state.SetKnownResourceNames(rsrc.EndpointType, map[string]struct{}{clusterName: {}}) + state.SetResourceVersions(map[string]string{clusterName: fixture.version}) c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version, - ResourceNames: []string{clusterName, clusterName2}}, state, watch) + ResourceNames: []string{clusterName, clusterName2}}, &state, watch) }() select { @@ -489,9 +496,9 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { // Repeat request for with same version and make sure a watch is created state := stream.NewStreamState(false, map[string]string{}) - state.SetKnownResourceNames(rsrc.EndpointType, map[string]struct{}{clusterName: {}, clusterName2: {}}) + state.SetResourceVersions(map[string]string{clusterName: fixture.version, clusterName2: fixture.version}) if cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version, - ResourceNames: []string{clusterName, clusterName2}}, state, watch); cancel == nil { + ResourceNames: []string{clusterName, clusterName2}}, &state, watch); cancel == nil { t.Fatal("Should create a watch") } else { cancel() @@ -617,7 +624,7 @@ func TestAvertPanicForWatchOnNonExistentSnapshot(t *testing.T) { } ss := stream.NewStreamState(false, map[string]string{"cluster": "abcdef"}) responder := make(chan cache.Response) - c.CreateWatch(req, ss, responder) + c.CreateWatch(req, &ss, responder) go func() { // Wait for at least one heartbeat to occur, then set snapshot. diff --git a/pkg/cache/v3/status.go b/pkg/cache/v3/status.go index 84db1f9821..43c6d109db 100644 --- a/pkg/cache/v3/status.go +++ b/pkg/cache/v3/status.go @@ -19,7 +19,6 @@ import ( "time" core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) // NodeHash computes string identifiers for Envoy nodes. @@ -99,7 +98,23 @@ type DeltaResponseWatch struct { Response chan DeltaResponse // VersionMap for the stream - StreamState stream.StreamState + clientState ClientState +} + +// WatchesResources returns whether at least one of the resource provided is currently watch by the stream +// It is currently only applicable to delta-xds +// If the request is wildcard, it will always return true +// Otherwise it will compare the provided resources to the list of resources currently subscribed +func (w *DeltaResponseWatch) WatchesResources(resourceNames map[string]struct{}) bool { + if w.clientState.IsWildcard() { + return true + } + for resourceName := range resourceNames { + if _, ok := w.clientState.GetSubscribedResources()[resourceName]; ok { + return true + } + } + return false } // newStatusInfo initializes a status info data structure. diff --git a/pkg/server/delta/v3/server.go b/pkg/server/delta/v3/server.go index 5f10266aba..e556e54a74 100644 --- a/pkg/server/delta/v3/server.go +++ b/pkg/server/delta/v3/server.go @@ -174,7 +174,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state) watch.responses = make(chan cache.DeltaResponse, 1) - watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watch.responses) + watch.cancel = s.cache.CreateDeltaWatch(req, &watch.state, watch.responses) watches.deltaWatches[typeURL] = watch go func() { @@ -216,7 +216,7 @@ func (s *server) DeltaStreamHandler(str stream.DeltaStream, typeURL string) erro // When we subscribe, we just want to make the cache know we are subscribing to a resource. // Even if the stream is wildcard, we keep the list of explicitly subscribed resources as the wildcard subscription can be discarded later on. func (s *server) subscribe(resources []string, streamState *stream.StreamState) { - sv := streamState.GetSubscribedResourceNames() + sv := streamState.GetSubscribedResources() for _, resource := range resources { if resource == "*" { streamState.SetWildcard(true) @@ -229,7 +229,7 @@ func (s *server) subscribe(resources []string, streamState *stream.StreamState) // Unsubscriptions remove resources from the stream's subscribed resource list. // If a client explicitly unsubscribes from a wildcard request, the stream is updated and now watches only subscribed resources. func (s *server) unsubscribe(resources []string, streamState *stream.StreamState) { - sv := streamState.GetSubscribedResourceNames() + sv := streamState.GetSubscribedResources() for _, resource := range resources { if resource == "*" { streamState.SetWildcard(false) @@ -244,7 +244,7 @@ func (s *server) unsubscribe(resources []string, streamState *stream.StreamState // To achieve that, we mark the resource as having been returned with an empty version. While creating the response, the cache will either: // * detect the version change, and return the resource (as an update) // * detect the resource deletion, and set it as removed in the response - streamState.GetResourceVersions()[resource] = "" + streamState.GetKnownResources()[resource] = "" } delete(sv, resource) } diff --git a/pkg/server/sotw/v3/server.go b/pkg/server/sotw/v3/server.go index 91681237c9..fcde7d89b2 100644 --- a/pkg/server/sotw/v3/server.go +++ b/pkg/server/sotw/v3/server.go @@ -69,7 +69,7 @@ type server struct { // regardless current snapshot version (even if it is not changed yet) type lastDiscoveryResponse struct { nonce string - resources map[string]struct{} + resources map[string]string } // process handles a bi-di stream request @@ -81,7 +81,7 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq // ignores stale nonces. nonce is only modified within send() function. var streamNonce int64 - streamState := stream.NewStreamState(false, map[string]string{}) + streamStates := map[string]stream.StreamState{} lastDiscoveryResponses := map[string]lastDiscoveryResponse{} // a collection of stack allocated watches per request type @@ -112,12 +112,17 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq streamNonce = streamNonce + 1 out.Nonce = strconv.FormatInt(streamNonce, 10) + version, err := resp.GetVersion() + if err != nil { + return "", err + } + lastResponse := lastDiscoveryResponse{ nonce: out.Nonce, - resources: make(map[string]struct{}), + resources: make(map[string]string), } for _, r := range resp.GetRequest().ResourceNames { - lastResponse.resources[r] = struct{}{} + lastResponse.resources[r] = version } lastDiscoveryResponses[resp.GetRequest().TypeUrl] = lastResponse @@ -183,14 +188,15 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq } } + typeURL := req.GetTypeUrl() + state := streamStates[typeURL] if lastResponse, ok := lastDiscoveryResponses[req.TypeUrl]; ok { if lastResponse.nonce == "" || lastResponse.nonce == nonce { // Let's record Resource names that a client has received. - streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources) + state.SetResourceVersions(lastResponse.resources) } } - typeURL := req.GetTypeUrl() responder := make(chan cache.Response, 1) if w, ok := watches.responders[typeURL]; ok { // We've found a pre-existing watch, lets check and update if needed. @@ -199,7 +205,7 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq w.close() watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, streamState, responder), + cancel: s.cache.CreateWatch(req, &state, responder), response: responder, }) } @@ -207,11 +213,13 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq // No pre-existing watch exists, let's create one. // We need to precompute the watches first then open a watch in the cache. watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, streamState, responder), + cancel: s.cache.CreateWatch(req, &state, responder), response: responder, }) } + streamStates[typeURL] = state + // Recompute the dynamic select cases for this stream. watches.recompute(s.ctx, reqCh) default: diff --git a/pkg/server/stream/v3/stream.go b/pkg/server/stream/v3/stream.go index b5832b7d58..2147dc46e0 100644 --- a/pkg/server/stream/v3/stream.go +++ b/pkg/server/stream/v3/stream.go @@ -33,57 +33,30 @@ type StreamState struct { // nolint:golint,revive // ResourceVersions contains a hash of the resource as the value and the resource name as the key. // This field stores the last state sent to the client. resourceVersions map[string]string - - // knownResourceNames contains resource names that a client has received previously - knownResourceNames map[string]map[string]struct{} - - // indicates whether the object has been modified since its creation - first bool } // GetSubscribedResourceNames returns the list of resources currently explicitly subscribed to // If the request is set to wildcard it may be empty // Currently populated only when using delta-xds -func (s *StreamState) GetSubscribedResourceNames() map[string]struct{} { +func (s *StreamState) GetSubscribedResources() map[string]struct{} { return s.subscribedResourceNames } // SetSubscribedResourceNames is setting the list of resources currently explicitly subscribed to // It is decorrelated from the wildcard state of the stream // Currently used only when using delta-xds -func (s *StreamState) SetSubscribedResourceNames(subscribedResourceNames map[string]struct{}) { +func (s *StreamState) SetSubscribedResources(subscribedResourceNames map[string]struct{}) { s.subscribedResourceNames = subscribedResourceNames } -// WatchesResources returns whether at least one of the resource provided is currently watch by the stream -// It is currently only applicable to delta-xds -// If the request is wildcard, it will always return true -// Otherwise it will compare the provided resources to the list of resources currently subscribed -func (s *StreamState) WatchesResources(resourceNames map[string]struct{}) bool { - if s.IsWildcard() { - return true - } - for resourceName := range resourceNames { - if _, ok := s.subscribedResourceNames[resourceName]; ok { - return true - } - } - return false -} - -func (s *StreamState) GetResourceVersions() map[string]string { +func (s *StreamState) GetKnownResources() map[string]string { return s.resourceVersions } func (s *StreamState) SetResourceVersions(resourceVersions map[string]string) { - s.first = false s.resourceVersions = resourceVersions } -func (s *StreamState) IsFirst() bool { - return s.first -} - func (s *StreamState) SetWildcard(wildcard bool) { s.wildcard = wildcard } @@ -92,30 +65,12 @@ func (s *StreamState) IsWildcard() bool { return s.wildcard } -func (s *StreamState) SetKnownResourceNames(url string, names map[string]struct{}) { - s.knownResourceNames[url] = names -} - -func (s *StreamState) SetKnownResourceNamesAsList(url string, names []string) { - m := map[string]struct{}{} - for _, name := range names { - m[name] = struct{}{} - } - s.knownResourceNames[url] = m -} - -func (s *StreamState) GetKnownResourceNames(url string) map[string]struct{} { - return s.knownResourceNames[url] -} - // NewStreamState initializes a stream state. func NewStreamState(wildcard bool, initialResourceVersions map[string]string) StreamState { state := StreamState{ wildcard: wildcard, subscribedResourceNames: map[string]struct{}{}, resourceVersions: initialResourceVersions, - first: true, - knownResourceNames: map[string]map[string]struct{}{}, } if initialResourceVersions == nil { diff --git a/pkg/server/v3/delta_test.go b/pkg/server/v3/delta_test.go index 204599613c..66ddc5bd1a 100644 --- a/pkg/server/v3/delta_test.go +++ b/pkg/server/v3/delta_test.go @@ -15,12 +15,11 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" "github.com/envoyproxy/go-control-plane/pkg/server/v3" "github.com/envoyproxy/go-control-plane/pkg/test/resource/v3" ) -func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryRequest, state stream.StreamState, out chan cache.DeltaResponse) func() { +func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryRequest, state cache.ClientState, out chan cache.DeltaResponse) func() { config.deltaCounts[req.TypeUrl] = config.deltaCounts[req.TypeUrl] + 1 // This is duplicated from pkg/cache/v3/delta.go as private there @@ -37,7 +36,7 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR // If we are handling a wildcard request, we want to respond with all resources switch { case state.IsWildcard(): - if len(state.GetResourceVersions()) == 0 { + if len(state.GetKnownResources()) == 0 { filtered = make([]types.Resource, 0, len(resourceMap)) } nextVersionMap = make(map[string]string, len(resourceMap)) @@ -46,24 +45,24 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR // we can just set it here to be used for comparison later version := versionMap[name] nextVersionMap[name] = version - prevVersion, found := state.GetResourceVersions()[name] + prevVersion, found := state.GetKnownResources()[name] if !found || (prevVersion != version) { filtered = append(filtered, r) } } // Compute resources for removal - for name := range state.GetResourceVersions() { + for name := range state.GetKnownResources() { if _, ok := resourceMap[name]; !ok { toRemove = append(toRemove, name) } } default: - nextVersionMap = make(map[string]string, len(state.GetSubscribedResourceNames())) + nextVersionMap = make(map[string]string, len(state.GetSubscribedResources())) // state.GetResourceVersions() may include resources no longer subscribed // In the current code this gets silently cleaned when updating the version map - for name := range state.GetSubscribedResourceNames() { - prevVersion, found := state.GetResourceVersions()[name] + for name := range state.GetSubscribedResources() { + prevVersion, found := state.GetKnownResources()[name] if r, ok := resourceMap[name]; ok { nextVersion := versionMap[name] if prevVersion != nextVersion { @@ -155,36 +154,36 @@ func makeMockDeltaStream(t *testing.T) *mockDeltaStream { func makeDeltaResources() map[string]map[string]types.Resource { return map[string]map[string]types.Resource{ - rsrc.EndpointType: map[string]types.Resource{ + rsrc.EndpointType: { endpoint.GetClusterName(): endpoint, }, - rsrc.ClusterType: map[string]types.Resource{ + rsrc.ClusterType: { cluster.Name: cluster, }, - rsrc.RouteType: map[string]types.Resource{ + rsrc.RouteType: { route.Name: route, }, - rsrc.ScopedRouteType: map[string]types.Resource{ + rsrc.ScopedRouteType: { scopedRoute.Name: scopedRoute, }, - rsrc.VirtualHostType: map[string]types.Resource{ + rsrc.VirtualHostType: { virtualHost.Name: virtualHost, }, - rsrc.ListenerType: map[string]types.Resource{ + rsrc.ListenerType: { httpListener.Name: httpListener, httpScopedListener.Name: httpScopedListener, }, - rsrc.SecretType: map[string]types.Resource{ + rsrc.SecretType: { secret.Name: secret, }, - rsrc.RuntimeType: map[string]types.Resource{ + rsrc.RuntimeType: { runtime.Name: runtime, }, - rsrc.ExtensionConfigType: map[string]types.Resource{ + rsrc.ExtensionConfigType: { extensionConfig.Name: extensionConfig, }, // Pass-through type (types without explicit handling) - opaqueType: map[string]types.Resource{ + opaqueType: { "opaque": opaque, }, } @@ -460,7 +459,7 @@ func TestDeltaCallbackError(t *testing.T) { func TestDeltaWildcardSubscriptions(t *testing.T) { config := makeMockConfigWatcher() config.deltaResources = map[string]map[string]types.Resource{ - rsrc.EndpointType: map[string]types.Resource{ + rsrc.EndpointType: { "endpoints0": resource.MakeEndpoint("endpoints0", 1234), "endpoints1": resource.MakeEndpoint("endpoints1", 1234), "endpoints2": resource.MakeEndpoint("endpoints2", 1234), diff --git a/pkg/server/v3/server.go b/pkg/server/v3/server.go index ed0b0cb6be..56c683a2de 100644 --- a/pkg/server/v3/server.go +++ b/pkg/server/v3/server.go @@ -29,7 +29,6 @@ import ( core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" clusterservice "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" - discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" endpointservice "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3" extensionconfigservice "github.com/envoyproxy/go-control-plane/envoy/service/extension/v3" listenerservice "github.com/envoyproxy/go-control-plane/envoy/service/listener/v3" @@ -48,7 +47,7 @@ type Server interface { routeservice.ScopedRoutesDiscoveryServiceServer routeservice.VirtualHostDiscoveryServiceServer listenerservice.ListenerDiscoveryServiceServer - discoverygrpc.AggregatedDiscoveryServiceServer + discovery.AggregatedDiscoveryServiceServer secretservice.SecretDiscoveryServiceServer runtimeservice.RuntimeDiscoveryServiceServer extensionconfigservice.ExtensionConfigDiscoveryServiceServer @@ -184,7 +183,7 @@ func (s *server) StreamHandler(stream stream.Stream, typeURL string) error { return s.sotw.StreamHandler(stream, typeURL) } -func (s *server) StreamAggregatedResources(stream discoverygrpc.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error { +func (s *server) StreamAggregatedResources(stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error { return s.StreamHandler(stream, resource.AnyType) } @@ -297,7 +296,7 @@ func (s *server) DeltaStreamHandler(stream stream.DeltaStream, typeURL string) e return s.delta.DeltaStreamHandler(stream, typeURL) } -func (s *server) DeltaAggregatedResources(stream discoverygrpc.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error { +func (s *server) DeltaAggregatedResources(stream discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error { return s.DeltaStreamHandler(stream, resource.AnyType) } diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go index 02078bc3ad..8668e46296 100644 --- a/pkg/server/v3/server_test.go +++ b/pkg/server/v3/server_test.go @@ -32,7 +32,6 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" "github.com/envoyproxy/go-control-plane/pkg/server/v3" "github.com/envoyproxy/go-control-plane/pkg/test/resource/v3" ) @@ -48,7 +47,7 @@ type mockConfigWatcher struct { mu *sync.RWMutex } -func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, state stream.StreamState, out chan cache.Response) func() { +func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, state cache.ClientState, out chan cache.Response) func() { config.counts[req.TypeUrl] = config.counts[req.TypeUrl] + 1 if len(config.responses[req.TypeUrl]) > 0 { out <- config.responses[req.TypeUrl][0]