Skip to content

Commit

Permalink
xds: unify xDS client creation APIs meant for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed May 28, 2024
1 parent 6e59dd1 commit f27e13a
Show file tree
Hide file tree
Showing 30 changed files with 656 additions and 492 deletions.
13 changes: 3 additions & 10 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,16 +193,9 @@ var (

ChannelzTurnOffForTesting func()

// TriggerXDSResourceNameNotFoundForTesting triggers the resource-not-found
// error for a given resource type and name. This is usually triggered when
// the associated watch timer fires. For testing purposes, having this
// function makes events more predictable than relying on timer events.
TriggerXDSResourceNameNotFoundForTesting any // func(func(xdsresource.Type, string), string, string) error

// TriggerXDSResourceNameNotFoundClient invokes the testing xDS Client
// singleton to invoke resource not found for a resource type name and
// resource name.
TriggerXDSResourceNameNotFoundClient any // func(string, string) error
// TriggerXDSResourceNotFoundForTesting causes the provided xDS Client to
// invoke resource-not-found error for the given resource type and name.
TriggerXDSResourceNotFoundForTesting any // func(xdsclient.XDSClient, xdsresource.Type, string) error

// FromOutgoingContextRaw returns the un-merged, intermediary contents of
// metadata.rawMD.
Expand Down
16 changes: 11 additions & 5 deletions internal/testutils/xds/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,17 @@ func Contents(opts Options) ([]byte, error) {
// resources with empty authority.
auths := map[string]authority{"": {}}
for n, auURI := range opts.Authorities {
auths[n] = authority{XdsServers: []server{{
ServerURI: auURI,
ChannelCreds: []creds{{Type: "insecure"}},
ServerFeatures: cfg.XdsServers[0].ServerFeatures,
}}}
// If the authority server URI is empty, set it to an empty authority
// config, resulting in it using the top-level xds server config.
a := authority{}
if auURI != "" {
a = authority{XdsServers: []server{{
ServerURI: auURI,
ChannelCreds: []creds{{Type: "insecure"}},
ServerFeatures: cfg.XdsServers[0].ServerFeatures,
}}}
}
auths[n] = a
}
cfg.Authorities = auths

Expand Down
11 changes: 11 additions & 0 deletions internal/xds/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ func (a *Authority) UnmarshalJSON(data []byte) error {
// Config provides the xDS client with several key bits of information that it
// requires in its interaction with the management server. The Config is
// initialized from the bootstrap file.
//
// Users must use one of the NewXxx() functions to create a Config instance, and
// not initialize it manually.
type Config struct {
// XDSServer is the management server to connect to.
//
Expand Down Expand Up @@ -415,6 +418,14 @@ func NewConfigFromContents(data []byte) (*Config, error) {
}

func newConfigFromContents(data []byte) (*Config, error) {
// Normalize the input configuration.
buf := bytes.Buffer{}
err := json.Indent(&buf, data, "", "")
if err != nil {
return nil, fmt.Errorf("xds: error normalizing JSON bootstrap configuration: %v", err)
}
data = bytes.TrimSpace(buf.Bytes())

config := &Config{}

var jsonData map[string]json.RawMessage
Expand Down
199 changes: 0 additions & 199 deletions test/xds/xds_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
Expand Down Expand Up @@ -224,204 +223,6 @@ func (s) TestRDSNack(t *testing.T) {
waitForFailedRPCWithStatus(ctx, t, cc, status.New(codes.Unavailable, "error from xDS configuration for matched route configuration"))
}

// TestResourceNotFoundRDS tests the case where an LDS points to an RDS which
// returns resource not found. Before getting the resource not found, the xDS
// Server has not received all configuration needed, so it should Accept and
// Close any new connections. After it has received the resource not found
// error, the server should move to serving, successfully Accept Connections,
// and fail at the L7 level with resource not found specified.
func (s) TestResourceNotFoundRDS(t *testing.T) {
managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
defer cleanup()
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
// Setup the management server to respond with a listener resource that
// specifies a route name to watch, and no RDS resource corresponding to
// this route name.
host, port, err := hostPortFromListener(lis)
if err != nil {
t.Fatalf("failed to retrieve host and port of server: %v", err)
}

listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener},
SkipValidation: true,
}

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
serving := grpcsync.NewEvent()
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
if args.Mode == connectivity.ServingModeServing {
serving.Fire()
}
})

server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
if err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
defer server.Stop()
testgrpc.RegisterTestServiceServer(server, &testService{})
go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()

cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()

waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)

// Invoke resource not found - this should result in L7 RPC error with
// unavailable receive on serving as a result, should trigger it to go
// serving. Poll as watch might not be started yet to trigger resource not
// found.
loop:
for {
if err := internal.TriggerXDSResourceNameNotFoundClient.(func(string, string) error)("RouteConfigResource", "routeName"); err != nil {
t.Fatalf("Failed to trigger resource name not found for testing: %v", err)
}
select {
case <-serving.Done():
break loop
case <-ctx.Done():
t.Fatalf("timed out waiting for serving mode to go serving")
case <-time.After(time.Millisecond):
}
}
waitForFailedRPCWithStatus(ctx, t, cc, status.New(codes.Unavailable, "error from xDS configuration for matched route configuration"))
}

