Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds/internal/server: switch to generic xDS API for LDS/RDS #6726

Merged
merged 2 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 52 additions & 44 deletions xds/internal/server/listener_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@
// XDSClient wraps the methods on the XDSClient which are required by
// the listenerWrapper.
type XDSClient interface {
WatchListener(string, func(xdsresource.ListenerUpdate, error)) func()
WatchRouteConfig(string, func(xdsresource.RouteConfigUpdate, error)) func()
WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func())
BootstrapConfig() *bootstrap.Config
}

Expand Down Expand Up @@ -110,7 +109,6 @@
mode: connectivity.ServingModeStarting,
closed: grpcsync.NewEvent(),
goodUpdate: grpcsync.NewEvent(),
ldsUpdateCh: make(chan ldsUpdateWithError, 1),
rdsUpdateCh: make(chan rdsHandlerUpdate, 1),
}
lw.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[xds-server-listener %p] ", lw))
Expand All @@ -120,17 +118,16 @@
lisAddr := lw.Listener.Addr().String()
lw.addr, lw.port, _ = net.SplitHostPort(lisAddr)

lw.rdsHandler = newRDSHandler(lw.xdsC, lw.rdsUpdateCh)
lw.cancelWatch = lw.xdsC.WatchListener(lw.name, lw.handleListenerUpdate)
lw.rdsHandler = newRDSHandler(lw.xdsC, lw.logger, lw.rdsUpdateCh)
lw.cancelWatch = xdsresource.WatchListener(lw.xdsC, lw.name, &ldsWatcher{
parent: lw,
logger: lw.logger,
name: lw.name,
})
go lw.run()
return lw, lw.goodUpdate.Done()
}

type ldsUpdateWithError struct {
update xdsresource.ListenerUpdate
err error
}

// listenerWrapper wraps the net.Listener associated with the listening address
// passed to Serve(). It also contains all other state associated with this
// particular invocation of Serve().
Expand Down Expand Up @@ -181,8 +178,6 @@
// rdsUpdates are the RDS resources received from the management
// server, keyed on the RouteName of the RDS resource.
rdsUpdates unsafe.Pointer // map[string]xdsclient.RouteConfigUpdate
// ldsUpdateCh is a channel for XDSClient LDS updates.
ldsUpdateCh chan ldsUpdateWithError
// rdsUpdateCh is a channel for XDSClient RDS updates.
rdsUpdateCh chan rdsHandlerUpdate
}
Expand Down Expand Up @@ -320,31 +315,12 @@
select {
case <-l.closed.Done():
return
case u := <-l.ldsUpdateCh:
l.handleLDSUpdate(u)
case u := <-l.rdsUpdateCh:
l.handleRDSUpdate(u)
}
}
}

// handleLDSUpdate is the callback which handles LDS Updates. It writes the
// received update to the update channel, which is picked up by the run
// goroutine.
func (l *listenerWrapper) handleListenerUpdate(update xdsresource.ListenerUpdate, err error) {
if l.closed.HasFired() {
l.logger.Warningf("Resource %q received update: %v with error: %v, after listener was closed", l.name, update, err)
return
}
// Remove any existing entry in ldsUpdateCh and replace with the new one, as the only update
// listener cares about is most recent update.
select {
case <-l.ldsUpdateCh:
default:
}
l.ldsUpdateCh <- ldsUpdateWithError{update: update, err: err}
}

// handleRDSUpdate handles a full rds update from rds handler. On a successful
// update, the server will switch to ServingModeServing as the full
// configuration (both LDS and RDS) has been received.
Expand All @@ -354,7 +330,6 @@
return
}
if update.err != nil {
l.logger.Warningf("Received error for rds names specified in resource %q: %+v", l.name, update.err)
if xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound {
l.switchMode(nil, connectivity.ServingModeNotServing, update.err)
}
Expand All @@ -368,17 +343,7 @@
l.goodUpdate.Fire()
}

func (l *listenerWrapper) handleLDSUpdate(update ldsUpdateWithError) {
if update.err != nil {
l.logger.Warningf("Received error for resource %q: %+v", l.name, update.err)
if xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound {
l.switchMode(nil, connectivity.ServingModeNotServing, update.err)
}
// For errors which are anything other than "resource-not-found", we
// continue to use the old configuration.
return
}

func (l *listenerWrapper) handleLDSUpdate(update xdsresource.ListenerUpdate) {
// Make sure that the socket address on the received Listener resource
// matches the address of the net.Listener passed to us by the user. This
// check is done here instead of at the XDSClient layer because of the
Expand All @@ -391,7 +356,7 @@
// What this means is that the XDSClient has ACKed a resource which can push
// the server into a "not serving" mode. This is not ideal, but this is
// what we have decided to do. See gRPC A36 for more details.
ilc := update.update.InboundListenerCfg
ilc := update.InboundListenerCfg
if ilc.Address != l.addr || ilc.Port != l.port {
l.switchMode(nil, connectivity.ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port))
return
Expand Down Expand Up @@ -440,3 +405,46 @@
l.modeCallback(l.Listener.Addr(), newMode, err)
}
}

