Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xdsclient: completely remove the old WatchCluster API #6621

Merged
merged 2 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 0 additions & 51 deletions xds/internal/testutils/fakeclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,57 +138,6 @@ func (xdsC *Client) WaitForCancelRouteConfigWatch(ctx context.Context) (string,
return val.(string), err
}

// WatchCluster registers a CDS watch.
func (xdsC *Client) WatchCluster(clusterName string, callback func(xdsresource.ClusterUpdate, error)) func() {
// Due to the tree like structure of aggregate clusters, there can be multiple callbacks persisted for each cluster
// node. However, the client doesn't care about the parent child relationship between the nodes, only that it invokes
// the right callback for a particular cluster.
xdsC.cdsCbs[clusterName] = callback
xdsC.cdsWatchCh.Send(clusterName)
return func() {
xdsC.cdsCancelCh.Send(clusterName)
}
}

// WaitForWatchCluster waits for WatchCluster to be invoked on this client and
// returns the clusterName being watched.
func (xdsC *Client) WaitForWatchCluster(ctx context.Context) (string, error) {
val, err := xdsC.cdsWatchCh.Receive(ctx)
if err != nil {
return "", err
}
return val.(string), err
}

// InvokeWatchClusterCallback invokes the registered cdsWatch callback.
//
// Not thread safe with WatchCluster. Only call this after
// WaitForWatchCluster.
func (xdsC *Client) InvokeWatchClusterCallback(update xdsresource.ClusterUpdate, err error) {
// Keeps functionality with previous usage of this, if single callback call that callback.
if len(xdsC.cdsCbs) == 1 {
var clusterName string
for cluster := range xdsC.cdsCbs {
clusterName = cluster
}
xdsC.cdsCbs[clusterName](update, err)
} else {
// Have what callback you call with the update determined by the service name in the ClusterUpdate. Left up to the
// caller to make sure the cluster update matches with a persisted callback.
xdsC.cdsCbs[update.ClusterName](update, err)
}
}

// WaitForCancelClusterWatch waits for a CDS watch to be cancelled and returns
// context.DeadlineExceeded otherwise.
func (xdsC *Client) WaitForCancelClusterWatch(ctx context.Context) (string, error) {
clusterNameReceived, err := xdsC.cdsCancelCh.Receive(ctx)
if err != nil {
return "", err
}
return clusterNameReceived.(string), err
}

