Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
valerian-roche committed Feb 16, 2024
1 parent c4e6e8c commit 11e1362
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 23 deletions.
36 changes: 23 additions & 13 deletions pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ type LinearCache struct {
versionPrefix string

// useStableVersionsInSotw switches to a new version model for sotw watches.
// The version is then encoding the known resources of the subscription
// When activated, versions are stored in subscriptions using stable versions, and the response version
// is an hash of the returned versions to allow watch resumptions when reconnecting to the cache with a
// new subscription.
useStableVersionsInSotw bool

log log.Logger
Expand Down Expand Up @@ -279,21 +281,29 @@ func (cache *LinearCache) computeResourceChange(sub Subscription, alwaysConsider
}

func computeSotwStableVersion(versionMap map[string]string) string {
// To enforce a stable hash we need to have an ordered vision of the map.
keys := make([]string, 0, len(versionMap))
for key := range versionMap {
keys = append(keys, key)
}
sort.Strings(keys)

// Reuse the hash used on resources.
hasher := fnv.New64a()
mapHasher := fnv.New64()

buffer := make([]byte, 0, 8)
itemHasher := fnv.New64()
for _, key := range keys {
hasher.Write([]byte(key))
hasher.Write([]byte("/"))
hasher.Write([]byte(versionMap[key]))
hasher.Write([]byte("^"))
buffer = buffer[:0]
itemHasher.Reset()
itemHasher.Write([]byte(key))
mapHasher.Write(itemHasher.Sum(buffer))
buffer = buffer[:0]
itemHasher.Reset()
itemHasher.Write([]byte(versionMap[key]))
mapHasher.Write(itemHasher.Sum(buffer))
}
return hex.EncodeToString(hasher.Sum(nil))
buffer = buffer[:0]
return hex.EncodeToString(mapHasher.Sum(buffer))
}

func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, alwaysConsiderAllResources bool) (*RawResponse, error) {
Expand All @@ -307,7 +317,7 @@ func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, alwaysConside
}

// In sotw the list of resources to actually return depends on:
// - whether the type requires full-state in each reply (e.g. lds, cds).
// - whether the type requires full-state in each reply (lds and cds).
// - whether the request is wildcard.
// resourcesToReturn will include all the resource names to reply based on the changes detected.
var resourcesToReturn []string
Expand Down Expand Up @@ -449,7 +459,7 @@ func (cache *LinearCache) notifyAll(modified []string) error {
watch.Response <- response
cache.removeWatch(watchID, watch.subscription)
} else {
cache.log.Warnf("[Linear cache] Watch %d detected as triggered but no change was found", watchID)
cache.log.Infof("[Linear cache] Watch %d detected as triggered but no change was found", watchID)
}
}

Expand All @@ -463,7 +473,7 @@ func (cache *LinearCache) notifyAll(modified []string) error {
watch.Response <- response
delete(cache.wildcardWatches.sotw, watchID)
} else {
cache.log.Warnf("[Linear cache] Wildcard watch %d detected as triggered but no change was found", watchID)
cache.log.Infof("[Linear cache] Wildcard watch %d detected as triggered but no change was found", watchID)
}
}

Expand All @@ -478,7 +488,7 @@ func (cache *LinearCache) notifyAll(modified []string) error {
watch.Response <- response
cache.removeDeltaWatch(watchID, watch.subscription)
} else {
cache.log.Warnf("[Linear cache] Delta watch %d detected as triggered but no change was found", watchID)
cache.log.Infof("[Linear cache] Delta watch %d detected as triggered but no change was found", watchID)
}
}

Expand All @@ -492,7 +502,7 @@ func (cache *LinearCache) notifyAll(modified []string) error {
watch.Response <- response
delete(cache.wildcardWatches.delta, watchID)
} else {
cache.log.Warnf("[Linear cache] Wildcard delta watch %d detected as triggered but no change was found", watchID)
cache.log.Infof("[Linear cache] Wildcard delta watch %d detected as triggered but no change was found", watchID)
}
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/cache/v3/linear_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) {
err = c.UpdateResources(map[string]types.Resource{"b": b, "d": d}, nil)
require.NoError(t, err)
verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"d", hashD}}, nil)
// d is now watched and shoudl be returned
// d is now watched and should be returned
checkStableVersionsAreComputed(t, c, "b", "d")
assert.Equal(t, 3, c.NumResources())

Expand Down Expand Up @@ -1476,7 +1476,7 @@ func TestLinearSotwVersion(t *testing.T) {
w := make(chan Response, 1)
_, err := cache.CreateWatch(req, subFromRequest(req), w)
require.NoError(t, err)
resp := verifyResponseResources(t, w, resource.EndpointType, "1d0dadc055487bf8", "a", "b")
resp := verifyResponseResources(t, w, resource.EndpointType, "f8fac96556140daa", "a", "b")
lastVersion, err = resp.GetVersion()
require.NoError(t, err)
assert.NotEmpty(t, lastVersion)
Expand Down Expand Up @@ -1513,7 +1513,7 @@ func TestLinearSotwVersion(t *testing.T) {
w := make(chan Response, 1)
_, err := cache.CreateWatch(req, subFromRequest(req), w)
require.NoError(t, err)
verifyResponseResources(t, w, resource.EndpointType, "test-prefix-1d0dadc055487bf8", "a", "b")
verifyResponseResources(t, w, resource.EndpointType, "test-prefix-"+lastVersion, "a", "b")
})

