From 4e00420ad55245446655ec66b739039ab7c06aa5 Mon Sep 17 00:00:00 2001 From: Valerian Roche Date: Wed, 24 Aug 2022 23:55:33 -0400 Subject: [PATCH] [#583][Sotw] Ensure resources are properly sent again if envoy unsubscribes then subscribes again to a resource Fix potential deadlock in sotw-ads related to improper cleanup of watches in Linear cache when using delta in non-wildcard Fix improper request set on sotw responses in Linear cache Replaced lastResponse in sotw server by staged resources pending ACK Signed-off-by: Valerian Roche --- pkg/cache/v3/cache.go | 20 ++++ pkg/cache/v3/linear.go | 85 +++++++++---- pkg/cache/v3/linear_test.go | 162 +++++++++++++++++++++---- pkg/cache/v3/simple.go | 16 ++- pkg/server/sotw/v3/server.go | 74 +++++++----- pkg/server/stream/v3/stream.go | 29 ++++- pkg/server/v3/server_test.go | 210 ++++++++++++++++++++++++++++----- 7 files changed, 480 insertions(+), 116 deletions(-) diff --git a/pkg/cache/v3/cache.go b/pkg/cache/v3/cache.go index be124cb484..5f382ecb1e 100644 --- a/pkg/cache/v3/cache.go +++ b/pkg/cache/v3/cache.go @@ -105,6 +105,9 @@ type Response interface { // Get the version in the Response. GetVersion() (string, error) + // Get the list of resources part of the response without having to cast resources + GetResourceNames() []string + // Get the context provided during response creation. GetContext() context.Context } @@ -142,6 +145,9 @@ type RawResponse struct { // Resources to be included in the response. Resources []types.ResourceWithTTL + // Names of the resources included in the response + ResourceNames []string + // Whether this is a heartbeat response. For xDS versions that support TTL, this // will be converted into a response that doesn't contain the actual resource protobuf. // This allows for more lightweight updates that server only to update the TTL timer. @@ -191,6 +197,9 @@ type PassthroughResponse struct { // The discovery response that needs to be sent as is, without any marshaling transformations. DiscoveryResponse *discovery.DiscoveryResponse + // Names of the resources set in the response + ResourceNames []string + ctx context.Context } @@ -288,6 +297,12 @@ func (r *RawDeltaResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscover return marshaledResponse.(*discovery.DeltaDiscoveryResponse), nil } +// GetResourceNames returns the list of resources returned within the response +// without having to decode the resources +func (r *RawResponse) GetResourceNames() []string { + return r.ResourceNames +} + // GetRequest returns the original Discovery Request. func (r *RawResponse) GetRequest() *discovery.DiscoveryRequest { return r.Request @@ -350,6 +365,11 @@ func (r *PassthroughResponse) GetDiscoveryResponse() (*discovery.DiscoveryRespon return r.DiscoveryResponse, nil } +// GetResourceNames returns the list of resources included within the response +func (r *PassthroughResponse) GetResourceNames() []string { + return r.ResourceNames +} + // GetDeltaDiscoveryResponse returns the final passthrough Delta Discovery Response. func (r *DeltaPassthroughResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscoveryResponse, error) { return r.DeltaDiscoveryResponse, nil diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index 8b4b4b1652..2a334e5472 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -26,7 +26,7 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/log" ) -type watches = map[chan Response]struct{} +type watches = map[ResponseWatch]struct{} // LinearCache supports collections of opaque resources. This cache has a // single collection indexed by resource names and manages resource versions @@ -112,45 +112,57 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache { return out } -func (cache *LinearCache) respond(value chan Response, staleResources []string) { +func (cache *LinearCache) respond(req *Request, value chan Response, staleResources []string) { var resources []types.ResourceWithTTL + var resourceNames []string // TODO: optimize the resources slice creations across different clients if len(staleResources) == 0 { resources = make([]types.ResourceWithTTL, 0, len(cache.resources)) - for _, resource := range cache.resources { + resourceNames = make([]string, 0, len(cache.resources)) + for name, resource := range cache.resources { resources = append(resources, types.ResourceWithTTL{Resource: resource}) + resourceNames = append(resourceNames, name) } } else { resources = make([]types.ResourceWithTTL, 0, len(staleResources)) + resourceNames = make([]string, 0, len(staleResources)) for _, name := range staleResources { resource := cache.resources[name] if resource != nil { resources = append(resources, types.ResourceWithTTL{Resource: resource}) + resourceNames = append(resourceNames, name) } } } value <- &RawResponse{ - Request: &Request{TypeUrl: cache.typeURL}, - Resources: resources, - Version: cache.getVersion(), - Ctx: context.Background(), + Request: &Request{TypeUrl: cache.typeURL}, + Resources: resources, + ResourceNames: resourceNames, + Version: cache.getVersion(), + Ctx: context.Background(), } } func (cache *LinearCache) notifyAll(modified map[string]struct{}) { // de-duplicate watches that need to be responded - notifyList := make(map[chan Response][]string) + notifyList := make(map[ResponseWatch][]string) for name := range modified { for watch := range cache.watches[name] { notifyList[watch] = append(notifyList[watch], name) + + // Make sure we clean the watch for ALL resources it might be associated with, + // as the channel will no longer be listened to + for _, resource := range watch.Request.ResourceNames { + delete(cache.watches[resource], watch) + } } delete(cache.watches, name) } - for value, stale := range notifyList { - cache.respond(value, stale) + for watch, stale := range notifyList { + cache.respond(watch.Request, watch.Response, stale) } - for value := range cache.watchAll { - cache.respond(value, nil) + for watch := range cache.watchAll { + cache.respond(watch.Request, watch.Response, nil) } cache.watchAll = make(watches) @@ -306,7 +318,7 @@ func (cache *LinearCache) CreateWatch(request *Request, clientState ClientState, // been updated between the last version and the current version. This avoids the problem // of sending empty updates whenever an irrelevant resource changes. stale := false - staleResources := []string{} // empty means all + var staleResources []string // empty means all // strip version prefix if it is present var lastVersion uint64 @@ -323,37 +335,66 @@ func (cache *LinearCache) CreateWatch(request *Request, clientState ClientState, if err != nil { stale = true staleResources = request.ResourceNames - } else if len(request.ResourceNames) == 0 { + if cache.log != nil { + cache.log.Debugf("Watch is stale as version failed to parse %s", err.Error()) + } + } else if clientState.IsWildcard() { stale = lastVersion != cache.version + if cache.log != nil { + cache.log.Debugf("Watch is stale as version differs for wildcard watch") + } } else { + // Non wildcard case, we only reply resources that have effectively changed since the version set in the request + // This is used for instance in EDS for _, name := range request.ResourceNames { - // When a resource is removed, its version defaults 0 and it is not considered stale. - if lastVersion < cache.versionVector[name] { + // The resource does not exist currently, we won't reply for it + if resourceVersion, ok := cache.versionVector[name]; !ok { + continue + } else if lastVersion < resourceVersion { + // The version of the request is older than the last change for the resource, return it + stale = true + staleResources = append(staleResources, name) + } else if _, ok := clientState.GetKnownResources()[name]; !ok { + // Resource is not currently known by the client (e.g. a resource is added in the resourceNames) stale = true staleResources = append(staleResources, name) } } + if cache.log != nil && stale { + cache.log.Debugf("Watch is stale with stale resources %v", staleResources) + } } if stale { - cache.respond(value, staleResources) + cache.respond(request, value, staleResources) return nil } // Create open watches since versions are up to date. - if len(request.ResourceNames) == 0 { - cache.watchAll[value] = struct{}{} + watch := ResponseWatch{request, value} + if clientState.IsWildcard() { + if cache.log != nil { + cache.log.Infof("[linear cache] open watch for %s all resources, system version %q", + cache.typeURL, cache.getVersion()) + } + cache.watchAll[watch] = struct{}{} return func() { cache.mu.Lock() defer cache.mu.Unlock() - delete(cache.watchAll, value) + delete(cache.watchAll, watch) } } + + // Non-wildcard case + if cache.log != nil { + cache.log.Infof("[linear cache] open watch for %s resources %v, system version %q", + cache.typeURL, request.ResourceNames, cache.getVersion()) + } for _, name := range request.ResourceNames { set, exists := cache.watches[name] if !exists { set = make(watches) cache.watches[name] = set } - set[value] = struct{}{} + set[watch] = struct{}{} } return func() { cache.mu.Lock() @@ -361,7 +402,7 @@ func (cache *LinearCache) CreateWatch(request *Request, clientState ClientState, for _, name := range request.ResourceNames { set, exists := cache.watches[name] if exists { - delete(set, value) + delete(set, watch) } if len(set) == 0 { delete(cache.watches, name) diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index 8f8db7fe04..0653054bd9 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -34,6 +34,39 @@ const ( testType = "google.protobuf.StringValue" ) +type testLogger struct { + t *testing.T +} + +func newTestLogger(t *testing.T) testLogger { + return testLogger{t} +} + +func (l testLogger) log(level string, format string, args ...interface{}) { + l.t.Helper() + l.t.Logf("["+level+"] "+format, args...) +} + +func (l testLogger) Debugf(format string, args ...interface{}) { + l.t.Helper() + l.log("INFO", format, args...) +} + +func (l testLogger) Infof(format string, args ...interface{}) { + l.t.Helper() + l.log("INFO", format, args...) +} + +func (l testLogger) Warnf(format string, args ...interface{}) { + l.t.Helper() + l.log("INFO", format, args...) +} + +func (l testLogger) Errorf(format string, args ...interface{}) { + l.t.Helper() + l.log("INFO", format, args...) +} + func testResource(s string) types.Resource { return wrapperspb.String(s) } @@ -162,6 +195,7 @@ func checkVersionMapSet(t *testing.T, c *LinearCache) { } func mustBlock(t *testing.T, w <-chan Response) { + t.Helper() select { case <-w: t.Error("watch must block") @@ -170,6 +204,7 @@ func mustBlock(t *testing.T, w <-chan Response) { } func mustBlockDelta(t *testing.T, w <-chan DeltaResponse) { + t.Helper() select { case <-w: t.Error("watch must block") @@ -178,6 +213,7 @@ func mustBlockDelta(t *testing.T, w <-chan DeltaResponse) { } func hashResource(t *testing.T, resource types.Resource) string { + t.Helper() marshaledResource, err := MarshalResource(resource) if err != nil { t.Fatal(err) @@ -229,18 +265,29 @@ func TestLinearCornerCases(t *testing.T) { } func TestLinearBasic(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType) + c.log = newTestLogger(t) // Create watches before a resource is ready + stream1 := stream.NewStreamState(false, map[string]string{}) w1 := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: ""}, &stream1, w1) + verifyResponse(t, w1, "0", 0) + stream1.GetKnownResources()["a"] = "0" + + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &stream1, w1) mustBlock(t, w1) checkVersionMapNotSet(t, c) + stream := stream.NewStreamState(true, map[string]string{}) w := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &streamState, w) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: ""}, &stream, w) + verifyResponse(t, w, "0", 0) + + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &stream, w) mustBlock(t, w) + checkVersionMapNotSet(t, c) + checkWatchCount(t, c, "a", 2) checkWatchCount(t, c, "b", 1) require.NoError(t, c.UpdateResource("a", testResource("a"))) @@ -248,36 +295,49 @@ func TestLinearBasic(t *testing.T) { checkWatchCount(t, c, "b", 0) verifyResponse(t, w1, "1", 1) verifyResponse(t, w, "1", 1) + stream.GetKnownResources()["a"] = "1" // Request again, should get same response - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w) + stream.SetWildcard(false) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &stream, w) checkWatchCount(t, c, "a", 0) verifyResponse(t, w, "1", 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &streamState, w) + + // Version is old, so should return it + stream.SetWildcard(true) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &stream, w) checkWatchCount(t, c, "a", 0) verifyResponse(t, w, "1", 1) // Add another element and update the first, response should be different require.NoError(t, c.UpdateResource("b", testResource("b"))) require.NoError(t, c.UpdateResource("a", testResource("aa"))) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w) + + stream.SetWildcard(false) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, &stream, w) verifyResponse(t, w, "3", 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &streamState, w) + + stream.SetWildcard(true) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, &stream, w) verifyResponse(t, w, "3", 2) // Ensure the version map was not created as we only ever used stow watches checkVersionMapNotSet(t, c) } func TestLinearSetResources(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType) + c.log = newTestLogger(t) + state1 := stream.NewStreamState(false, map[string]string{}) + state1.GetKnownResources()["a"] = "0" // Create new resources w1 := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &state1, w1) mustBlock(t, w1) + + state2 := stream.NewStreamState(true, map[string]string{}) w2 := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &streamState, w2) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &state2, w2) mustBlock(t, w2) c.SetResources(map[string]types.Resource{ "a": testResource("a"), @@ -287,9 +347,9 @@ func TestLinearSetResources(t *testing.T) { verifyResponse(t, w2, "1", 2) // the version was only incremented once for all resources // Add another element and update the first, response should be different - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, &streamState, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, &state1, w1) mustBlock(t, w1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, &streamState, w2) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, &state2, w2) mustBlock(t, w2) c.SetResources(map[string]types.Resource{ "a": testResource("aa"), @@ -300,9 +360,9 @@ func TestLinearSetResources(t *testing.T) { verifyResponse(t, w2, "2", 3) // Delete resource - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"}, &streamState, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"}, &state1, w1) mustBlock(t, w1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"}, &streamState, w2) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"}, &state2, w2) mustBlock(t, w2) c.SetResources(map[string]types.Resource{ "b": testResource("b"), @@ -314,6 +374,7 @@ func TestLinearSetResources(t *testing.T) { func TestLinearGetResources(t *testing.T) { c := NewLinearCache(testType) + c.log = newTestLogger(t) expectedResources := map[string]types.Resource{ "a": testResource("a"), @@ -332,14 +393,16 @@ func TestLinearGetResources(t *testing.T) { func TestLinearVersionPrefix(t *testing.T) { streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType, WithVersionPrefix("instance1-")) + c.log = newTestLogger(t) w := make(chan Response, 1) c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w) verifyResponse(t, w, "instance1-0", 0) require.NoError(t, c.UpdateResource("a", testResource("a"))) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-0"}, &streamState, w) verifyResponse(t, w, "instance1-1", 1) + streamState.GetKnownResources()["a"] = "instance1-1" c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, &streamState, w) mustBlock(t, w) @@ -347,42 +410,73 @@ func TestLinearVersionPrefix(t *testing.T) { } func TestLinearDeletion(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) + c.log = newTestLogger(t) + + streamState := stream.NewStreamState(false, map[string]string{}) w := make(chan Response, 1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: ""}, &streamState, w) + verifyResponse(t, w, "0", 1) + streamState.GetKnownResources()["a"] = "0" + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1) require.NoError(t, c.DeleteResource("a")) verifyResponse(t, w, "1", 0) checkWatchCount(t, c, "a", 0) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &streamState, w) - verifyResponse(t, w, "1", 1) - checkWatchCount(t, c, "b", 0) - require.NoError(t, c.DeleteResource("b")) + streamState.SetWildcard(true) c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, &streamState, w) + mustBlock(t, w) + checkWatchCount(t, c, "b", 1) + + require.NoError(t, c.DeleteResource("b")) verifyResponse(t, w, "2", 0) checkWatchCount(t, c, "b", 0) } func TestLinearWatchTwo(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) + c.log = newTestLogger(t) + + // Default case, stream starts with no version + state := stream.NewStreamState(false, map[string]string{}) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, &streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: ""}, &state, w) + verifyResponse(t, w, "0", 2) + state.GetKnownResources()["a"] = "0" + state.GetKnownResources()["b"] = "0" + + c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, &state, w) mustBlock(t, w) + + // Wildcard should be able to start with a version + // It will only return if the version is not up-to-date + state1 := stream.NewStreamState(true, map[string]string{}) w1 := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &streamState, w1) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, &state1, w1) mustBlock(t, w1) + require.NoError(t, c.UpdateResource("a", testResource("aa"))) // should only get the modified resource verifyResponse(t, w, "1", 1) + // Receive an update for all resources verifyResponse(t, w1, "1", 2) } -func TestLinearCancel(t *testing.T) { +func TestLinearResourceSubscription(t *testing.T) { streamState := stream.NewStreamState(false, map[string]string{}) + c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) + w := make(chan Response, 1) + + c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "1"}, &streamState, w) + verifyResponse(t, w, "0", 2) +} + +func TestLinearCancel(t *testing.T) { + streamState := stream.NewStreamState(true, map[string]string{}) c := NewLinearCache(testType) + c.log = newTestLogger(t) require.NoError(t, c.UpdateResource("a", testResource("a"))) // cancel watch-all @@ -392,8 +486,10 @@ func TestLinearCancel(t *testing.T) { checkWatchCount(t, c, "a", 1) cancel() checkWatchCount(t, c, "a", 0) + streamState.GetKnownResources()["a"] = "1" // cancel watch for "a" + streamState.SetWildcard(false) cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, &streamState, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1) @@ -406,6 +502,7 @@ func TestLinearCancel(t *testing.T) { w4 := make(chan Response, 1) cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, &streamState, w) cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}, &streamState, w2) + streamState.SetWildcard(true) cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, &streamState, w3) cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, &streamState, w4) mustBlock(t, w) @@ -431,6 +528,7 @@ func TestLinearCancel(t *testing.T) { func TestLinearConcurrentSetWatch(t *testing.T) { streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType) + c.log = newTestLogger(t) n := 50 for i := 0; i < 2*n; i++ { func(i int) { @@ -460,6 +558,7 @@ func TestLinearConcurrentSetWatch(t *testing.T) { func TestLinearDeltaWildcard(t *testing.T) { c := NewLinearCache(testType) + c.log = newTestLogger(t) state1 := stream.NewStreamState(true, map[string]string{}) w1 := make(chan DeltaResponse, 1) c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state1, w1) @@ -481,6 +580,7 @@ func TestLinearDeltaWildcard(t *testing.T) { func TestLinearDeltaExistingResources(t *testing.T) { c := NewLinearCache(testType) + c.log = newTestLogger(t) a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} hashA := hashResource(t, a) err := c.UpdateResource("a", a) @@ -507,6 +607,7 @@ func TestLinearDeltaExistingResources(t *testing.T) { func TestLinearDeltaInitialResourcesVersionSet(t *testing.T) { c := NewLinearCache(testType) + c.log = newTestLogger(t) a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} hashA := hashResource(t, a) err := c.UpdateResource("a", a) @@ -539,6 +640,7 @@ func TestLinearDeltaInitialResourcesVersionSet(t *testing.T) { func TestLinearDeltaResourceUpdate(t *testing.T) { c := NewLinearCache(testType) + c.log = newTestLogger(t) a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} hashA := hashResource(t, a) err := c.UpdateResource("a", a) @@ -577,6 +679,7 @@ func TestLinearDeltaResourceUpdate(t *testing.T) { func TestLinearDeltaResourceDelete(t *testing.T) { c := NewLinearCache(testType) + c.log = newTestLogger(t) a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} hashA := hashResource(t, a) err := c.UpdateResource("a", a) @@ -610,6 +713,7 @@ func TestLinearDeltaResourceDelete(t *testing.T) { func TestLinearDeltaMultiResourceUpdates(t *testing.T) { c := NewLinearCache(testType) + c.log = newTestLogger(t) state := stream.NewStreamState(false, nil) state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) @@ -727,6 +831,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { func TestLinearMixedWatches(t *testing.T) { c := NewLinearCache(testType) + c.log = newTestLogger(t) a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} err := c.UpdateResource("a", a) assert.NoError(t, err) @@ -738,6 +843,11 @@ func TestLinearMixedWatches(t *testing.T) { sotwState := stream.NewStreamState(false, nil) w := make(chan Response, 1) + c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, &sotwState, w) + verifyResponse(t, w, c.getVersion(), 2) + sotwState.GetKnownResources()["a"] = c.getVersion() + sotwState.GetKnownResources()["b"] = c.getVersion() + c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, &sotwState, w) mustBlock(t, w) checkVersionMapNotSet(t, c) @@ -748,8 +858,10 @@ func TestLinearMixedWatches(t *testing.T) { hashA := hashResource(t, a) err = c.UpdateResources(map[string]types.Resource{"a": a}, nil) assert.NoError(t, err) - // This behavior is currently invalid for cds and lds, but due to a current limitation of linear cache sotw implementation + // For non-wildcard, we only return the state of the resource that has been updated + // This is non-applicable for cds/lds that will always send wildcard requests verifyResponse(t, w, c.getVersion(), 1) + sotwState.GetKnownResources()["a"] = c.getVersion() checkVersionMapNotSet(t, c) c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, &sotwState, w) diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index bb2c5a313a..57a022d51e 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -433,6 +433,7 @@ func (cache *snapshotCache) respond(ctx context.Context, request *Request, value func createResponse(ctx context.Context, request *Request, resources map[string]types.ResourceWithTTL, version string, heartbeat bool) Response { filtered := make([]types.ResourceWithTTL, 0, len(resources)) + resourceNames := make([]string, 0, len(resources)) // Reply only with the requested resources. Envoy may ask each resource // individually in a separate stream. It is ok to reply with the same version @@ -442,20 +443,23 @@ func createResponse(ctx context.Context, request *Request, resources map[string] for name, resource := range resources { if set[name] { filtered = append(filtered, resource) + resourceNames = append(resourceNames, name) } } } else { - for _, resource := range resources { + for name, resource := range resources { filtered = append(filtered, resource) + resourceNames = append(resourceNames, name) } } return &RawResponse{ - Request: request, - Version: version, - Resources: filtered, - Heartbeat: heartbeat, - Ctx: ctx, + Request: request, + Version: version, + Resources: filtered, + ResourceNames: resourceNames, + Heartbeat: heartbeat, + Ctx: ctx, } } diff --git a/pkg/server/sotw/v3/server.go b/pkg/server/sotw/v3/server.go index fcde7d89b2..8a30ce420a 100644 --- a/pkg/server/sotw/v3/server.go +++ b/pkg/server/sotw/v3/server.go @@ -63,15 +63,6 @@ type server struct { streamCount int64 } -// Discovery response that is sent over GRPC stream -// We need to record what resource names are already sent to a client -// So if the client requests a new name we can respond back -// regardless current snapshot version (even if it is not changed yet) -type lastDiscoveryResponse struct { - nonce string - resources map[string]string -} - // process handles a bi-di stream request func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryRequest, defaultTypeURL string) error { // increment stream count @@ -82,7 +73,6 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq var streamNonce int64 streamStates := map[string]stream.StreamState{} - lastDiscoveryResponses := map[string]lastDiscoveryResponse{} // a collection of stack allocated watches per request type watches := newWatches() @@ -112,20 +102,6 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq streamNonce = streamNonce + 1 out.Nonce = strconv.FormatInt(streamNonce, 10) - version, err := resp.GetVersion() - if err != nil { - return "", err - } - - lastResponse := lastDiscoveryResponse{ - nonce: out.Nonce, - resources: make(map[string]string), - } - for _, r := range resp.GetRequest().ResourceNames { - lastResponse.resources[r] = version - } - lastDiscoveryResponses[resp.GetRequest().TypeUrl] = lastResponse - if s.callbacks != nil { s.callbacks.OnStreamResponse(resp.GetContext(), streamID, resp.GetRequest(), out) } @@ -190,13 +166,25 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq typeURL := req.GetTypeUrl() state := streamStates[typeURL] - if lastResponse, ok := lastDiscoveryResponses[req.TypeUrl]; ok { - if lastResponse.nonce == "" || lastResponse.nonce == nonce { - // Let's record Resource names that a client has received. - state.SetResourceVersions(lastResponse.resources) + // Check if we're pending an ACK + if responder, ok := watches.responders[typeURL]; ok { + if responder.nonce != "" && responder.nonce == nonce { + // The nonce is matching, this is an ACK from the client + state.CommitPendingResources() } } + // Remove resources no longer subscribed from the stream state + // This ensures we will send a resource if it is unsubscribed then subscribed again + // without a cache version change + knownResources := state.GetKnownResources() + unsubscribedResources := getUnsubscribedResources(req.ResourceNames, knownResources) + for _, resourceName := range unsubscribedResources { + delete(knownResources, resourceName) + } + // Remove from pending resources to ensure we won't lose this state when commiting + state.RemovePendingResources(unsubscribedResources) + responder := make(chan cache.Response, 1) if w, ok := watches.responders[typeURL]; ok { // We've found a pre-existing watch, lets check and update if needed. @@ -235,6 +223,23 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq return err } + // Track the resources returned in the response + // Those are staged pending the client ACK + // The protocol clearly states that if we send another response prior to an ACK + // the previous one is to be considered as discarded + version, err := res.GetVersion() + if err != nil { + return err + } + + state := streamStates[res.GetRequest().TypeUrl] + resources := make(map[string]string, len(res.GetResourceNames())) + for _, name := range res.GetResourceNames() { + resources[name] = version + } + state.SetPendingResources(resources) + streamStates[res.GetRequest().TypeUrl] = state + watches.responders[res.GetRequest().TypeUrl].nonce = nonce } } @@ -263,3 +268,16 @@ func (s *server) StreamHandler(stream stream.Stream, typeURL string) error { return s.process(stream, reqCh, typeURL) } + +func getUnsubscribedResources(newResources []string, knownResources map[string]string) (removedResources []string) { + newResourcesMap := make(map[string]struct{}, len(newResources)) + for _, resourceName := range newResources { + newResourcesMap[resourceName] = struct{}{} + } + for resourceName := range knownResources { + if _, ok := newResourcesMap[resourceName]; !ok { + removedResources = append(removedResources, resourceName) + } + } + return +} diff --git a/pkg/server/stream/v3/stream.go b/pkg/server/stream/v3/stream.go index 2147dc46e0..6e2e10e511 100644 --- a/pkg/server/stream/v3/stream.go +++ b/pkg/server/stream/v3/stream.go @@ -33,6 +33,10 @@ type StreamState struct { // nolint:golint,revive // ResourceVersions contains a hash of the resource as the value and the resource name as the key. // This field stores the last state sent to the client. resourceVersions map[string]string + + // Provides the list of resources (and their version) that have been sent to the client + // but not ACKed yet + pendingResources map[string]string } // GetSubscribedResourceNames returns the list of resources currently explicitly subscribed to @@ -49,7 +53,28 @@ func (s *StreamState) SetSubscribedResources(subscribedResourceNames map[string] s.subscribedResourceNames = subscribedResourceNames } +func (s *StreamState) SetPendingResources(resources map[string]string) { + s.pendingResources = resources +} + +func (s *StreamState) RemovePendingResources(resources []string) { + for _, resource := range resources { + delete(s.pendingResources, resource) + } +} + +func (s *StreamState) CommitPendingResources() { + clientVersions := s.GetKnownResources() + for name, version := range s.pendingResources { + clientVersions[name] = version + delete(s.pendingResources, name) + } +} + func (s *StreamState) GetKnownResources() map[string]string { + if s.resourceVersions == nil { + s.resourceVersions = make(map[string]string) + } return s.resourceVersions } @@ -73,9 +98,5 @@ func NewStreamState(wildcard bool, initialResourceVersions map[string]string) St resourceVersions: initialResourceVersions, } - if initialResourceVersions == nil { - state.resourceVersions = make(map[string]string) - } - return state } diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go index 8668e46296..2e2f0b0d00 100644 --- a/pkg/server/v3/server_test.go +++ b/pkg/server/v3/server_test.go @@ -50,7 +50,9 @@ type mockConfigWatcher struct { func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, state cache.ClientState, out chan cache.Response) func() { config.counts[req.TypeUrl] = config.counts[req.TypeUrl] + 1 if len(config.responses[req.TypeUrl]) > 0 { - out <- config.responses[req.TypeUrl][0] + resp := config.responses[req.TypeUrl][0].(*cache.RawResponse) + resp.Request = req + out <- resp config.responses[req.TypeUrl] = config.responses[req.TypeUrl][1:] } else { config.watches++ @@ -177,73 +179,83 @@ func makeResponses() map[string][]cache.Response { return map[string][]cache.Response{ rsrc.EndpointType: { &cache.RawResponse{ - Version: "1", - Resources: []types.ResourceWithTTL{{Resource: endpoint}}, - Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType}, + Version: "1", + Resources: []types.ResourceWithTTL{{Resource: endpoint}}, + ResourceNames: []string{clusterName}, + Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType}, }, }, rsrc.ClusterType: { &cache.RawResponse{ - Version: "2", - Resources: []types.ResourceWithTTL{{Resource: cluster}}, - Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.ClusterType}, + Version: "2", + Resources: []types.ResourceWithTTL{{Resource: cluster}}, + ResourceNames: []string{clusterName}, + Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.ClusterType}, }, }, rsrc.RouteType: { &cache.RawResponse{ - Version: "3", - Resources: []types.ResourceWithTTL{{Resource: route}}, - Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.RouteType}, + Version: "3", + Resources: []types.ResourceWithTTL{{Resource: route}}, + ResourceNames: []string{routeName}, + Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.RouteType}, }, }, rsrc.ScopedRouteType: { &cache.RawResponse{ - Version: "4", - Resources: []types.ResourceWithTTL{{Resource: scopedRoute}}, - Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.ScopedRouteType}, + Version: "4", + Resources: []types.ResourceWithTTL{{Resource: scopedRoute}}, + ResourceNames: []string{routeName}, + Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.ScopedRouteType}, }, }, rsrc.VirtualHostType: { &cache.RawResponse{ - Version: "5", - Resources: []types.ResourceWithTTL{{Resource: virtualHost}}, - Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.VirtualHostType}, + Version: "5", + Resources: []types.ResourceWithTTL{{Resource: virtualHost}}, + ResourceNames: []string{virtualHostName}, + Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.VirtualHostType}, }, }, rsrc.ListenerType: { &cache.RawResponse{ - Version: "6", - Resources: []types.ResourceWithTTL{{Resource: httpListener}, {Resource: httpScopedListener}}, - Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.ListenerType}, + Version: "6", + Resources: []types.ResourceWithTTL{{Resource: httpListener}, {Resource: httpScopedListener}}, + ResourceNames: []string{listenerName, scopedListenerName}, + Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.ListenerType}, }, }, rsrc.SecretType: { &cache.RawResponse{ - Version: "7", - Resources: []types.ResourceWithTTL{{Resource: secret}}, - Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.SecretType}, + Version: "7", + Resources: []types.ResourceWithTTL{{Resource: secret}}, + ResourceNames: []string{secretName}, + Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.SecretType}, }, }, rsrc.RuntimeType: { &cache.RawResponse{ - Version: "8", - Resources: []types.ResourceWithTTL{{Resource: runtime}}, - Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.RuntimeType}, + Version: "8", + Resources: []types.ResourceWithTTL{{Resource: runtime}}, + ResourceNames: []string{runtimeName}, + Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.RuntimeType}, }, }, rsrc.ExtensionConfigType: { &cache.RawResponse{ - Version: "9", - Resources: []types.ResourceWithTTL{{Resource: extensionConfig}}, - Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.ExtensionConfigType}, + Version: "9", + Resources: []types.ResourceWithTTL{{Resource: extensionConfig}}, + ResourceNames: []string{extensionConfigName}, + Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.ExtensionConfigType}, }, }, // Pass-through type (xDS does not exist for this type) opaqueType: { &cache.RawResponse{ - Version: "10", - Resources: []types.ResourceWithTTL{{Resource: opaque}}, - Request: &discovery.DiscoveryRequest{TypeUrl: opaqueType}, + Version: "10", + Resources: []types.ResourceWithTTL{{Resource: opaque}}, + ResourceNames: []string{"opaque"}, + Request: &discovery.DiscoveryRequest{TypeUrl: opaqueType}, }, }, } @@ -681,3 +693,139 @@ func TestCallbackError(t *testing.T) { }) } } + +type LinearCacheMock struct { + // name -> version + assert func(req *discovery.DiscoveryRequest, state cache.ClientState) + resources []types.ResourceWithTTL + resourceNames []string + version string +} + +func (mock *LinearCacheMock) CreateWatch(req *discovery.DiscoveryRequest, state cache.ClientState, out chan cache.Response) func() { + if mock.assert != nil { + mock.assert(req, state) + } + if mock.version != "" { + out <- &cache.RawResponse{ + Request: req, + Version: mock.version, + Resources: mock.resources, + ResourceNames: mock.resourceNames, + } + } + return func() {} +} + +func (mock *LinearCacheMock) CreateDeltaWatch(req *discovery.DeltaDiscoveryRequest, state cache.ClientState, out chan cache.DeltaResponse) func() { + return nil +} +func (mock *LinearCacheMock) Fetch(ctx context.Context, req *discovery.DiscoveryRequest) (cache.Response, error) { + return nil, errors.New("unimplemented") +} + +func TestSubscriptionsThroughLinearCache(t *testing.T) { + resp := makeMockStream(t) + linearCache := LinearCacheMock{} + defer close(resp.recv) + + go func() { + s := server.NewServer(context.Background(), &linearCache, server.CallbackFuncs{}) + assert.NoError(t, s.StreamAggregatedResources(resp)) + }() + + linearCache.version = "1" + linearCache.resources = []types.ResourceWithTTL{ + {Resource: endpoint}, + } + linearCache.resourceNames = []string{clusterName} + linearCache.assert = func(req *discovery.DiscoveryRequest, state cache.ClientState) { + assert.Equal(t, []string{clusterName}, req.ResourceNames) + assert.Empty(t, state.GetKnownResources()) + } + + var nonce string + resp.recv <- &discovery.DiscoveryRequest{ + Node: node, + ResponseNonce: nonce, + TypeUrl: rsrc.EndpointType, + ResourceNames: []string{clusterName}, + } + + select { + case epResponse := <-resp.sent: + assert.Len(t, epResponse.Resources, 1) + nonce = epResponse.Nonce + case <-time.After(100 * time.Millisecond): + assert.Fail(t, "no response received") + } + + linearCache.version = "" + linearCache.assert = func(req *discovery.DiscoveryRequest, state cache.ClientState) { + assert.Equal(t, []string{}, req.ResourceNames) + // This should also be empty + assert.Empty(t, state.GetKnownResources()) + } + + // No longer listen to this resource + resp.recv <- &discovery.DiscoveryRequest{ + Node: node, + ResponseNonce: nonce, + TypeUrl: rsrc.EndpointType, + ResourceNames: []string{}, + } + + select { + case epResponse := <-resp.sent: + assert.Fail(t, "unexpected response") + nonce = epResponse.Nonce + case <-time.After(100 * time.Millisecond): + // go on + } + + // Cache version did not change + linearCache.version = "1" + linearCache.assert = func(req *discovery.DiscoveryRequest, state cache.ClientState) { + assert.Equal(t, []string{clusterName}, req.ResourceNames) + // This should also be empty + assert.Empty(t, state.GetKnownResources()) + } + + //Subscribe to it again + resp.recv <- &discovery.DiscoveryRequest{ + Node: node, + ResponseNonce: nonce, + TypeUrl: rsrc.EndpointType, + ResourceNames: []string{clusterName}, + } + + select { + case epResponse := <-resp.sent: + assert.Len(t, epResponse.Resources, 1) + nonce = epResponse.Nonce + case <-time.After(100 * time.Millisecond): + assert.Fail(t, "no response received") + } + + // Cache version did not change + linearCache.version = "" + linearCache.assert = func(req *discovery.DiscoveryRequest, state cache.ClientState) { + assert.Equal(t, []string{clusterName}, req.ResourceNames) + // This should also be empty + assert.Equal(t, map[string]string{clusterName: "1"}, state.GetKnownResources()) + } + + // Don't change anything, simply ack the current one + resp.recv <- &discovery.DiscoveryRequest{ + Node: node, + ResponseNonce: nonce, + TypeUrl: rsrc.EndpointType, + ResourceNames: []string{clusterName}, + } + select { + case <-resp.sent: + assert.Fail(t, "unexpected response") + case <-time.After(100 * time.Millisecond): + // go on + } +}