diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index 89a873b8d3..2416aa118d 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -119,7 +119,9 @@ type LinearCache struct { versionPrefix string // useStableVersionsInSotw switches to a new version model for sotw watches. - // The version is then encoding the known resources of the subscription + // When activated, versions are stored in subscriptions using stable versions, and the response version + // is an hash of the returned versions to allow watch resumptions when reconnecting to the cache with a + // new subscription. useStableVersionsInSotw bool log log.Logger @@ -279,21 +281,29 @@ func (cache *LinearCache) computeResourceChange(sub Subscription, alwaysConsider } func computeSotwStableVersion(versionMap map[string]string) string { + // To enforce a stable hash we need to have an ordered vision of the map. keys := make([]string, 0, len(versionMap)) for key := range versionMap { keys = append(keys, key) } sort.Strings(keys) - // Reuse the hash used on resources. - hasher := fnv.New64a() + mapHasher := fnv.New64() + + buffer := make([]byte, 0, 8) + itemHasher := fnv.New64() for _, key := range keys { - hasher.Write([]byte(key)) - hasher.Write([]byte("/")) - hasher.Write([]byte(versionMap[key])) - hasher.Write([]byte("^")) + buffer = buffer[:0] + itemHasher.Reset() + itemHasher.Write([]byte(key)) + mapHasher.Write(itemHasher.Sum(buffer)) + buffer = buffer[:0] + itemHasher.Reset() + itemHasher.Write([]byte(versionMap[key])) + mapHasher.Write(itemHasher.Sum(buffer)) } - return hex.EncodeToString(hasher.Sum(nil)) + buffer = buffer[:0] + return hex.EncodeToString(mapHasher.Sum(buffer)) } func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, alwaysConsiderAllResources bool) (*RawResponse, error) { @@ -307,7 +317,7 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, alwaysConside } // In sotw the list of resources to actually return depends on: - // - whether the type requires full-state in each reply (e.g. lds, cds). + // - whether the type requires full-state in each reply (lds and cds). // - whether the request is wildcard. // resourcesToReturn will include all the resource names to reply based on the changes detected. var resourcesToReturn []string @@ -449,7 +459,7 @@ func (cache *LinearCache) notifyAll(modified []string) error { watch.Response <- response cache.removeWatch(watchID, watch.subscription) } else { - cache.log.Warnf("[Linear cache] Watch %d detected as triggered but no change was found", watchID) + cache.log.Infof("[Linear cache] Watch %d detected as triggered but no change was found", watchID) } } @@ -463,7 +473,7 @@ func (cache *LinearCache) notifyAll(modified []string) error { watch.Response <- response delete(cache.wildcardWatches.sotw, watchID) } else { - cache.log.Warnf("[Linear cache] Wildcard watch %d detected as triggered but no change was found", watchID) + cache.log.Infof("[Linear cache] Wildcard watch %d detected as triggered but no change was found", watchID) } } @@ -478,7 +488,7 @@ func (cache *LinearCache) notifyAll(modified []string) error { watch.Response <- response cache.removeDeltaWatch(watchID, watch.subscription) } else { - cache.log.Warnf("[Linear cache] Delta watch %d detected as triggered but no change was found", watchID) + cache.log.Infof("[Linear cache] Delta watch %d detected as triggered but no change was found", watchID) } } @@ -492,7 +502,7 @@ func (cache *LinearCache) notifyAll(modified []string) error { watch.Response <- response delete(cache.wildcardWatches.delta, watchID) } else { - cache.log.Warnf("[Linear cache] Wildcard delta watch %d detected as triggered but no change was found", watchID) + cache.log.Infof("[Linear cache] Wildcard delta watch %d detected as triggered but no change was found", watchID) } } diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index 7311499b63..600dde9c28 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -918,7 +918,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { err = c.UpdateResources(map[string]types.Resource{"b": b, "d": d}, nil) require.NoError(t, err) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"d", hashD}}, nil) - // d is now watched and shoudl be returned + // d is now watched and should be returned checkStableVersionsAreComputed(t, c, "b", "d") assert.Equal(t, 3, c.NumResources()) @@ -1476,7 +1476,7 @@ func TestLinearSotwVersion(t *testing.T) { w := make(chan Response, 1) _, err := cache.CreateWatch(req, subFromRequest(req), w) require.NoError(t, err) - resp := verifyResponseResources(t, w, resource.EndpointType, "1d0dadc055487bf8", "a", "b") + resp := verifyResponseResources(t, w, resource.EndpointType, "f8fac96556140daa", "a", "b") lastVersion, err = resp.GetVersion() require.NoError(t, err) assert.NotEmpty(t, lastVersion) @@ -1513,7 +1513,7 @@ func TestLinearSotwVersion(t *testing.T) { w := make(chan Response, 1) _, err := cache.CreateWatch(req, subFromRequest(req), w) require.NoError(t, err) - verifyResponseResources(t, w, resource.EndpointType, "test-prefix-1d0dadc055487bf8", "a", "b") + verifyResponseResources(t, w, resource.EndpointType, "test-prefix-"+lastVersion, "a", "b") }) t.Run("watch opened with the same last version including prefix", func(t *testing.T) { @@ -1543,7 +1543,7 @@ func TestLinearSotwVersion(t *testing.T) { _ = cache.UpdateResource("e", &endpoint.ClusterLoadAssignment{ClusterName: "e"}) // Resources a and b are still at the proper version, so not returned - resp := verifyResponseResources(t, w, resource.EndpointType, "ef89d29eb6398b90", "e") + resp := verifyResponseResources(t, w, resource.EndpointType, "6ae65ee0b0c2bfa8", "e") updateFromSotwResponse(resp, &sub, req) w = make(chan Response, 1) @@ -1558,12 +1558,12 @@ func TestLinearSotwVersion(t *testing.T) { EndpointStaleAfter: durationpb.New(5 * time.Second), }}) // Resources a and b are still at the proper version, so not returned - verifyResponseResources(t, w, resource.EndpointType, "51d88a339c93515b", "e") + verifyResponseResources(t, w, resource.EndpointType, "633e4f7cb4f55524", "e") _ = cache.UpdateResource("e", &endpoint.ClusterLoadAssignment{ClusterName: "e"}) // Another watch created with the proper version does not trigger - req2 := buildRequest([]string{"a", "b", "e"}, "ef89d29eb6398b90") + req2 := buildRequest([]string{"a", "b", "e"}, "6ae65ee0b0c2bfa8") sub2 := subFromRequest(req2) w = make(chan Response, 1) _, err = cache.CreateWatch(req2, sub2, w) @@ -1577,7 +1577,7 @@ func TestLinearSotwVersion(t *testing.T) { w := make(chan Response, 1) _, err := cache.CreateWatch(req, subFromRequest(req), w) require.NoError(t, err) - verifyResponseResources(t, w, resource.EndpointType, "be5530af715f4980", "a", "b", "c", "e") + verifyResponseResources(t, w, resource.EndpointType, "68113a35fda99df9", "a", "b", "c", "e") }) t.Run("watch opened with the same last version and returning less resources", func(t *testing.T) { @@ -1586,6 +1586,6 @@ func TestLinearSotwVersion(t *testing.T) { w := make(chan Response, 1) _, err := cache.CreateWatch(req, subFromRequest(req), w) require.NoError(t, err) - verifyResponseResources(t, w, resource.EndpointType, "0aa479b0bd7e5474", "a") + verifyResponseResources(t, w, resource.EndpointType, "55876f045443ee06", "a") }) } diff --git a/pkg/client/sotw/v3/client_test.go b/pkg/client/sotw/v3/client_test.go index bf942d7fbf..ece115b689 100644 --- a/pkg/client/sotw/v3/client_test.go +++ b/pkg/client/sotw/v3/client_test.go @@ -30,7 +30,11 @@ func TestFetch(t *testing.T) { defer cancel() snapCache := cache.NewSnapshotCache(true, cache.IDHash{}, nil) + + wg := sync.WaitGroup{} + wg.Add(1) go func() { + defer wg.Done() err := startAdsServer(ctx, snapCache) require.NoError(t, err) }() @@ -45,6 +49,10 @@ func TestFetch(t *testing.T) { t.Run("Test initial fetch", testInitialFetch(ctx, snapCache, c)) t.Run("Test next fetch", testNextFetch(ctx, snapCache, c)) + + // Ensure the ADS server is properly shutdown and has released the port + cancel() + wg.Wait() } func testInitialFetch(ctx context.Context, snapCache cache.SnapshotCache, c client.ADSClient) func(t *testing.T) { @@ -53,6 +61,7 @@ func testInitialFetch(ctx context.Context, snapCache cache.SnapshotCache, c clie wg.Add(1) go func() { + defer wg.Done() // watch for configs resp, err := c.Fetch() require.NoError(t, err) @@ -66,7 +75,6 @@ func testInitialFetch(ctx context.Context, snapCache cache.SnapshotCache, c clie err = c.Ack() require.NoError(t, err) - wg.Done() }() snapshot, err := cache.NewSnapshot("1", map[resource.Type][]types.Resource{ @@ -92,6 +100,7 @@ func testNextFetch(ctx context.Context, snapCache cache.SnapshotCache, c client. wg.Add(1) go func() { + defer wg.Done() // watch for configs resp, err := c.Fetch() require.NoError(t, err) @@ -105,7 +114,6 @@ func testNextFetch(ctx context.Context, snapCache cache.SnapshotCache, c client. err = c.Ack() require.NoError(t, err) - wg.Done() }() snapshot, err := cache.NewSnapshot("2", map[resource.Type][]types.Resource{ @@ -134,6 +142,11 @@ func startAdsServer(ctx context.Context, snapCache cache.SnapshotCache) error { s := server.NewServer(ctx, snapCache, nil) discovery.RegisterAggregatedDiscoveryServiceServer(grpcServer, s) + go func() { + <-ctx.Done() + grpcServer.Stop() + }() + if e := grpcServer.Serve(lis); e != nil { err = e }