// TestServingModeChanges tests the Server's logic as it transitions from Not
// Ready to Ready, then to Not Ready. Before it goes Ready, connections should
// be accepted and closed. After it goes ready, RPC's should proceed as normal
// according to matched route configuration. After it transitions back into not
// ready (through an explicit LDS Resource Not Found), previously running RPC's
// should be gracefully closed and still work, and new RPC's should fail.
func (s) TestServingModeChanges(t *testing.T) {
managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
defer cleanup()
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
// Setup the management server to respond with a listener resource that
// specifies a route name to watch. Due to not having received the full
// configuration, this should cause the server to be in mode Serving.
host, port, err := hostPortFromListener(lis)
if err != nil {
t.Fatalf("failed to retrieve host and port of server: %v", err)
}

listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener},
SkipValidation: true,
}

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

serving := grpcsync.NewEvent()
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
if args.Mode == connectivity.ServingModeServing {
serving.Fire()
}
})

server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
if err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
defer server.Stop()
testgrpc.RegisterTestServiceServer(server, &testService{})
go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()

waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
routeConfig := e2e.RouteConfigNonForwardingAction("routeName")
resources = e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener},
Routes: []*v3routepb.RouteConfiguration{routeConfig},
}
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

select {
case <-ctx.Done():
t.Fatal("timeout waiting for the xDS Server to go Serving")
case <-serving.Done():
}

// A unary RPC should work once it transitions into serving. (need this same
// assertion from LDS resource not found triggering it).
waitForSuccessfulRPC(ctx, t, cc)

// Start a stream before switching the server to not serving. Due to the
// stream being created before the graceful stop of the underlying
// connection, it should be able to continue even after the server switches
// to not serving.
c := testgrpc.NewTestServiceClient(cc)
stream, err := c.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("cc.FullDuplexCall failed: %f", err)
}

// Invoke the lds resource not found - this should cause the server to
// switch to not serving. This should gracefully drain connections, and fail
// RPC's after. (how to assert accepted + closed) does this make it's way to
// application layer? (should work outside of resource not found...

// Invoke LDS Resource not found here (tests graceful close)
if err := internal.TriggerXDSResourceNameNotFoundClient.(func(string, string) error)("ListenerResource", listener.GetName()); err != nil {
t.Fatalf("Failed to trigger resource name not found for testing: %v", err)
}

// New RPCs on that connection should eventually start failing. Due to
// Graceful Stop any started streams continue to work.
if err = stream.Send(&testpb.StreamingOutputCallRequest{}); err != nil {
t.Fatalf("stream.Send() failed: %v, should continue to work due to graceful stop", err)
}
if err = stream.CloseSend(); err != nil {
t.Fatalf("stream.CloseSend() failed: %v, should continue to work due to graceful stop", err)
}
if _, err = stream.Recv(); err != io.EOF {
t.Fatalf("unexpected error: %v, expected an EOF error", err)
}

// New RPCs on that connection should eventually start failing.
waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
}

// TestMultipleUpdatesImmediatelySwitch tests the case where you get an LDS
// specifying RDS A, B, and C (with A being matched to). The Server should be in
// not serving until it receives all 3 RDS Configurations, and then transition
Expand Down
8 changes: 1 addition & 7 deletions xds/googledirectpath/googlec2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/envconfig"
Expand Down Expand Up @@ -232,14 +231,9 @@ func TestBuildXDS(t *testing.T) {
if tt.tdURI != "" {
wantConfig.XDSServer.ServerURI = tt.tdURI
}
cmpOpts := cmp.Options{
cmpopts.IgnoreFields(bootstrap.ServerConfig{}, "Creds"),
cmp.AllowUnexported(bootstrap.ServerConfig{}),
protocmp.Transform(),
}
select {
case gotConfig := <-configCh:
if diff := cmp.Diff(wantConfig, gotConfig, cmpOpts); diff != "" {
if diff := cmp.Diff(wantConfig, gotConfig, protocmp.Transform()); diff != "" {
t.Fatalf("Unexpected diff in bootstrap config (-want +got):\n%s", diff)
}
case <-time.After(time.Second):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func registerWrappedCDSPolicyWithNewSubConnOverride(t *testing.T, ch chan *xdscr
func setupForSecurityTests(t *testing.T, bootstrapContents []byte, clientCreds, serverCreds credentials.TransportCredentials) (*grpc.ClientConn, string) {
t.Helper()

xdsClient, xdsClose, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
xdsClient, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents})
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func setupWithManagementServer(t *testing.T) (*e2e.ManagementServer, string, *gr
})
t.Cleanup(cleanup)

xdsC, xdsClose, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
xdsC, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents})
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
Expand Down Expand Up @@ -344,7 +344,7 @@ func (s) TestConfigurationUpdate_EmptyCluster(t *testing.T) {
// Setup a management server and an xDS client to talk to it.
_, _, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
t.Cleanup(cleanup)
xdsClient, xdsClose, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
xdsClient, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents})
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,16 @@ import (
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils/pickfirst"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
"google.golang.org/protobuf/types/known/wrapperspb"

v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
Expand Down Expand Up @@ -1107,12 +1104,13 @@ func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) {

// Create an xDS client talking to the above management server, configured
// with a short watch expiry timeout.
xdsClient, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address),
NodeProto: &v3corepb.Node{Id: nodeID},
}, defaultTestWatchExpiryTimeout, time.Duration(0))
bc, err := e2e.DefaultBootstrapContents(nodeID, mgmtServer.Address)
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
t.Fatalf("Failed to create bootstrap configuration: %v", err)
}
xdsClient, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout})
if err != nil {
t.Fatalf("Failed to create an xDS client: %v", err)
}
defer close()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func setupAndDial(t *testing.T, bootstrapContents []byte) (*grpc.ClientConn, fun
t.Helper()

// Create an xDS client for use by the cluster_resolver LB policy.
xdsC, xdsClose, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
xdsC, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents})
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
Expand Down
Loading

0 comments on commit f27e13a

Please sign in to comment.