diff --git a/xds/internal/testutils/fakeclient/client.go b/xds/internal/testutils/fakeclient/client.go index 9794425c501f..a898d5f4e859 100644 --- a/xds/internal/testutils/fakeclient/client.go +++ b/xds/internal/testutils/fakeclient/client.go @@ -138,57 +138,6 @@ func (xdsC *Client) WaitForCancelRouteConfigWatch(ctx context.Context) (string, return val.(string), err } -// WatchCluster registers a CDS watch. -func (xdsC *Client) WatchCluster(clusterName string, callback func(xdsresource.ClusterUpdate, error)) func() { - // Due to the tree like structure of aggregate clusters, there can be multiple callbacks persisted for each cluster - // node. However, the client doesn't care about the parent child relationship between the nodes, only that it invokes - // the right callback for a particular cluster. - xdsC.cdsCbs[clusterName] = callback - xdsC.cdsWatchCh.Send(clusterName) - return func() { - xdsC.cdsCancelCh.Send(clusterName) - } -} - -// WaitForWatchCluster waits for WatchCluster to be invoked on this client and -// returns the clusterName being watched. -func (xdsC *Client) WaitForWatchCluster(ctx context.Context) (string, error) { - val, err := xdsC.cdsWatchCh.Receive(ctx) - if err != nil { - return "", err - } - return val.(string), err -} - -// InvokeWatchClusterCallback invokes the registered cdsWatch callback. -// -// Not thread safe with WatchCluster. Only call this after -// WaitForWatchCluster. -func (xdsC *Client) InvokeWatchClusterCallback(update xdsresource.ClusterUpdate, err error) { - // Keeps functionality with previous usage of this, if single callback call that callback. - if len(xdsC.cdsCbs) == 1 { - var clusterName string - for cluster := range xdsC.cdsCbs { - clusterName = cluster - } - xdsC.cdsCbs[clusterName](update, err) - } else { - // Have what callback you call with the update determined by the service name in the ClusterUpdate. Left up to the - // caller to make sure the cluster update matches with a persisted callback. - xdsC.cdsCbs[update.ClusterName](update, err) - } -} - -// WaitForCancelClusterWatch waits for a CDS watch to be cancelled and returns -// context.DeadlineExceeded otherwise. -func (xdsC *Client) WaitForCancelClusterWatch(ctx context.Context) (string, error) { - clusterNameReceived, err := xdsC.cdsCancelCh.Receive(ctx) - if err != nil { - return "", err - } - return clusterNameReceived.(string), err -} - // WatchEndpoints registers an EDS watch for provided clusterName. func (xdsC *Client) WatchEndpoints(clusterName string, callback func(xdsresource.EndpointsUpdate, error)) (cancel func()) { xdsC.edsCbs[clusterName] = callback diff --git a/xds/internal/xdsclient/client.go b/xds/internal/xdsclient/client.go index 44f6d3bc0a1c..542c5e025fd1 100644 --- a/xds/internal/xdsclient/client.go +++ b/xds/internal/xdsclient/client.go @@ -32,7 +32,6 @@ import ( type XDSClient interface { WatchListener(string, func(xdsresource.ListenerUpdate, error)) func() WatchRouteConfig(string, func(xdsresource.RouteConfigUpdate, error)) func() - WatchCluster(string, func(xdsresource.ClusterUpdate, error)) func() // WatchResource uses xDS to discover the resource associated with the // provided resource name. The resource type implementation determines how diff --git a/xds/internal/xdsclient/clientimpl_watchers.go b/xds/internal/xdsclient/clientimpl_watchers.go index e503349dbc29..5866221e2696 100644 --- a/xds/internal/xdsclient/clientimpl_watchers.go +++ b/xds/internal/xdsclient/clientimpl_watchers.go @@ -81,37 +81,6 @@ func (c *clientImpl) WatchRouteConfig(resourceName string, cb func(xdsresource.R return xdsresource.WatchRouteConfig(c, resourceName, watcher) } -// This is only required temporarily, while we modify the -// clientImpl.WatchCluster API to be implemented via the wrapper WatchCluster() -// API which calls the WatchResource() API. -type clusterWatcher struct { - resourceName string - cb func(xdsresource.ClusterUpdate, error) -} - -func (c *clusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData) { - c.cb(update.Resource, nil) -} - -func (c *clusterWatcher) OnError(err error) { - c.cb(xdsresource.ClusterUpdate{}, err) -} - -func (c *clusterWatcher) OnResourceDoesNotExist() { - err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Cluster not found in received response", c.resourceName) - c.cb(xdsresource.ClusterUpdate{}, err) -} - -// WatchCluster uses CDS to discover information about the Cluster resource -// identified by resourceName. -// -// WatchCluster can be called multiple times, with same or different -// clusterNames. Each call will start an independent watcher for the resource. -func (c *clientImpl) WatchCluster(resourceName string, cb func(xdsresource.ClusterUpdate, error)) (cancel func()) { - watcher := &clusterWatcher{resourceName: resourceName, cb: cb} - return xdsresource.WatchCluster(c, resourceName, watcher) -} - // WatchResource uses xDS to discover the resource associated with the provided // resource name. The resource type implementation determines how xDS requests // are sent out and how responses are deserialized and validated. Upon receipt diff --git a/xds/internal/xdsclient/tests/authority_test.go b/xds/internal/xdsclient/tests/authority_test.go index 0345f4a4040c..e1777446c805 100644 --- a/xds/internal/xdsclient/tests/authority_test.go +++ b/xds/internal/xdsclient/tests/authority_test.go @@ -120,6 +120,12 @@ func setupForAuthorityTests(ctx context.Context, t *testing.T, idleTimeout time. return lisDefault, lisNonDefault, client, close } +type noopClusterWatcher struct{} + +func (noopClusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData) {} +func (noopClusterWatcher) OnError(err error) {} +func (noopClusterWatcher) OnResourceDoesNotExist() {} + // TestAuthorityShare tests the authority sharing logic. The test verifies the // following scenarios: // - A watch for a resource name with an authority matching an existing watch @@ -143,14 +149,15 @@ func (s) TestAuthorityShare(t *testing.T) { } // Request the first resource. Verify that a new transport is created. - cdsCancel1 := client.WatchCluster(authorityTestResourceName11, func(u xdsresource.ClusterUpdate, err error) {}) + watcher := noopClusterWatcher{} + cdsCancel1 := xdsresource.WatchCluster(client, authorityTestResourceName11, watcher) defer cdsCancel1() if _, err := lis.NewConnCh.Receive(ctx); err != nil { t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err) } // Request the second resource. Verify that no new transport is created. - cdsCancel2 := client.WatchCluster(authorityTestResourceName12, func(u xdsresource.ClusterUpdate, err error) {}) + cdsCancel2 := xdsresource.WatchCluster(client, authorityTestResourceName12, watcher) defer cdsCancel2() sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout) defer sCancel() @@ -159,7 +166,7 @@ func (s) TestAuthorityShare(t *testing.T) { } // Request the third resource. Verify that no new transport is created. - cdsCancel3 := client.WatchCluster(authorityTestResourceName2, func(u xdsresource.ClusterUpdate, err error) {}) + cdsCancel3 := xdsresource.WatchCluster(client, authorityTestResourceName2, watcher) defer cdsCancel3() sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout) defer sCancel() @@ -179,7 +186,8 @@ func (s) TestAuthorityIdleTimeout(t *testing.T) { defer close() // Request the first resource. Verify that a new transport is created. - cdsCancel1 := client.WatchCluster(authorityTestResourceName11, func(u xdsresource.ClusterUpdate, err error) {}) + watcher := noopClusterWatcher{} + cdsCancel1 := xdsresource.WatchCluster(client, authorityTestResourceName11, watcher) val, err := lis.NewConnCh.Receive(ctx) if err != nil { t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err) @@ -187,7 +195,7 @@ func (s) TestAuthorityIdleTimeout(t *testing.T) { conn := val.(*testutils.ConnWrapper) // Request the second resource. Verify that no new transport is created. - cdsCancel2 := client.WatchCluster(authorityTestResourceName12, func(u xdsresource.ClusterUpdate, err error) {}) + cdsCancel2 := xdsresource.WatchCluster(client, authorityTestResourceName12, watcher) sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) defer sCancel() if _, err := lis.NewConnCh.Receive(sCtx); err != context.DeadlineExceeded { @@ -225,7 +233,8 @@ func (s) TestAuthorityClientClose(t *testing.T) { // Request the first resource. Verify that a new transport is created to the // default management server. - cdsCancel1 := client.WatchCluster(authorityTestResourceName11, func(u xdsresource.ClusterUpdate, err error) {}) + watcher := noopClusterWatcher{} + cdsCancel1 := xdsresource.WatchCluster(client, authorityTestResourceName11, watcher) val, err := lisDefault.NewConnCh.Receive(ctx) if err != nil { t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err) @@ -235,7 +244,7 @@ func (s) TestAuthorityClientClose(t *testing.T) { // Request another resource which is served by the non-default authority. // Verify that a new transport is created to the non-default management // server. - client.WatchCluster(authorityTestResourceName3, func(u xdsresource.ClusterUpdate, err error) {}) + xdsresource.WatchCluster(client, authorityTestResourceName3, watcher) val, err = lisNonDefault.NewConnCh.Receive(ctx) if err != nil { t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err) @@ -272,7 +281,8 @@ func (s) TestAuthorityRevive(t *testing.T) { defer close() // Request the first resource. Verify that a new transport is created. - cdsCancel1 := client.WatchCluster(authorityTestResourceName11, func(u xdsresource.ClusterUpdate, err error) {}) + watcher := noopClusterWatcher{} + cdsCancel1 := xdsresource.WatchCluster(client, authorityTestResourceName11, watcher) val, err := lis.NewConnCh.Receive(ctx) if err != nil { t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err) @@ -284,7 +294,7 @@ func (s) TestAuthorityRevive(t *testing.T) { // Request the second resource. Verify that no new transport is created. // This should move the authority out of the idle cache. - cdsCancel2 := client.WatchCluster(authorityTestResourceName12, func(u xdsresource.ClusterUpdate, err error) {}) + cdsCancel2 := xdsresource.WatchCluster(client, authorityTestResourceName12, watcher) defer cdsCancel2() sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) defer sCancel() diff --git a/xds/internal/xdsclient/tests/cds_watchers_test.go b/xds/internal/xdsclient/tests/cds_watchers_test.go index 9670caaca0a6..9db7b934f8b9 100644 --- a/xds/internal/xdsclient/tests/cds_watchers_test.go +++ b/xds/internal/xdsclient/tests/cds_watchers_test.go @@ -41,6 +41,30 @@ import ( v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" ) +type clusterWatcher struct { + updateCh *testutils.Channel +} + +func newClusterWatcher() *clusterWatcher { + return &clusterWatcher{updateCh: testutils.NewChannel()} +} + +func (ew *clusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData) { + ew.updateCh.Send(xdsresource.ClusterUpdateErrTuple{Update: update.Resource}) +} + +func (ew *clusterWatcher) OnError(err error) { + // When used with a go-control-plane management server that continuously + // resends resources which are NACKed by the xDS client, using a `Replace()` + // here and in OnResourceDoesNotExist() simplifies tests which will have + // access to the most recently received error. + ew.updateCh.Replace(xdsresource.ClusterUpdateErrTuple{Err: err}) +} + +func (ew *clusterWatcher) OnResourceDoesNotExist() { + ew.updateCh.Replace(xdsresource.ClusterUpdateErrTuple{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Cluster not found in received response")}) +} + // badClusterResource returns a cluster resource for the given name which // contains a config_source_specifier for the `lrs_server` field which is not // set to `self`, and hence is expected to be NACKed by the client. @@ -154,10 +178,8 @@ func (s) TestCDSWatch(t *testing.T) { // Register a watch for a cluster resource and have the watch // callback push the received update on to a channel. - updateCh := testutils.NewChannel() - cdsCancel := client.WatchCluster(test.resourceName, func(u xdsresource.ClusterUpdate, err error) { - updateCh.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err}) - }) + cw := newClusterWatcher() + cdsCancel := xdsresource.WatchCluster(client, test.resourceName, cw) // Configure the management server to return a single cluster // resource, corresponding to the one we registered a watch for. @@ -173,7 +195,7 @@ func (s) TestCDSWatch(t *testing.T) { } // Verify the contents of the received update. - if err := verifyClusterUpdate(ctx, updateCh, test.wantUpdate); err != nil { + if err := verifyClusterUpdate(ctx, cw.updateCh, test.wantUpdate); err != nil { t.Fatal(err) } @@ -187,7 +209,7 @@ func (s) TestCDSWatch(t *testing.T) { if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) } - if err := verifyNoClusterUpdate(ctx, updateCh); err != nil { + if err := verifyNoClusterUpdate(ctx, cw.updateCh); err != nil { t.Fatal(err) } @@ -202,7 +224,7 @@ func (s) TestCDSWatch(t *testing.T) { if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) } - if err := verifyNoClusterUpdate(ctx, updateCh); err != nil { + if err := verifyNoClusterUpdate(ctx, cw.updateCh); err != nil { t.Fatal(err) } }) @@ -284,15 +306,11 @@ func (s) TestCDSWatch_TwoWatchesForSameResourceName(t *testing.T) { // Register two watches for the same cluster resource and have the // callbacks push the received updates on to a channel. - updateCh1 := testutils.NewChannel() - cdsCancel1 := client.WatchCluster(test.resourceName, func(u xdsresource.ClusterUpdate, err error) { - updateCh1.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err}) - }) + cw1 := newClusterWatcher() + cdsCancel1 := xdsresource.WatchCluster(client, test.resourceName, cw1) defer cdsCancel1() - updateCh2 := testutils.NewChannel() - cdsCancel2 := client.WatchCluster(test.resourceName, func(u xdsresource.ClusterUpdate, err error) { - updateCh2.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err}) - }) + cw2 := newClusterWatcher() + cdsCancel2 := xdsresource.WatchCluster(client, test.resourceName, cw2) // Configure the management server to return a single cluster // resource, corresponding to the one we registered watches for. @@ -308,10 +326,10 @@ func (s) TestCDSWatch_TwoWatchesForSameResourceName(t *testing.T) { } // Verify the contents of the received update. - if err := verifyClusterUpdate(ctx, updateCh1, test.wantUpdateV1); err != nil { + if err := verifyClusterUpdate(ctx, cw1.updateCh, test.wantUpdateV1); err != nil { t.Fatal(err) } - if err := verifyClusterUpdate(ctx, updateCh2, test.wantUpdateV1); err != nil { + if err := verifyClusterUpdate(ctx, cw2.updateCh, test.wantUpdateV1); err != nil { t.Fatal(err) } @@ -322,10 +340,10 @@ func (s) TestCDSWatch_TwoWatchesForSameResourceName(t *testing.T) { if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) } - if err := verifyNoClusterUpdate(ctx, updateCh1); err != nil { + if err := verifyNoClusterUpdate(ctx, cw1.updateCh); err != nil { t.Fatal(err) } - if err := verifyNoClusterUpdate(ctx, updateCh2); err != nil { + if err := verifyNoClusterUpdate(ctx, cw2.updateCh); err != nil { t.Fatal(err) } @@ -339,10 +357,10 @@ func (s) TestCDSWatch_TwoWatchesForSameResourceName(t *testing.T) { if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) } - if err := verifyClusterUpdate(ctx, updateCh1, test.wantUpdateV2); err != nil { + if err := verifyClusterUpdate(ctx, cw1.updateCh, test.wantUpdateV2); err != nil { t.Fatal(err) } - if err := verifyNoClusterUpdate(ctx, updateCh2); err != nil { + if err := verifyNoClusterUpdate(ctx, cw2.updateCh); err != nil { t.Fatal(err) } }) @@ -369,23 +387,17 @@ func (s) TestCDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) { // Register two watches for the same cluster resource and have the // callbacks push the received updates on to a channel. - updateCh1 := testutils.NewChannel() - cdsCancel1 := client.WatchCluster(cdsName, func(u xdsresource.ClusterUpdate, err error) { - updateCh1.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err}) - }) + cw1 := newClusterWatcher() + cdsCancel1 := xdsresource.WatchCluster(client, cdsName, cw1) defer cdsCancel1() - updateCh2 := testutils.NewChannel() - cdsCancel2 := client.WatchCluster(cdsName, func(u xdsresource.ClusterUpdate, err error) { - updateCh2.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err}) - }) + cw2 := newClusterWatcher() + cdsCancel2 := xdsresource.WatchCluster(client, cdsName, cw2) defer cdsCancel2() // Register the third watch for a different cluster resource, and push the // received updates onto a channel. - updateCh3 := testutils.NewChannel() - cdsCancel3 := client.WatchCluster(cdsNameNewStyle, func(u xdsresource.ClusterUpdate, err error) { - updateCh3.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err}) - }) + cw3 := newClusterWatcher() + cdsCancel3 := xdsresource.WatchCluster(client, cdsNameNewStyle, cw3) defer cdsCancel3() // Configure the management server to return two cluster resources, @@ -417,13 +429,13 @@ func (s) TestCDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) { EDSServiceName: edsNameNewStyle, }, } - if err := verifyClusterUpdate(ctx, updateCh1, wantUpdate12); err != nil { + if err := verifyClusterUpdate(ctx, cw1.updateCh, wantUpdate12); err != nil { t.Fatal(err) } - if err := verifyClusterUpdate(ctx, updateCh2, wantUpdate12); err != nil { + if err := verifyClusterUpdate(ctx, cw2.updateCh, wantUpdate12); err != nil { t.Fatal(err) } - if err := verifyClusterUpdate(ctx, updateCh3, wantUpdate3); err != nil { + if err := verifyClusterUpdate(ctx, cw3.updateCh, wantUpdate3); err != nil { t.Fatal(err) } } @@ -466,10 +478,8 @@ func (s) TestCDSWatch_ResourceCaching(t *testing.T) { // Register a watch for a cluster resource and have the watch // callback push the received update on to a channel. - updateCh1 := testutils.NewChannel() - cdsCancel1 := client.WatchCluster(cdsName, func(u xdsresource.ClusterUpdate, err error) { - updateCh1.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err}) - }) + cw1 := newClusterWatcher() + cdsCancel1 := xdsresource.WatchCluster(client, cdsName, cw1) defer cdsCancel1() // Configure the management server to return a single cluster @@ -492,7 +502,7 @@ func (s) TestCDSWatch_ResourceCaching(t *testing.T) { EDSServiceName: edsName, }, } - if err := verifyClusterUpdate(ctx, updateCh1, wantUpdate); err != nil { + if err := verifyClusterUpdate(ctx, cw1.updateCh, wantUpdate); err != nil { t.Fatal(err) } select { @@ -503,12 +513,10 @@ func (s) TestCDSWatch_ResourceCaching(t *testing.T) { // Register another watch for the same resource. This should get the update // from the cache. - updateCh2 := testutils.NewChannel() - cdsCancel2 := client.WatchCluster(cdsName, func(u xdsresource.ClusterUpdate, err error) { - updateCh2.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err}) - }) + cw2 := newClusterWatcher() + cdsCancel2 := xdsresource.WatchCluster(client, cdsName, cw2) defer cdsCancel2() - if err := verifyClusterUpdate(ctx, updateCh2, wantUpdate); err != nil { + if err := verifyClusterUpdate(ctx, cw2.updateCh, wantUpdate); err != nil { t.Fatal(err) } // No request should get sent out as part of this watch. @@ -544,10 +552,8 @@ func (s) TestCDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) { // Register a watch for a resource which is expected to be invoked with an // error after the watch expiry timer fires. - updateCh := testutils.NewChannel() - cdsCancel := client.WatchCluster(cdsName, func(u xdsresource.ClusterUpdate, err error) { - updateCh.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err}) - }) + cw := newClusterWatcher() + cdsCancel := xdsresource.WatchCluster(client, cdsName, cw) defer cdsCancel() // Wait for the watch expiry timer to fire. @@ -557,7 +563,7 @@ func (s) TestCDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() wantErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "") - if err := verifyClusterUpdate(ctx, updateCh, xdsresource.ClusterUpdateErrTuple{Err: wantErr}); err != nil { + if err := verifyClusterUpdate(ctx, cw.updateCh, xdsresource.ClusterUpdateErrTuple{Err: wantErr}); err != nil { t.Fatal(err) } } @@ -587,10 +593,8 @@ func (s) TestCDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) { // Register a watch for a cluster resource and have the watch // callback push the received update on to a channel. - updateCh := testutils.NewChannel() - cdsCancel := client.WatchCluster(cdsName, func(u xdsresource.ClusterUpdate, err error) { - updateCh.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err}) - }) + cw := newClusterWatcher() + cdsCancel := xdsresource.WatchCluster(client, cdsName, cw) defer cdsCancel() // Configure the management server to return a single cluster resource, @@ -613,14 +617,14 @@ func (s) TestCDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) { EDSServiceName: edsName, }, } - if err := verifyClusterUpdate(ctx, updateCh, wantUpdate); err != nil { + if err := verifyClusterUpdate(ctx, cw.updateCh, wantUpdate); err != nil { t.Fatal(err) } // Wait for the watch expiry timer to fire, and verify that the callback is // not invoked. <-time.After(defaultTestWatchExpiryTimeout) - if err := verifyNoClusterUpdate(ctx, updateCh); err != nil { + if err := verifyNoClusterUpdate(ctx, cw.updateCh); err != nil { t.Fatal(err) } } @@ -650,16 +654,12 @@ func (s) TesCDSWatch_ResourceRemoved(t *testing.T) { // Register two watches for two cluster resources and have the // callbacks push the received updates on to a channel. resourceName1 := cdsName - updateCh1 := testutils.NewChannel() - cdsCancel1 := client.WatchCluster(resourceName1, func(u xdsresource.ClusterUpdate, err error) { - updateCh1.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err}) - }) + cw1 := newClusterWatcher() + cdsCancel1 := xdsresource.WatchCluster(client, resourceName1, cw1) defer cdsCancel1() resourceName2 := cdsNameNewStyle - updateCh2 := testutils.NewChannel() - cdsCancel2 := client.WatchCluster(resourceName2, func(u xdsresource.ClusterUpdate, err error) { - updateCh2.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err}) - }) + cw2 := newClusterWatcher() + cdsCancel2 := xdsresource.WatchCluster(client, resourceName1, cw2) defer cdsCancel2() // Configure the management server to return two cluster resources, @@ -691,10 +691,10 @@ func (s) TesCDSWatch_ResourceRemoved(t *testing.T) { EDSServiceName: edsNameNewStyle, }, } - if err := verifyClusterUpdate(ctx, updateCh1, wantUpdate1); err != nil { + if err := verifyClusterUpdate(ctx, cw1.updateCh, wantUpdate1); err != nil { t.Fatal(err) } - if err := verifyClusterUpdate(ctx, updateCh2, wantUpdate2); err != nil { + if err := verifyClusterUpdate(ctx, cw2.updateCh, wantUpdate2); err != nil { t.Fatal(err) } @@ -710,10 +710,10 @@ func (s) TesCDSWatch_ResourceRemoved(t *testing.T) { // The first watcher should receive a resource removed error, while the // second watcher should not receive an update. - if err := verifyClusterUpdate(ctx, updateCh1, xdsresource.ClusterUpdateErrTuple{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")}); err != nil { + if err := verifyClusterUpdate(ctx, cw1.updateCh, xdsresource.ClusterUpdateErrTuple{Err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")}); err != nil { t.Fatal(err) } - if err := verifyNoClusterUpdate(ctx, updateCh2); err != nil { + if err := verifyNoClusterUpdate(ctx, cw2.updateCh); err != nil { t.Fatal(err) } @@ -727,7 +727,7 @@ func (s) TesCDSWatch_ResourceRemoved(t *testing.T) { if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) } - if err := verifyNoClusterUpdate(ctx, updateCh1); err != nil { + if err := verifyNoClusterUpdate(ctx, cw1.updateCh); err != nil { t.Fatal(err) } wantUpdate := xdsresource.ClusterUpdateErrTuple{ @@ -736,7 +736,7 @@ func (s) TesCDSWatch_ResourceRemoved(t *testing.T) { EDSServiceName: "new-eds-resource", }, } - if err := verifyClusterUpdate(ctx, updateCh2, wantUpdate); err != nil { + if err := verifyClusterUpdate(ctx, cw2.updateCh, wantUpdate); err != nil { t.Fatal(err) } } @@ -758,12 +758,8 @@ func (s) TestCDSWatch_NACKError(t *testing.T) { // Register a watch for a cluster resource and have the watch // callback push the received update on to a channel. - updateCh := testutils.NewChannel() - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - cdsCancel := client.WatchCluster(cdsName, func(u xdsresource.ClusterUpdate, err error) { - updateCh.SendContext(ctx, xdsresource.ClusterUpdateErrTuple{Update: u, Err: err}) - }) + cw := newClusterWatcher() + cdsCancel := xdsresource.WatchCluster(client, cdsName, cw) defer cdsCancel() // Configure the management server to return a single cluster resource @@ -773,12 +769,14 @@ func (s) TestCDSWatch_NACKError(t *testing.T) { Clusters: []*v3clusterpb.Cluster{badClusterResource(cdsName, edsName, e2e.SecurityLevelNone)}, SkipValidation: true, } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) } // Verify that the expected error is propagated to the watcher. - u, err := updateCh.Receive(ctx) + u, err := cw.updateCh.Receive(ctx) if err != nil { t.Fatalf("timeout when waiting for a cluster resource from the management server: %v", err) } @@ -808,19 +806,13 @@ func (s) TestCDSWatch_PartialValid(t *testing.T) { // Register two watches for cluster resources. The first watch is expected // to receive an error because the received resource is NACK'ed. The second // watch is expected to get a good update. - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() badResourceName := cdsName - updateCh1 := testutils.NewChannel() - cdsCancel1 := client.WatchCluster(badResourceName, func(u xdsresource.ClusterUpdate, err error) { - updateCh1.SendContext(ctx, xdsresource.ClusterUpdateErrTuple{Update: u, Err: err}) - }) + cw1 := newClusterWatcher() + cdsCancel1 := xdsresource.WatchCluster(client, badResourceName, cw1) defer cdsCancel1() goodResourceName := cdsNameNewStyle - updateCh2 := testutils.NewChannel() - cdsCancel2 := client.WatchCluster(goodResourceName, func(u xdsresource.ClusterUpdate, err error) { - updateCh2.SendContext(ctx, xdsresource.ClusterUpdateErrTuple{Update: u, Err: err}) - }) + cw2 := newClusterWatcher() + cdsCancel2 := xdsresource.WatchCluster(client, goodResourceName, cw2) defer cdsCancel2() // Configure the management server with two cluster resources. One of these @@ -832,13 +824,15 @@ func (s) TestCDSWatch_PartialValid(t *testing.T) { e2e.DefaultCluster(goodResourceName, edsName, e2e.SecurityLevelNone)}, SkipValidation: true, } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) } // Verify that the expected error is propagated to the watcher which is // watching the bad resource. - u, err := updateCh1.Receive(ctx) + u, err := cw1.updateCh.Receive(ctx) if err != nil { t.Fatalf("timeout when waiting for a cluster resource from the management server: %v", err) } @@ -855,7 +849,7 @@ func (s) TestCDSWatch_PartialValid(t *testing.T) { EDSServiceName: edsName, }, } - if err := verifyClusterUpdate(ctx, updateCh2, wantUpdate); err != nil { + if err := verifyClusterUpdate(ctx, cw2.updateCh, wantUpdate); err != nil { t.Fatal(err) } } @@ -882,16 +876,12 @@ func (s) TestCDSWatch_PartialResponse(t *testing.T) { // Register two watches for two cluster resources and have the // callbacks push the received updates on to a channel. resourceName1 := cdsName - updateCh1 := testutils.NewChannel() - cdsCancel1 := client.WatchCluster(resourceName1, func(u xdsresource.ClusterUpdate, err error) { - updateCh1.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err}) - }) + cw1 := newClusterWatcher() + cdsCancel1 := xdsresource.WatchCluster(client, resourceName1, cw1) defer cdsCancel1() resourceName2 := cdsNameNewStyle - updateCh2 := testutils.NewChannel() - cdsCancel2 := client.WatchCluster(resourceName2, func(u xdsresource.ClusterUpdate, err error) { - updateCh2.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err}) - }) + cw2 := newClusterWatcher() + cdsCancel2 := xdsresource.WatchCluster(client, resourceName2, cw2) defer cdsCancel2() // Configure the management server to return only one of the two cluster @@ -914,12 +904,12 @@ func (s) TestCDSWatch_PartialResponse(t *testing.T) { EDSServiceName: edsName, }, } - if err := verifyClusterUpdate(ctx, updateCh1, wantUpdate1); err != nil { + if err := verifyClusterUpdate(ctx, cw1.updateCh, wantUpdate1); err != nil { t.Fatal(err) } // Verify that the second watcher does not get an update with an error. - if err := verifyNoClusterUpdate(ctx, updateCh2); err != nil { + if err := verifyNoClusterUpdate(ctx, cw2.updateCh); err != nil { t.Fatal(err) } @@ -944,13 +934,13 @@ func (s) TestCDSWatch_PartialResponse(t *testing.T) { EDSServiceName: edsNameNewStyle, }, } - if err := verifyClusterUpdate(ctx, updateCh2, wantUpdate2); err != nil { + if err := verifyClusterUpdate(ctx, cw2.updateCh, wantUpdate2); err != nil { t.Fatal(err) } // Verify that the first watcher gets no update, as the first resource did // not change. - if err := verifyNoClusterUpdate(ctx, updateCh1); err != nil { + if err := verifyNoClusterUpdate(ctx, cw1.updateCh); err != nil { t.Fatal(err) } } diff --git a/xds/internal/xdsclient/tests/dump_test.go b/xds/internal/xdsclient/tests/dump_test.go index 5f2c5e05e4dd..ef9a05c87fbf 100644 --- a/xds/internal/xdsclient/tests/dump_test.go +++ b/xds/internal/xdsclient/tests/dump_test.go @@ -125,7 +125,7 @@ func (s) TestDumpResources(t *testing.T) { client.WatchRouteConfig(target, func(xdsresource.RouteConfigUpdate, error) {}) } for _, target := range cdsTargets { - client.WatchCluster(target, func(xdsresource.ClusterUpdate, error) {}) + xdsresource.WatchCluster(client, target, noopClusterWatcher{}) } for _, target := range edsTargets { xdsresource.WatchEndpoints(client, target, noopEndpointsWatcher{}) diff --git a/xds/internal/xdsclient/tests/eds_watchers_test.go b/xds/internal/xdsclient/tests/eds_watchers_test.go index 0d81c3848c8d..aca227ed4582 100644 --- a/xds/internal/xdsclient/tests/eds_watchers_test.go +++ b/xds/internal/xdsclient/tests/eds_watchers_test.go @@ -70,11 +70,15 @@ func (ew *endpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData) } func (ew *endpointsWatcher) OnError(err error) { - ew.updateCh.SendOrFail(endpointsUpdateErrTuple{err: err}) + // When used with a go-control-plane management server that continuously + // resends resources which are NACKed by the xDS client, using a `Replace()` + // here and in OnResourceDoesNotExist() simplifies tests which will have + // access to the most recently received error. + ew.updateCh.Replace(endpointsUpdateErrTuple{err: err}) } func (ew *endpointsWatcher) OnResourceDoesNotExist() { - ew.updateCh.SendOrFail(endpointsUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Endpoints not found in received response")}) + ew.updateCh.Replace(endpointsUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Endpoints not found in received response")}) } // badEndpointsResource returns a endpoints resource for the given diff --git a/xds/internal/xdsclient/tests/federation_watchers_test.go b/xds/internal/xdsclient/tests/federation_watchers_test.go index ed59b63ac794..cbfca26af3d1 100644 --- a/xds/internal/xdsclient/tests/federation_watchers_test.go +++ b/xds/internal/xdsclient/tests/federation_watchers_test.go @@ -219,15 +219,11 @@ func (s) TestFederation_ClusterResourceContextParamOrder(t *testing.T) { // Register two watches for cluster resources with the same query string, // but context parameters in different order. - updateCh1 := testutils.NewChannel() - cdsCancel1 := client.WatchCluster(resourceName1, func(u xdsresource.ClusterUpdate, err error) { - updateCh1.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err}) - }) + cw1 := newClusterWatcher() + cdsCancel1 := xdsresource.WatchCluster(client, resourceName1, cw1) defer cdsCancel1() - updateCh2 := testutils.NewChannel() - cdsCancel2 := client.WatchCluster(resourceName2, func(u xdsresource.ClusterUpdate, err error) { - updateCh2.Send(xdsresource.ClusterUpdateErrTuple{Update: u, Err: err}) - }) + cw2 := newClusterWatcher() + cdsCancel2 := xdsresource.WatchCluster(client, resourceName2, cw2) defer cdsCancel2() // Configure the management server for the non-default authority to return a @@ -250,10 +246,10 @@ func (s) TestFederation_ClusterResourceContextParamOrder(t *testing.T) { }, } // Verify the contents of the received update. - if err := verifyClusterUpdate(ctx, updateCh1, wantUpdate); err != nil { + if err := verifyClusterUpdate(ctx, cw1.updateCh, wantUpdate); err != nil { t.Fatal(err) } - if err := verifyClusterUpdate(ctx, updateCh2, wantUpdate); err != nil { + if err := verifyClusterUpdate(ctx, cw2.updateCh, wantUpdate); err != nil { t.Fatal(err) } } diff --git a/xds/internal/xdsclient/tests/resource_update_test.go b/xds/internal/xdsclient/tests/resource_update_test.go index 3a2ccc114e8d..d83f9d73a89c 100644 --- a/xds/internal/xdsclient/tests/resource_update_test.go +++ b/xds/internal/xdsclient/tests/resource_update_test.go @@ -738,18 +738,9 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) { defer close() t.Logf("Created xDS client to %s", mgmtServer.Address) - // A wrapper struct to wrap the update and the associated error, as - // received by the resource watch callback. - type updateAndErr struct { - update xdsresource.ClusterUpdate - err error - } - updateAndErrCh := testutils.NewChannel() - // Register a watch, and push the results on to a channel. - client.WatchCluster(test.resourceName, func(update xdsresource.ClusterUpdate, err error) { - updateAndErrCh.Send(updateAndErr{update: update, err: err}) - }) + cw := newClusterWatcher() + xdsresource.WatchCluster(client, test.resourceName, cw) t.Logf("Registered a watch for Cluster %q", test.resourceName) // Wait for the discovery request to be sent out. @@ -775,12 +766,12 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) { // Wait for an update from the xDS client and compare with expected // update. - val, err = updateAndErrCh.Receive(ctx) + val, err = cw.updateCh.Receive(ctx) if err != nil { t.Fatalf("Timeout when waiting for watch callback to invoked after response from management server: %v", err) } - gotUpdate := val.(updateAndErr).update - gotErr := val.(updateAndErr).err + gotUpdate := val.(xdsresource.ClusterUpdateErrTuple).Update + gotErr := val.(xdsresource.ClusterUpdateErrTuple).Err if (gotErr != nil) != (test.wantErr != "") { t.Fatalf("Got error from handling update: %v, want %v", gotErr, test.wantErr) }