Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clusterresolver: switch a couple of tests to e2e style #6394

Merged
merged 3 commits into from
Jun 23, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
300 changes: 1 addition & 299 deletions xds/internal/balancer/clusterresolver/clusterresolver_test.go
Original file line number Diff line number Diff line change
@@ -19,26 +19,10 @@
package clusterresolver

import (
"context"
"fmt"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpctest"
iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/clusterimpl"
"google.golang.org/grpc/xds/internal/balancer/outlierdetection"
"google.golang.org/grpc/xds/internal/balancer/priority"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

const (
@@ -47,295 +31,13 @@ const (
testEDSService = "test-eds-service-name"
testClusterName = "test-cluster-name"
testClusterName2 = "google_cfe_some-name"
testBalancerNameFooBar = "foo.bar"
)

var (
// A non-empty endpoints update which is expected to be accepted by the EDS
// LB policy.
defaultEndpointsUpdate = xdsresource.EndpointsUpdate{
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Address: "endpoint1"}},
ID: xdsinternal.LocalityID{Zone: "zone"},
Priority: 1,
Weight: 100,
},
},
}
)

func init() {
balancer.Register(bb{})
}

type s struct {
grpctest.Tester

cleanup func()
}

func (ss s) Teardown(t *testing.T) {
xdsclient.ClearAllCountersForTesting()
ss.Tester.Teardown(t)
if ss.cleanup != nil {
ss.cleanup()
}
}

func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

const testBalancerNameFooBar = "foo.bar"

func newNoopTestClientConn() *noopTestClientConn {
return &noopTestClientConn{}
}

// noopTestClientConn is used in EDS balancer config update tests that only
// cover the config update handling, but not SubConn/load-balancing.
type noopTestClientConn struct {
balancer.ClientConn
}

func (t *noopTestClientConn) NewSubConn([]resolver.Address, balancer.NewSubConnOptions) (balancer.SubConn, error) {
return nil, nil
}

func (noopTestClientConn) Target() string { return testEDSService }

type scStateChange struct {
sc balancer.SubConn
state balancer.SubConnState
}

type fakeChildBalancer struct {
cc balancer.ClientConn
subConnState *testutils.Channel
clientConnState *testutils.Channel
resolverError *testutils.Channel
}

func (f *fakeChildBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
f.clientConnState.Send(state)
return nil
}

func (f *fakeChildBalancer) ResolverError(err error) {
f.resolverError.Send(err)
}

func (f *fakeChildBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
f.subConnState.Send(&scStateChange{sc: sc, state: state})
}

func (f *fakeChildBalancer) Close() {}

func (f *fakeChildBalancer) ExitIdle() {}

func (f *fakeChildBalancer) waitForClientConnStateChangeVerifyBalancerConfig(ctx context.Context, wantCCS balancer.ClientConnState) error {
ccs, err := f.clientConnState.Receive(ctx)
if err != nil {
return err
}
gotCCS := ccs.(balancer.ClientConnState)
if diff := cmp.Diff(gotCCS, wantCCS, cmpopts.IgnoreFields(balancer.ClientConnState{}, "ResolverState")); diff != "" {
return fmt.Errorf("received unexpected ClientConnState, diff (-got +want): %v", diff)
}
return nil
}

func (f *fakeChildBalancer) waitForSubConnStateChange(ctx context.Context, wantState *scStateChange) error {
val, err := f.subConnState.Receive(ctx)
if err != nil {
return err
}
gotState := val.(*scStateChange)
if !cmp.Equal(gotState, wantState, cmp.AllowUnexported(scStateChange{})) {
return fmt.Errorf("got subconnStateChange %v, want %v", gotState, wantState)
}
return nil
}

func newFakeChildBalancer(cc balancer.ClientConn) balancer.Balancer {
return &fakeChildBalancer{
cc: cc,
subConnState: testutils.NewChannelWithSize(10),
clientConnState: testutils.NewChannelWithSize(10),
resolverError: testutils.NewChannelWithSize(10),
}
}

type fakeSubConn struct{}

func (*fakeSubConn) UpdateAddresses([]resolver.Address) { panic("implement me") }
func (*fakeSubConn) Connect() { panic("implement me") }
func (*fakeSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) {
panic("implement me")
}

