Skip to content

Commit

Permalink
[Linear cache] Enable SotwStableVersion subscription resumption only …
Browse files Browse the repository at this point in the history
…when using wildcard watches (#16)

During testing with grpc-xds, it was noticed that a specific behavior on the client side is not compatible with sotw subscription resumptions.
When the last channel is closed, the client disconnects from the control-plane. If the same channel gets reopened later on, the connection is re-established with the same resource subscription and the last version from before is provided. In this case the control-plane currently does not return the response as the version does match, whereas grpc expects the control-plane to reply as it considers it as a "desubscription then resubscription event", which should send the resource again.
In the context of wildcard watches this is not an issue, so the behavior is kept.

More context on the grpc-xds discussions in this [thread](grpc/grpc-go#7013 (comment))

Signed-off-by: Valerian Roche <valerian.roche@datadoghq.com>
  • Loading branch information
valerian-roche committed Aug 8, 2024
1 parent 35df9d3 commit 19f9668
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 35 deletions.
10 changes: 6 additions & 4 deletions pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,9 +519,7 @@ func (cache *LinearCache) CreateWatch(request *Request, sub Subscription, value
// We could optimize the reconnection case here if:
// - we take the assumption that clients will not start requesting wildcard while providing a version. We could then ignore requests providing the resources.
// - we use the version as some form of hash of resources known, and we can then consider it as a way to correctly verify whether all resources are unchanged.
// For now it is not done as:
// - for the first case, while the protocol documentation does not explicitly mention the case, it does not mark it impossible and explicitly references unsubscribing from wildcard.
// - for the second one we could likely do it with little difficulty if need be, but if users rely on the current monotonic version it could impact their callbacks implementations.
// When using the `WithSotwStableVersions` option, this optimization is activated and avoids resending all the dataset on wildcard watch resumption if no change has occurred.
watch := ResponseWatch{
Request: request,
Response: value,
Expand All @@ -541,10 +539,14 @@ func (cache *LinearCache) CreateWatch(request *Request, sub Subscription, value
if response != nil {
// If the request
// - is the first
// - is wildcard
// - provides a non-empty version, matching the version prefix
// and the cache uses stable versions, if the generated versions are the same as the previous one, we do not return the response.
// This avoids resending all data if the new subscription is just a resumption of the previous one.
if cache.useStableVersionsInSotw && request.GetResponseNonce() == "" && !replyEvenIfEmpty {
// This optimization is only done on wildcard as we cannot track if a subscription is "new" at this stage and needs to be returned.
// In the context of wildcard it could be incorrect if the subscription is newly wildcard, and we already returned all objects,
// but as of Q1-2024 there are no known usecases of a subscription becoming wildcard (in envoy of xds-grpc).
if cache.useStableVersionsInSotw && sub.IsWildcard() && request.GetResponseNonce() == "" && !replyEvenIfEmpty {
if request.GetVersionInfo() != response.GetResponseVersion() {
// The response has a different returned version map as the request
shouldReply = true
Expand Down
67 changes: 36 additions & 31 deletions pkg/cache/v3/linear_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1498,51 +1498,44 @@ func TestLinearSotwVersion(t *testing.T) {
var lastVersion string
t.Run("watch without any version is replied to", func(t *testing.T) {
cache.log = log.NewTestLogger(t)
req := buildRequest([]string{"a", "b", "d"}, "")
req := buildRequest([]string{"a", "b", "c", "d"}, "")
w := make(chan Response, 1)
_, err := cache.CreateWatch(req, subFromRequest(req), w)
require.NoError(t, err)
resp := verifyResponseResources(t, w, resource.EndpointType, "f8fac96556140daa", "a", "b")
resp := verifyResponseResources(t, w, resource.EndpointType, "9430c853ac129267", "a", "b", "c")
lastVersion, err = resp.GetVersion()
require.NoError(t, err)
assert.NotEmpty(t, lastVersion)
})

t.Run("watch opened with the same last version", func(t *testing.T) {
t.Run("watch opened with the same last version but not wildcard is replied to", func(t *testing.T) {
cache.log = log.NewTestLogger(t)
req := buildRequest([]string{"a", "b", "d"}, lastVersion)
req := buildRequest([]string{"a", "b", "c", "d"}, lastVersion)
w := make(chan Response, 1)
_, err := cache.CreateWatch(req, subFromRequest(req), w)
require.NoError(t, err)
mustBlock(t, w)
verifyResponseResources(t, w, resource.EndpointType, lastVersion, "a", "b", "c")
})

t.Run("watch opened with the same last version and different prefix", func(t *testing.T) {
t.Run("watch opened with the same last version and wildcard is not replied to", func(t *testing.T) {
cache.log = log.NewTestLogger(t)
req := buildRequest([]string{"a", "b", "d"}, "test-prefix-"+lastVersion)
req := buildRequest([]string{}, lastVersion)
w := make(chan Response, 1)
_, err := cache.CreateWatch(req, subFromRequest(req), w)
require.NoError(t, err)
verifyResponseResources(t, w, resource.EndpointType, lastVersion, "a", "b")
mustBlock(t, w)
})

t.Run("watch opened with the same last version missing prefix", func(t *testing.T) {
cache := NewLinearCache(resource.EndpointType, WithLogger(log.NewTestLogger(t)), WithInitialResources(
map[string]types.Resource{
"a": &endpoint.ClusterLoadAssignment{ClusterName: "a"},
"b": &endpoint.ClusterLoadAssignment{ClusterName: "b"},
"c": &endpoint.ClusterLoadAssignment{ClusterName: "c"},
},
), WithSotwStableVersions(), WithVersionPrefix("test-prefix-"))

req := buildRequest([]string{"a", "b", "d"}, lastVersion)
t.Run("watch opened with the same last version and different prefix", func(t *testing.T) {
cache.log = log.NewTestLogger(t)
req := buildRequest([]string{}, "test-prefix-"+lastVersion)
w := make(chan Response, 1)
_, err := cache.CreateWatch(req, subFromRequest(req), w)
require.NoError(t, err)
verifyResponseResources(t, w, resource.EndpointType, "test-prefix-"+lastVersion, "a", "b")
verifyResponseResources(t, w, resource.EndpointType, lastVersion, "a", "b", "c")
})

t.Run("watch opened with the same last version including prefix", func(t *testing.T) {
t.Run("cache with prefix", func(t *testing.T) {
cache := NewLinearCache(resource.EndpointType, WithLogger(log.NewTestLogger(t)), WithInitialResources(
map[string]types.Resource{
"a": &endpoint.ClusterLoadAssignment{ClusterName: "a"},
Expand All @@ -1551,16 +1544,26 @@ func TestLinearSotwVersion(t *testing.T) {
},
), WithSotwStableVersions(), WithVersionPrefix("test-prefix-"))

req := buildRequest([]string{"a", "b", "d"}, "test-prefix-"+lastVersion)
w := make(chan Response, 1)
_, err := cache.CreateWatch(req, subFromRequest(req), w)
require.NoError(t, err)
mustBlock(t, w)
t.Run("watch opened with the same last version missing prefix", func(t *testing.T) {
req := buildRequest([]string{}, lastVersion)
w := make(chan Response, 1)
_, err := cache.CreateWatch(req, subFromRequest(req), w)
require.NoError(t, err)
verifyResponseResources(t, w, resource.EndpointType, "test-prefix-"+lastVersion, "a", "b", "c")
})

t.Run("watch opened with the same last version including prefix", func(t *testing.T) {
req := buildRequest([]string{}, "test-prefix-"+lastVersion)
w := make(chan Response, 1)
_, err := cache.CreateWatch(req, subFromRequest(req), w)
require.NoError(t, err)
mustBlock(t, w)
})
})

t.Run("watch opened with the same last version, different resource not changing the response", func(t *testing.T) {
cache.log = log.NewTestLogger(t)
req := buildRequest([]string{"a", "b", "e"}, lastVersion)
req := buildRequest([]string{}, lastVersion)
sub := subFromRequest(req)
w := make(chan Response, 1)
_, err := cache.CreateWatch(req, sub, w)
Expand All @@ -1569,7 +1572,7 @@ func TestLinearSotwVersion(t *testing.T) {

_ = cache.UpdateResource("e", &endpoint.ClusterLoadAssignment{ClusterName: "e"})
// Resources a and b are still at the proper version, so not returned
resp := verifyResponseResources(t, w, resource.EndpointType, "6ae65ee0b0c2bfa8", "e")
resp := verifyResponseResources(t, w, resource.EndpointType, "68113a35fda99df9", "e")
updateFromSotwResponse(resp, &sub, req)

w = make(chan Response, 1)
Expand All @@ -1584,12 +1587,12 @@ func TestLinearSotwVersion(t *testing.T) {
EndpointStaleAfter: durationpb.New(5 * time.Second),
}})
// Resources a and b are still at the proper version, so not returned
verifyResponseResources(t, w, resource.EndpointType, "633e4f7cb4f55524", "e")
verifyResponseResources(t, w, resource.EndpointType, "7587f9c195c96581", "e")

_ = cache.UpdateResource("e", &endpoint.ClusterLoadAssignment{ClusterName: "e"})

// Another watch created with the proper version does not trigger
req2 := buildRequest([]string{"a", "b", "e"}, "6ae65ee0b0c2bfa8")
req2 := buildRequest([]string{}, "68113a35fda99df9")
sub2 := subFromRequest(req2)
w = make(chan Response, 1)
_, err = cache.CreateWatch(req2, sub2, w)
Expand All @@ -1606,12 +1609,14 @@ func TestLinearSotwVersion(t *testing.T) {
verifyResponseResources(t, w, resource.EndpointType, "68113a35fda99df9", "a", "b", "c", "e")
})

_ = cache.DeleteResource("e")

t.Run("watch opened with the same last version and returning less resources", func(t *testing.T) {
cache.log = log.NewTestLogger(t)
req := buildRequest([]string{"a", "d"}, lastVersion)
req := buildRequest([]string{}, "68113a35fda99df9")
w := make(chan Response, 1)
_, err := cache.CreateWatch(req, subFromRequest(req), w)
require.NoError(t, err)
verifyResponseResources(t, w, resource.EndpointType, "55876f045443ee06", "a")
verifyResponseResources(t, w, resource.EndpointType, lastVersion, "a", "b", "c")
})
}

0 comments on commit 19f9668

Please sign in to comment.