Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds/server: Fix nil panic in xDS Server when received LDS with no inline RDS #6747

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions internal/testutils/xds/e2e/clientresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,26 @@ func DefaultRouteConfig(routeName, ldsTarget, clusterName string) *v3routepb.Rou
}
}

// RouteConfigNonForwardingTarget returns an xDS RouteConfig resource which
// specifies to route to a route specfying non forwarding action. This is
// intended to be used on the server side for RDS requests, and corresponds to
// the inline route configuration in DefaultServerListener.
func RouteConfigNonForwardingTarget(routeName string) *v3routepb.RouteConfiguration {
return &v3routepb.RouteConfiguration{
Name: routeName,
VirtualHosts: []*v3routepb.VirtualHost{{
// This "*" string matches on any incoming authority. This is to ensure any
// incoming RPC matches to Route_NonForwardingAction and will proceed as
// normal.
Domains: []string{"*"},
Routes: []*v3routepb.Route{{
Match: &v3routepb.RouteMatch{
PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"},
},
Action: &v3routepb.Route_NonForwardingAction{},
}}}}}
}

// RouteConfigClusterSpecifierType determines the cluster specifier type for the
// route actions configured in the returned RouteConfiguration resource.
type RouteConfigClusterSpecifierType int
Expand Down
106 changes: 106 additions & 0 deletions test/xds/xds_server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package xds_test

import (
"context"
"net"
"testing"

"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc/xds"

v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
)

// TestServeLDSRDS tests the case where a server receives LDS resource which
// specifies RDS. LDS and RDS resources are configured on the management server,
// which the server should pick up. The server should successfully accept
// connections and RPCs should work on these accepted connections.
func (s) TestServeLDSRDS(t *testing.T) {
managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
defer cleanup()
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
// Setup the management server to respond with a listener resource that
// specifies a route name to watch, and a RDS resource corresponding to this
// route name.
host, port, err := hostPortFromListener(lis)
if err != nil {
t.Fatalf("failed to retrieve host and port of server: %v", err)
}
listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")

routeConfig := e2e.RouteConfigNonForwardingTarget("routeName")

resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener},
Routes: []*v3routepb.RouteConfiguration{routeConfig},
}

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
serving := grpcsync.NewEvent()

// In order to successfully Dial and make an RPC, the server should be in
// state Serving (successfully proceed the good LDS and RDS update
// configured on the management server). If you do not wait for Serving
// here, a Dial could potentially trigger an Accept on the server which
// would immediately close the accepted connection.
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary? If so, why? Once the listener is created, I think it should be safe to call Dial?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment as mentioned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment.

t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
if args.Mode == connectivity.ServingModeServing {
serving.Fire()
}
})

server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
if err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
defer server.Stop()
testgrpc.RegisterTestServiceServer(server, &testService{})
go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()
<-serving.Done()

cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()

waitForSuccessfulRPC(ctx, t, cc)
}
38 changes: 24 additions & 14 deletions xds/internal/server/listener_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,15 +331,19 @@
}
if update.err != nil {
if xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound {
l.switchMode(nil, connectivity.ServingModeNotServing, update.err)
l.mu.Lock()

Check warning on line 334 in xds/internal/server/listener_wrapper.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/server/listener_wrapper.go#L334

Added line #L334 was not covered by tests
l.filterChains = nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is wrong; we should not throw away the LDS data when we get a bad RDS. A good RDS needs to be able to work again.

l.switchModeLocked(connectivity.ServingModeNotServing, update.err)
l.mu.Unlock()

Check warning on line 337 in xds/internal/server/listener_wrapper.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/server/listener_wrapper.go#L337

Added line #L337 was not covered by tests
}
// For errors which are anything other than "resource-not-found", we
// continue to use the old configuration.
return
}
atomic.StorePointer(&l.rdsUpdates, unsafe.Pointer(&update.updates))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to verify somewhere that we are still watching this RDS update. Probably at the top?


l.switchMode(l.filterChains, connectivity.ServingModeServing, nil)
l.mu.Lock()
l.switchModeLocked(connectivity.ServingModeServing, nil)
l.mu.Unlock()
l.goodUpdate.Fire()
}

Expand All @@ -358,7 +362,10 @@
// what we have decided to do. See gRPC A36 for more details.
ilc := update.InboundListenerCfg
if ilc.Address != l.addr || ilc.Port != l.port {
l.switchMode(nil, connectivity.ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port))
l.mu.Lock()
l.filterChains = nil

Check warning on line 366 in xds/internal/server/listener_wrapper.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/server/listener_wrapper.go#L365-L366

Added lines #L365 - L366 were not covered by tests
l.switchModeLocked(connectivity.ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also needs to stop the RDS watch if there was one, such that subsequent RDS updates are ignored.

l.mu.Unlock()
return
}

Expand All @@ -378,20 +385,20 @@
// If there are no dynamic RDS Configurations still needed to be received
// from the management server, this listener has all the configuration
// needed, and is ready to serve.
l.mu.Lock()
defer l.mu.Unlock()
l.filterChains = ilc.FilterChains // write uncondtionally
if len(ilc.FilterChains.RouteConfigNames) == 0 {
l.switchMode(ilc.FilterChains, connectivity.ServingModeServing, nil)
l.switchModeLocked(connectivity.ServingModeServing, nil)
l.goodUpdate.Fire()
}
}

// switchMode updates the value of serving mode and filter chains stored in the
// listenerWrapper. And if the serving mode has changed, it invokes the
// registered mode change callback.
func (l *listenerWrapper) switchMode(fcs *xdsresource.FilterChainManager, newMode connectivity.ServingMode, err error) {
l.mu.Lock()
defer l.mu.Unlock()

l.filterChains = fcs
// switchMode updates the value of serving mode of the listenerWrapper. And if
// the serving mode has changed, it invokes the registered mode change callback.
//
// Caller must hold l.mu.
func (l *listenerWrapper) switchModeLocked(newMode connectivity.ServingMode, err error) {
if l.mode == newMode && l.mode == connectivity.ServingModeServing {
// Redundant updates are suppressed only when we are SERVING and the new
// mode is also SERVING. In the other case (where we are NOT_SERVING and the
Expand Down Expand Up @@ -446,5 +453,8 @@
lw.logger.Infof("LDS watch for resource %q reported resource-does-not-exist error: %v", lw.name)
}
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Listener not found in received response", lw.name)
lw.parent.switchMode(nil, connectivity.ServingModeNotServing, err)
lw.parent.mu.Lock()
defer lw.parent.mu.Unlock()
lw.parent.filterChains = nil
lw.parent.switchModeLocked(connectivity.ServingModeNotServing, err)
}
Loading