From 01c96bc46052eebbba5f32c842551b863007f9ee Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Tue, 24 Oct 2023 19:41:09 -0400 Subject: [PATCH 1/5] Fix nil panic --- xds/internal/server/listener_wrapper.go | 39 +++++++++++++------- xds/internal/server/listener_wrapper_test.go | 2 +- 2 files changed, 26 insertions(+), 15 deletions(-) diff --git a/xds/internal/server/listener_wrapper.go b/xds/internal/server/listener_wrapper.go index 94598df80c41..ed4501da3725 100644 --- a/xds/internal/server/listener_wrapper.go +++ b/xds/internal/server/listener_wrapper.go @@ -331,15 +331,20 @@ func (l *listenerWrapper) handleRDSUpdate(update rdsHandlerUpdate) { } if update.err != nil { if xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound { - l.switchMode(nil, connectivity.ServingModeNotServing, update.err) + l.mu.Lock() + l.filterChains = nil + l.switchModeLocked(connectivity.ServingModeNotServing, update.err) + l.mu.Unlock() } // For errors which are anything other than "resource-not-found", we // continue to use the old configuration. return } atomic.StorePointer(&l.rdsUpdates, unsafe.Pointer(&update.updates)) - - l.switchMode(l.filterChains, connectivity.ServingModeServing, nil) + l.mu.Lock() + l.filterChains = nil + l.switchModeLocked(connectivity.ServingModeServing, nil) + l.mu.Unlock() l.goodUpdate.Fire() } @@ -358,7 +363,10 @@ func (l *listenerWrapper) handleLDSUpdate(update xdsresource.ListenerUpdate) { // what we have decided to do. See gRPC A36 for more details. ilc := update.InboundListenerCfg if ilc.Address != l.addr || ilc.Port != l.port { - l.switchMode(nil, connectivity.ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port)) + l.mu.Lock() + l.filterChains = ilc.FilterChains + l.switchModeLocked(connectivity.ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port)) + l.mu.Unlock() return } @@ -378,20 +386,20 @@ func (l *listenerWrapper) handleLDSUpdate(update xdsresource.ListenerUpdate) { // If there are no dynamic RDS Configurations still needed to be received // from the management server, this listener has all the configuration // needed, and is ready to serve. + l.mu.Lock() + defer l.mu.Unlock() + l.filterChains = ilc.FilterChains // write uncondtionally if len(ilc.FilterChains.RouteConfigNames) == 0 { - l.switchMode(ilc.FilterChains, connectivity.ServingModeServing, nil) + l.switchModeLocked(connectivity.ServingModeServing, nil) l.goodUpdate.Fire() } } -// switchMode updates the value of serving mode and filter chains stored in the -// listenerWrapper. And if the serving mode has changed, it invokes the -// registered mode change callback. -func (l *listenerWrapper) switchMode(fcs *xdsresource.FilterChainManager, newMode connectivity.ServingMode, err error) { - l.mu.Lock() - defer l.mu.Unlock() - - l.filterChains = fcs +// switchMode updates the value of serving mode of the listenerWrapper. And if +// the serving mode has changed, it invokes the registered mode change callback. +// +// Caller must hold l.mu. +func (l *listenerWrapper) switchModeLocked(newMode connectivity.ServingMode, err error) { // try this with tests, and then see if I can write a reproducible test case if l.mode == newMode && l.mode == connectivity.ServingModeServing { // Redundant updates are suppressed only when we are SERVING and the new // mode is also SERVING. In the other case (where we are NOT_SERVING and the @@ -446,5 +454,8 @@ func (lw *ldsWatcher) OnResourceDoesNotExist() { lw.logger.Infof("LDS watch for resource %q reported resource-does-not-exist error: %v", lw.name) } err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Listener not found in received response", lw.name) - lw.parent.switchMode(nil, connectivity.ServingModeNotServing, err) + lw.parent.mu.Lock() + defer lw.parent.mu.Unlock() + lw.parent.filterChains = nil + lw.parent.switchModeLocked(connectivity.ServingModeNotServing, err) } diff --git a/xds/internal/server/listener_wrapper_test.go b/xds/internal/server/listener_wrapper_test.go index f85efdda2c4d..a20e734f5941 100644 --- a/xds/internal/server/listener_wrapper_test.go +++ b/xds/internal/server/listener_wrapper_test.go @@ -194,7 +194,7 @@ func (s) TestListenerWrapper_InlineRouteConfig(t *testing.T) { // resource. The test verifies that the listenerWrapper does not become ready // when waiting for the Route Configuration resource and becomes ready once it // receives the Route Configuration resource. -func (s) TestListenerWrapper_RouteNames(t *testing.T) { +func (s) TestListenerWrapper_RouteNames(t *testing.T) { // This is what I need, LDS + RDS and then accept a conn that looks up mgmtServer, nodeID, ldsResourceNamesCh, rdsResourceNamesCh, xdsC := xdsSetupFoTests(t) readyCh, host, port, lisResourceName := createListenerWrapper(t, xdsC) From fb5e65106b401dc21794e57f7ffda347af2719df Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Mon, 30 Oct 2023 19:32:37 -0400 Subject: [PATCH 2/5] Add test for fix --- internal/testutils/xds/e2e/clientresources.go | 20 ++++ test/xds/xds_server_test.go | 100 ++++++++++++++++++ xds/internal/server/listener_wrapper.go | 3 +- 3 files changed, 121 insertions(+), 2 deletions(-) create mode 100644 test/xds/xds_server_test.go diff --git a/internal/testutils/xds/e2e/clientresources.go b/internal/testutils/xds/e2e/clientresources.go index b9f6e409f454..0cddbac2d648 100644 --- a/internal/testutils/xds/e2e/clientresources.go +++ b/internal/testutils/xds/e2e/clientresources.go @@ -330,6 +330,26 @@ func DefaultRouteConfig(routeName, ldsTarget, clusterName string) *v3routepb.Rou } } +// RouteConfigNonForwardingTarget returns an xDS RouteConfig resource which +// specifies to route to a route specfying non forwarding action. This is +// intended to be used on the server side for RDS requests, and corresponds to +// the inline route configuration in DefaultServerListener. +func RouteConfigNonForwardingTarget(routeName string) *v3routepb.RouteConfiguration { + return &v3routepb.RouteConfiguration{ + Name: routeName, + VirtualHosts: []*v3routepb.VirtualHost{{ + // This "*" string matches on any incoming authority. This is to ensure any + // incoming RPC matches to Route_NonForwardingAction and will proceed as + // normal. + Domains: []string{"*"}, + Routes: []*v3routepb.Route{{ + Match: &v3routepb.RouteMatch{ + PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}, + }, + Action: &v3routepb.Route_NonForwardingAction{}, + }}}}} +} + // RouteConfigClusterSpecifierType determines the cluster specifier type for the // route actions configured in the returned RouteConfiguration resource. type RouteConfigClusterSpecifierType int diff --git a/test/xds/xds_server_test.go b/test/xds/xds_server_test.go new file mode 100644 index 000000000000..cca13fcc7f43 --- /dev/null +++ b/test/xds/xds_server_test.go @@ -0,0 +1,100 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xds_test + +import ( + "context" + "net" + "testing" + + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/grpcsync" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" + testgrpc "google.golang.org/grpc/interop/grpc_testing" + "google.golang.org/grpc/xds" + + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" +) + +// TestServeLDSRDS tests the case where a server receives LDS resource which +// specifies RDS. LDS and RDS resources are configured on the management server, +// which the server should pick up. The server should successfully accept +// connections and RPCs should work on these accepted connections. +func (s) TestServeLDSRDS(t *testing.T) { + managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{}) + defer cleanup() + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("testutils.LocalTCPListener() failed: %v", err) + } + // Setup the management server to respond with a listener resource that + // specifies a route name to watch, and a RDS resource corresponding to this + // route name. + host, port, err := hostPortFromListener(lis) + if err != nil { + t.Fatalf("failed to retrieve host and port of server: %v", err) + } + listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName") + + routeConfig := e2e.RouteConfigNonForwardingTarget("routeName") + + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + Routes: []*v3routepb.RouteConfiguration{routeConfig}, + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + serving := grpcsync.NewEvent() + modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) { + t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err) + if args.Mode == connectivity.ServingModeServing { + serving.Fire() + } + }) + + server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents)) + if err != nil { + t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err) + } + defer server.Stop() + testgrpc.RegisterTestServiceServer(server, &testService{}) + go func() { + if err := server.Serve(lis); err != nil { + t.Errorf("Serve() failed: %v", err) + } + }() + <-serving.Done() + + cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + waitForSuccessfulRPC(ctx, t, cc) +} diff --git a/xds/internal/server/listener_wrapper.go b/xds/internal/server/listener_wrapper.go index ed4501da3725..dd3415f9a6e5 100644 --- a/xds/internal/server/listener_wrapper.go +++ b/xds/internal/server/listener_wrapper.go @@ -342,7 +342,6 @@ func (l *listenerWrapper) handleRDSUpdate(update rdsHandlerUpdate) { } atomic.StorePointer(&l.rdsUpdates, unsafe.Pointer(&update.updates)) l.mu.Lock() - l.filterChains = nil l.switchModeLocked(connectivity.ServingModeServing, nil) l.mu.Unlock() l.goodUpdate.Fire() @@ -364,7 +363,7 @@ func (l *listenerWrapper) handleLDSUpdate(update xdsresource.ListenerUpdate) { ilc := update.InboundListenerCfg if ilc.Address != l.addr || ilc.Port != l.port { l.mu.Lock() - l.filterChains = ilc.FilterChains + l.filterChains = nil l.switchModeLocked(connectivity.ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port)) l.mu.Unlock() return From 7da7a4a524caf19e5de6a78480f490dd3bee0130 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Mon, 30 Oct 2023 20:40:02 -0400 Subject: [PATCH 3/5] Remove dangling comment --- xds/internal/server/listener_wrapper_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xds/internal/server/listener_wrapper_test.go b/xds/internal/server/listener_wrapper_test.go index a20e734f5941..f85efdda2c4d 100644 --- a/xds/internal/server/listener_wrapper_test.go +++ b/xds/internal/server/listener_wrapper_test.go @@ -194,7 +194,7 @@ func (s) TestListenerWrapper_InlineRouteConfig(t *testing.T) { // resource. The test verifies that the listenerWrapper does not become ready // when waiting for the Route Configuration resource and becomes ready once it // receives the Route Configuration resource. -func (s) TestListenerWrapper_RouteNames(t *testing.T) { // This is what I need, LDS + RDS and then accept a conn that looks up +func (s) TestListenerWrapper_RouteNames(t *testing.T) { mgmtServer, nodeID, ldsResourceNamesCh, rdsResourceNamesCh, xdsC := xdsSetupFoTests(t) readyCh, host, port, lisResourceName := createListenerWrapper(t, xdsC) From 6efe8dfcc983c56b4e24a9068043ed43f7462950 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Mon, 30 Oct 2023 20:52:34 -0400 Subject: [PATCH 4/5] Remove one more trailing comment --- xds/internal/server/listener_wrapper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xds/internal/server/listener_wrapper.go b/xds/internal/server/listener_wrapper.go index dd3415f9a6e5..bea501d4cc03 100644 --- a/xds/internal/server/listener_wrapper.go +++ b/xds/internal/server/listener_wrapper.go @@ -398,7 +398,7 @@ func (l *listenerWrapper) handleLDSUpdate(update xdsresource.ListenerUpdate) { // the serving mode has changed, it invokes the registered mode change callback. // // Caller must hold l.mu. -func (l *listenerWrapper) switchModeLocked(newMode connectivity.ServingMode, err error) { // try this with tests, and then see if I can write a reproducible test case +func (l *listenerWrapper) switchModeLocked(newMode connectivity.ServingMode, err error) { if l.mode == newMode && l.mode == connectivity.ServingModeServing { // Redundant updates are suppressed only when we are SERVING and the new // mode is also SERVING. In the other case (where we are NOT_SERVING and the From 16d7f911a560f3ef1b156307a195b385bf412311 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Tue, 31 Oct 2023 17:41:10 -0400 Subject: [PATCH 5/5] Responded to Doug's comments --- test/xds/xds_server_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/xds/xds_server_test.go b/test/xds/xds_server_test.go index cca13fcc7f43..17fb1eee3cef 100644 --- a/test/xds/xds_server_test.go +++ b/test/xds/xds_server_test.go @@ -70,6 +70,12 @@ func (s) TestServeLDSRDS(t *testing.T) { t.Fatal(err) } serving := grpcsync.NewEvent() + + // In order to successfully Dial and make an RPC, the server should be in + // state Serving (successfully proceed the good LDS and RDS update + // configured on the management server). If you do not wait for Serving + // here, a Dial could potentially trigger an Accept on the server which + // would immediately close the accepted connection. modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) { t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err) if args.Mode == connectivity.ServingModeServing {