From 176a440d1e44ca0b9135d73e1a4bd5d32ff9d7db Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Tue, 3 Oct 2023 23:31:37 +0000 Subject: [PATCH] xds/resolver: move service watching tests to resolver_test package --- .../resolver/cluster_specifier_plugin_test.go | 230 ++-------- xds/internal/resolver/helpers_test.go | 254 +++++++++++ xds/internal/resolver/watch_service_test.go | 415 ++++++------------ xds/internal/resolver/xds_resolver_test.go | 4 + 4 files changed, 428 insertions(+), 475 deletions(-) create mode 100644 xds/internal/resolver/helpers_test.go diff --git a/xds/internal/resolver/cluster_specifier_plugin_test.go b/xds/internal/resolver/cluster_specifier_plugin_test.go index eb9f90350c93..f704e6818c1e 100644 --- a/xds/internal/resolver/cluster_specifier_plugin_test.go +++ b/xds/internal/resolver/cluster_specifier_plugin_test.go @@ -23,24 +23,18 @@ import ( "encoding/json" "fmt" "testing" - "time" "github.com/golang/protobuf/proto" - "github.com/google/go-cmp/cmp" "github.com/google/uuid" "google.golang.org/grpc/balancer" - "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/envconfig" - "google.golang.org/grpc/internal/grpctest" iresolver "google.golang.org/grpc/internal/resolver" "google.golang.org/grpc/internal/testutils" - xdsbootstrap "google.golang.org/grpc/internal/testutils/xds/bootstrap" "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/xds/internal/balancer/clustermanager" "google.golang.org/grpc/xds/internal/clusterspecifier" - xdsresolver "google.golang.org/grpc/xds/internal/resolver" protov2 "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/wrapperspb" @@ -49,80 +43,6 @@ import ( v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" ) -const ( - defaultTestTimeout = 10 * time.Second - defaultTestShortTimeout = 100 * time.Microsecond -) - -type s struct { - grpctest.Tester -} - -func Test(t *testing.T) { - grpctest.RunSubTests(t, s{}) -} - -// verifyUpdateFromResolver waits for the resolver to push an update to the fake -// resolver.ClientConn and verifies that update matches the provided service -// config. -// -// Tests that want to skip verifying the contents of the service config can pass -// an empty string. -// -// Returns the config selector from the state update pushed by the resolver. -// Tests that don't need the config selector can ignore the return value. -func verifyUpdateFromResolver(ctx context.Context, t *testing.T, stateCh chan resolver.State, wantSC string) iresolver.ConfigSelector { - t.Helper() - - var state resolver.State - select { - case <-ctx.Done(): - t.Fatalf("Timeout waiting for an update from the resolver: %v", ctx.Err()) - case state = <-stateCh: - if err := state.ServiceConfig.Err; err != nil { - t.Fatalf("Received error in service config: %v", state.ServiceConfig.Err) - } - if wantSC == "" { - break - } - wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantSC) - if !internal.EqualServiceConfigForTesting(state.ServiceConfig.Config, wantSCParsed.Config) { - t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, state.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) - } - } - cs := iresolver.GetConfigSelector(state) - if cs == nil { - t.Fatal("Received nil config selector in update from resolver") - } - return cs -} - -// buildResolverForTarget builds an xDS resolver for the given target. It -// returns the following: -// - a channel to read updates from the resolver -// - the newly created xDS resolver -func buildResolverForTarget(t *testing.T, target resolver.Target) (chan resolver.State, resolver.Resolver) { - t.Helper() - - builder := resolver.Get(xdsresolver.Scheme) - if builder == nil { - t.Fatalf("Scheme %q is not registered", xdsresolver.Scheme) - } - - stateCh := make(chan resolver.State, 1) - updateStateF := func(s resolver.State) error { - stateCh <- s - return nil - } - tcc := &testutils.ResolverClientConn{Logger: t, UpdateStateF: updateStateF} - r, err := builder.Build(target, tcc, resolver.BuildOptions{}) - if err != nil { - t.Fatalf("Failed to build xDS resolver for target %q: %v", target, err) - } - t.Cleanup(r.Close) - return stateCh, r -} - func init() { balancer.Register(cspBalancerBuilder{}) clusterspecifier.Register(testClusterSpecifierPlugin{}) @@ -201,46 +121,24 @@ func (s) TestResolverClusterSpecifierPlugin(t *testing.T) { envconfig.XDSRLS = oldRLS }() - mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) - if err != nil { - t.Fatalf("Failed to start xDS management server: %v", err) - } - defer mgmtServer.Stop() - - // Create a bootstrap configuration specifying the above management server. - nodeID := uuid.New().String() - cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ - NodeID: nodeID, - ServerURI: mgmtServer.Address, - }) - if err != nil { - t.Fatal(err) - } - defer cleanup() - - // Configure listener and route configuration resources on the management - // server. - const serviceName = "my-service-client-side-xds" - rdsName := "route-" + serviceName - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)}, - Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{ - RouteConfigName: rdsName, - ListenerName: serviceName, - ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin, - ClusterSpecifierPluginName: "cspA", - ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anything"}), - })}, - SkipValidation: true, - } + // Spin up an xDS management server for the test. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - if err := mgmtServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } + nodeID := uuid.New().String() + mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID) + + // Configure resources on the management server. + listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)} + routes := []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{ + RouteConfigName: defaultTestRouteConfigName, + ListenerName: defaultTestServiceName, + ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin, + ClusterSpecifierPluginName: "cspA", + ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anything"}), + })} + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes) - stateCh, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) + stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}) // Wait for an update from the resolver, and verify the service config. wantSC := ` @@ -276,21 +174,14 @@ func (s) TestResolverClusterSpecifierPlugin(t *testing.T) { } // Change the cluster specifier plugin configuration. - resources = e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)}, - Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{ - RouteConfigName: rdsName, - ListenerName: serviceName, - ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin, - ClusterSpecifierPluginName: "cspA", - ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "changed"}), - })}, - SkipValidation: true, - } - if err := mgmtServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } + routes = []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{ + RouteConfigName: defaultTestRouteConfigName, + ListenerName: defaultTestServiceName, + ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin, + ClusterSpecifierPluginName: "cspA", + ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "changed"}), + })} + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes) // Wait for an update from the resolver, and verify the service config. wantSC = ` @@ -328,46 +219,24 @@ func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) { envconfig.XDSRLS = oldRLS }() - mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) - if err != nil { - t.Fatalf("Failed to start xDS management server: %v", err) - } - defer mgmtServer.Stop() - - // Create a bootstrap configuration specifying the above management server. - nodeID := uuid.New().String() - cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ - NodeID: nodeID, - ServerURI: mgmtServer.Address, - }) - if err != nil { - t.Fatal(err) - } - defer cleanup() - - // Configure listener and route configuration resources on the management - // server. - const serviceName = "my-service-client-side-xds" - rdsName := "route-" + serviceName - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)}, - Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{ - RouteConfigName: rdsName, - ListenerName: serviceName, - ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin, - ClusterSpecifierPluginName: "cspA", - ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anythingA"}), - })}, - SkipValidation: true, - } + // Spin up an xDS management server for the test. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - if err := mgmtServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } + nodeID := uuid.New().String() + mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID) + + // Configure resources on the management server. + listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)} + routes := []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{ + RouteConfigName: defaultTestRouteConfigName, + ListenerName: defaultTestServiceName, + ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin, + ClusterSpecifierPluginName: "cspA", + ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anythingA"}), + })} + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes) - stateCh, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) + stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}) // Wait for an update from the resolver, and verify the service config. wantSC := ` @@ -407,21 +276,14 @@ func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) { // clusters, they still appear in the service config. // Change the cluster specifier plugin configuration. - resources = e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)}, - Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{ - RouteConfigName: rdsName, - ListenerName: serviceName, - ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin, - ClusterSpecifierPluginName: "cspB", - ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anythingB"}), - })}, - SkipValidation: true, - } - if err := mgmtServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } + routes = []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{ + RouteConfigName: defaultTestRouteConfigName, + ListenerName: defaultTestServiceName, + ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin, + ClusterSpecifierPluginName: "cspB", + ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anythingB"}), + })} + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes) // Wait for an update from the resolver, and verify the service config. wantSC = ` diff --git a/xds/internal/resolver/helpers_test.go b/xds/internal/resolver/helpers_test.go new file mode 100644 index 000000000000..9058d5f223b1 --- /dev/null +++ b/xds/internal/resolver/helpers_test.go @@ -0,0 +1,254 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package resolver_test + +import ( + "context" + "fmt" + "testing" + "time" + + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/grpctest" + iresolver "google.golang.org/grpc/internal/resolver" + "google.golang.org/grpc/internal/testutils" + xdsbootstrap "google.golang.org/grpc/internal/testutils/xds/bootstrap" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/serviceconfig" + xdsresolver "google.golang.org/grpc/xds/internal/resolver" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +const ( + defaultTestTimeout = 10 * time.Second + defaultTestShortTimeout = 100 * time.Microsecond + + defaultTestServiceName = "service-name" + defaultTestRouteConfigName = "route-config-name" + defaultTestClusterName = "cluster-name" +) + +// This is the expected service config when using default listener and route +// configuration resources from the e2e package using the above resource names. +var wantDefaultServiceConfig = fmt.Sprintf(`{ + "loadBalancingConfig": [{ + "xds_cluster_manager_experimental": { + "children": { + "cluster:%s": { + "childPolicy": [{ + "cds_experimental": { + "cluster": "%s" + } + }] + } + } + } + }] + }`, defaultTestClusterName, defaultTestClusterName) + +// buildResolverForTarget builds an xDS resolver for the given target. It +// returns the following: +// - a channel to read updates from the resolver +// - a channel to read errors from the resolver +// - the newly created xDS resolver +func buildResolverForTarget(t *testing.T, target resolver.Target) (chan resolver.State, chan error, resolver.Resolver) { + t.Helper() + + builder := resolver.Get(xdsresolver.Scheme) + if builder == nil { + t.Fatalf("Scheme %q is not registered", xdsresolver.Scheme) + } + + stateCh := make(chan resolver.State, 1) + updateStateF := func(s resolver.State) error { + stateCh <- s + return nil + } + errCh := make(chan error, 1) + reportErrorF := func(err error) { + select { + case errCh <- err: + default: + } + } + tcc := &testutils.ResolverClientConn{Logger: t, UpdateStateF: updateStateF, ReportErrorF: reportErrorF} + r, err := builder.Build(target, tcc, resolver.BuildOptions{}) + if err != nil { + t.Fatalf("Failed to build xDS resolver for target %q: %v", target, err) + } + t.Cleanup(r.Close) + return stateCh, errCh, r +} + +// verifyUpdateFromResolver waits for the resolver to push an update to the fake +// resolver.ClientConn and verifies that update matches the provided service +// config. +// +// Tests that want to skip verifying the contents of the service config can pass +// an empty string. +// +// Returns the config selector from the state update pushed by the resolver. +// Tests that don't need the config selector can ignore the return value. +func verifyUpdateFromResolver(ctx context.Context, t *testing.T, stateCh chan resolver.State, wantSC string) iresolver.ConfigSelector { + t.Helper() + + var state resolver.State + select { + case <-ctx.Done(): + t.Fatalf("Timeout waiting for an update from the resolver: %v", ctx.Err()) + case state = <-stateCh: + if err := state.ServiceConfig.Err; err != nil { + t.Fatalf("Received error in service config: %v", state.ServiceConfig.Err) + } + if wantSC == "" { + break + } + wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantSC) + if !internal.EqualServiceConfigForTesting(state.ServiceConfig.Config, wantSCParsed.Config) { + t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, state.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) + } + } + cs := iresolver.GetConfigSelector(state) + if cs == nil { + t.Fatal("Received nil config selector in update from resolver") + } + return cs +} + +// verifyNoUpdateFromResolver verifies that no update is pushed on stateCh. +// Calls t.Fatal() if an update is received before defaultTestShortTimeout +// expires. +func verifyNoUpdateFromResolver(ctx context.Context, t *testing.T, stateCh chan resolver.State) { + t.Helper() + + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + select { + case <-sCtx.Done(): + case u := <-stateCh: + t.Fatalf("Received update from resolver %v when none expected", u) + } +} + +// Spins up an xDS management server and sets up an xDS bootstrap configuration +// file that points to it. +// +// Returns the following: +// - A reference to the xDS management server +// - A channel to read requested Listener resource names +// - A channel to read requested RouteConfiguration resource names +func setupManagementServerForTest(ctx context.Context, t *testing.T, nodeID string) (*e2e.ManagementServer, chan []string, chan []string) { + t.Helper() + + listenerResourceNamesCh := make(chan []string, 1) + routeConfigResourceNamesCh := make(chan []string, 1) + + // Setup the management server to push the requested listener and route + // configuration resource names on to separate channels for the test to + // inspect. + mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{ + OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + switch req.GetTypeUrl() { + case version.V3ListenerURL: + select { + case <-listenerResourceNamesCh: + default: + } + select { + case listenerResourceNamesCh <- req.GetResourceNames(): + default: + } + case version.V3RouteConfigURL: + select { + case <-routeConfigResourceNamesCh: + default: + } + select { + case routeConfigResourceNamesCh <- req.GetResourceNames(): + default: + } + } + return nil + }, + AllowResourceSubset: true, + }) + if err != nil { + t.Fatalf("Failed to start xDS management server: %v", err) + } + t.Cleanup(mgmtServer.Stop) + + // Create a bootstrap configuration specifying the above management server. + cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ + NodeID: nodeID, + ServerURI: mgmtServer.Address, + }) + if err != nil { + t.Fatal(err) + } + t.Cleanup(cleanup) + return mgmtServer, listenerResourceNamesCh, routeConfigResourceNamesCh +} + +// Spins up an xDS management server and configures it with a default listener +// and route configuration resource. It also sets up an xDS bootstrap +// configuration file that points to the above management server. +func configureResourcesOnManagementServer(ctx context.Context, t *testing.T, mgmtServer *e2e.ManagementServer, nodeID string, listeners []*v3listenerpb.Listener, routes []*v3routepb.RouteConfiguration) { + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: listeners, + Routes: routes, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } +} + +// waitForResourceNames waits for the wantNames to be pushed on to namesCh. +// Fails the test by calling t.Fatal if the context expires before that. +func waitForResourceNames(ctx context.Context, t *testing.T, namesCh chan []string, wantNames []string) { + t.Helper() + + for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { + t.Logf("easwars: executing the for loop waiting for resource names") + select { + case <-ctx.Done(): + case gotNames := <-namesCh: + if cmp.Equal(gotNames, wantNames, cmpopts.EquateEmpty()) { + return + } + t.Logf("Received resource names %v, want %v", gotNames, wantNames) + } + } + t.Fatalf("Timeout waiting for resource to be requested from the management server") +} diff --git a/xds/internal/resolver/watch_service_test.go b/xds/internal/resolver/watch_service_test.go index fdc23e712708..908562722775 100644 --- a/xds/internal/resolver/watch_service_test.go +++ b/xds/internal/resolver/watch_service_test.go @@ -16,324 +16,157 @@ * */ -package resolver +package resolver_test import ( "context" - "fmt" "testing" - "time" - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" + "github.com/envoyproxy/go-control-plane/pkg/wellknown" + "github.com/google/uuid" "google.golang.org/grpc/internal/testutils" - "google.golang.org/grpc/xds/internal/testutils/fakeclient" - "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/resolver" + "google.golang.org/protobuf/types/known/wrapperspb" + + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + v3routerpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3" + v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" ) -type serviceUpdateErr struct { - u serviceUpdate - err error -} - -func verifyServiceUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate serviceUpdate) error { - u, err := updateCh.Receive(ctx) - if err != nil { - return fmt.Errorf("timeout when waiting for service update: %v", err) - } - gotUpdate := u.(serviceUpdateErr) - if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate, cmpopts.EquateEmpty(), cmp.AllowUnexported(serviceUpdate{}, ldsConfig{})) { - return fmt.Errorf("unexpected service update: (%v, %v), want: (%v, nil), diff (-want +got):\n%s", gotUpdate.u, gotUpdate.err, wantUpdate, cmp.Diff(gotUpdate.u, wantUpdate, cmpopts.EquateEmpty(), cmp.AllowUnexported(serviceUpdate{}, ldsConfig{}))) - } - return nil -} - -func newStringP(s string) *string { - return &s -} - -// TestServiceWatch covers the cases: -// - an update is received after a watch() -// - an update with routes received -func (s) TestServiceWatch(t *testing.T) { - serviceUpdateCh := testutils.NewChannel() - xdsC := fakeclient.NewClient() - cancelWatch := watchService(xdsC, targetStr, func(update serviceUpdate, err error) { - serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err}) - }, nil) - defer cancelWatch() - +// Tests the case where the listener resource starts pointing to a new route +// configuration resource after the xDS resolver has successfully resolved the +// service name and pushed an update on the channel. The test verifies that the +// resolver stops requesting the old route configuration resource and requests +// the new resource, and once successfully resolved, sends an update on the +// channel. +func (s) TestServiceWatch_ListenerPointsToNewRouteConfiguration(t *testing.T) { + // Spin up an xDS management server for the test. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - waitForWatchListener(ctx, t, xdsC, targetStr) - xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr}, nil) - waitForWatchRouteConfig(ctx, t, xdsC, routeStr) - - wantUpdate := serviceUpdate{virtualHost: &xdsresource.VirtualHost{Domains: []string{"target"}, Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{cluster: {Weight: 1}}}}}} - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{cluster: {Weight: 1}}}}, - }, + nodeID := uuid.New().String() + mgmtServer, lisCh, routeCfgCh := setupManagementServerForTest(ctx, t, nodeID) + + // Configure resources on the management server. + listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)} + routes := []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)} + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes) + + stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}) + + // Verify initial update from the resolver. + waitForResourceNames(ctx, t, lisCh, []string{defaultTestServiceName}) + waitForResourceNames(ctx, t, routeCfgCh, []string{defaultTestRouteConfigName}) + verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig) + + // Update the listener resource to point to a new route configuration name. + // Leave the old route configuration resource unchanged. + newTestRouteConfigName := defaultTestRouteConfigName + "-new" + listeners = []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, newTestRouteConfigName)} + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes) + + // Verify that the new route configuration resource is requested. + waitForResourceNames(ctx, t, routeCfgCh, []string{newTestRouteConfigName}) + + // Update the old route configuration resource by adding a new route. + routes[0].VirtualHosts[0].Routes = append(routes[0].VirtualHosts[0].Routes, &v3routepb.Route{ + Match: &v3routepb.RouteMatch{ + PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/foo/bar"}, + CaseSensitive: &wrapperspb.BoolValue{Value: false}, }, - }, nil) - if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil { - t.Fatal(err) - } - - wantUpdate2 := serviceUpdate{virtualHost: &xdsresource.VirtualHost{Domains: []string{"target"}, - Routes: []*xdsresource.Route{{ - Path: newStringP(""), - WeightedClusters: map[string]xdsresource.WeightedCluster{cluster: {Weight: 1}}, - }}, - }} - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: []*xdsresource.Route{{Path: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{cluster: {Weight: 1}}}}, - }, - { - // Another virtual host, with different domains. - Domains: []string{"random"}, - Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{cluster: {Weight: 1}}}}, + Action: &v3routepb.Route_Route{ + Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: "some-random-cluster"}, }, }, - }, nil) - if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate2); err != nil { - t.Fatal(err) - } -} + }) + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes) -// TestServiceWatchLDSUpdate covers the case that after first LDS and first RDS -// response, the second LDS response trigger an new RDS watch, and an update of -// the old RDS watch doesn't trigger update to service callback. -func (s) TestServiceWatchLDSUpdate(t *testing.T) { - serviceUpdateCh := testutils.NewChannel() - xdsC := fakeclient.NewClient() - cancelWatch := watchService(xdsC, targetStr, func(update serviceUpdate, err error) { - serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err}) - }, nil) - defer cancelWatch() + // Wait for no update from the resolver. + verifyNoUpdateFromResolver(ctx, t, stateCh) - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - waitForWatchListener(ctx, t, xdsC, targetStr) - xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr}, nil) - waitForWatchRouteConfig(ctx, t, xdsC, routeStr) + // Update the management server with the new route configuration resource. + routes = append(routes, e2e.DefaultRouteConfig(newTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)) + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes) - wantUpdate := serviceUpdate{virtualHost: &xdsresource.VirtualHost{Domains: []string{"target"}, Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{cluster: {Weight: 1}}}}}} - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{cluster: {Weight: 1}}}}, - }, - }, - }, nil) - if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil { - t.Fatal(err) - } - - // Another LDS update with a different RDS_name. - xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr + "2"}, nil) - if _, err := xdsC.WaitForCancelRouteConfigWatch(ctx); err != nil { - t.Fatalf("wait for cancel route watch failed: %v, want nil", err) - } - waitForWatchRouteConfig(ctx, t, xdsC, routeStr+"2") - - // RDS update for the new name. - wantUpdate2 := serviceUpdate{virtualHost: &xdsresource.VirtualHost{Domains: []string{"target"}, Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{cluster + "2": {Weight: 1}}}}}} - xdsC.InvokeWatchRouteConfigCallback(routeStr+"2", xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{cluster + "2": {Weight: 1}}}}, - }, - }, - }, nil) - if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate2); err != nil { - t.Fatal(err) - } + // Ensure update from the resolver. + verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig) } -// TestServiceWatchLDSUpdate covers the case that after first LDS and first RDS -// response, the second LDS response includes a new MaxStreamDuration. It also -// verifies this is reported in subsequent RDS updates. -func (s) TestServiceWatchLDSUpdateMaxStreamDuration(t *testing.T) { - serviceUpdateCh := testutils.NewChannel() - xdsC := fakeclient.NewClient() - cancelWatch := watchService(xdsC, targetStr, func(update serviceUpdate, err error) { - serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err}) - }, nil) - defer cancelWatch() - +// Tests the case where the listener resource changes to contain an inline route +// configuration and changes back to having a route configuration resource name. +// Verifies that the expected xDS resource names are requested by the resolver +// and the update pushed to the channel contais the expected service config. +func (s) TestServiceWatch_ListenerPointsToInlineRouteConfiguration(t *testing.T) { + // Spin up an xDS management server for the test. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - waitForWatchListener(ctx, t, xdsC, targetStr) - xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, MaxStreamDuration: time.Second}, nil) - waitForWatchRouteConfig(ctx, t, xdsC, routeStr) - - wantUpdate := serviceUpdate{virtualHost: &xdsresource.VirtualHost{Domains: []string{"target"}, Routes: []*xdsresource.Route{{ - Prefix: newStringP(""), - WeightedClusters: map[string]xdsresource.WeightedCluster{cluster: {Weight: 1}}}}}, - ldsConfig: ldsConfig{maxStreamDuration: time.Second}, - } - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{cluster: {Weight: 1}}}}, + nodeID := uuid.New().String() + mgmtServer, lisCh, routeCfgCh := setupManagementServerForTest(ctx, t, nodeID) + + // Configure resources on the management server. + listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)} + routes := []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)} + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes) + + stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}) + + // Verify initial update from the resolver. + waitForResourceNames(ctx, t, lisCh, []string{defaultTestServiceName}) + waitForResourceNames(ctx, t, routeCfgCh, []string{defaultTestRouteConfigName}) + verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig) + + // Update listener to contain an inline route configuration. + hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ + RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{ + RouteConfig: &v3routepb.RouteConfiguration{ + Name: defaultTestRouteConfigName, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{defaultTestServiceName}, + Routes: []*v3routepb.Route{{ + Match: &v3routepb.RouteMatch{ + PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}, + }, + Action: &v3routepb.Route_Route{ + Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: defaultTestClusterName}, + }, + }, + }}, + }}, }, }, - }, nil) - if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil { - t.Fatal(err) - } - - // Another LDS update with the same RDS_name but different MaxStreamDuration (zero in this case). - wantUpdate2 := serviceUpdate{virtualHost: &xdsresource.VirtualHost{Domains: []string{"target"}, Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{cluster: {Weight: 1}}}}}} - xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr}, nil) - if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate2); err != nil { - t.Fatal(err) - } - - // RDS update. - wantUpdate3 := serviceUpdate{virtualHost: &xdsresource.VirtualHost{Domains: []string{"target"}, Routes: []*xdsresource.Route{{ - Prefix: newStringP(""), - WeightedClusters: map[string]xdsresource.WeightedCluster{cluster + "2": {Weight: 1}}}}, + HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})}, + }) + listeners = []*v3listenerpb.Listener{{ + Name: defaultTestServiceName, + ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm}, + FilterChains: []*v3listenerpb.FilterChain{{ + Name: "filter-chain-name", + Filters: []*v3listenerpb.Filter{{ + Name: wellknown.HTTPConnectionManager, + ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm}, + }}, + }}, }} - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{cluster + "2": {Weight: 1}}}}, - }, - }, - }, nil) - if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate3); err != nil { - t.Fatal(err) - } -} + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, nil) -// TestServiceNotCancelRDSOnSameLDSUpdate covers the case that if the second LDS -// update contains the same RDS name as the previous, the RDS watch isn't -// canceled and restarted. -func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) { - serviceUpdateCh := testutils.NewChannel() - xdsC := fakeclient.NewClient() - cancelWatch := watchService(xdsC, targetStr, func(update serviceUpdate, err error) { - serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err}) - }, nil) - defer cancelWatch() + // Verify that the old route configuration is not requested anymore. + waitForResourceNames(ctx, t, routeCfgCh, []string{}) + verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig) - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - waitForWatchListener(ctx, t, xdsC, targetStr) - xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr}, nil) - waitForWatchRouteConfig(ctx, t, xdsC, routeStr) - - wantUpdate := serviceUpdate{virtualHost: &xdsresource.VirtualHost{Domains: []string{"target"}, Routes: []*xdsresource.Route{{ - Prefix: newStringP(""), - WeightedClusters: map[string]xdsresource.WeightedCluster{cluster: {Weight: 1}}}}, - }} - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{cluster: {Weight: 1}}}}, - }, - }, - }, nil) + // Update listener back to contain a route configuration name. + listeners = []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)} + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes) - if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil { - t.Fatal(err) - } + // Verify that that route configuration resource is requested. + waitForResourceNames(ctx, t, routeCfgCh, []string{defaultTestRouteConfigName}) - // Another LDS update with a the same RDS_name. - xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr}, nil) - sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) - defer sCancel() - if _, err := xdsC.WaitForCancelRouteConfigWatch(sCtx); err != context.DeadlineExceeded { - t.Fatalf("wait for cancel route watch failed: %v, want nil", err) - } + // Verify that appropriate SC is pushed on the channel. + verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig) } -// TestServiceWatchInlineRDS covers the cases switching between: -// - LDS update contains RDS name to watch -// - LDS update contains inline RDS resource -func (s) TestServiceWatchInlineRDS(t *testing.T) { - serviceUpdateCh := testutils.NewChannel() - xdsC := fakeclient.NewClient() - cancelWatch := watchService(xdsC, targetStr, func(update serviceUpdate, err error) { - serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err}) - }, nil) - defer cancelWatch() - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - // First LDS update is LDS with RDS name to watch. - waitForWatchListener(ctx, t, xdsC, targetStr) - xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr}, nil) - waitForWatchRouteConfig(ctx, t, xdsC, routeStr) - wantUpdate := serviceUpdate{virtualHost: &xdsresource.VirtualHost{Domains: []string{"target"}, Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{cluster: {Weight: 1}}}}}} - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{cluster: {Weight: 1}}}}, - }, - }, - }, nil) - if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil { - t.Fatal(err) - } - - // Switch LDS resp to a LDS with inline RDS resource - wantVirtualHosts2 := &xdsresource.VirtualHost{Domains: []string{"target"}, - Routes: []*xdsresource.Route{{ - Path: newStringP(""), - WeightedClusters: map[string]xdsresource.WeightedCluster{cluster: {Weight: 1}}, - }}, - } - wantUpdate2 := serviceUpdate{virtualHost: wantVirtualHosts2} - xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{InlineRouteConfig: &xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{wantVirtualHosts2}, - }}, nil) - // This inline RDS resource should cause the RDS watch to be canceled. - if _, err := xdsC.WaitForCancelRouteConfigWatch(ctx); err != nil { - t.Fatalf("wait for cancel route watch failed: %v, want nil", err) - } - if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate2); err != nil { - t.Fatal(err) - } - - // Switch LDS update back to LDS with RDS name to watch. - xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr}, nil) - waitForWatchRouteConfig(ctx, t, xdsC, routeStr) - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{cluster: {Weight: 1}}}}, - }, - }, - }, nil) - if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil { - t.Fatal(err) - } - - // Switch LDS resp to a LDS with inline RDS resource again. - xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{InlineRouteConfig: &xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{wantVirtualHosts2}, - }}, nil) - // This inline RDS resource should cause the RDS watch to be canceled. - if _, err := xdsC.WaitForCancelRouteConfigWatch(ctx); err != nil { - t.Fatalf("wait for cancel route watch failed: %v, want nil", err) - } - if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate2); err != nil { - t.Fatal(err) - } +func newStringP(s string) *string { + return &s } diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index 9ed77f81513a..c38ab8cb9dfb 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -2046,3 +2046,7 @@ func (s) TestXDSResolverHTTPFilters(t *testing.T) { func newDurationP(d time.Duration) *time.Duration { return &d } + +func newStringP(s string) *string { + return &s +}