// waitForNewChildLB makes sure that a new child LB is created by the top-level
// clusterResolverBalancer.
func waitForNewChildLB(ctx context.Context, ch *testutils.Channel) (*fakeChildBalancer, error) {
val, err := ch.Receive(ctx)
if err != nil {
return nil, fmt.Errorf("error when waiting for a new edsLB: %v", err)
}
return val.(*fakeChildBalancer), nil
}

// setup overrides the functions which are used to create the xdsClient and the
// edsLB, creates fake version of them and makes them available on the provided
// channels. The returned cancel function should be called by the test for
// cleanup.
func setup(childLBCh *testutils.Channel) (*fakeclient.Client, func()) {
xdsC := fakeclient.NewClientWithName(testBalancerNameFooBar)

origNewChildBalancer := newChildBalancer
newChildBalancer = func(_ balancer.Builder, cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
childLB := newFakeChildBalancer(cc)
defer func() { childLBCh.Send(childLB) }()
return childLB
}
return xdsC, func() { newChildBalancer = origNewChildBalancer }
}

// TestSubConnStateChange verifies if the top-level clusterResolverBalancer passes on
// the subConnState to appropriate child balancer.
func (s) TestSubConnStateChange(t *testing.T) {
edsLBCh := testutils.NewChannel()
xdsC, cleanup := setup(edsLBCh)
defer cleanup()

builder := balancer.Get(Name)
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{})
if edsB == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", Name)
}
defer edsB.Close()

if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: newLBConfigWithOneEDS(testEDSService),
}); err != nil {
t.Fatalf("edsB.UpdateClientConnState() failed: %v", err)
}

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
xdsC.InvokeWatchEDSCallback("", defaultEndpointsUpdate, nil)
edsLB, err := waitForNewChildLB(ctx, edsLBCh)
if err != nil {
t.Fatal(err)
}

fsc := &fakeSubConn{}
state := balancer.SubConnState{ConnectivityState: connectivity.Ready}
edsB.UpdateSubConnState(fsc, state)
if err := edsLB.waitForSubConnStateChange(ctx, &scStateChange{sc: fsc, state: state}); err != nil {
t.Fatal(err)
}
}

func newLBConfigWithOneEDS(edsServiceName string) *LBConfig {
return &LBConfig{
DiscoveryMechanisms: []DiscoveryMechanism{{
Cluster: testClusterName,
Type: DiscoveryMechanismTypeEDS,
EDSServiceName: edsServiceName,
}},
xdsLBPolicy: iserviceconfig.BalancerConfig{
Name: "ROUND_ROBIN",
Config: nil,
},
}
}

func newLBConfigWithOneEDSAndOutlierDetection(edsServiceName string, odCfg outlierdetection.LBConfig) *LBConfig {
lbCfg := newLBConfigWithOneEDS(edsServiceName)
lbCfg.DiscoveryMechanisms[0].outlierDetection = odCfg
return lbCfg
}

// TestOutlierDetection tests the Balancer Config sent down to the child
// priority balancer when Outlier Detection is turned on. The Priority
// Configuration sent downward should have a top level Outlier Detection Policy
// for each priority.
func (s) TestOutlierDetection(t *testing.T) {
edsLBCh := testutils.NewChannel()
xdsC, cleanup := setup(edsLBCh)
defer cleanup()
builder := balancer.Get(Name)
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{})
if edsB == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", Name)
}
defer edsB.Close()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// Update Cluster Resolver with Client Conn State with Outlier Detection
// configuration present. This is what will be passed down to this balancer,
// as CDS Balancer gets the Cluster Update and converts the Outlier
// Detection data to an Outlier Detection configuration and sends it to this
// level.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: newLBConfigWithOneEDSAndOutlierDetection(testEDSService, noopODCfg),
}); err != nil {
t.Fatal(err)
}
if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}

// Invoke EDS Callback - causes child balancer to be built and then
// UpdateClientConnState called on it with Outlier Detection as a direct
// child.
xdsC.InvokeWatchEDSCallback("", defaultEndpointsUpdate, nil)
edsLB, err := waitForNewChildLB(ctx, edsLBCh)
if err != nil {
t.Fatal(err)
}

