Skip to content

Commit

Permalink
xds/resolver: move service watching tests to resolver_test package
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Oct 12, 2023
1 parent 5a6773c commit 176a440
Show file tree
Hide file tree
Showing 4 changed files with 428 additions and 475 deletions.
230 changes: 46 additions & 184 deletions xds/internal/resolver/cluster_specifier_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,18 @@ import (
"encoding/json"
"fmt"
"testing"
"time"

"github.com/golang/protobuf/proto"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpctest"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/testutils"
xdsbootstrap "google.golang.org/grpc/internal/testutils/xds/bootstrap"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/clustermanager"
"google.golang.org/grpc/xds/internal/clusterspecifier"
xdsresolver "google.golang.org/grpc/xds/internal/resolver"
protov2 "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/wrapperspb"
Expand All @@ -49,80 +43,6 @@ import (
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
)

const (
defaultTestTimeout = 10 * time.Second
defaultTestShortTimeout = 100 * time.Microsecond
)

type s struct {
grpctest.Tester
}

func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

// verifyUpdateFromResolver waits for the resolver to push an update to the fake
// resolver.ClientConn and verifies that update matches the provided service
// config.
//
// Tests that want to skip verifying the contents of the service config can pass
// an empty string.
//
// Returns the config selector from the state update pushed by the resolver.
// Tests that don't need the config selector can ignore the return value.
func verifyUpdateFromResolver(ctx context.Context, t *testing.T, stateCh chan resolver.State, wantSC string) iresolver.ConfigSelector {
t.Helper()

var state resolver.State
select {
case <-ctx.Done():
t.Fatalf("Timeout waiting for an update from the resolver: %v", ctx.Err())
case state = <-stateCh:
if err := state.ServiceConfig.Err; err != nil {
t.Fatalf("Received error in service config: %v", state.ServiceConfig.Err)
}
if wantSC == "" {
break
}
wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantSC)
if !internal.EqualServiceConfigForTesting(state.ServiceConfig.Config, wantSCParsed.Config) {
t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, state.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config))
}
}
cs := iresolver.GetConfigSelector(state)
if cs == nil {
t.Fatal("Received nil config selector in update from resolver")
}
return cs
}

// buildResolverForTarget builds an xDS resolver for the given target. It
// returns the following:
// - a channel to read updates from the resolver
// - the newly created xDS resolver
func buildResolverForTarget(t *testing.T, target resolver.Target) (chan resolver.State, resolver.Resolver) {
t.Helper()

builder := resolver.Get(xdsresolver.Scheme)
if builder == nil {
t.Fatalf("Scheme %q is not registered", xdsresolver.Scheme)
}

stateCh := make(chan resolver.State, 1)
updateStateF := func(s resolver.State) error {
stateCh <- s
return nil
}
tcc := &testutils.ResolverClientConn{Logger: t, UpdateStateF: updateStateF}
r, err := builder.Build(target, tcc, resolver.BuildOptions{})
if err != nil {
t.Fatalf("Failed to build xDS resolver for target %q: %v", target, err)
}
t.Cleanup(r.Close)
return stateCh, r
}

func init() {
balancer.Register(cspBalancerBuilder{})
clusterspecifier.Register(testClusterSpecifierPlugin{})
Expand Down Expand Up @@ -201,46 +121,24 @@ func (s) TestResolverClusterSpecifierPlugin(t *testing.T) {
envconfig.XDSRLS = oldRLS
}()

mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
if err != nil {
t.Fatalf("Failed to start xDS management server: %v", err)
}
defer mgmtServer.Stop()

// Create a bootstrap configuration specifying the above management server.
nodeID := uuid.New().String()
cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
NodeID: nodeID,
ServerURI: mgmtServer.Address,
})
if err != nil {
t.Fatal(err)
}
defer cleanup()

// Configure listener and route configuration resources on the management
// server.
const serviceName = "my-service-client-side-xds"
rdsName := "route-" + serviceName
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: rdsName,
ListenerName: serviceName,
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
ClusterSpecifierPluginName: "cspA",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anything"}),
})},
SkipValidation: true,
}
// Spin up an xDS management server for the test.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
nodeID := uuid.New().String()
mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)

// Configure resources on the management server.
listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
routes := []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: defaultTestRouteConfigName,
ListenerName: defaultTestServiceName,
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
ClusterSpecifierPluginName: "cspA",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anything"}),
})}
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)

stateCh, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)})
stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})

// Wait for an update from the resolver, and verify the service config.
wantSC := `
Expand Down Expand Up @@ -276,21 +174,14 @@ func (s) TestResolverClusterSpecifierPlugin(t *testing.T) {
}

// Change the cluster specifier plugin configuration.
resources = e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: rdsName,
ListenerName: serviceName,
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
ClusterSpecifierPluginName: "cspA",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "changed"}),
})},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
routes = []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: defaultTestRouteConfigName,
ListenerName: defaultTestServiceName,
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
ClusterSpecifierPluginName: "cspA",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "changed"}),
})}
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)

// Wait for an update from the resolver, and verify the service config.
wantSC = `
Expand Down Expand Up @@ -328,46 +219,24 @@ func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) {
envconfig.XDSRLS = oldRLS
}()

mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
if err != nil {
t.Fatalf("Failed to start xDS management server: %v", err)
}
defer mgmtServer.Stop()

// Create a bootstrap configuration specifying the above management server.
nodeID := uuid.New().String()
cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
NodeID: nodeID,
ServerURI: mgmtServer.Address,
})
if err != nil {
t.Fatal(err)
}
defer cleanup()

// Configure listener and route configuration resources on the management
// server.
const serviceName = "my-service-client-side-xds"
rdsName := "route-" + serviceName
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: rdsName,
ListenerName: serviceName,
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
ClusterSpecifierPluginName: "cspA",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anythingA"}),
})},
SkipValidation: true,
}
// Spin up an xDS management server for the test.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
nodeID := uuid.New().String()
mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)

// Configure resources on the management server.
listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
routes := []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: defaultTestRouteConfigName,
ListenerName: defaultTestServiceName,
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
ClusterSpecifierPluginName: "cspA",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anythingA"}),
})}
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)

stateCh, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)})
stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})

// Wait for an update from the resolver, and verify the service config.
wantSC := `
Expand Down Expand Up @@ -407,21 +276,14 @@ func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) {
// clusters, they still appear in the service config.

// Change the cluster specifier plugin configuration.
resources = e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: rdsName,
ListenerName: serviceName,
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
ClusterSpecifierPluginName: "cspB",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anythingB"}),
})},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
routes = []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: defaultTestRouteConfigName,
ListenerName: defaultTestServiceName,
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
ClusterSpecifierPluginName: "cspB",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anythingB"}),
})}
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)

// Wait for an update from the resolver, and verify the service config.
wantSC = `
Expand Down
Loading

0 comments on commit 176a440

Please sign in to comment.