Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal/idle: add a test that invokes ClientConn methods concurrently #6659

Merged
merged 3 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ func (cc *ClientConn) exitIdleMode() error {
return errConnClosing
}
if cc.idlenessState != ccIdlenessStateIdle {
cc.mu.Unlock()
channelz.Infof(logger, cc.channelzID, "ClientConn asked to exit idle mode, current mode is %v", cc.idlenessState)
cc.mu.Unlock()
return nil
}

Expand Down Expand Up @@ -404,13 +404,13 @@ func (cc *ClientConn) exitIdleMode() error {
// name resolver, load balancer and any subchannels.
func (cc *ClientConn) enterIdleMode() error {
cc.mu.Lock()
defer cc.mu.Unlock()

if cc.conns == nil {
cc.mu.Unlock()
return ErrClientConnClosing
}
if cc.idlenessState != ccIdlenessStateActive {
channelz.Errorf(logger, cc.channelzID, "ClientConn asked to enter idle mode, current mode is %v", cc.idlenessState)
cc.mu.Unlock()
channelz.Warningf(logger, cc.channelzID, "ClientConn asked to enter idle mode, current mode is %v", cc.idlenessState)
return nil
}

Expand All @@ -431,14 +431,14 @@ func (cc *ClientConn) enterIdleMode() error {
cc.balancerWrapper.enterIdleMode()
cc.csMgr.updateState(connectivity.Idle)
cc.idlenessState = ccIdlenessStateIdle
cc.mu.Unlock()
cc.addTraceEvent("entering idle mode")

go func() {
cc.addTraceEvent("entering idle mode")
for ac := range conns {
ac.tearDown(errConnIdling)
}
}()

return nil
}

Expand Down Expand Up @@ -804,6 +804,12 @@ func init() {
internal.SubscribeToConnectivityStateChanges = func(cc *ClientConn, s grpcsync.Subscriber) func() {
return cc.csMgr.pubSub.Subscribe(s)
}
internal.EnterIdleModeForTesting = func(cc *ClientConn) error {
return cc.enterIdleMode()
}
internal.ExitIdleModeForTesting = func(cc *ClientConn) error {
return cc.exitIdleMode()
}
}

func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
Expand Down
116 changes: 97 additions & 19 deletions internal/idle/idle_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"io"
"strings"
"sync"
"testing"
"time"

Expand All @@ -32,6 +33,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpctest"
Expand Down Expand Up @@ -132,11 +134,11 @@ func (s) TestChannelIdleness_Disabled_NoActivity(t *testing.T) {
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
defer cc.Close()

// Start a test backend and push an address update via the resolver.
backend := stubserver.StartTestService(t, nil)
t.Cleanup(backend.Stop)
defer backend.Stop()
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Verify that the ClientConn moves to READY.
Expand Down Expand Up @@ -178,12 +180,12 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
defer cc.Close()

// Start a test backend and push an address update via the resolver.
lis := testutils.NewListenerWrapper(t, nil)
backend := stubserver.StartTestService(t, &stubserver.StubServer{Listener: lis})
t.Cleanup(backend.Stop)
defer backend.Stop()
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Verify that the ClientConn moves to READY.
Expand Down Expand Up @@ -266,11 +268,10 @@ func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
defer cc.Close()

// Start a test backend which keeps a unary RPC call active by blocking on a
// channel that is closed by the test later on. Also push an address update
// via the resolver.
// Start a test backend that keeps the RPC call active by blocking
// on a channel that is closed by the test later on.
blockCh := make(chan struct{})
backend := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
Expand All @@ -285,16 +286,19 @@ func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {
if err := backend.StartServer(); err != nil {
t.Fatalf("Failed to start backend: %v", err)
}
t.Cleanup(backend.Stop)
defer backend.Stop()

// Push an address update containing the address of the above
// backend via the manual resolver.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Verify that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
testutils.AwaitState(ctx, t, cc, connectivity.Ready)

// Spawn a goroutine which checks expected state transitions and idleness
// channelz trace events.
// Spawn a goroutine to check for expected behavior while a blocking
// RPC all is made from the main test goroutine.
errCh := make(chan error, 1)
go func() {
defer close(blockCh)
Expand Down Expand Up @@ -353,11 +357,11 @@ func (s) TestChannelIdleness_Enabled_ActiveSinceLastCheck(t *testing.T) {
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
defer cc.Close()

// Start a test backend and push an address update via the resolver.
backend := stubserver.StartTestService(t, nil)
t.Cleanup(backend.Stop)
defer backend.Stop()
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Verify that the ClientConn moves to READY.
Expand Down Expand Up @@ -408,7 +412,7 @@ func (s) TestChannelIdleness_Enabled_ExitIdleOnRPC(t *testing.T) {
// restarted when exiting idle, it will push the same address to grpc again.
r := manual.NewBuilderWithScheme("whatever")
backend := stubserver.StartTestService(t, nil)
t.Cleanup(backend.Stop)
defer backend.Stop()
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Create a ClientConn with a short idle_timeout.
Expand All @@ -422,7 +426,7 @@ func (s) TestChannelIdleness_Enabled_ExitIdleOnRPC(t *testing.T) {
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
defer cc.Close()

// Verify that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
Expand Down Expand Up @@ -473,7 +477,7 @@ func (s) TestChannelIdleness_Enabled_IdleTimeoutRacesWithRPCs(t *testing.T) {
// restarted when exiting idle, it will push the same address to grpc again.
r := manual.NewBuilderWithScheme("whatever")
backend := stubserver.StartTestService(t, nil)
t.Cleanup(backend.Stop)
defer backend.Stop()
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Create a ClientConn with a short idle_timeout.
Expand All @@ -487,7 +491,7 @@ func (s) TestChannelIdleness_Enabled_IdleTimeoutRacesWithRPCs(t *testing.T) {
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
defer cc.Close()

// Verify that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
Expand Down Expand Up @@ -516,7 +520,7 @@ func (s) TestChannelIdleness_Connect(t *testing.T) {
// restarted when exiting idle, it will push the same address to grpc again.
r := manual.NewBuilderWithScheme("whatever")
backend := stubserver.StartTestService(t, nil)
t.Cleanup(backend.Stop)
defer backend.Stop()
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Create a ClientConn with a short idle_timeout.
Expand All @@ -530,7 +534,7 @@ func (s) TestChannelIdleness_Connect(t *testing.T) {
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
defer cc.Close()

// Verify that the ClientConn moves to IDLE.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
Expand All @@ -544,3 +548,77 @@ func (s) TestChannelIdleness_Connect(t *testing.T) {
// Verify that the ClientConn moves back to READY.
testutils.AwaitState(ctx, t, cc, connectivity.Ready)
}

// runFunc runs f repeatedly until the context expires.
func runFunc(ctx context.Context, f func()) {
for {
select {
case <-ctx.Done():
return
case <-time.After(10 * time.Millisecond):
f()
}
}
}

// Tests the scenario where there are concurrent calls to exit and enter idle
// mode on the ClientConn. Verifies that there is no race under this scenario.
func (s) TestChannelIdleness_RaceBetweenEnterAndExitIdleMode(t *testing.T) {
// Start a test backend and set the bootstrap state of the resolver to
// include this address. This will ensure that when the resolver is
// restarted when exiting idle, it will push the same address to grpc again.
r := manual.NewBuilderWithScheme("whatever")
backend := stubserver.StartTestService(t, nil)
defer backend.Stop()
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Create a ClientConn with a long idle_timeout. We will explicitly trigger
// entering and exiting IDLE mode from the test.
dopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(r),
grpc.WithIdleTimeout(30 * time.Minute),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"pick_first":{}}]}`),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()

enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn) error)
enterIdleFunc := func() {
if err := enterIdle(cc); err != nil {
t.Errorf("Failed to enter idle mode: %v", err)
}
}
exitIdle := internal.ExitIdleModeForTesting.(func(*grpc.ClientConn) error)
exitIdleFunc := func() {
if err := exitIdle(cc); err != nil {
t.Errorf("Failed to exit idle mode: %v", err)
}
}
// Spawn goroutines that call methods on the ClientConn to enter and exit
// idle mode concurrently for one second.
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
var wg sync.WaitGroup
wg.Add(4)
go func() {
runFunc(ctx, enterIdleFunc)
wg.Done()
}()
Comment on lines +607 to +610
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a for loop around this instead of repeating? Then you can use a const for 4.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to save only a couple of lines because I need two for loops, one for the enterIdleFunc and one for the exitIdleFunc. So, decided to leave it as is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You weren't code golfing hard enough, then!:

for _, f := range []func(){enterIdleFunc, enterIdleFunc, exitIdleFunc, exitIdleFunc} {
	go func() {
		runFunc(ctx, f)
		wg.Done
	}()
}

go func() {
runFunc(ctx, enterIdleFunc)
wg.Done()
}()
go func() {
runFunc(ctx, exitIdleFunc)
wg.Done()
}()
go func() {
runFunc(ctx, exitIdleFunc)
wg.Done()
}()
wg.Wait()
}
6 changes: 6 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ var (
// GRPCResolverSchemeExtraMetadata determines when gRPC will add extra
// metadata to RPCs.
GRPCResolverSchemeExtraMetadata string = "xds"

// EnterIdleModeForTesting gets the ClientConn to enter IDLE mode.
EnterIdleModeForTesting any // func(*grpc.ClientConn) error

// ExitIdleModeForTesting gets the ClientConn to exit IDLE mode.
ExitIdleModeForTesting any // func(*grpc.ClientConn) error
)

// HealthChecker defines the signature of the client-side LB channel health checking function.
Expand Down