// The priority configuration generated should have Outlier Detection as a
// direct child due to Outlier Detection being turned on.
pCfgWant := &priority.LBConfig{
Children: map[string]*priority.Child{
"priority-0-0": {
Config: &iserviceconfig.BalancerConfig{
Name: outlierdetection.Name,
Config: &outlierdetection.LBConfig{
Interval: iserviceconfig.Duration(10 * time.Second), // default interval
BaseEjectionTime: iserviceconfig.Duration(30 * time.Second),
MaxEjectionTime: iserviceconfig.Duration(300 * time.Second),
MaxEjectionPercent: 10,
ChildPolicy: &iserviceconfig.BalancerConfig{
Name: clusterimpl.Name,
Config: &clusterimpl.LBConfig{
Cluster: testClusterName,
EDSServiceName: "test-eds-service-name",
ChildPolicy: &iserviceconfig.BalancerConfig{
Name: "ROUND_ROBIN",
Config: nil,
},
},
},
},
},
IgnoreReresolutionRequests: true,
},
},
Priorities: []string{"priority-0-0"},
}

if err := edsLB.waitForClientConnStateChangeVerifyBalancerConfig(ctx, balancer.ClientConnState{
BalancerConfig: pCfgWant,
}); err != nil {
t.Fatalf("EDS impl got unexpected update: %v", err)
}
}
223 changes: 223 additions & 0 deletions xds/internal/balancer/clusterresolver/e2e_test/balancer_test.go
Original file line number Diff line number Diff line change
@@ -23,20 +23,31 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/buffer"
iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/balancer/clusterimpl"
"google.golang.org/grpc/xds/internal/balancer/outlierdetection"
"google.golang.org/grpc/xds/internal/balancer/priority"
"google.golang.org/grpc/xds/internal/balancer/wrrlocality"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/wrapperspb"

v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
@@ -302,3 +313,215 @@ func (s) TestErrorFromParentLB_ResourceNotFound(t *testing.T) {
t.Fatalf("RPCs did not fail after removal of Cluster resource")
}
}

// wrappedPriorityBuilder implements the balancer.Builder interface and builds
// an LB policy which is a thin wrapper around the priority LB policy. The built
// LB policy and makes certain events available to the test (SubConn state
// changes and LB config updates).
type wrappedPriorityBuilder struct {
balancer.Builder
balancer.ConfigParser
// We use an unbounded buffer instead of a vanilla channel to ensure that no
// state updates are lost *and* pushing to the channel is non-blocking (to
// ensure that the sending goroutine does not block if the test is not
// reading from the channel).
scStateCh *buffer.Unbounded
lbCfgCh chan serviceconfig.LoadBalancingConfig
}

func newWrappedPriorityBuilder(b balancer.Builder) *wrappedPriorityBuilder {
return &wrappedPriorityBuilder{
scStateCh: buffer.NewUnbounded(),
lbCfgCh: make(chan serviceconfig.LoadBalancingConfig, 1),
Builder: b,
ConfigParser: b.(balancer.ConfigParser),
}
}

func (b *wrappedPriorityBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
priorityLB := b.Builder.Build(cc, opts)
return &wrappedPriorityBalancer{
Balancer: priorityLB,
scStateCh: b.scStateCh,
lbCfgCh: b.lbCfgCh,
}
}

type wrappedPriorityBalancer struct {
balancer.Balancer
scStateCh *buffer.Unbounded
lbCfgCh chan serviceconfig.LoadBalancingConfig
}

func (b *wrappedPriorityBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
b.scStateCh.Put(state)
b.Balancer.UpdateSubConnState(sc, state)
}

func (b *wrappedPriorityBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
select {
case b.lbCfgCh <- ccs.BalancerConfig:
default:
}
return b.Balancer.UpdateClientConnState(ccs)
}

func (b *wrappedPriorityBalancer) Close() {
b.scStateCh.Close()
b.Balancer.Close()
}

// Test verifies that SubConn state changes are propagated to the child policy
// by the cluster resolver LB policy.
func (s) TestSubConnStateChangePropagationToChildPolicy(t *testing.T) {
zasweq marked this conversation as resolved.
Show resolved Hide resolved
// Unregister the priority balancer builder for the duration of this test,
// and register a policy under the same name that makes SubConn state
// changes pushed to it available to the test.
priorityBuilder := balancer.Get(priority.Name)
internal.BalancerUnregister(priorityBuilder.Name())
testChildPolicy := newWrappedPriorityBuilder(priorityBuilder)
balancer.Register(testChildPolicy)
defer balancer.Register(priorityBuilder)

managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
defer cleanup()

server := stubserver.StartTestService(t, nil)
defer server.Stop()

// Configure cluster and endpoints resources in the management server.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, edsServiceName, e2e.SecurityLevelNone)},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create xDS client, configure cds_experimental LB policy with a manual
// resolver, and dial the test backends.
cc, cleanup := setupAndDial(t, bootstrapContents)
defer cleanup()

client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}

