Skip to content

Commit

Permalink
completely delete WatchListener and WatchRouteConfig APIs (#6849)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars authored Dec 15, 2023
1 parent 836e5de commit 6e6914a
Show file tree
Hide file tree
Showing 16 changed files with 421 additions and 636 deletions.
153 changes: 0 additions & 153 deletions xds/internal/testutils/fakeclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/load"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

// Client is a fake implementation of an xds client. It exposes a bunch of
Expand All @@ -38,151 +37,10 @@ type Client struct {
xdsclient.XDSClient

name string
ldsWatchCh *testutils.Channel
rdsWatchCh *testutils.Channel
cdsWatchCh *testutils.Channel
edsWatchCh *testutils.Channel
ldsCancelCh *testutils.Channel
rdsCancelCh *testutils.Channel
cdsCancelCh *testutils.Channel
edsCancelCh *testutils.Channel
loadReportCh *testutils.Channel
lrsCancelCh *testutils.Channel
loadStore *load.Store
bootstrapCfg *bootstrap.Config

ldsCb func(xdsresource.ListenerUpdate, error)
rdsCbs map[string]func(xdsresource.RouteConfigUpdate, error)
cdsCbs map[string]func(xdsresource.ClusterUpdate, error)
edsCbs map[string]func(xdsresource.EndpointsUpdate, error)
}

// WatchListener registers a LDS watch.
func (xdsC *Client) WatchListener(serviceName string, callback func(xdsresource.ListenerUpdate, error)) func() {
xdsC.ldsCb = callback
xdsC.ldsWatchCh.Send(serviceName)
return func() {
xdsC.ldsCancelCh.Send(nil)
}
}

// WaitForWatchListener waits for WatchCluster to be invoked on this client and
// returns the serviceName being watched.
func (xdsC *Client) WaitForWatchListener(ctx context.Context) (string, error) {
val, err := xdsC.ldsWatchCh.Receive(ctx)
if err != nil {
return "", err
}
return val.(string), err
}

// InvokeWatchListenerCallback invokes the registered ldsWatch callback.
//
// Not thread safe with WatchListener. Only call this after
// WaitForWatchListener.
func (xdsC *Client) InvokeWatchListenerCallback(update xdsresource.ListenerUpdate, err error) {
xdsC.ldsCb(update, err)
}

// WaitForCancelListenerWatch waits for a LDS watch to be cancelled and returns
// context.DeadlineExceeded otherwise.
func (xdsC *Client) WaitForCancelListenerWatch(ctx context.Context) error {
_, err := xdsC.ldsCancelCh.Receive(ctx)
return err
}

// WatchRouteConfig registers a RDS watch.
func (xdsC *Client) WatchRouteConfig(routeName string, callback func(xdsresource.RouteConfigUpdate, error)) func() {
xdsC.rdsCbs[routeName] = callback
xdsC.rdsWatchCh.Send(routeName)
return func() {
xdsC.rdsCancelCh.Send(routeName)
}
}

// WaitForWatchRouteConfig waits for WatchCluster to be invoked on this client and
// returns the routeName being watched.
func (xdsC *Client) WaitForWatchRouteConfig(ctx context.Context) (string, error) {
val, err := xdsC.rdsWatchCh.Receive(ctx)
if err != nil {
return "", err
}
return val.(string), err
}

// InvokeWatchRouteConfigCallback invokes the registered rdsWatch callback.
//
// Not thread safe with WatchRouteConfig. Only call this after
// WaitForWatchRouteConfig.
func (xdsC *Client) InvokeWatchRouteConfigCallback(name string, update xdsresource.RouteConfigUpdate, err error) {
if len(xdsC.rdsCbs) != 1 {
xdsC.rdsCbs[name](update, err)
return
}
// Keeps functionality with previous usage of this on client side, if single
// callback call that callback.
var routeName string
for route := range xdsC.rdsCbs {
routeName = route
}
xdsC.rdsCbs[routeName](update, err)
}

// WaitForCancelRouteConfigWatch waits for a RDS watch to be cancelled and returns
// context.DeadlineExceeded otherwise.
func (xdsC *Client) WaitForCancelRouteConfigWatch(ctx context.Context) (string, error) {
val, err := xdsC.rdsCancelCh.Receive(ctx)
if err != nil {
return "", err
}
return val.(string), err
}

// WatchEndpoints registers an EDS watch for provided clusterName.
func (xdsC *Client) WatchEndpoints(clusterName string, callback func(xdsresource.EndpointsUpdate, error)) (cancel func()) {
xdsC.edsCbs[clusterName] = callback
xdsC.edsWatchCh.Send(clusterName)
return func() {
xdsC.edsCancelCh.Send(clusterName)
}
}

// WaitForWatchEDS waits for WatchEndpoints to be invoked on this client and
// returns the clusterName being watched.
func (xdsC *Client) WaitForWatchEDS(ctx context.Context) (string, error) {
val, err := xdsC.edsWatchCh.Receive(ctx)
if err != nil {
return "", err
}
return val.(string), err
}

// InvokeWatchEDSCallback invokes the registered edsWatch callback.
//
// Not thread safe with WatchEndpoints. Only call this after
// WaitForWatchEDS.
func (xdsC *Client) InvokeWatchEDSCallback(name string, update xdsresource.EndpointsUpdate, err error) {
if len(xdsC.edsCbs) != 1 {
// This may panic if name isn't found. But it's fine for tests.
xdsC.edsCbs[name](update, err)
return
}
// Keeps functionality with previous usage of this, if single callback call
// that callback.
for n := range xdsC.edsCbs {
name = n
}
xdsC.edsCbs[name](update, err)
}

// WaitForCancelEDSWatch waits for a EDS watch to be cancelled and returns
// context.DeadlineExceeded otherwise.
func (xdsC *Client) WaitForCancelEDSWatch(ctx context.Context) (string, error) {
edsNameReceived, err := xdsC.edsCancelCh.Receive(ctx)
if err != nil {
return "", err
}
return edsNameReceived.(string), err
}

// ReportLoadArgs wraps the arguments passed to ReportLoad.
Expand Down Expand Up @@ -247,20 +105,9 @@ func NewClient() *Client {
func NewClientWithName(name string) *Client {
return &Client{
name: name,
ldsWatchCh: testutils.NewChannelWithSize(10),
rdsWatchCh: testutils.NewChannelWithSize(10),
cdsWatchCh: testutils.NewChannelWithSize(10),
edsWatchCh: testutils.NewChannelWithSize(10),
ldsCancelCh: testutils.NewChannelWithSize(10),
rdsCancelCh: testutils.NewChannelWithSize(10),
cdsCancelCh: testutils.NewChannelWithSize(10),
edsCancelCh: testutils.NewChannelWithSize(10),
loadReportCh: testutils.NewChannel(),
lrsCancelCh: testutils.NewChannel(),
loadStore: load.NewStore(),
bootstrapCfg: &bootstrap.Config{ClientDefaultListenerResourceNameTemplate: "%s"},
rdsCbs: make(map[string]func(xdsresource.RouteConfigUpdate, error)),
cdsCbs: make(map[string]func(xdsresource.ClusterUpdate, error)),
edsCbs: make(map[string]func(xdsresource.EndpointsUpdate, error)),
}
}
2 changes: 1 addition & 1 deletion xds/internal/testutils/resource_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
// used to receive updates on watches registered with the xDS client, when using
// the resource-type agnostic WatchResource API.
//
// Tests can the channels provided by this tyep to get access to updates and
// Tests can use the channels provided by this type to get access to updates and
// errors sent by the xDS client.
type TestResourceWatcher struct {
// UpdateCh is the channel on which xDS client updates are delivered.
Expand Down
6 changes: 0 additions & 6 deletions xds/internal/xdsclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ import (
// (collectively termed as xDS) on a remote management server, to discover
// various dynamic resources.
type XDSClient interface {
WatchListener(string, func(xdsresource.ListenerUpdate, error)) func()
WatchRouteConfig(string, func(xdsresource.RouteConfigUpdate, error)) func()

// WatchResource uses xDS to discover the resource associated with the
// provided resource name. The resource type implementation determines how
// xDS requests are sent out and how responses are deserialized and
Expand All @@ -47,9 +44,6 @@ type XDSClient interface {
// During a race (e.g. an xDS response is received while the user is calling
// cancel()), there's a small window where the callback can be called after
// the watcher is canceled. Callers need to handle this case.
//
// TODO: Once this generic client API is fully implemented and integrated,
// delete the resource type specific watch APIs on this interface.
WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func())

// DumpResources returns the status of the xDS resources. Returns a map of
Expand Down
56 changes: 0 additions & 56 deletions xds/internal/xdsclient/clientimpl_watchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,62 +25,6 @@ import (
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

// This is only required temporarily, while we modify the
// clientImpl.WatchListener API to be implemented via the wrapper
// WatchListener() API which calls the WatchResource() API.
type listenerWatcher struct {
resourceName string
cb func(xdsresource.ListenerUpdate, error)
}

func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData) {
l.cb(update.Resource, nil)
}

func (l *listenerWatcher) OnError(err error) {
l.cb(xdsresource.ListenerUpdate{}, err)
}

func (l *listenerWatcher) OnResourceDoesNotExist() {
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Listener not found in received response", l.resourceName)
l.cb(xdsresource.ListenerUpdate{}, err)
}

// WatchListener uses LDS to discover information about the Listener resource
// identified by resourceName.
func (c *clientImpl) WatchListener(resourceName string, cb func(xdsresource.ListenerUpdate, error)) (cancel func()) {
watcher := &listenerWatcher{resourceName: resourceName, cb: cb}
return xdsresource.WatchListener(c, resourceName, watcher)
}

// This is only required temporarily, while we modify the
// clientImpl.WatchRouteConfig API to be implemented via the wrapper
// WatchRouteConfig() API which calls the WatchResource() API.
type routeConfigWatcher struct {
resourceName string
cb func(xdsresource.RouteConfigUpdate, error)
}

func (r *routeConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {
r.cb(update.Resource, nil)
}

func (r *routeConfigWatcher) OnError(err error) {
r.cb(xdsresource.RouteConfigUpdate{}, err)
}

func (r *routeConfigWatcher) OnResourceDoesNotExist() {
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type RouteConfiguration not found in received response", r.resourceName)
r.cb(xdsresource.RouteConfigUpdate{}, err)
}

// WatchRouteConfig uses RDS to discover information about the
// RouteConfiguration resource identified by resourceName.
func (c *clientImpl) WatchRouteConfig(resourceName string, cb func(xdsresource.RouteConfigUpdate, error)) (cancel func()) {
watcher := &routeConfigWatcher{resourceName: resourceName, cb: cb}
return xdsresource.WatchRouteConfig(c, resourceName, watcher)
}

// WatchResource uses xDS to discover the resource associated with the provided
// resource name. The resource type implementation determines how xDS requests
// are sent out and how responses are deserialized and validated. Upon receipt
Expand Down
6 changes: 0 additions & 6 deletions xds/internal/xdsclient/tests/authority_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,6 @@ func setupForAuthorityTests(ctx context.Context, t *testing.T, idleTimeout time.
return lisDefault, lisNonDefault, client, close
}

type noopClusterWatcher struct{}

func (noopClusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData) {}
func (noopClusterWatcher) OnError(err error) {}
func (noopClusterWatcher) OnResourceDoesNotExist() {}

// TestAuthorityShare tests the authority sharing logic. The test verifies the
// following scenarios:
// - A watch for a resource name with an authority matching an existing watch
Expand Down
Loading

0 comments on commit 6e6914a

Please sign in to comment.