Skip to content

Commit

Permalink
Move test to internal and trigger internal algo directly
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal committed Dec 20, 2024
1 parent e3879a5 commit 4c60234
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 67 deletions.
1 change: 0 additions & 1 deletion xds/internal/balancer/outlierdetection/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,6 @@ func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnSt
// the balancer.Balancer API, so it is guaranteed to be called in a
// synchronous manner, so it cannot race with this read.
if b.cfg == nil || b.cfg.ChildPolicy.Name != lbCfg.ChildPolicy.Name {

if err := b.child.switchTo(bb); err != nil {
return fmt.Errorf("outlier detection: error switching to child of type %q: %v", lbCfg.ChildPolicy.Name, err)
}
Expand Down
96 changes: 96 additions & 0 deletions xds/internal/balancer/outlierdetection/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,24 @@ import (

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/pickfirst/pickfirstleaf"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpctest"
iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/clusterimpl"

testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
)

var (
Expand Down Expand Up @@ -1568,3 +1575,92 @@ func (s) TestConcurrentOperations(t *testing.T) {
close(finished)
wg.Wait()
}

// Test verifies that outlier detection doesn't eject subchannels created by
// the new pickfirst balancer when pickfirst is a non-leaf policy, i.e. not
// under a petiole policy. When pickfirst is not under a petiole policy, it will
// not register a health listener. pickfirst will still set the address
// attribute to disable ejection through the raw connectivity listener. When
// Outlier Detection processes a health update and sees the health listener is
// enabled but a health listener is not registered, it will drop the ejection
// update.
func (s) TestPickFirstHealthListenerDisabled(t *testing.T) {
backend := &stubserver.StubServer{
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
return nil, errors.New("some error")
},
}
if err := backend.StartServer(); err != nil {
t.Fatalf("Failed to start backend: %v", err)
}
defer backend.Stop()
t.Logf("Started bad TestService backend at: %q", backend.Address)

// The interval is intentionally kept very large, the interval algorithm
// will be triggered manually.
odCfg := &LBConfig{
Interval: iserviceconfig.Duration(300 * time.Second),
BaseEjectionTime: iserviceconfig.Duration(300 * time.Second),
MaxEjectionTime: iserviceconfig.Duration(500 * time.Second),
FailurePercentageEjection: &FailurePercentageEjection{
Threshold: 50,
EnforcementPercentage: 100,
MinimumHosts: 0,
RequestVolume: 2,
},
MaxEjectionPercent: 100,
ChildPolicy: &iserviceconfig.BalancerConfig{
Name: pickfirstleaf.Name,
},
}

lbChan := make(chan *outlierDetectionBalancer, 1)
bf := stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
bd.Data = balancer.Get(Name).Build(bd.ClientConn, bd.BuildOptions)
lbChan <- bd.Data.(*outlierDetectionBalancer)
},
Close: func(bd *stub.BalancerData) {
bd.Data.(balancer.Balancer).Close()
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
ccs.BalancerConfig = odCfg
return bd.Data.(balancer.Balancer).UpdateClientConnState(ccs)
},
}

stub.Register(t.Name(), bf)

opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{%q: {}}] }`, t.Name())),
}
cc, err := grpc.NewClient(backend.Address, opts...)
if err != nil {
t.Fatalf("grpc.NewClient() failed: %v", err)
}
defer cc.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
testServiceClient := testgrpc.NewTestServiceClient(cc)
testServiceClient.EmptyCall(ctx, &testpb.Empty{})
testutils.AwaitState(ctx, t, cc, connectivity.Ready)

// Failing request should not cause ejection.
testServiceClient.EmptyCall(ctx, &testpb.Empty{})
testServiceClient.EmptyCall(ctx, &testpb.Empty{})
testServiceClient.EmptyCall(ctx, &testpb.Empty{})
testServiceClient.EmptyCall(ctx, &testpb.Empty{})

// Run the interval algorithm.
select {
case <-ctx.Done():
t.Fatal("Timed out waiting for the outlier detection LB policy to be built.")
case od := <-lbChan:
od.intervalTimerAlgorithm()
}

shortCtx, shortCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer shortCancel()
testutils.AwaitNoStateChange(shortCtx, t, cc, connectivity.Ready)
}
Original file line number Diff line number Diff line change
Expand Up @@ -383,69 +383,3 @@ func (s) TestNoopConfiguration(t *testing.T) {
t.Fatalf("error in expected round robin: %v", err)
}
}

// Test verifies that outlier detection doesn't eject subchannels created by
// the new pickfirst balancer when pickfirst is a non-leaf policy, i.e. not
// under a petiole policy. When pickfirst is not under a petiole policy, it will
// not register a health listener. pickfirst will still set the address
// attribute to disable ejection through the raw connectivity listener. When
// Outlier Detection processes a health update and sees the health listener is
// enabled but a health listener is not registered, it will drop the ejection
// update.
func (s) TestPickFirstHealthListenerDisabled(t *testing.T) {
backend := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
return nil, errors.New("some error")
},
}
if err := backend.StartServer(); err != nil {
t.Fatalf("Failed to start backend: %v", err)
}
defer backend.Stop()
t.Logf("Started bad TestService backend at: %q", backend.Address)

countingODServiceConfigJSON := fmt.Sprintf(`
{
"loadBalancingConfig": [
{
"outlier_detection_experimental": {
"interval": "0.025s",
"baseEjectionTime": "0.100s",
"maxEjectionTime": "300s",
"failurePercentageEjection": {
"threshold": 50,
"enforcementPercentage": 100,
"minimumHosts": 0,
"requestVolume": 2
},
"childPolicy": [{"%s": {}}]
}
}
]
}`, pickfirstleaf.Name)
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(countingODServiceConfigJSON)

mr := manual.NewBuilderWithScheme("od-e2e")

mr.InitialState(resolver.State{
Addresses: []resolver.Address{{Addr: backend.Address}},
ServiceConfig: sc,
})
cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient() failed: %v", err)
}
defer cc.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
testServiceClient := testgrpc.NewTestServiceClient(cc)

// Failing request should not cause ejection.
testServiceClient.EmptyCall(ctx, &testpb.Empty{})
testServiceClient.EmptyCall(ctx, &testpb.Empty{})
testServiceClient.EmptyCall(ctx, &testpb.Empty{})
// Wait for the failure rate algorithm to run once.
shortCtx, shortCancel := context.WithTimeout(ctx, 50*time.Millisecond)
defer shortCancel()
testutils.AwaitNoStateChange(shortCtx, t, cc, connectivity.Ready)
}

0 comments on commit 4c60234

Please sign in to comment.