// WatchEndpoints registers an EDS watch for provided clusterName.
func (xdsC *Client) WatchEndpoints(clusterName string, callback func(xdsresource.EndpointsUpdate, error)) (cancel func()) {
xdsC.edsCbs[clusterName] = callback
Expand Down
1 change: 0 additions & 1 deletion xds/internal/xdsclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
type XDSClient interface {
WatchListener(string, func(xdsresource.ListenerUpdate, error)) func()
WatchRouteConfig(string, func(xdsresource.RouteConfigUpdate, error)) func()
WatchCluster(string, func(xdsresource.ClusterUpdate, error)) func()

// WatchResource uses xDS to discover the resource associated with the
// provided resource name. The resource type implementation determines how
Expand Down
31 changes: 0 additions & 31 deletions xds/internal/xdsclient/clientimpl_watchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,37 +81,6 @@ func (c *clientImpl) WatchRouteConfig(resourceName string, cb func(xdsresource.R
return xdsresource.WatchRouteConfig(c, resourceName, watcher)
}

// This is only required temporarily, while we modify the
// clientImpl.WatchCluster API to be implemented via the wrapper WatchCluster()
// API which calls the WatchResource() API.
type clusterWatcher struct {
resourceName string
cb func(xdsresource.ClusterUpdate, error)
}

func (c *clusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData) {
c.cb(update.Resource, nil)
}

func (c *clusterWatcher) OnError(err error) {
c.cb(xdsresource.ClusterUpdate{}, err)
}

func (c *clusterWatcher) OnResourceDoesNotExist() {
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Cluster not found in received response", c.resourceName)
c.cb(xdsresource.ClusterUpdate{}, err)
}

// WatchCluster uses CDS to discover information about the Cluster resource
// identified by resourceName.
//
// WatchCluster can be called multiple times, with same or different
// clusterNames. Each call will start an independent watcher for the resource.
func (c *clientImpl) WatchCluster(resourceName string, cb func(xdsresource.ClusterUpdate, error)) (cancel func()) {
watcher := &clusterWatcher{resourceName: resourceName, cb: cb}
return xdsresource.WatchCluster(c, resourceName, watcher)
}

// WatchResource uses xDS to discover the resource associated with the provided
// resource name. The resource type implementation determines how xDS requests
// are sent out and how responses are deserialized and validated. Upon receipt
Expand Down
28 changes: 19 additions & 9 deletions xds/internal/xdsclient/tests/authority_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ func setupForAuthorityTests(ctx context.Context, t *testing.T, idleTimeout time.
return lisDefault, lisNonDefault, client, close
}

type noopClusterWatcher struct{}

func (noopClusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData) {}
func (noopClusterWatcher) OnError(err error) {}
func (noopClusterWatcher) OnResourceDoesNotExist() {}

// TestAuthorityShare tests the authority sharing logic. The test verifies the
// following scenarios:
// - A watch for a resource name with an authority matching an existing watch
Expand All @@ -143,14 +149,15 @@ func (s) TestAuthorityShare(t *testing.T) {
}

// Request the first resource. Verify that a new transport is created.
cdsCancel1 := client.WatchCluster(authorityTestResourceName11, func(u xdsresource.ClusterUpdate, err error) {})
watcher := noopClusterWatcher{}
cdsCancel1 := xdsresource.WatchCluster(client, authorityTestResourceName11, watcher)
defer cdsCancel1()
if _, err := lis.NewConnCh.Receive(ctx); err != nil {
t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err)
}

// Request the second resource. Verify that no new transport is created.
cdsCancel2 := client.WatchCluster(authorityTestResourceName12, func(u xdsresource.ClusterUpdate, err error) {})
cdsCancel2 := xdsresource.WatchCluster(client, authorityTestResourceName12, watcher)
defer cdsCancel2()
sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
Expand All @@ -159,7 +166,7 @@ func (s) TestAuthorityShare(t *testing.T) {
}

// Request the third resource. Verify that no new transport is created.
cdsCancel3 := client.WatchCluster(authorityTestResourceName2, func(u xdsresource.ClusterUpdate, err error) {})
cdsCancel3 := xdsresource.WatchCluster(client, authorityTestResourceName2, watcher)
defer cdsCancel3()
sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
Expand All @@ -179,15 +186,16 @@ func (s) TestAuthorityIdleTimeout(t *testing.T) {
defer close()

// Request the first resource. Verify that a new transport is created.
cdsCancel1 := client.WatchCluster(authorityTestResourceName11, func(u xdsresource.ClusterUpdate, err error) {})
watcher := noopClusterWatcher{}
cdsCancel1 := xdsresource.WatchCluster(client, authorityTestResourceName11, watcher)
val, err := lis.NewConnCh.Receive(ctx)
if err != nil {
t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err)
}
conn := val.(*testutils.ConnWrapper)

// Request the second resource. Verify that no new transport is created.
cdsCancel2 := client.WatchCluster(authorityTestResourceName12, func(u xdsresource.ClusterUpdate, err error) {})
cdsCancel2 := xdsresource.WatchCluster(client, authorityTestResourceName12, watcher)
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if _, err := lis.NewConnCh.Receive(sCtx); err != context.DeadlineExceeded {
Expand Down Expand Up @@ -225,7 +233,8 @@ func (s) TestAuthorityClientClose(t *testing.T) {

// Request the first resource. Verify that a new transport is created to the
// default management server.
cdsCancel1 := client.WatchCluster(authorityTestResourceName11, func(u xdsresource.ClusterUpdate, err error) {})
watcher := noopClusterWatcher{}
cdsCancel1 := xdsresource.WatchCluster(client, authorityTestResourceName11, watcher)
val, err := lisDefault.NewConnCh.Receive(ctx)
if err != nil {
t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err)
Expand All @@ -235,7 +244,7 @@ func (s) TestAuthorityClientClose(t *testing.T) {
// Request another resource which is served by the non-default authority.
// Verify that a new transport is created to the non-default management
// server.
client.WatchCluster(authorityTestResourceName3, func(u xdsresource.ClusterUpdate, err error) {})
xdsresource.WatchCluster(client, authorityTestResourceName3, watcher)
val, err = lisNonDefault.NewConnCh.Receive(ctx)
if err != nil {
t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err)
Expand Down Expand Up @@ -272,7 +281,8 @@ func (s) TestAuthorityRevive(t *testing.T) {
defer close()

// Request the first resource. Verify that a new transport is created.
cdsCancel1 := client.WatchCluster(authorityTestResourceName11, func(u xdsresource.ClusterUpdate, err error) {})
watcher := noopClusterWatcher{}
cdsCancel1 := xdsresource.WatchCluster(client, authorityTestResourceName11, watcher)
val, err := lis.NewConnCh.Receive(ctx)
if err != nil {
t.Fatalf("Timed out when waiting for a new transport to be created to the management server: %v", err)
Expand All @@ -284,7 +294,7 @@ func (s) TestAuthorityRevive(t *testing.T) {

// Request the second resource. Verify that no new transport is created.
// This should move the authority out of the idle cache.
cdsCancel2 := client.WatchCluster(authorityTestResourceName12, func(u xdsresource.ClusterUpdate, err error) {})
cdsCancel2 := xdsresource.WatchCluster(client, authorityTestResourceName12, watcher)
defer cdsCancel2()
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
Expand Down
Loading