// ldsWatcher implements the xdsresource.ListenerWatcher interface and is
// passed to the WatchListener API.
type ldsWatcher struct {
parent *listenerWrapper
logger *internalgrpclog.PrefixLogger
name string
}

func (lw *ldsWatcher) OnUpdate(update *xdsresource.ListenerResourceData) {
if lw.parent.closed.HasFired() {
lw.logger.Warningf("Resource %q received update: %#v after listener was closed", lw.name, update)
return
}

Check warning on line 421 in xds/internal/server/listener_wrapper.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/server/listener_wrapper.go#L419-L421

Added lines #L419 - L421 were not covered by tests
if lw.logger.V(2) {
lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, update.Resource)
}

Check warning on line 424 in xds/internal/server/listener_wrapper.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/server/listener_wrapper.go#L423-L424

Added lines #L423 - L424 were not covered by tests
lw.parent.handleLDSUpdate(update.Resource)
}

func (lw *ldsWatcher) OnError(err error) {
if lw.parent.closed.HasFired() {
lw.logger.Warningf("Resource %q received error: %v after listener was closed", lw.name, err)
return
}
if lw.logger.V(2) {
lw.logger.Infof("LDS watch for resource %q reported error: %#v", lw.name, err)
}

Check warning on line 435 in xds/internal/server/listener_wrapper.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/server/listener_wrapper.go#L434-L435

Added lines #L434 - L435 were not covered by tests
// For errors which are anything other than "resource-not-found", we
// continue to use the old configuration.
}

