Skip to content

Commit

Permalink
xdsclient: completely remove the old WatchCluster API (#6621)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars authored Sep 18, 2023
1 parent 94d8074 commit 92f5ba9
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 221 deletions.
51 changes: 0 additions & 51 deletions xds/internal/testutils/fakeclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,57 +138,6 @@ func (xdsC *Client) WaitForCancelRouteConfigWatch(ctx context.Context) (string,
return val.(string), err
}

// WatchCluster registers a CDS watch.
func (xdsC *Client) WatchCluster(clusterName string, callback func(xdsresource.ClusterUpdate, error)) func() {
// Due to the tree like structure of aggregate clusters, there can be multiple callbacks persisted for each cluster
// node. However, the client doesn't care about the parent child relationship between the nodes, only that it invokes
// the right callback for a particular cluster.
xdsC.cdsCbs[clusterName] = callback
xdsC.cdsWatchCh.Send(clusterName)
return func() {
xdsC.cdsCancelCh.Send(clusterName)
}
}

// WaitForWatchCluster waits for WatchCluster to be invoked on this client and
// returns the clusterName being watched.
func (xdsC *Client) WaitForWatchCluster(ctx context.Context) (string, error) {
val, err := xdsC.cdsWatchCh.Receive(ctx)
if err != nil {
return "", err
}
return val.(string), err
}

// InvokeWatchClusterCallback invokes the registered cdsWatch callback.
//
// Not thread safe with WatchCluster. Only call this after
// WaitForWatchCluster.
func (xdsC *Client) InvokeWatchClusterCallback(update xdsresource.ClusterUpdate, err error) {
// Keeps functionality with previous usage of this, if single callback call that callback.
if len(xdsC.cdsCbs) == 1 {
var clusterName string
for cluster := range xdsC.cdsCbs {
clusterName = cluster
}
xdsC.cdsCbs[clusterName](update, err)
} else {
// Have what callback you call with the update determined by the service name in the ClusterUpdate. Left up to the
// caller to make sure the cluster update matches with a persisted callback.
xdsC.cdsCbs[update.ClusterName](update, err)
}
}

// WaitForCancelClusterWatch waits for a CDS watch to be cancelled and returns
// context.DeadlineExceeded otherwise.
func (xdsC *Client) WaitForCancelClusterWatch(ctx context.Context) (string, error) {
clusterNameReceived, err := xdsC.cdsCancelCh.Receive(ctx)
if err != nil {
return "", err
}
return clusterNameReceived.(string), err
}

// WatchEndpoints registers an EDS watch for provided clusterName.
func (xdsC *Client) WatchEndpoints(clusterName string, callback func(xdsresource.EndpointsUpdate, error)) (cancel func()) {
xdsC.edsCbs[clusterName] = callback
Expand Down
1 change: 0 additions & 1 deletion xds/internal/xdsclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
type XDSClient interface {
WatchListener(string, func(xdsresource.ListenerUpdate, error)) func()
WatchRouteConfig(string, func(xdsresource.RouteConfigUpdate, error)) func()
WatchCluster(string, func(xdsresource.ClusterUpdate, error)) func()

// WatchResource uses xDS to discover the resource associated with the
// provided resource name. The resource type implementation determines how
Expand Down
31 changes: 0 additions & 31 deletions xds/internal/xdsclient/clientimpl_watchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,37 +81,6 @@ func (c *clientImpl) WatchRouteConfig(resourceName string, cb func(xdsresource.R
return xdsresource.WatchRouteConfig(c, resourceName, watcher)
}

// This is only required temporarily, while we modify the
// clientImpl.WatchCluster API to be implemented via the wrapper WatchCluster()
// API which calls the WatchResource() API.
type clusterWatcher struct {
resourceName string
cb func(xdsresource.ClusterUpdate, error)
}

func (c *clusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData) {
c.cb(update.Resource, nil)
}

func (c *clusterWatcher) OnError(err error) {
c.cb(xdsresource.ClusterUpdate{}, err)
}

func (c *clusterWatcher) OnResourceDoesNotExist() {
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Cluster not found in received response", c.resourceName)
c.cb(xdsresource.ClusterUpdate{}, err)
}

// WatchCluster uses CDS to discover information about the Cluster resource
// identified by resourceName.
//
// WatchCluster can be called multiple times, with same or different
// clusterNames. Each call will start an independent watcher for the resource.
func (c *clientImpl) WatchCluster(resourceName string, cb func(xdsresource.ClusterUpdate, error)) (cancel func()) {
watcher := &clusterWatcher{resourceName: resourceName, cb: cb}
return xdsresource.WatchCluster(c, resourceName, watcher)
}

// WatchResource uses xDS to discover the resource associated with the provided
// resource name. The resource type implementation determines how xDS requests
// are sent out and how responses are deserialized and validated. Upon receipt
Expand Down
28 changes: 19 additions & 9 deletions xds/internal/xdsclient/tests/authority_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ func setupForAuthorityTests(ctx context.Context, t *testing.T, idleTimeout time.
return lisDefault, lisNonDefault, client, close
}

type noopClusterWatcher struct{}

func (noopClusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData) {}
func (noopClusterWatcher) OnError(err error) {}
func (noopClusterWatcher) OnResourceDoesNotExist() {}

// TestAuthorityShare tests the authority sharing logic. The test verifies the
// following scenarios:
// - A watch for a resource name with an authority matching an existing watch
Expand All @@ -143,14 +149,15 @@ func (s) TestAuthorityShare(t *testing.T) {
}

// Request the first resource. Verify that a new transport is created.
cdsCancel1 := client.WatchCluster(authorityTestResourceName11, func(u xdsresource.ClusterUpdate, err error) {})
watcher := noopClusterWatcher{}
cdsCancel1 := xdsresource.WatchCluster(client, authorityTestResourceName11, watcher)
defer cdsCancel1()
if _, err := lis.NewConnCh.Receive(ctx); err != nil {
t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err)
}

// Request the second resource. Verify that no new transport is created.
cdsCancel2 := client.WatchCluster(authorityTestResourceName12, func(u xdsresource.ClusterUpdate, err error) {})
cdsCancel2 := xdsresource.WatchCluster(client, authorityTestResourceName12, watcher)
defer cdsCancel2()
sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
Expand All @@ -159,7 +166,7 @@ func (s) TestAuthorityShare(t *testing.T) {
}

// Request the third resource. Verify that no new transport is created.
cdsCancel3 := client.WatchCluster(authorityTestResourceName2, func(u xdsresource.ClusterUpdate, err error) {})
cdsCancel3 := xdsresource.WatchCluster(client, authorityTestResourceName2, watcher)
defer cdsCancel3()
sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
Expand All @@ -179,15 +186,16 @@ func (s) TestAuthorityIdleTimeout(t *testing.T) {
defer close()

// Request the first resource. Verify that a new transport is created.
cdsCancel1 := client.WatchCluster(authorityTestResourceName11, func(u xdsresource.ClusterUpdate, err error) {})
watcher := noopClusterWatcher{}
cdsCancel1 := xdsresource.WatchCluster(client, authorityTestResourceName11, watcher)
val, err := lis.NewConnCh.Receive(ctx)
if err != nil {
t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err)
}
conn := val.(*testutils.ConnWrapper)

// Request the second resource. Verify that no new transport is created.
cdsCancel2 := client.WatchCluster(authorityTestResourceName12, func(u xdsresource.ClusterUpdate, err error) {})
cdsCancel2 := xdsresource.WatchCluster(client, authorityTestResourceName12, watcher)
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if _, err := lis.NewConnCh.Receive(sCtx); err != context.DeadlineExceeded {
Expand Down Expand Up @@ -225,7 +233,8 @@ func (s) TestAuthorityClientClose(t *testing.T) {

// Request the first resource. Verify that a new transport is created to the
// default management server.
cdsCancel1 := client.WatchCluster(authorityTestResourceName11, func(u xdsresource.ClusterUpdate, err error) {})
watcher := noopClusterWatcher{}
cdsCancel1 := xdsresource.WatchCluster(client, authorityTestResourceName11, watcher)
val, err := lisDefault.NewConnCh.Receive(ctx)
if err != nil {
t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err)
Expand All @@ -235,7 +244,7 @@ func (s) TestAuthorityClientClose(t *testing.T) {
// Request another resource which is served by the non-default authority.
// Verify that a new transport is created to the non-default management
// server.
client.WatchCluster(authorityTestResourceName3, func(u xdsresource.ClusterUpdate, err error) {})
xdsresource.WatchCluster(client, authorityTestResourceName3, watcher)
val, err = lisNonDefault.NewConnCh.Receive(ctx)
if err != nil {
t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err)
Expand Down Expand Up @@ -272,7 +281,8 @@ func (s) TestAuthorityRevive(t *testing.T) {
defer close()

// Request the first resource. Verify that a new transport is created.
cdsCancel1 := client.WatchCluster(authorityTestResourceName11, func(u xdsresource.ClusterUpdate, err error) {})
watcher := noopClusterWatcher{}
cdsCancel1 := xdsresource.WatchCluster(client, authorityTestResourceName11, watcher)
val, err := lis.NewConnCh.Receive(ctx)
if err != nil {
t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err)
Expand All @@ -284,7 +294,7 @@ func (s) TestAuthorityRevive(t *testing.T) {

// Request the second resource. Verify that no new transport is created.
// This should move the authority out of the idle cache.
cdsCancel2 := client.WatchCluster(authorityTestResourceName12, func(u xdsresource.ClusterUpdate, err error) {})
cdsCancel2 := xdsresource.WatchCluster(client, authorityTestResourceName12, watcher)
defer cdsCancel2()
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
Expand Down
Loading

0 comments on commit 92f5ba9

Please sign in to comment.