Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

idle: use LB policy close event as a proxy for channel idleness #6628

Merged
merged 2 commits into from
Sep 13, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 79 additions & 37 deletions internal/idle/idle_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,19 @@ package idle_test

import (
"context"
"errors"
"fmt"
"io"
"strings"
"testing"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/stubserver"
Expand Down Expand Up @@ -83,26 +85,41 @@ func channelzTraceEventFound(ctx context.Context, wantDesc string) error {
return fmt.Errorf("when looking for channelz trace event with description %q, %w", wantDesc, ctx.Err())
}

// channelzTraceEventNotFound looks up the top-channels in channelz (expects a
// single one), and verifies that there is no trace event on the channel
// matching the provided description string.
func channelzTraceEventNotFound(ctx context.Context, wantDesc string) error {
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
// Registers a wrapped round_robin LB policy for the duration of this test that
// retains all the functionality of the round_robin LB policy and makes the
// balancer close event available for inspection by the test.
//
// Returns a channel that gets pinged when the balancer is closed.
func registerWrappedRoundRobinPolicy(t *testing.T) chan struct{} {
rrBuilder := balancer.Get(roundrobin.Name)
closeCh := make(chan struct{}, 1)
stub.Register(roundrobin.Name, stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
bd.Data = rrBuilder.Build(bd.ClientConn, bd.BuildOptions)
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
bal := bd.Data.(balancer.Balancer)
return bal.UpdateClientConnState(ccs)
},
Close: func(bd *stub.BalancerData) {
select {
case closeCh <- struct{}{}:
default:
}
bal := bd.Data.(balancer.Balancer)
bal.Close()
},
})
t.Cleanup(func() { balancer.Register(rrBuilder) })

err := channelzTraceEventFound(sCtx, wantDesc)
if err == nil {
return fmt.Errorf("found channelz trace event with description %q, when expected not to", wantDesc)
}
if !errors.Is(err, context.DeadlineExceeded) {
return err
}
return nil
return closeCh
}

// Tests the case where channel idleness is disabled by passing an idle_timeout
// of 0. Verifies that a READY channel with no RPCs does not move to IDLE.
func (s) TestChannelIdleness_Disabled_NoActivity(t *testing.T) {
closeCh := registerWrappedRoundRobinPolicy(t)

// Create a ClientConn with idle_timeout set to 0.
r := manual.NewBuilderWithScheme("whatever")
dopts := []grpc.DialOption{
Expand All @@ -127,24 +144,28 @@ func (s) TestChannelIdleness_Disabled_NoActivity(t *testing.T) {
defer cancel()
testutils.AwaitState(ctx, t, cc, connectivity.Ready)

// Verify that the ClientConn stay in READY.
// Verify that the ClientConn stays in READY.
sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout)
defer sCancel()
testutils.AwaitNoStateChange(sCtx, t, cc, connectivity.Ready)

// Verify that there are no idleness related channelz events.
if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil {
t.Fatal(err)
}
if err := channelzTraceEventNotFound(ctx, "exiting idle mode"); err != nil {
t.Fatal(err)
// Verify that the LB policy is not closed which is expected to happen when
// the channel enters IDLE.
sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortIdleTimeout)
defer sCancel()
select {
case <-sCtx.Done():
case <-closeCh:
t.Fatal("LB policy closed when expected not to")
}
}

// Tests the case where channel idleness is enabled by passing a small value for
// idle_timeout. Verifies that a READY channel with no RPCs moves to IDLE, and
// the connection to the backend is closed.
func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
closeCh := registerWrappedRoundRobinPolicy(t)

// Create a ClientConn with a short idle_timeout.
r := manual.NewBuilderWithScheme("whatever")
dopts := []grpc.DialOption{
Expand Down Expand Up @@ -189,6 +210,13 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
if _, err := conn.CloseCh.Receive(ctx); err != nil {
t.Fatalf("Failed when waiting for connection to be closed after channel entered IDLE: %v", err)
}

// Verify that the LB policy is closed.
select {
case <-ctx.Done():
t.Fatal("Timeout waiting for LB policy to be closed after the channel enters IDLE")
case <-closeCh:
}
}

// Tests the case where channel idleness is enabled by passing a small value for
Expand Down Expand Up @@ -224,6 +252,8 @@ func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
closeCh := registerWrappedRoundRobinPolicy(t)

// Create a ClientConn with a short idle_timeout.
r := manual.NewBuilderWithScheme("whatever")
dopts := []grpc.DialOption{
Expand Down Expand Up @@ -277,15 +307,16 @@ func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {
return
}

// Verify that there are no idleness related channelz events.
//
// TODO: Improve the checks here. If these log strings are
// changed in the code, these checks will continue to pass.
if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil {
errCh <- err
return
// Verify that the LB policy is not closed which is expected to happen when
// the channel enters IDLE.
sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortIdleTimeout)
defer sCancel()
select {
case <-sCtx.Done():
case <-closeCh:
errCh <- fmt.Errorf("LB policy closed when expected not to")
}
errCh <- channelzTraceEventNotFound(ctx, "exiting idle mode")
errCh <- nil
}()

if err := test.makeRPC(ctx, testgrpc.NewTestServiceClient(cc)); err != nil {
Expand All @@ -308,6 +339,8 @@ func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {
// idle_timeout. Verifies that activity on a READY channel (frequent and short
// RPCs) keeps it from moving to IDLE.
func (s) TestChannelIdleness_Enabled_ActiveSinceLastCheck(t *testing.T) {
closeCh := registerWrappedRoundRobinPolicy(t)

// Create a ClientConn with a short idle_timeout.
r := manual.NewBuilderWithScheme("whatever")
dopts := []grpc.DialOption{
Expand Down Expand Up @@ -352,22 +385,24 @@ func (s) TestChannelIdleness_Enabled_ActiveSinceLastCheck(t *testing.T) {
}
}()

// Verify that the ClientConn stay in READY.
// Verify that the ClientConn stays in READY.
testutils.AwaitNoStateChange(sCtx, t, cc, connectivity.Ready)

// Verify that there are no idleness related channelz events.
if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil {
t.Fatal(err)
}
if err := channelzTraceEventNotFound(ctx, "exiting idle mode"); err != nil {
t.Fatal(err)
// Verify that the LB policy is not closed which is expected to happen when
// the channel enters IDLE.
select {
case <-sCtx.Done():
case <-closeCh:
t.Fatal("LB policy closed when expected not to")
}
}

// Tests the case where channel idleness is enabled by passing a small value for
// idle_timeout. Verifies that a READY channel with no RPCs moves to IDLE. Also
// verifies that a subsequent RPC on the IDLE channel kicks it out of IDLE.
func (s) TestChannelIdleness_Enabled_ExitIdleOnRPC(t *testing.T) {
closeCh := registerWrappedRoundRobinPolicy(t)

// Start a test backend and set the bootstrap state of the resolver to
// include this address. This will ensure that when the resolver is
// restarted when exiting idle, it will push the same address to grpc again.
Expand Down Expand Up @@ -402,6 +437,13 @@ func (s) TestChannelIdleness_Enabled_ExitIdleOnRPC(t *testing.T) {
t.Fatal(err)
}

// Verify that the LB policy is closed.
select {
case <-ctx.Done():
t.Fatal("Timeout waiting for LB policy to be closed after the channel enters IDLE")
case <-closeCh:
}

// Make an RPC and ensure that it succeeds and moves the channel back to
// READY.
client := testgrpc.NewTestServiceClient(cc)
Expand Down