func (lw *ldsWatcher) OnResourceDoesNotExist() {
if lw.parent.closed.HasFired() {
lw.logger.Warningf("Resource %q received resource-not-found-error after listener was closed", lw.name)
return
}

Check warning on line 444 in xds/internal/server/listener_wrapper.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/server/listener_wrapper.go#L442-L444

Added lines #L442 - L444 were not covered by tests
if lw.logger.V(2) {
lw.logger.Infof("LDS watch for resource %q reported resource-does-not-exist error: %v", lw.name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the formatting directive %v correspond to?

}

Check warning on line 447 in xds/internal/server/listener_wrapper.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/server/listener_wrapper.go#L446-L447

Added lines #L446 - L447 were not covered by tests
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Listener not found in received response", lw.name)
zasweq marked this conversation as resolved.
Show resolved Hide resolved
lw.parent.switchMode(nil, connectivity.ServingModeNotServing, err)
}
53 changes: 41 additions & 12 deletions xds/internal/server/rds_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import (
"sync"

igrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

Expand All @@ -34,7 +35,8 @@
// rdsHandler handles any RDS queries that need to be started for a given server
// side listeners Filter Chains (i.e. not inline).
type rdsHandler struct {
xdsC XDSClient
xdsC XDSClient
logger *igrpclog.PrefixLogger

mu sync.Mutex
updates map[string]xdsresource.RouteConfigUpdate
Expand All @@ -49,9 +51,10 @@
// newRDSHandler creates a new rdsHandler to watch for RDS resources.
// listenerWrapper updates the list of route names to watch by calling
// updateRouteNamesToWatch() upon receipt of new Listener configuration.
func newRDSHandler(xdsC XDSClient, ch chan rdsHandlerUpdate) *rdsHandler {
func newRDSHandler(xdsC XDSClient, logger *igrpclog.PrefixLogger, ch chan rdsHandlerUpdate) *rdsHandler {
return &rdsHandler{
xdsC: xdsC,
logger: logger,
updateChannel: ch,
updates: make(map[string]xdsresource.RouteConfigUpdate),
cancels: make(map[string]func()),
Expand All @@ -69,11 +72,11 @@
// routeNamesToWatch.
for routeName := range routeNamesToWatch {
if _, ok := rh.cancels[routeName]; !ok {
func(routeName string) {
rh.cancels[routeName] = rh.xdsC.WatchRouteConfig(routeName, func(update xdsresource.RouteConfigUpdate, err error) {
rh.handleRouteUpdate(routeName, update, err)
})
}(routeName)
// The xDS client keeps a reference to the watcher until the cancel
// func is invoked. So, we don't need to keep a reference for fear
// of it being garbage collected.
w := &rdsWatcher{parent: rh, routeName: routeName}
rh.cancels[routeName] = xdsresource.WatchRouteConfig(rh.xdsC, routeName, w)
}
}

Expand All @@ -97,11 +100,7 @@
// handleRouteUpdate persists the route config for a given route name, and also
// sends an update to the Listener Wrapper on an error received or if the rds
// handler has a full collection of updates.
func (rh *rdsHandler) handleRouteUpdate(routeName string, update xdsresource.RouteConfigUpdate, err error) {
if err != nil {
drainAndPush(rh.updateChannel, rdsHandlerUpdate{err: err})
return
}
func (rh *rdsHandler) handleRouteUpdate(routeName string, update xdsresource.RouteConfigUpdate) {
rh.mu.Lock()
defer rh.mu.Unlock()
rh.updates[routeName] = update
Expand Down Expand Up @@ -131,3 +130,33 @@
cancel()
}
}

// rdsWatcher implements the xdsresource.RouteConfigWatcher interface and is
// passed to the WatchRouteConfig API.
type rdsWatcher struct {
parent *rdsHandler
logger *igrpclog.PrefixLogger
routeName string
}

func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {
if rw.logger.V(2) {
rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, update.Resource)
}

Check warning on line 145 in xds/internal/server/rds_handler.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/server/rds_handler.go#L144-L145

Added lines #L144 - L145 were not covered by tests
rw.parent.handleRouteUpdate(rw.routeName, update.Resource)
}

func (rw *rdsWatcher) OnError(err error) {
if rw.logger.V(2) {
rw.logger.Infof("RDS watch for resource %q reported error: %v", rw.routeName, err)
}

Check warning on line 152 in xds/internal/server/rds_handler.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/server/rds_handler.go#L151-L152

Added lines #L151 - L152 were not covered by tests
drainAndPush(rw.parent.updateChannel, rdsHandlerUpdate{err: err})
}

func (rw *rdsWatcher) OnResourceDoesNotExist() {
if rw.logger.V(2) {
rw.logger.Infof("RDS watch for resource %q reported resource-does-not-exist error: %v", rw.routeName)
}
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type RouteConfiguration not found in received response", rw.routeName)
drainAndPush(rw.parent.updateChannel, rdsHandlerUpdate{err: err})

Check warning on line 161 in xds/internal/server/rds_handler.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/server/rds_handler.go#L156-L161

Added lines #L156 - L161 were not covered by tests
}
12 changes: 6 additions & 6 deletions xds/internal/server/rds_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (s) TestRDSHandler_SuccessCaseOneRDSWatch(t *testing.T) {

// Create an rds handler and give it a single route to watch.
updateCh := make(chan rdsHandlerUpdate, 1)
rh := newRDSHandler(xdsC, updateCh)
rh := newRDSHandler(xdsC, nil, updateCh)
rh.updateRouteNamesToWatch(map[string]bool{route1: true})

// Verify that the given route is requested.
Expand All @@ -211,7 +211,7 @@ func (s) TestRDSHandler_SuccessCaseTwoUpdates(t *testing.T) {

// Create an rds handler and give it a single route to watch.
updateCh := make(chan rdsHandlerUpdate, 1)
rh := newRDSHandler(xdsC, updateCh)
rh := newRDSHandler(xdsC, nil, updateCh)
rh.updateRouteNamesToWatch(map[string]bool{route1: true})

// Verify that the given route is requested.
Expand Down Expand Up @@ -273,7 +273,7 @@ func (s) TestRDSHandler_SuccessCaseDeletedRoute(t *testing.T) {

// Create an rds handler and give it two routes to watch.
updateCh := make(chan rdsHandlerUpdate, 1)
rh := newRDSHandler(xdsC, updateCh)
rh := newRDSHandler(xdsC, nil, updateCh)
rh.updateRouteNamesToWatch(map[string]bool{route1: true, route2: true})

// Verify that the given routes are requested.
Expand Down Expand Up @@ -329,7 +329,7 @@ func (s) TestRDSHandler_SuccessCaseTwoUpdatesAddAndDeleteRoute(t *testing.T) {

// Create an rds handler and give it two routes to watch.
updateCh := make(chan rdsHandlerUpdate, 1)
rh := newRDSHandler(xdsC, updateCh)
rh := newRDSHandler(xdsC, nil, updateCh)
rh.updateRouteNamesToWatch(map[string]bool{route1: true, route2: true})

// Verify that the given routes are requested.
Expand Down Expand Up @@ -400,7 +400,7 @@ func (s) TestRDSHandler_SuccessCaseSecondUpdateMakesRouteFull(t *testing.T) {

// Create an rds handler and give it three routes to watch.
updateCh := make(chan rdsHandlerUpdate, 1)
rh := newRDSHandler(xdsC, updateCh)
rh := newRDSHandler(xdsC, nil, updateCh)
rh.updateRouteNamesToWatch(map[string]bool{route1: true, route2: true, route3: true})

// Verify that the given routes are requested.
Expand Down Expand Up @@ -455,7 +455,7 @@ func (s) TestErrorReceived(t *testing.T) {

// Create an rds handler and give it a single route to watch.
updateCh := make(chan rdsHandlerUpdate, 1)
rh := newRDSHandler(xdsC, updateCh)
rh := newRDSHandler(xdsC, nil, updateCh)
rh.updateRouteNamesToWatch(map[string]bool{route1: true})

// Verify that the given route is requested.
Expand Down
Loading