From 532e3113228009d82a9f1773ba0ffddc4511ecdf Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 2 Oct 2024 18:08:04 +0000 Subject: [PATCH 1/3] xdsclient: e2e style tests for ads stream restart --- .../tests/ads_stream_restart_test.go | 254 ++++++++++++++++++ 1 file changed, 254 insertions(+) create mode 100644 xds/internal/xdsclient/tests/ads_stream_restart_test.go diff --git a/xds/internal/xdsclient/tests/ads_stream_restart_test.go b/xds/internal/xdsclient/tests/ads_stream_restart_test.go new file mode 100644 index 000000000000..c1dd88a939ff --- /dev/null +++ b/xds/internal/xdsclient/tests/ads_stream_restart_test.go @@ -0,0 +1,254 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsclient_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/google/uuid" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/xds/internal/xdsclient" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" + + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" +) + +// Tests that an ADS stream is restarted after a connection failure. Also +// verifies that if there were any watches registered before the connection +// failed, those resources are re-requested after the stream is restarted. +func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) { + // Create a restartable listener that can simulate a broken ADS stream. + l, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("net.Listen() failed: %v", err) + } + lis := testutils.NewRestartableListener(l) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Start an xDS management server that uses a couple of channels to inform + // the test about the specific LDS and CDS resource names being requested. + ldsResourcesCh := make(chan []string, 1) + cdsResourcesCh := make(chan []string, 1) + streamOpened := make(chan struct{}, 1) + streamClosed := make(chan struct{}, 1) + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + Listener: lis, + OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + switch req.GetTypeUrl() { + case version.V3ClusterURL: + select { + case cdsResourcesCh <- req.GetResourceNames(): + default: + } + case version.V3ListenerURL: + t.Logf("Received LDS request for resources: %v", req.GetResourceNames()) + select { + case ldsResourcesCh <- req.GetResourceNames(): + default: + } + } + return nil + }, + OnStreamClosed: func(int64, *v3corepb.Node) { + select { + case streamClosed <- struct{}{}: + default: + } + + }, + OnStreamOpen: func(context.Context, int64, string) error { + select { + case streamOpened <- struct{}{}: + default: + } + return nil + }, + }) + + // Create a listener resource on the management server. + const listenerName = "listener" + const routeConfigName = "route-config" + nodeID := uuid.New().String() + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerName, routeConfigName)}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create bootstrap configuration pointing to the above management server. + bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + + // Create an xDS client with the above bootstrap configuration. + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bootstrapContents, + }) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer close() + + // Register a watch for a listener resource. + lw := newListenerWatcher() + ldsCancel := xdsresource.WatchListener(client, listenerName, lw) + + // Verify that an ADS stream is opened and an LDS request with the above + // resource name is sent. + select { + case <-streamOpened: + case <-ctx.Done(): + t.Fatal("Timeout when waiting for ADS stream to open") + } + if err := waitForResourceNames(ctx, t, ldsResourcesCh, []string{listenerName}); err != nil { + t.Fatal(err) + } + + // Cancel the watch for the above listener resource, and verify that an LDS + // request with no resource names is sent. + ldsCancel() + if err := waitForResourceNames(ctx, t, ldsResourcesCh, []string{}); err != nil { + t.Fatal(err) + } + + // Stop the restartable listener and wait for the stream to close. + lis.Stop() + select { + case <-streamClosed: + case <-ctx.Done(): + t.Fatal("Timeout when waiting for ADS stream to close") + } + + // Restart the restartable listener and wait for the stream to open. + lis.Restart() + select { + case <-streamOpened: + case <-ctx.Done(): + t.Fatal("Timeout when waiting for ADS stream to open") + } + + // Wait for a short duration and verify that no LDS request is sent, since + // there are no resources being watched. + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + select { + case <-sCtx.Done(): + case names := <-ldsResourcesCh: + t.Fatalf("LDS request sent for resource names %v, when expecting no request", names) + } + + // Register another watch for the same listener resource, and verify that an + // LDS request with the above resource name is sent. + ldsCancel = xdsresource.WatchListener(client, listenerName, lw) + if err := waitForResourceNames(ctx, t, ldsResourcesCh, []string{listenerName}); err != nil { + t.Fatal(err) + } + + // Create a cluster resource on the management server, in addition to the + // existing listener resource. + const clusterName = "cluster" + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerName, routeConfigName)}, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, clusterName, e2e.SecurityLevelNone)}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Register a watch for a cluster resource, and verify that a CDS request + // with the above resource name is sent. + cw := newClusterWatcher() + cdsCancel := xdsresource.WatchCluster(client, clusterName, cw) + if err := waitForResourceNames(ctx, t, cdsResourcesCh, []string{clusterName}); err != nil { + t.Fatal(err) + } + + // Cancel the watch for the above cluster resource, and verify that a CDS + // request with no resource names is sent. + cdsCancel() + if err := waitForResourceNames(ctx, t, cdsResourcesCh, []string{}); err != nil { + t.Fatal(err) + } + + // Stop the restartable listener and wait for the stream to close. + lis.Stop() + select { + case <-streamClosed: + case <-ctx.Done(): + t.Fatal("Timeout when waiting for ADS stream to close") + } + + // Restart the restartable listener and wait for the stream to open. + lis.Restart() + select { + case <-streamOpened: + case <-ctx.Done(): + t.Fatal("Timeout when waiting for ADS stream to open") + } + + // Verify that the listener resource is requested again. + if err := waitForResourceNames(ctx, t, ldsResourcesCh, []string{listenerName}); err != nil { + t.Fatal(err) + } + + // Wait for a short duration and verify that no CDS request is sent, since + // there are no resources being watched. + sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + select { + case <-sCtx.Done(): + case names := <-cdsResourcesCh: + t.Fatalf("LDS request sent for resource names %v, when expecting no request", names) + } + +} + +// waitForResourceNames waits for the wantNames to be received on namesCh. +// Returns a non-nil error if the context expires before that. +func waitForResourceNames(ctx context.Context, t *testing.T, namesCh chan []string, wantNames []string) error { + t.Helper() + + for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { + select { + case <-ctx.Done(): + case gotNames := <-namesCh: + if cmp.Equal(gotNames, wantNames, cmpopts.EquateEmpty(), cmpopts.SortSlices(func(s1, s2 string) bool { return s1 < s2 })) { + return nil + } + t.Logf("Received resource names %v, want %v", gotNames, wantNames) + } + } + return fmt.Errorf("timeout waiting for resource to be requested from the management server") +} From 41a903c2231930638e61b2ab32dd701af5ef1349 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 4 Oct 2024 22:34:30 +0000 Subject: [PATCH 2/3] first round of review comments from zasweq --- xds/internal/xdsclient/tests/ads_stream_restart_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/xds/internal/xdsclient/tests/ads_stream_restart_test.go b/xds/internal/xdsclient/tests/ads_stream_restart_test.go index c1dd88a939ff..07e07f34a6ee 100644 --- a/xds/internal/xdsclient/tests/ads_stream_restart_test.go +++ b/xds/internal/xdsclient/tests/ads_stream_restart_test.go @@ -230,7 +230,7 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) { select { case <-sCtx.Done(): case names := <-cdsResourcesCh: - t.Fatalf("LDS request sent for resource names %v, when expecting no request", names) + t.Fatalf("CDS request sent for resource names %v, when expecting no request", names) } } @@ -240,15 +240,16 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) { func waitForResourceNames(ctx context.Context, t *testing.T, namesCh chan []string, wantNames []string) error { t.Helper() + var gotNames []string for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { select { case <-ctx.Done(): - case gotNames := <-namesCh: + case gotNames = <-namesCh: if cmp.Equal(gotNames, wantNames, cmpopts.EquateEmpty(), cmpopts.SortSlices(func(s1, s2 string) bool { return s1 < s2 })) { return nil } t.Logf("Received resource names %v, want %v", gotNames, wantNames) } } - return fmt.Errorf("timeout waiting for resource to be requested from the management server") + return fmt.Errorf("timeout waiting expected resources to be requested. Last requested: %v, want: %v", gotNames, wantNames) } From ff256e8ad4d3652b3877f48f6dd159de2dc76fba Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 4 Oct 2024 22:42:17 +0000 Subject: [PATCH 3/3] make vet happy --- xds/internal/xdsclient/tests/ads_stream_restart_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xds/internal/xdsclient/tests/ads_stream_restart_test.go b/xds/internal/xdsclient/tests/ads_stream_restart_test.go index 07e07f34a6ee..c610ada91421 100644 --- a/xds/internal/xdsclient/tests/ads_stream_restart_test.go +++ b/xds/internal/xdsclient/tests/ads_stream_restart_test.go @@ -173,6 +173,7 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) { if err := waitForResourceNames(ctx, t, ldsResourcesCh, []string{listenerName}); err != nil { t.Fatal(err) } + defer ldsCancel() // Create a cluster resource on the management server, in addition to the // existing listener resource. @@ -232,7 +233,6 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) { case names := <-cdsResourcesCh: t.Fatalf("CDS request sent for resource names %v, when expecting no request", names) } - } // waitForResourceNames waits for the wantNames to be received on namesCh.