for {
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for child policy to see a READY SubConn")
case s := <-testChildPolicy.scStateCh.Get():
testChildPolicy.scStateCh.Load()
state := s.(balancer.SubConnState)
if state.ConnectivityState == connectivity.Ready {
return
}
}
}
}

// Test verifies that when the received Cluster resource contains outlier
// detection configuration, the LB config pushed to the child policy contains
// the appropriate configuration for the outlier detection LB policy.
func (s) TestOutlierDetectionConfigPropagationToChildPolicy(t *testing.T) {
zasweq marked this conversation as resolved.
Show resolved Hide resolved
// Unregister the priority balancer builder for the duration of this test,
// and register a policy under the same name that makes the LB config
// pushed to it available to the test.
priorityBuilder := balancer.Get(priority.Name)
internal.BalancerUnregister(priorityBuilder.Name())
testChildPolicy := newWrappedPriorityBuilder(priorityBuilder)
balancer.Register(testChildPolicy)
defer balancer.Register(priorityBuilder)

managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
defer cleanup()

server := stubserver.StartTestService(t, nil)
defer server.Stop()

// Configure cluster and endpoints resources in the management server.
cluster := e2e.DefaultCluster(clusterName, edsServiceName, e2e.SecurityLevelNone)
cluster.OutlierDetection = &v3clusterpb.OutlierDetection{
Interval: durationpb.New(10 * time.Second),
BaseEjectionTime: durationpb.New(30 * time.Second),
MaxEjectionTime: durationpb.New(300 * time.Second),
MaxEjectionPercent: wrapperspb.UInt32(10),
SuccessRateStdevFactor: wrapperspb.UInt32(2000),
EnforcingSuccessRate: wrapperspb.UInt32(50),
SuccessRateMinimumHosts: wrapperspb.UInt32(10),
SuccessRateRequestVolume: wrapperspb.UInt32(50),
}
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{cluster},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create xDS client, configure cds_experimental LB policy with a manual
// resolver, and dial the test backends.
_, cleanup = setupAndDial(t, bootstrapContents)
defer cleanup()

// The priority configuration generated should have Outlier Detection as a
// direct child due to Outlier Detection being turned on.
wantCfg := &priority.LBConfig{
Children: map[string]*priority.Child{
"priority-0-0": {
Config: &iserviceconfig.BalancerConfig{
Name: outlierdetection.Name,
Config: &outlierdetection.LBConfig{
Interval: iserviceconfig.Duration(10 * time.Second), // default interval
BaseEjectionTime: iserviceconfig.Duration(30 * time.Second),
MaxEjectionTime: iserviceconfig.Duration(300 * time.Second),
MaxEjectionPercent: 10,
SuccessRateEjection: &outlierdetection.SuccessRateEjection{
StdevFactor: 2000,
EnforcementPercentage: 50,
MinimumHosts: 10,
RequestVolume: 50,
},
ChildPolicy: &iserviceconfig.BalancerConfig{
Name: clusterimpl.Name,
Config: &clusterimpl.LBConfig{
Cluster: clusterName,
EDSServiceName: edsServiceName,
ChildPolicy: &iserviceconfig.BalancerConfig{
Name: wrrlocality.Name,
Config: &wrrlocality.LBConfig{
ChildPolicy: &iserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
},
},
},
},
},
IgnoreReresolutionRequests: true,
},
},
Priorities: []string{"priority-0-0"},
}

select {
case lbCfg := <-testChildPolicy.lbCfgCh:
gotCfg := lbCfg.(*priority.LBConfig)
if diff := cmp.Diff(wantCfg, gotCfg); diff != "" {
t.Fatalf("Child policy received unexpected diff in config (-want +got):\n%s", diff)
}
case <-ctx.Done():
t.Fatalf("Timeout when waiting for child policy to receive its configuration")
}
}