t.Run("watch opened with the same last version including prefix", func(t *testing.T) {
Expand Down Expand Up @@ -1543,7 +1543,7 @@ func TestLinearSotwVersion(t *testing.T) {

_ = cache.UpdateResource("e", &endpoint.ClusterLoadAssignment{ClusterName: "e"})
// Resources a and b are still at the proper version, so not returned
resp := verifyResponseResources(t, w, resource.EndpointType, "ef89d29eb6398b90", "e")
resp := verifyResponseResources(t, w, resource.EndpointType, "6ae65ee0b0c2bfa8", "e")
updateFromSotwResponse(resp, &sub, req)

w = make(chan Response, 1)
Expand All @@ -1558,12 +1558,12 @@ func TestLinearSotwVersion(t *testing.T) {
EndpointStaleAfter: durationpb.New(5 * time.Second),
}})
// Resources a and b are still at the proper version, so not returned
verifyResponseResources(t, w, resource.EndpointType, "51d88a339c93515b", "e")
verifyResponseResources(t, w, resource.EndpointType, "633e4f7cb4f55524", "e")

_ = cache.UpdateResource("e", &endpoint.ClusterLoadAssignment{ClusterName: "e"})

// Another watch created with the proper version does not trigger
req2 := buildRequest([]string{"a", "b", "e"}, "ef89d29eb6398b90")
req2 := buildRequest([]string{"a", "b", "e"}, "6ae65ee0b0c2bfa8")
sub2 := subFromRequest(req2)
w = make(chan Response, 1)
_, err = cache.CreateWatch(req2, sub2, w)
Expand All @@ -1577,7 +1577,7 @@ func TestLinearSotwVersion(t *testing.T) {
w := make(chan Response, 1)
_, err := cache.CreateWatch(req, subFromRequest(req), w)
require.NoError(t, err)
verifyResponseResources(t, w, resource.EndpointType, "be5530af715f4980", "a", "b", "c", "e")
verifyResponseResources(t, w, resource.EndpointType, "68113a35fda99df9", "a", "b", "c", "e")
})

t.Run("watch opened with the same last version and returning less resources", func(t *testing.T) {
Expand All @@ -1586,6 +1586,6 @@ func TestLinearSotwVersion(t *testing.T) {
w := make(chan Response, 1)
_, err := cache.CreateWatch(req, subFromRequest(req), w)
require.NoError(t, err)
verifyResponseResources(t, w, resource.EndpointType, "0aa479b0bd7e5474", "a")
verifyResponseResources(t, w, resource.EndpointType, "55876f045443ee06", "a")
})
}
17 changes: 15 additions & 2 deletions pkg/client/sotw/v3/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ func TestFetch(t *testing.T) {
defer cancel()

snapCache := cache.NewSnapshotCache(true, cache.IDHash{}, nil)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
err := startAdsServer(ctx, snapCache)
require.NoError(t, err)
}()
Expand All @@ -45,6 +49,10 @@ func TestFetch(t *testing.T) {

t.Run("Test initial fetch", testInitialFetch(ctx, snapCache, c))
t.Run("Test next fetch", testNextFetch(ctx, snapCache, c))

// Ensure the ADS server is properly shutdown and has released the port
cancel()
wg.Wait()
}

func testInitialFetch(ctx context.Context, snapCache cache.SnapshotCache, c client.ADSClient) func(t *testing.T) {
Expand All @@ -53,6 +61,7 @@ func testInitialFetch(ctx context.Context, snapCache cache.SnapshotCache, c clie
wg.Add(1)

go func() {
defer wg.Done()
// watch for configs
resp, err := c.Fetch()
require.NoError(t, err)
Expand All @@ -66,7 +75,6 @@ func testInitialFetch(ctx context.Context, snapCache cache.SnapshotCache, c clie

err = c.Ack()
require.NoError(t, err)
wg.Done()
}()

snapshot, err := cache.NewSnapshot("1", map[resource.Type][]types.Resource{
Expand All @@ -92,6 +100,7 @@ func testNextFetch(ctx context.Context, snapCache cache.SnapshotCache, c client.
wg.Add(1)

go func() {
defer wg.Done()
// watch for configs
resp, err := c.Fetch()
require.NoError(t, err)
Expand All @@ -105,7 +114,6 @@ func testNextFetch(ctx context.Context, snapCache cache.SnapshotCache, c client.

err = c.Ack()
require.NoError(t, err)
wg.Done()
}()

snapshot, err := cache.NewSnapshot("2", map[resource.Type][]types.Resource{
Expand Down Expand Up @@ -134,6 +142,11 @@ func startAdsServer(ctx context.Context, snapCache cache.SnapshotCache) error {
s := server.NewServer(ctx, snapCache, nil)
discovery.RegisterAggregatedDiscoveryServiceServer(grpcServer, s)

go func() {
<-ctx.Done()
grpcServer.Stop()
}()

if e := grpcServer.Serve(lis); e != nil {
err = e
}
Expand Down

0 comments on commit 11e1362

Please sign in to comment.