From 25fcc158ea413b49339dfc57969aea80cbec5129 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 2 Oct 2024 13:18:42 +0000 Subject: [PATCH 1/4] xdsclient: switch more transport tests to e2e style --- xds/internal/xdsclient/authority.go | 2 + xds/internal/xdsclient/client_new.go | 16 +- xds/internal/xdsclient/client_refcounted.go | 7 +- xds/internal/xdsclient/clientimpl.go | 1 + .../xdsclient/clientimpl_authority.go | 1 + .../tests/ads_stream_backoff_test.go | 438 +++++++++++++++++ .../xdsclient/tests/resource_update_test.go | 30 +- .../xdsclient/transport/loadreport_test.go | 11 + .../transport/transport_backoff_test.go | 449 ------------------ .../transport/transport_resource_test.go | 422 ---------------- .../xdsclient/transport/transport_test.go | 22 + 11 files changed, 503 insertions(+), 896 deletions(-) create mode 100644 xds/internal/xdsclient/tests/ads_stream_backoff_test.go delete mode 100644 xds/internal/xdsclient/transport/transport_backoff_test.go delete mode 100644 xds/internal/xdsclient/transport/transport_resource_test.go diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index 7c6766a77f00..668c436fb5fb 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -104,6 +104,7 @@ type authorityArgs struct { serializer *grpcsync.CallbackSerializer resourceTypeGetter func(string) xdsresource.Type watchExpiryTimeout time.Duration + backoff func(int) time.Duration // Backoff for ADS and LRS stream failures. logger *grpclog.PrefixLogger } @@ -123,6 +124,7 @@ func newAuthority(args authorityArgs) (*authority, error) { OnRecvHandler: ret.handleResourceUpdate, OnErrorHandler: ret.newConnectionError, OnSendHandler: ret.transportOnSendHandler, + Backoff: args.backoff, Logger: args.logger, NodeProto: args.bootstrapCfg.Node(), }) diff --git a/xds/internal/xdsclient/client_new.go b/xds/internal/xdsclient/client_new.go index 6097e86925e6..5e11f557b234 100644 --- a/xds/internal/xdsclient/client_new.go +++ b/xds/internal/xdsclient/client_new.go @@ -25,6 +25,7 @@ import ( "time" "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/backoff" "google.golang.org/grpc/internal/cache" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/xds/bootstrap" @@ -53,16 +54,17 @@ const NameForServer = "#server" // only when all references are released, and it is safe for the caller to // invoke this close function multiple times. func New(name string) (XDSClient, func(), error) { - return newRefCounted(name, defaultWatchExpiryTimeout, defaultIdleAuthorityDeleteTimeout) + return newRefCounted(name, defaultWatchExpiryTimeout, defaultIdleAuthorityDeleteTimeout, backoff.DefaultExponential.Backoff) } // newClientImpl returns a new xdsClient with the given config. -func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, idleAuthorityDeleteTimeout time.Duration) (*clientImpl, error) { +func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, idleAuthorityDeleteTimeout time.Duration, streamBackoff func(int) time.Duration) (*clientImpl, error) { ctx, cancel := context.WithCancel(context.Background()) c := &clientImpl{ done: grpcsync.NewEvent(), config: config, watchExpiryTimeout: watchExpiryTimeout, + backoff: streamBackoff, serializer: grpcsync.NewCallbackSerializer(ctx), serializerClose: cancel, resourceTypes: newResourceTypeRegistry(), @@ -90,6 +92,11 @@ type OptionsForTesting struct { // AuthorityIdleTimeout is the timeout before idle authorities are deleted. // If unspecified, uses the default value used in non-test code. AuthorityIdleTimeout time.Duration + + // StreamBackoffAfterFailure is the backoff function used to determine the + // backoff duration after stream failures. If unspecified, uses the default + // value used in non-test code. + StreamBackoffAfterFailure func(int) time.Duration } // NewForTesting returns an xDS client configured with the provided options. @@ -111,11 +118,14 @@ func NewForTesting(opts OptionsForTesting) (XDSClient, func(), error) { if opts.AuthorityIdleTimeout == 0 { opts.AuthorityIdleTimeout = defaultIdleAuthorityDeleteTimeout } + if opts.StreamBackoffAfterFailure == nil { + opts.StreamBackoffAfterFailure = defaultStreamBackoffFunc + } if err := bootstrap.SetFallbackBootstrapConfig(opts.Contents); err != nil { return nil, nil, err } - client, cancel, err := newRefCounted(opts.Name, opts.WatchExpiryTimeout, opts.AuthorityIdleTimeout) + client, cancel, err := newRefCounted(opts.Name, opts.WatchExpiryTimeout, opts.AuthorityIdleTimeout, opts.StreamBackoffAfterFailure) return client, func() { bootstrap.UnsetFallbackBootstrapConfigForTesting(); cancel() }, err } diff --git a/xds/internal/xdsclient/client_refcounted.go b/xds/internal/xdsclient/client_refcounted.go index 1efb4de42eb2..a8c7213aea8e 100644 --- a/xds/internal/xdsclient/client_refcounted.go +++ b/xds/internal/xdsclient/client_refcounted.go @@ -23,6 +23,7 @@ import ( "sync/atomic" "time" + "google.golang.org/grpc/internal/backoff" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/xds/bootstrap" ) @@ -37,6 +38,8 @@ var ( // overridden in tests to give them visibility into certain events. xdsClientImplCreateHook = func(string) {} xdsClientImplCloseHook = func(string) {} + + defaultStreamBackoffFunc = backoff.DefaultExponential.Backoff ) func clientRefCountedClose(name string) { @@ -60,7 +63,7 @@ func clientRefCountedClose(name string) { // newRefCounted creates a new reference counted xDS client implementation for // name, if one does not exist already. If an xDS client for the given name // exists, it gets a reference to it and returns it. -func newRefCounted(name string, watchExpiryTimeout, idleAuthorityTimeout time.Duration) (XDSClient, func(), error) { +func newRefCounted(name string, watchExpiryTimeout, idleAuthorityTimeout time.Duration, streamBackoff func(int) time.Duration) (XDSClient, func(), error) { clientsMu.Lock() defer clientsMu.Unlock() @@ -74,7 +77,7 @@ func newRefCounted(name string, watchExpiryTimeout, idleAuthorityTimeout time.Du if err != nil { return nil, nil, fmt.Errorf("xds: failed to get xDS bootstrap config: %v", err) } - c, err := newClientImpl(config, watchExpiryTimeout, idleAuthorityTimeout) + c, err := newClientImpl(config, watchExpiryTimeout, idleAuthorityTimeout, streamBackoff) if err != nil { return nil, nil, err } diff --git a/xds/internal/xdsclient/clientimpl.go b/xds/internal/xdsclient/clientimpl.go index 9f619016a08e..715b1d61548b 100644 --- a/xds/internal/xdsclient/clientimpl.go +++ b/xds/internal/xdsclient/clientimpl.go @@ -37,6 +37,7 @@ type clientImpl struct { config *bootstrap.Config logger *grpclog.PrefixLogger watchExpiryTimeout time.Duration + backoff func(int) time.Duration // Backoff for ADS and LRS stream failures. serializer *grpcsync.CallbackSerializer serializerClose func() resourceTypes *resourceTypeRegistry diff --git a/xds/internal/xdsclient/clientimpl_authority.go b/xds/internal/xdsclient/clientimpl_authority.go index 1ce20fabdf83..56c26b81754c 100644 --- a/xds/internal/xdsclient/clientimpl_authority.go +++ b/xds/internal/xdsclient/clientimpl_authority.go @@ -114,6 +114,7 @@ func (c *clientImpl) newAuthorityLocked(config *bootstrap.ServerConfig) (_ *auth serializer: c.serializer, resourceTypeGetter: c.resourceTypes.get, watchExpiryTimeout: c.watchExpiryTimeout, + backoff: c.backoff, logger: grpclog.NewPrefixLogger(logger, authorityPrefix(c, config.ServerURI())), }) if err != nil { diff --git a/xds/internal/xdsclient/tests/ads_stream_backoff_test.go b/xds/internal/xdsclient/tests/ads_stream_backoff_test.go new file mode 100644 index 000000000000..80b5d9309313 --- /dev/null +++ b/xds/internal/xdsclient/tests/ads_stream_backoff_test.go @@ -0,0 +1,438 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsclient_test + +import ( + "context" + "errors" + "fmt" + "strings" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/google/uuid" + "google.golang.org/grpc" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/xds/internal/xdsclient" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" + "google.golang.org/protobuf/testing/protocmp" + + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" +) + +// Creates an xDS client with the given bootstrap contents and backoff function. +func createXDSClientWithBackoff(t *testing.T, bootstrapContents []byte, streamBackoff func(int) time.Duration) xdsclient.XDSClient { + t.Helper() + + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + StreamBackoffAfterFailure: streamBackoff, + Contents: bootstrapContents, + }) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + t.Cleanup(close) + return client +} + +// Tests the case where the management server returns an error in the ADS +// streaming RPC. Verifies that the ADS stream is restarted after a backoff +// period, and that the previously requested resources are re-requested on the +// new stream. +func (s) TestADS_BackoffAfterStreamFailure(t *testing.T) { + // Channels used for verifying different events in the test. + streamCloseCh := make(chan struct{}, 1) // ADS stream is closed. + ldsResourcesCh := make(chan []string, 1) // Listener resource names in the discovery request. + backoffCh := make(chan struct{}, 1) // Backoff after stream failure. + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Create an xDS management server that returns RPC errors. + streamErr := errors.New("ADS stream error") + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + // Push the requested resource names on to a channel. + if req.GetTypeUrl() == version.V3ListenerURL { + t.Logf("Received LDS request for resources: %v", req.GetResourceNames()) + select { + case ldsResourcesCh <- req.GetResourceNames(): + default: + } + } + // Return an error everytime a request is sent on the stream. This + // should cause the transport to backoff before attempting to + // recreate the stream. + return streamErr + }, + // Push on a channel whenever the stream is closed. + OnStreamClosed: func(int64, *v3corepb.Node) { + select { + case streamCloseCh <- struct{}{}: + default: + } + }, + }) + + // Override the backoff implementation to push on a channel that is read by + // the test goroutine. + streamBackoff := func(v int) time.Duration { + select { + case backoffCh <- struct{}{}: + default: + } + return 0 + } + + // Create an xDS client with bootstrap pointing to the above server. + nodeID := uuid.New().String() + bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + testutils.CreateBootstrapFileForTesting(t, bc) + client := createXDSClientWithBackoff(t, bc, streamBackoff) + + // Register a watch for a listener resource. + const listenerName = "listener" + lw := newListenerWatcher() + ldsCancel := xdsresource.WatchListener(client, listenerName, lw) + defer ldsCancel() + + // Verify that an ADS stream is created and an LDS request with the above + // resource name is sent. + if err := waitForResourceNames(ctx, t, ldsResourcesCh, []string{listenerName}); err != nil { + t.Fatal(err) + } + + // Verify that the received stream error is reported to the watcher. + u, err := lw.updateCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for an error callback on the listener watcher") + } + gotErr := u.(listenerUpdateErrTuple).err + if !strings.Contains(gotErr.Error(), streamErr.Error()) { + t.Fatalf("Received stream error: %v, wantErr: %v", gotErr, streamErr) + } + + // Verify that the stream is closed. + select { + case <-streamCloseCh: + case <-ctx.Done(): + t.Fatalf("Timeout waiting for stream to be closed after an error") + } + + // Verify that the ADS stream backs off before recreating the stream. + select { + case <-backoffCh: + case <-ctx.Done(): + t.Fatalf("Timeout waiting for ADS stream to backoff after stream failure") + } + + // Verify that the same resource name is re-requested on the new stream. + if err := waitForResourceNames(ctx, t, ldsResourcesCh, []string{listenerName}); err != nil { + t.Fatal(err) + } +} + +// Tests the case where a stream breaks because the server goes down. Verifies +// that when the server comes back up, the same resources are re-requested, this +// time with the previously acked version and an empty nonce. +func (s) TestADS_RetriesAfterBrokenStream(t *testing.T) { + // Channels used for verifying different events in the test. + streamRequestCh := make(chan *v3discoverypb.DiscoveryRequest, 1) // Discovery request is received. + streamResponseCh := make(chan *v3discoverypb.DiscoveryResponse, 1) // Discovery response is received. + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Create an xDS management server listening on a local port. + l, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("Failed to create a local listener for the xDS management server: %v", err) + } + lis := testutils.NewRestartableListener(l) + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + Listener: lis, + // Push the received request on to a channel for the test goroutine to + // verify that it matches expectations. + OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + select { + case streamRequestCh <- req: + default: + } + return nil + }, + // Push the response that the management server is about to send on to a + // channel. The test goroutine to uses this to extract the version and + // nonce, expected on subsequent requests. + OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) { + select { + case streamResponseCh <- resp: + default: + } + }, + }) + + // Create a listener resource on the management server. + const listenerName = "listener" + const routeConfigName = "route-config" + nodeID := uuid.New().String() + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerName, routeConfigName)}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Override the backoff implementation to always return 0, to reduce test + // run time. Instead control when the backoff returns by blocking on a + // channel, that the test closes. + backoffCh := make(chan struct{}, 1) + unblockBackoffCh := make(chan struct{}) + streamBackoff := func(v int) time.Duration { + select { + case backoffCh <- struct{}{}: + default: + } + <-unblockBackoffCh + return 0 + } + + // Create an xDS client with bootstrap pointing to the above server. + bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + testutils.CreateBootstrapFileForTesting(t, bc) + client := createXDSClientWithBackoff(t, bc, streamBackoff) + + // Register a watch for a listener resource. + lw := newListenerWatcher() + ldsCancel := xdsresource.WatchListener(client, listenerName, lw) + defer ldsCancel() + + // Verify that the initial discovery request matches expectation. + var gotReq *v3discoverypb.DiscoveryRequest + select { + case gotReq = <-streamRequestCh: + case <-ctx.Done(): + t.Fatalf("Timeout waiting for discovery request on the stream") + } + wantReq := &v3discoverypb.DiscoveryRequest{ + VersionInfo: "", + Node: &v3corepb.Node{ + Id: nodeID, + UserAgentName: "gRPC Go", + UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: grpc.Version}, + ClientFeatures: []string{"envoy.lb.does_not_support_overprovisioning", "xds.config.resource-in-sotw"}, + }, + ResourceNames: []string{listenerName}, + TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", + ResponseNonce: "", + } + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) + } + + // Capture the version and nonce from the response. + var gotResp *v3discoverypb.DiscoveryResponse + select { + case gotResp = <-streamResponseCh: + case <-ctx.Done(): + t.Fatalf("Timeout waiting for discovery response on the stream") + } + version := gotResp.GetVersionInfo() + nonce := gotResp.GetNonce() + + // Verify that the ACK contains the appropriate version and nonce. + wantReq.VersionInfo = version + wantReq.ResponseNonce = nonce + select { + case gotReq = <-streamRequestCh: + case <-ctx.Done(): + t.Fatalf("Timeout waiting for the discovery request ACK on the stream") + } + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) + } + + // Verify the update received by the watcher. + wantUpdate := listenerUpdateErrTuple{ + update: xdsresource.ListenerUpdate{ + RouteConfigName: routeConfigName, + HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, + }, + } + if err := verifyListenerUpdate(ctx, lw.updateCh, wantUpdate); err != nil { + t.Fatal(err) + } + + // Bring down the management server to simulate a broken stream. + lis.Stop() + + // Verify that the error callback on the watcher is not invoked. + verifyNoListenerUpdate(ctx, lw.updateCh) + + // Wait for backoff to kick in. + select { + case <-backoffCh: + case <-ctx.Done(): + t.Fatal("Timeout waiting for stream backoff") + } + + // Bring up the connection to the management server, and unblock the backoff + // implementation. + lis.Restart() + close(unblockBackoffCh) + + // Verify that the transport creates a new stream and sends out a new + // request which contains the previously acked version, but an empty nonce. + wantReq.ResponseNonce = "" + select { + case gotReq = <-streamRequestCh: + case <-ctx.Done(): + t.Fatalf("Timeout waiting for the discovery request ACK on the stream") + } + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) + } +} + +// Tests the case where a resource is requested before the a valid ADS stream +// exists. Verifies that the a discovery request is sent out for the previously +// requested resource once a valid stream is created. +func (s) TestADS_ResourceRequestedBeforeStreamCreation(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Channels used for verifying different events in the test. + streamRequestCh := make(chan *v3discoverypb.DiscoveryRequest, 1) // Discovery request is received. + + // Create an xDS management server listening on a local port. + l, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("Failed to create a local listener for the xDS management server: %v", err) + } + lis := testutils.NewRestartableListener(l) + streamErr := errors.New("ADS stream error") + + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + Listener: lis, + + // Return an error everytime a request is sent on the stream. This + // should cause the transport to backoff before attempting to recreate + // the stream. + OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error { + select { + case streamRequestCh <- req: + default: + } + return streamErr + }, + }) + + // Bring down the management server before creating the transport. This + // allows us to test the case where SendRequest() is called when there is no + // stream to the management server. + lis.Stop() + + // Override the backoff implementation to always return 0, to reduce test + // run time. Instead control when the backoff returns by blocking on a + // channel, that the test closes. + backoffCh := make(chan struct{}, 1) + unblockBackoffCh := make(chan struct{}) + streamBackoff := func(v int) time.Duration { + select { + case backoffCh <- struct{}{}: + default: + } + <-unblockBackoffCh + return 0 + } + + // Create an xDS client with bootstrap pointing to the above server. + nodeID := uuid.New().String() + bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + testutils.CreateBootstrapFileForTesting(t, bc) + client := createXDSClientWithBackoff(t, bc, streamBackoff) + + // Register a watch for a listener resource. + const listenerName = "listener" + lw := newListenerWatcher() + ldsCancel := xdsresource.WatchListener(client, listenerName, lw) + defer ldsCancel() + + // The above watch results in an attempt to create a new stream, which will + // fail, and will result in backoff. Wait for backoff to kick in. + select { + case <-backoffCh: + case <-ctx.Done(): + t.Fatal("Timeout waiting for stream backoff") + } + + // Bring up the connection to the management server, and unblock the backoff + // implementation. + lis.Restart() + close(unblockBackoffCh) + + // Verify that the initial discovery request matches expectation. + var gotReq *v3discoverypb.DiscoveryRequest + select { + case gotReq = <-streamRequestCh: + case <-ctx.Done(): + t.Fatalf("Timeout waiting for discovery request on the stream") + } + wantReq := &v3discoverypb.DiscoveryRequest{ + VersionInfo: "", + Node: &v3corepb.Node{ + Id: nodeID, + UserAgentName: "gRPC Go", + UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: grpc.Version}, + ClientFeatures: []string{"envoy.lb.does_not_support_overprovisioning", "xds.config.resource-in-sotw"}, + }, + ResourceNames: []string{listenerName}, + TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", + ResponseNonce: "", + } + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) + } +} + +// waitForResourceNames waits for the wantNames to be received on namesCh. +// Returns a non-nil error if the context expires before that. +func waitForResourceNames(ctx context.Context, t *testing.T, namesCh chan []string, wantNames []string) error { + t.Helper() + + for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { + select { + case <-ctx.Done(): + case gotNames := <-namesCh: + if cmp.Equal(gotNames, wantNames, cmpopts.EquateEmpty(), cmpopts.SortSlices(func(s1, s2 string) bool { return s1 < s2 })) { + return nil + } + t.Logf("Received resource names %v, want %v", gotNames, wantNames) + } + } + return fmt.Errorf("timeout waiting for resource to be requested from the management server") +} diff --git a/xds/internal/xdsclient/tests/resource_update_test.go b/xds/internal/xdsclient/tests/resource_update_test.go index 43a655a7f959..b493c820c774 100644 --- a/xds/internal/xdsclient/tests/resource_update_test.go +++ b/xds/internal/xdsclient/tests/resource_update_test.go @@ -54,15 +54,17 @@ import ( _ "google.golang.org/grpc/xds/internal/httpfilter/router" // Register the router filter. ) -// startFakeManagementServer starts a fake xDS management server and returns a +// startFakeManagementServer starts a fake xDS management server and registers a // cleanup function to close the fake server. -func startFakeManagementServer(t *testing.T) (*fakeserver.Server, func()) { +func startFakeManagementServer(t *testing.T) *fakeserver.Server { t.Helper() - fs, sCleanup, err := fakeserver.StartServer(nil) + fs, cleanup, err := fakeserver.StartServer(nil) if err != nil { t.Fatalf("Failed to start fake xDS server: %v", err) } - return fs, sCleanup + t.Logf("Started xDS management server on %s", fs.Address) + t.Cleanup(cleanup) + return fs } func compareUpdateMetadata(ctx context.Context, dumpFunc func() *v3statuspb.ClientStatusResponse, want []*v3statuspb.ClientConfig_GenericXdsConfig) error { @@ -276,9 +278,7 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) { t.Run(test.desc, func(t *testing.T) { // Create a fake xDS management server listening on a local port, // and set it up with the response to send. - mgmtServer, cleanup := startFakeManagementServer(t) - defer cleanup() - t.Logf("Started xDS management server on %s", mgmtServer.Address) + mgmtServer := startFakeManagementServer(t) // Create an xDS client talking to the above management server. nodeID := uuid.New().String() @@ -292,7 +292,6 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) { t.Fatalf("Failed to create an xDS client: %v", err) } defer close() - t.Logf("Created xDS client to %s", mgmtServer.Address) // Register a watch, and push the results on to a channel. lw := newListenerWatcher() @@ -555,9 +554,7 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) { t.Run(test.desc, func(t *testing.T) { // Create a fake xDS management server listening on a local port, // and set it up with the response to send. - mgmtServer, cleanup := startFakeManagementServer(t) - defer cleanup() - t.Logf("Started xDS management server on %s", mgmtServer.Address) + mgmtServer := startFakeManagementServer(t) // Create an xDS client talking to the above management server. nodeID := uuid.New().String() @@ -571,7 +568,6 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) { t.Fatalf("Failed to create an xDS client: %v", err) } defer close() - t.Logf("Created xDS client to %s", mgmtServer.Address) // Register a watch, and push the results on to a channel. rw := newRouteConfigWatcher() @@ -795,9 +791,7 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) { t.Run(test.desc, func(t *testing.T) { // Create a fake xDS management server listening on a local port, // and set it up with the response to send. - mgmtServer, cleanup := startFakeManagementServer(t) - defer cleanup() - t.Logf("Started xDS management server on %s", mgmtServer.Address) + mgmtServer := startFakeManagementServer(t) // Create an xDS client talking to the above management server. nodeID := uuid.New().String() @@ -811,7 +805,6 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) { t.Fatalf("Failed to create an xDS client: %v", err) } defer close() - t.Logf("Created xDS client to %s", mgmtServer.Address) // Register a watch, and push the results on to a channel. cw := newClusterWatcher() @@ -1147,9 +1140,7 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) { t.Run(test.desc, func(t *testing.T) { // Create a fake xDS management server listening on a local port, // and set it up with the response to send. - mgmtServer, cleanup := startFakeManagementServer(t) - defer cleanup() - t.Logf("Started xDS management server on %s", mgmtServer.Address) + mgmtServer := startFakeManagementServer(t) // Create an xDS client talking to the above management server. nodeID := uuid.New().String() @@ -1163,7 +1154,6 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) { t.Fatalf("Failed to create an xDS client: %v", err) } defer close() - t.Logf("Created xDS client to %s", mgmtServer.Address) // Register a watch, and push the results on to a channel. ew := newEndpointsWatcher() diff --git a/xds/internal/xdsclient/transport/loadreport_test.go b/xds/internal/xdsclient/transport/loadreport_test.go index c359025c0b1c..9ed9b12d1792 100644 --- a/xds/internal/xdsclient/transport/loadreport_test.go +++ b/xds/internal/xdsclient/transport/loadreport_test.go @@ -52,6 +52,17 @@ var ( ) ) +// startFakeManagementServer starts a fake xDS management server and returns a +// cleanup function to close the fake server. +func startFakeManagementServer(t *testing.T) (*fakeserver.Server, func()) { + t.Helper() + fs, sCleanup, err := fakeserver.StartServer(nil) + if err != nil { + t.Fatalf("Failed to start fake xDS server: %v", err) + } + return fs, sCleanup +} + func (s) TestReportLoad(t *testing.T) { // Create a fake xDS management server listening on a local port. mgmtServer, cleanup := startFakeManagementServer(t) diff --git a/xds/internal/xdsclient/transport/transport_backoff_test.go b/xds/internal/xdsclient/transport/transport_backoff_test.go deleted file mode 100644 index f85c10b5466d..000000000000 --- a/xds/internal/xdsclient/transport/transport_backoff_test.go +++ /dev/null @@ -1,449 +0,0 @@ -/* - * - * Copyright 2022 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package transport_test - -import ( - "context" - "errors" - "strings" - "testing" - "time" - - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "github.com/google/uuid" - "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/internal/testutils" - "google.golang.org/grpc/internal/testutils/xds/e2e" - "google.golang.org/grpc/internal/xds/bootstrap" - "google.golang.org/grpc/xds/internal/xdsclient/transport" - "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" - "google.golang.org/protobuf/testing/protocmp" - "google.golang.org/protobuf/types/known/anypb" - - v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" - v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" - v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" -) - -var strSort = func(s1, s2 string) bool { return s1 < s2 } - -var noopRecvHandler = func(_ transport.ResourceUpdate, onDone func()) error { - onDone() - return nil -} - -// TestTransport_BackoffAfterStreamFailure tests the case where the management -// server returns an error in the ADS streaming RPC. The test verifies the -// following: -// 1. Initial discovery request matches expectation. -// 2. RPC error is propagated via the stream error handler. -// 3. When the stream is closed, the transport backs off. -// 4. The same discovery request is sent on the newly created stream. -func (s) TestTransport_BackoffAfterStreamFailure(t *testing.T) { - // Channels used for verifying different events in the test. - streamCloseCh := make(chan struct{}, 1) // ADS stream is closed. - streamRequestCh := make(chan *v3discoverypb.DiscoveryRequest, 1) // Discovery request is received. - backoffCh := make(chan struct{}, 1) // Transport backoff after stream failure. - streamErrCh := make(chan error, 1) // Stream error seen by the transport. - - // Create an xDS management server listening on a local port. - streamErr := errors.New("ADS stream error") - mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ - // Push on a channel whenever the stream is closed. - OnStreamClosed: func(int64, *v3corepb.Node) { - select { - case streamCloseCh <- struct{}{}: - default: - } - }, - - // Return an error everytime a request is sent on the stream. This - // should cause the transport to backoff before attempting to recreate - // the stream. - OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error { - select { - case streamRequestCh <- req: - default: - } - return streamErr - }, - }) - - // Override the backoff implementation to push on a channel that is read by - // the test goroutine. - transportBackoff := func(v int) time.Duration { - select { - case backoffCh <- struct{}{}: - default: - } - return 0 - } - - serverCfg, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{URI: mgmtServer.Address}) - if err != nil { - t.Fatalf("Failed to create server config for testing: %v", err) - } - - // Create a new transport. Since we are only testing backoff behavior here, - // we can pass a no-op data model layer implementation. - nodeID := uuid.New().String() - tr, err := transport.New(transport.Options{ - ServerCfg: serverCfg, - OnRecvHandler: noopRecvHandler, // No data model layer validation. - OnErrorHandler: func(err error) { - select { - case streamErrCh <- err: - default: - } - }, - OnSendHandler: func(*transport.ResourceSendInfo) {}, - Backoff: transportBackoff, - NodeProto: &v3corepb.Node{Id: nodeID}, - }) - if err != nil { - t.Fatalf("Failed to create xDS transport: %v", err) - } - defer tr.Close() - - // Send a discovery request through the transport. - const resourceName = "resource name" - tr.SendRequest(version.V3ListenerURL, []string{resourceName}) - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - // Verify that the initial discovery request matches expectation. - var gotReq *v3discoverypb.DiscoveryRequest - select { - case gotReq = <-streamRequestCh: - case <-ctx.Done(): - t.Fatalf("Timeout waiting for discovery request on the stream") - } - wantReq := &v3discoverypb.DiscoveryRequest{ - VersionInfo: "", - Node: &v3corepb.Node{Id: nodeID}, - ResourceNames: []string{resourceName}, - TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", - ResponseNonce: "", - } - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { - t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) - } - - // Verify that the received stream error is reported to the user. - var gotErr error - select { - case gotErr = <-streamErrCh: - case <-ctx.Done(): - t.Fatalf("Timeout waiting for stream error to be reported to the user") - } - if !strings.Contains(gotErr.Error(), streamErr.Error()) { - t.Fatalf("Received stream error: %v, wantErr: %v", gotErr, streamErr) - } - - // Verify that the stream is closed. - select { - case <-streamCloseCh: - case <-ctx.Done(): - t.Fatalf("Timeout waiting for stream to be closed after an error") - } - - // Verify that the transport backs off before recreating the stream. - select { - case <-backoffCh: - case <-ctx.Done(): - t.Fatalf("Timeout waiting for transport to backoff after stream failure") - } - - // Verify that the same discovery request is resent on the new stream. - select { - case gotReq = <-streamRequestCh: - case <-ctx.Done(): - t.Fatalf("Timeout waiting for discovery request on the stream") - } - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { - t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) - } -} - -// TestTransport_RetriesAfterBrokenStream tests the case where a stream breaks -// because the server goes down. The test verifies the following: -// 1. Initial discovery request matches expectation. -// 2. Good response from the server leads to an ACK with appropriate version. -// 3. Management server going down, leads to stream failure. -// 4. Once the management server comes back up, the same resources are -// re-requested, this time with an empty nonce. -func (s) TestTransport_RetriesAfterBrokenStream(t *testing.T) { - // Channels used for verifying different events in the test. - streamRequestCh := make(chan *v3discoverypb.DiscoveryRequest, 1) // Discovery request is received. - streamResponseCh := make(chan *v3discoverypb.DiscoveryResponse, 1) // Discovery response is received. - streamErrCh := make(chan error, 1) // Stream error seen by the transport. - - // Create an xDS management server listening on a local port. - l, err := testutils.LocalTCPListener() - if err != nil { - t.Fatalf("Failed to create a local listener for the xDS management server: %v", err) - } - lis := testutils.NewRestartableListener(l) - mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ - Listener: lis, - // Push the received request on to a channel for the test goroutine to - // verify that it matches expectations. - OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { - select { - case streamRequestCh <- req: - default: - } - return nil - }, - // Push the response that the management server is about to send on to a - // channel. The test goroutine to uses this to extract the version and - // nonce, expected on subsequent requests. - OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) { - select { - case streamResponseCh <- resp: - default: - } - }, - }) - - // Configure the management server with appropriate resources. - apiListener := &v3listenerpb.ApiListener{ - ApiListener: func() *anypb.Any { - return testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ - RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{ - Rds: &v3httppb.Rds{ - ConfigSource: &v3corepb.ConfigSource{ - ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}}, - }, - RouteConfigName: "route-configuration-name", - }, - }, - }) - }(), - } - const resourceName1 = "resource name 1" - const resourceName2 = "resource name 2" - listenerResource1 := &v3listenerpb.Listener{ - Name: resourceName1, - ApiListener: apiListener, - } - listenerResource2 := &v3listenerpb.Listener{ - Name: resourceName2, - ApiListener: apiListener, - } - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - nodeID := uuid.New().String() - mgmtServer.Update(ctx, e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{listenerResource1, listenerResource2}, - SkipValidation: true, - }) - - serverCfg, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{URI: mgmtServer.Address}) - if err != nil { - t.Fatalf("Failed to create server config for testing: %v", err) - } - - // Create a new transport. Since we are only testing backoff behavior here, - // we can pass a no-op data model layer implementation. - tr, err := transport.New(transport.Options{ - ServerCfg: serverCfg, - OnRecvHandler: noopRecvHandler, // No data model layer validation. - OnErrorHandler: func(err error) { - select { - case streamErrCh <- err: - default: - } - }, - OnSendHandler: func(*transport.ResourceSendInfo) {}, - Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff. - NodeProto: &v3corepb.Node{Id: nodeID}, - }) - if err != nil { - t.Fatalf("Failed to create xDS transport: %v", err) - } - defer tr.Close() - - // Send a discovery request through the transport. - tr.SendRequest(version.V3ListenerURL, []string{resourceName1, resourceName2}) - - // Verify that the initial discovery request matches expectation. - var gotReq *v3discoverypb.DiscoveryRequest - select { - case gotReq = <-streamRequestCh: - case <-ctx.Done(): - t.Fatalf("Timeout waiting for discovery request on the stream") - } - wantReq := &v3discoverypb.DiscoveryRequest{ - VersionInfo: "", - Node: &v3corepb.Node{Id: nodeID}, - ResourceNames: []string{resourceName1, resourceName2}, - TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", - ResponseNonce: "", - } - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform(), cmpopts.SortSlices(strSort)); diff != "" { - t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) - } - - // Capture the version and nonce from the response. - var gotResp *v3discoverypb.DiscoveryResponse - select { - case gotResp = <-streamResponseCh: - case <-ctx.Done(): - t.Fatalf("Timeout waiting for discovery response on the stream") - } - version := gotResp.GetVersionInfo() - nonce := gotResp.GetNonce() - - // Verify that the ACK contains the appropriate version and nonce. - wantReq.VersionInfo = version - wantReq.ResponseNonce = nonce - select { - case gotReq = <-streamRequestCh: - case <-ctx.Done(): - t.Fatalf("Timeout waiting for the discovery request ACK on the stream") - } - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform(), cmpopts.SortSlices(strSort)); diff != "" { - t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) - } - - // Bring down the management server to simulate a broken stream. - lis.Stop() - - // We don't care about the exact error here and it can vary based on which - // error gets reported first, the Recv() failure or the new stream creation - // failure. So, all we check here is whether we get an error or not. - select { - case <-streamErrCh: - case <-ctx.Done(): - t.Fatalf("Timeout waiting for stream error to be reported to the user") - } - - // Bring up the connection to the management server. - lis.Restart() - - // Verify that the transport creates a new stream and sends out a new - // request which contains the previously acked version, but an empty nonce. - wantReq.ResponseNonce = "" - select { - case gotReq = <-streamRequestCh: - case <-ctx.Done(): - t.Fatalf("Timeout waiting for the discovery request ACK on the stream") - } - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform(), cmpopts.SortSlices(strSort)); diff != "" { - t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) - } -} - -// TestTransport_ResourceRequestedBeforeStreamCreation tests the case where a -// resource is requested before the transport has a valid stream. Verifies that -// the transport sends out the request once it has a valid stream. -func (s) TestTransport_ResourceRequestedBeforeStreamCreation(t *testing.T) { - // Channels used for verifying different events in the test. - streamRequestCh := make(chan *v3discoverypb.DiscoveryRequest, 1) // Discovery request is received. - - // Create an xDS management server listening on a local port. - l, err := testutils.LocalTCPListener() - if err != nil { - t.Fatalf("Failed to create a local listener for the xDS management server: %v", err) - } - lis := testutils.NewRestartableListener(l) - streamErr := errors.New("ADS stream error") - - mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ - Listener: lis, - - // Return an error everytime a request is sent on the stream. This - // should cause the transport to backoff before attempting to recreate - // the stream. - OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error { - select { - case streamRequestCh <- req: - default: - } - return streamErr - }, - }) - - // Bring down the management server before creating the transport. This - // allows us to test the case where SendRequest() is called when there is no - // stream to the management server. - lis.Stop() - - serverCfg, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{URI: mgmtServer.Address}) - if err != nil { - t.Fatalf("Failed to create server config for testing: %v", err) - } - - // Create a new transport. Since we are only testing backoff behavior here, - // we can pass a no-op data model layer implementation. - nodeID := uuid.New().String() - tr, err := transport.New(transport.Options{ - ServerCfg: serverCfg, - OnRecvHandler: noopRecvHandler, // No data model layer validation. - OnErrorHandler: func(error) {}, // No stream error handling. - OnSendHandler: func(*transport.ResourceSendInfo) {}, // No on send handler - Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff. - NodeProto: &v3corepb.Node{Id: nodeID}, - }) - if err != nil { - t.Fatalf("Failed to create xDS transport: %v", err) - } - defer tr.Close() - - // Send a discovery request through the transport. - const resourceName = "resource name" - tr.SendRequest(version.V3ListenerURL, []string{resourceName}) - - // Wait until the transport has attempted to connect to the management - // server and has seen the connection fail. In this case, since the - // connection is down, and the transport creates streams with WaitForReady() - // set to true, stream creation will never fail (unless the context - // expires), and therefore we cannot rely on the stream error handler. - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { - if tr.ChannelConnectivityStateForTesting() == connectivity.TransientFailure { - break - } - } - - lis.Restart() - - // Verify that the initial discovery request matches expectation. - var gotReq *v3discoverypb.DiscoveryRequest - select { - case gotReq = <-streamRequestCh: - case <-ctx.Done(): - t.Fatalf("Timeout waiting for discovery request on the stream") - } - wantReq := &v3discoverypb.DiscoveryRequest{ - VersionInfo: "", - Node: &v3corepb.Node{Id: nodeID}, - ResourceNames: []string{resourceName}, - TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", - ResponseNonce: "", - } - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { - t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) - } -} diff --git a/xds/internal/xdsclient/transport/transport_resource_test.go b/xds/internal/xdsclient/transport/transport_resource_test.go deleted file mode 100644 index 70874c0c6e97..000000000000 --- a/xds/internal/xdsclient/transport/transport_resource_test.go +++ /dev/null @@ -1,422 +0,0 @@ -/* - * - * Copyright 2022 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// Package transport_test contains e2e style tests for the xDS transport -// implementation. It uses the envoy-go-control-plane as the management server. -package transport_test - -import ( - "context" - "errors" - "testing" - "time" - - "github.com/google/go-cmp/cmp" - "github.com/google/uuid" - "google.golang.org/grpc/internal/grpctest" - "google.golang.org/grpc/internal/testutils" - "google.golang.org/grpc/internal/testutils/xds/fakeserver" - "google.golang.org/grpc/internal/xds/bootstrap" - "google.golang.org/grpc/xds/internal/xdsclient/transport" - "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" - "google.golang.org/protobuf/testing/protocmp" - "google.golang.org/protobuf/types/known/anypb" - - v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" - v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" - v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" -) - -type s struct { - grpctest.Tester -} - -func Test(t *testing.T) { - grpctest.RunSubTests(t, s{}) -} - -const ( - defaultTestTimeout = 5 * time.Second - defaultTestShortTimeout = 10 * time.Millisecond -) - -// startFakeManagementServer starts a fake xDS management server and returns a -// cleanup function to close the fake server. -func startFakeManagementServer(t *testing.T) (*fakeserver.Server, func()) { - t.Helper() - fs, sCleanup, err := fakeserver.StartServer(nil) - if err != nil { - t.Fatalf("Failed to start fake xDS server: %v", err) - } - return fs, sCleanup -} - -// resourcesWithTypeURL wraps resources and type URL received from server. -type resourcesWithTypeURL struct { - resources []*anypb.Any - url string -} - -// TestHandleResponseFromManagementServer covers different scenarios of the -// transport receiving a response from the management server. In all scenarios, -// the transport is expected to pass the received responses as-is to the data -// model layer for validation and not perform any validation on its own. -func (s) TestHandleResponseFromManagementServer(t *testing.T) { - const ( - resourceName1 = "resource-name-1" - resourceName2 = "resource-name-2" - ) - var ( - badlyMarshaledResource = &anypb.Any{ - TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", - Value: []byte{1, 2, 3, 4}, - } - apiListener = &v3listenerpb.ApiListener{ - ApiListener: func() *anypb.Any { - return testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ - RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{ - Rds: &v3httppb.Rds{ - ConfigSource: &v3corepb.ConfigSource{ - ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}}, - }, - RouteConfigName: "route-configuration-name", - }, - }, - }) - }(), - } - resource1 = &v3listenerpb.Listener{ - Name: resourceName1, - ApiListener: apiListener, - } - resource2 = &v3listenerpb.Listener{ - Name: resourceName2, - ApiListener: apiListener, - } - ) - - tests := []struct { - desc string - resourceNamesToRequest []string - managementServerResponse *v3discoverypb.DiscoveryResponse - wantURL string - wantResources []*anypb.Any - }{ - { - desc: "badly marshaled response", - resourceNamesToRequest: []string{resourceName1}, - managementServerResponse: &v3discoverypb.DiscoveryResponse{ - TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", - Resources: []*anypb.Any{badlyMarshaledResource}, - }, - wantURL: "type.googleapis.com/envoy.config.listener.v3.Listener", - wantResources: []*anypb.Any{badlyMarshaledResource}, - }, - { - desc: "empty response", - resourceNamesToRequest: []string{resourceName1}, - managementServerResponse: &v3discoverypb.DiscoveryResponse{}, - wantURL: "", - wantResources: nil, - }, - { - desc: "one good resource", - resourceNamesToRequest: []string{resourceName1}, - managementServerResponse: &v3discoverypb.DiscoveryResponse{ - TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", - Resources: []*anypb.Any{testutils.MarshalAny(t, resource1)}, - }, - wantURL: "type.googleapis.com/envoy.config.listener.v3.Listener", - wantResources: []*anypb.Any{testutils.MarshalAny(t, resource1)}, - }, - { - desc: "two good resources", - resourceNamesToRequest: []string{resourceName1, resourceName2}, - managementServerResponse: &v3discoverypb.DiscoveryResponse{ - TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", - Resources: []*anypb.Any{testutils.MarshalAny(t, resource1), testutils.MarshalAny(t, resource2)}, - }, - wantURL: "type.googleapis.com/envoy.config.listener.v3.Listener", - wantResources: []*anypb.Any{testutils.MarshalAny(t, resource1), testutils.MarshalAny(t, resource2)}, - }, - { - desc: "two resources when we requested one", - resourceNamesToRequest: []string{resourceName1}, - managementServerResponse: &v3discoverypb.DiscoveryResponse{ - TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", - Resources: []*anypb.Any{testutils.MarshalAny(t, resource1), testutils.MarshalAny(t, resource2)}, - }, - wantURL: "type.googleapis.com/envoy.config.listener.v3.Listener", - wantResources: []*anypb.Any{testutils.MarshalAny(t, resource1), testutils.MarshalAny(t, resource2)}, - }, - } - - for _, test := range tests { - t.Run(test.desc, func(t *testing.T) { - // Create a fake xDS management server listening on a local port, - // and set it up with the response to send. - mgmtServer, cleanup := startFakeManagementServer(t) - defer cleanup() - t.Logf("Started xDS management server on %s", mgmtServer.Address) - mgmtServer.XDSResponseChan <- &fakeserver.Response{Resp: test.managementServerResponse} - - serverCfg, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{URI: mgmtServer.Address}) - if err != nil { - t.Fatalf("Failed to create server config for testing: %v", err) - } - - // Create a new transport. - resourcesCh := testutils.NewChannel() - tr, err := transport.New(transport.Options{ - ServerCfg: serverCfg, - // No validation. Simply push received resources on a channel. - OnRecvHandler: func(update transport.ResourceUpdate, onDone func()) error { - resourcesCh.Send(&resourcesWithTypeURL{ - resources: update.Resources, - url: update.URL, - // Ignore resource version here. - }) - onDone() - return nil - }, - OnSendHandler: func(*transport.ResourceSendInfo) {}, // No onSend handling. - OnErrorHandler: func(error) {}, // No stream error handling. - Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff. - NodeProto: &v3corepb.Node{Id: uuid.New().String()}, - }) - if err != nil { - t.Fatalf("Failed to create xDS transport: %v", err) - } - defer tr.Close() - - // Send the request, and validate that the response sent by the - // management server is propagated to the data model layer. - tr.SendRequest(version.V3ListenerURL, test.resourceNamesToRequest) - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - v, err := resourcesCh.Receive(ctx) - if err != nil { - t.Fatalf("Failed to receive resources at the data model layer: %v", err) - } - gotURL := v.(*resourcesWithTypeURL).url - gotResources := v.(*resourcesWithTypeURL).resources - if gotURL != test.wantURL { - t.Fatalf("Received resource URL in response: %s, want %s", gotURL, test.wantURL) - } - if diff := cmp.Diff(gotResources, test.wantResources, protocmp.Transform()); diff != "" { - t.Fatalf("Received unexpected resources. Diff (-got, +want):\n%s", diff) - } - }) - } -} - -func (s) TestEmptyListenerResourceOnStreamRestart(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - mgmtServer, cleanup := startFakeManagementServer(t) - defer cleanup() - t.Logf("Started xDS management server on %s", mgmtServer.Address) - - serverCfg, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{URI: mgmtServer.Address}) - if err != nil { - t.Fatalf("Failed to create server config for testing: %v", err) - } - nodeProto := &v3corepb.Node{Id: uuid.New().String()} - tr, err := transport.New(transport.Options{ - ServerCfg: serverCfg, - OnRecvHandler: noopRecvHandler, // No data model layer validation. - OnSendHandler: func(*transport.ResourceSendInfo) {}, // No onSend handling. - OnErrorHandler: func(error) {}, // No stream error handling. - Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff. - NodeProto: nodeProto, - }) - if err != nil { - t.Fatalf("Failed to create xDS transport: %v", err) - } - defer tr.Close() - - // Send a request for a listener resource. - const resource = "some-resource" - tr.SendRequest(version.V3ListenerURL, []string{resource}) - - // Ensure the proper request was sent. - val, err := mgmtServer.XDSRequestChan.Receive(ctx) - if err != nil { - t.Fatalf("Error waiting for mgmt server response: %v", err) - } - wantReq := &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{ - Node: nodeProto, - ResourceNames: []string{resource}, - TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", - }} - gotReq := val.(*fakeserver.Request) - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { - t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq) - } - - // Remove the subscription by requesting an empty list. - tr.SendRequest(version.V3ListenerURL, []string{}) - - // Ensure the proper request was sent. - val, err = mgmtServer.XDSRequestChan.Receive(ctx) - if err != nil { - t.Fatalf("Error waiting for mgmt server response: %v", err) - } - wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{ - ResourceNames: []string{}, - TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", - }} - gotReq = val.(*fakeserver.Request) - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { - t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq) - } - - // Cause the stream to restart. - mgmtServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("go away")} - - // Ensure no request is sent since there are no resources. - ctxShort, cancel := context.WithTimeout(ctx, defaultTestShortTimeout) - defer cancel() - if got, err := mgmtServer.XDSRequestChan.Receive(ctxShort); !errors.Is(err, context.DeadlineExceeded) { - t.Fatalf("mgmt server received request: %v; wanted DeadlineExceeded error", got) - } - - tr.SendRequest(version.V3ListenerURL, []string{resource}) - - // Ensure the proper request was sent with the node proto. - val, err = mgmtServer.XDSRequestChan.Receive(ctx) - if err != nil { - t.Fatalf("Error waiting for mgmt server response: %v", err) - } - wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{ - Node: nodeProto, - ResourceNames: []string{resource}, - TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", - }} - gotReq = val.(*fakeserver.Request) - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { - t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq) - } - -} - -func (s) TestEmptyClusterResourceOnStreamRestartWithListener(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - mgmtServer, cleanup := startFakeManagementServer(t) - defer cleanup() - t.Logf("Started xDS management server on %s", mgmtServer.Address) - - serverCfg, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{URI: mgmtServer.Address}) - if err != nil { - t.Fatalf("Failed to create server config for testing: %v", err) - } - nodeProto := &v3corepb.Node{Id: uuid.New().String()} - tr, err := transport.New(transport.Options{ - ServerCfg: serverCfg, - OnRecvHandler: noopRecvHandler, // No data model layer validation. - OnSendHandler: func(*transport.ResourceSendInfo) {}, // No onSend handling. - OnErrorHandler: func(error) {}, // No stream error handling. - Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff. - NodeProto: nodeProto, - }) - if err != nil { - t.Fatalf("Failed to create xDS transport: %v", err) - } - defer tr.Close() - - // Send a request for a listener resource. - const resource = "some-resource" - tr.SendRequest(version.V3ListenerURL, []string{resource}) - - // Ensure the proper request was sent. - val, err := mgmtServer.XDSRequestChan.Receive(ctx) - if err != nil { - t.Fatalf("Error waiting for mgmt server response: %v", err) - } - wantReq := &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{ - Node: nodeProto, - ResourceNames: []string{resource}, - TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", - }} - gotReq := val.(*fakeserver.Request) - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { - t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq) - } - - // Send a request for a cluster resource. - tr.SendRequest(version.V3ClusterURL, []string{resource}) - - // Ensure the proper request was sent. - val, err = mgmtServer.XDSRequestChan.Receive(ctx) - if err != nil { - t.Fatalf("Error waiting for mgmt server response: %v", err) - } - wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{ - ResourceNames: []string{resource}, - TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", - }} - gotReq = val.(*fakeserver.Request) - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { - t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq) - } - - // Remove the cluster subscription by requesting an empty list. - tr.SendRequest(version.V3ClusterURL, []string{}) - - // Ensure the proper request was sent. - val, err = mgmtServer.XDSRequestChan.Receive(ctx) - if err != nil { - t.Fatalf("Error waiting for mgmt server response: %v", err) - } - wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{ - ResourceNames: []string{}, - TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", - }} - gotReq = val.(*fakeserver.Request) - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { - t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq) - } - - // Cause the stream to restart. - mgmtServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("go away")} - - // Ensure the proper LDS request was sent. - val, err = mgmtServer.XDSRequestChan.Receive(ctx) - if err != nil { - t.Fatalf("Error waiting for mgmt server response: %v", err) - } - wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{ - Node: nodeProto, - ResourceNames: []string{resource}, - TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", - }} - gotReq = val.(*fakeserver.Request) - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { - t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq) - } - - // Ensure no cluster request is sent since there are no cluster resources. - ctxShort, cancel := context.WithTimeout(ctx, defaultTestShortTimeout) - defer cancel() - if got, err := mgmtServer.XDSRequestChan.Receive(ctxShort); !errors.Is(err, context.DeadlineExceeded) { - t.Fatalf("mgmt server received request: %v; wanted DeadlineExceeded error", got) - } -} diff --git a/xds/internal/xdsclient/transport/transport_test.go b/xds/internal/xdsclient/transport/transport_test.go index 6c2c1f2835e2..c98d42d8affe 100644 --- a/xds/internal/xdsclient/transport/transport_test.go +++ b/xds/internal/xdsclient/transport/transport_test.go @@ -22,10 +22,12 @@ import ( "encoding/json" "net" "testing" + "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/grpctest" internalbootstrap "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/xds/bootstrap" "google.golang.org/grpc/xds/internal/xdsclient/transport" @@ -34,6 +36,26 @@ import ( v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" ) +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +const ( + defaultTestTimeout = 5 * time.Second + defaultTestShortTimeout = 10 * time.Millisecond +) + +var strSort = func(s1, s2 string) bool { return s1 < s2 } + +var noopRecvHandler = func(_ transport.ResourceUpdate, onDone func()) error { + onDone() + return nil +} + func (s) TestNewWithGRPCDial(t *testing.T) { // Override the dialer with a custom one. customDialerCalled := false From cf72d2e34550e8aaab95ea9c0affcfd8c0f5ff6a Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 4 Oct 2024 21:55:34 +0000 Subject: [PATCH 2/4] fix an error string --- xds/internal/xdsclient/tests/ads_stream_backoff_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xds/internal/xdsclient/tests/ads_stream_backoff_test.go b/xds/internal/xdsclient/tests/ads_stream_backoff_test.go index 80b5d9309313..c466f43b14d6 100644 --- a/xds/internal/xdsclient/tests/ads_stream_backoff_test.go +++ b/xds/internal/xdsclient/tests/ads_stream_backoff_test.go @@ -331,7 +331,7 @@ func (s) TestADS_ResourceRequestedBeforeStreamCreation(t *testing.T) { // Create an xDS management server listening on a local port. l, err := testutils.LocalTCPListener() if err != nil { - t.Fatalf("Failed to create a local listener for the xDS management server: %v", err) + t.Fatalf("Failed to create a local listener: %v", err) } lis := testutils.NewRestartableListener(l) streamErr := errors.New("ADS stream error") From 2604316808aa7b191c92e64ab11516e0487a6459 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Mon, 7 Oct 2024 21:59:49 +0000 Subject: [PATCH 3/4] remove default case from select when writing on the channel --- .../tests/ads_stream_backoff_test.go | 34 ++++++++++++------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/xds/internal/xdsclient/tests/ads_stream_backoff_test.go b/xds/internal/xdsclient/tests/ads_stream_backoff_test.go index c466f43b14d6..c94945321ba5 100644 --- a/xds/internal/xdsclient/tests/ads_stream_backoff_test.go +++ b/xds/internal/xdsclient/tests/ads_stream_backoff_test.go @@ -80,7 +80,7 @@ func (s) TestADS_BackoffAfterStreamFailure(t *testing.T) { t.Logf("Received LDS request for resources: %v", req.GetResourceNames()) select { case ldsResourcesCh <- req.GetResourceNames(): - default: + case <-ctx.Done(): } } // Return an error everytime a request is sent on the stream. This @@ -92,7 +92,7 @@ func (s) TestADS_BackoffAfterStreamFailure(t *testing.T) { OnStreamClosed: func(int64, *v3corepb.Node) { select { case streamCloseCh <- struct{}{}: - default: + case <-ctx.Done(): } }, }) @@ -102,7 +102,7 @@ func (s) TestADS_BackoffAfterStreamFailure(t *testing.T) { streamBackoff := func(v int) time.Duration { select { case backoffCh <- struct{}{}: - default: + case <-ctx.Done(): } return 0 } @@ -179,7 +179,7 @@ func (s) TestADS_RetriesAfterBrokenStream(t *testing.T) { OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { select { case streamRequestCh <- req: - default: + case <-ctx.Done(): } return nil }, @@ -189,7 +189,7 @@ func (s) TestADS_RetriesAfterBrokenStream(t *testing.T) { OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) { select { case streamResponseCh <- resp: - default: + case <-ctx.Done(): } }, }) @@ -210,14 +210,12 @@ func (s) TestADS_RetriesAfterBrokenStream(t *testing.T) { // Override the backoff implementation to always return 0, to reduce test // run time. Instead control when the backoff returns by blocking on a // channel, that the test closes. - backoffCh := make(chan struct{}, 1) - unblockBackoffCh := make(chan struct{}) + backoffCh := make(chan struct{}) streamBackoff := func(v int) time.Duration { select { case backoffCh <- struct{}{}: - default: + case <-ctx.Done(): } - <-unblockBackoffCh return 0 } @@ -293,17 +291,29 @@ func (s) TestADS_RetriesAfterBrokenStream(t *testing.T) { // Verify that the error callback on the watcher is not invoked. verifyNoListenerUpdate(ctx, lw.updateCh) - // Wait for backoff to kick in. + // Wait for backoff to kick in, and unblock the first backoff attempt. select { case <-backoffCh: case <-ctx.Done(): t.Fatal("Timeout waiting for stream backoff") } - // Bring up the connection to the management server, and unblock the backoff + // Bring up the management server. The test does not have prcecise control + // over when new streams to the management server will start succeeding. The + // ADS stream implementation will backoff as many times as required before + // it can successfully create a new stream. Therefore, we need to receive on + // the backoffCh as many times as required, and unblock the backoff // implementation. lis.Restart() - close(unblockBackoffCh) + go func() { + for { + select { + case <-backoffCh: + case <-ctx.Done(): + return + } + } + }() // Verify that the transport creates a new stream and sends out a new // request which contains the previously acked version, but an empty nonce. From 182188ef331f01a8abca82f3ab8a2b7e6386827e Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Mon, 7 Oct 2024 22:38:33 +0000 Subject: [PATCH 4/4] make vet happy --- xds/internal/xdsclient/transport/transport_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/xds/internal/xdsclient/transport/transport_test.go b/xds/internal/xdsclient/transport/transport_test.go index c98d42d8affe..61a3dbefa4fb 100644 --- a/xds/internal/xdsclient/transport/transport_test.go +++ b/xds/internal/xdsclient/transport/transport_test.go @@ -49,8 +49,6 @@ const ( defaultTestShortTimeout = 10 * time.Millisecond ) -var strSort = func(s1, s2 string) bool { return s1 < s2 } - var noopRecvHandler = func(_ transport.ResourceUpdate, onDone func()) error { onDone() return nil