Skip to content

Commit

Permalink
internal/idle: add a test that invokes ClientConn methods concurrently (
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars authored Sep 29, 2023
1 parent fd9ef72 commit 1466283
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 25 deletions.
18 changes: 12 additions & 6 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ func (cc *ClientConn) exitIdleMode() error {
return errConnClosing
}
if cc.idlenessState != ccIdlenessStateIdle {
cc.mu.Unlock()
channelz.Infof(logger, cc.channelzID, "ClientConn asked to exit idle mode, current mode is %v", cc.idlenessState)
cc.mu.Unlock()
return nil
}

Expand Down Expand Up @@ -404,13 +404,13 @@ func (cc *ClientConn) exitIdleMode() error {
// name resolver, load balancer and any subchannels.
func (cc *ClientConn) enterIdleMode() error {
cc.mu.Lock()
defer cc.mu.Unlock()

if cc.conns == nil {
cc.mu.Unlock()
return ErrClientConnClosing
}
if cc.idlenessState != ccIdlenessStateActive {
channelz.Errorf(logger, cc.channelzID, "ClientConn asked to enter idle mode, current mode is %v", cc.idlenessState)
cc.mu.Unlock()
channelz.Warningf(logger, cc.channelzID, "ClientConn asked to enter idle mode, current mode is %v", cc.idlenessState)
return nil
}

Expand All @@ -431,14 +431,14 @@ func (cc *ClientConn) enterIdleMode() error {
cc.balancerWrapper.enterIdleMode()
cc.csMgr.updateState(connectivity.Idle)
cc.idlenessState = ccIdlenessStateIdle
cc.mu.Unlock()
cc.addTraceEvent("entering idle mode")

go func() {
cc.addTraceEvent("entering idle mode")
for ac := range conns {
ac.tearDown(errConnIdling)
}
}()

return nil
}

Expand Down Expand Up @@ -804,6 +804,12 @@ func init() {
internal.SubscribeToConnectivityStateChanges = func(cc *ClientConn, s grpcsync.Subscriber) func() {
return cc.csMgr.pubSub.Subscribe(s)
}
internal.EnterIdleModeForTesting = func(cc *ClientConn) error {
return cc.enterIdleMode()
}
internal.ExitIdleModeForTesting = func(cc *ClientConn) error {
return cc.exitIdleMode()
}
}

func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
Expand Down
116 changes: 97 additions & 19 deletions internal/idle/idle_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"io"
"strings"
"sync"
"testing"
"time"

Expand All @@ -32,6 +33,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpctest"
Expand Down Expand Up @@ -132,11 +134,11 @@ func (s) TestChannelIdleness_Disabled_NoActivity(t *testing.T) {
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
defer cc.Close()

// Start a test backend and push an address update via the resolver.
backend := stubserver.StartTestService(t, nil)
t.Cleanup(backend.Stop)
defer backend.Stop()
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Verify that the ClientConn moves to READY.
Expand Down Expand Up @@ -178,12 +180,12 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
defer cc.Close()

// Start a test backend and push an address update via the resolver.
lis := testutils.NewListenerWrapper(t, nil)
backend := stubserver.StartTestService(t, &stubserver.StubServer{Listener: lis})
t.Cleanup(backend.Stop)
defer backend.Stop()
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Verify that the ClientConn moves to READY.
Expand Down Expand Up @@ -266,11 +268,10 @@ func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
defer cc.Close()

// Start a test backend which keeps a unary RPC call active by blocking on a
// channel that is closed by the test later on. Also push an address update
// via the resolver.
// Start a test backend that keeps the RPC call active by blocking
// on a channel that is closed by the test later on.
blockCh := make(chan struct{})
backend := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
Expand All @@ -285,16 +286,19 @@ func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {
if err := backend.StartServer(); err != nil {
t.Fatalf("Failed to start backend: %v", err)
}
t.Cleanup(backend.Stop)
defer backend.Stop()

// Push an address update containing the address of the above
// backend via the manual resolver.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Verify that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
testutils.AwaitState(ctx, t, cc, connectivity.Ready)

// Spawn a goroutine which checks expected state transitions and idleness
// channelz trace events.
// Spawn a goroutine to check for expected behavior while a blocking
// RPC all is made from the main test goroutine.
errCh := make(chan error, 1)
go func() {
defer close(blockCh)
Expand Down Expand Up @@ -353,11 +357,11 @@ func (s) TestChannelIdleness_Enabled_ActiveSinceLastCheck(t *testing.T) {
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
defer cc.Close()

// Start a test backend and push an address update via the resolver.
backend := stubserver.StartTestService(t, nil)
t.Cleanup(backend.Stop)
defer backend.Stop()
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Verify that the ClientConn moves to READY.
Expand Down Expand Up @@ -408,7 +412,7 @@ func (s) TestChannelIdleness_Enabled_ExitIdleOnRPC(t *testing.T) {
// restarted when exiting idle, it will push the same address to grpc again.
r := manual.NewBuilderWithScheme("whatever")
backend := stubserver.StartTestService(t, nil)
t.Cleanup(backend.Stop)
defer backend.Stop()
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Create a ClientConn with a short idle_timeout.
Expand All @@ -422,7 +426,7 @@ func (s) TestChannelIdleness_Enabled_ExitIdleOnRPC(t *testing.T) {
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
defer cc.Close()

// Verify that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
Expand Down Expand Up @@ -473,7 +477,7 @@ func (s) TestChannelIdleness_Enabled_IdleTimeoutRacesWithRPCs(t *testing.T) {
// restarted when exiting idle, it will push the same address to grpc again.
r := manual.NewBuilderWithScheme("whatever")
backend := stubserver.StartTestService(t, nil)
t.Cleanup(backend.Stop)
defer backend.Stop()
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Create a ClientConn with a short idle_timeout.
Expand All @@ -487,7 +491,7 @@ func (s) TestChannelIdleness_Enabled_IdleTimeoutRacesWithRPCs(t *testing.T) {
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
defer cc.Close()

// Verify that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
Expand Down Expand Up @@ -516,7 +520,7 @@ func (s) TestChannelIdleness_Connect(t *testing.T) {
// restarted when exiting idle, it will push the same address to grpc again.
r := manual.NewBuilderWithScheme("whatever")
backend := stubserver.StartTestService(t, nil)
t.Cleanup(backend.Stop)
defer backend.Stop()
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Create a ClientConn with a short idle_timeout.
Expand All @@ -530,7 +534,7 @@ func (s) TestChannelIdleness_Connect(t *testing.T) {
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
defer cc.Close()

// Verify that the ClientConn moves to IDLE.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
Expand All @@ -544,3 +548,77 @@ func (s) TestChannelIdleness_Connect(t *testing.T) {
// Verify that the ClientConn moves back to READY.
testutils.AwaitState(ctx, t, cc, connectivity.Ready)
}

// runFunc runs f repeatedly until the context expires.
func runFunc(ctx context.Context, f func()) {
for {
select {
case <-ctx.Done():
return
case <-time.After(10 * time.Millisecond):
f()
}
}
}

// Tests the scenario where there are concurrent calls to exit and enter idle
// mode on the ClientConn. Verifies that there is no race under this scenario.
func (s) TestChannelIdleness_RaceBetweenEnterAndExitIdleMode(t *testing.T) {
// Start a test backend and set the bootstrap state of the resolver to
// include this address. This will ensure that when the resolver is
// restarted when exiting idle, it will push the same address to grpc again.
r := manual.NewBuilderWithScheme("whatever")
backend := stubserver.StartTestService(t, nil)
defer backend.Stop()
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Create a ClientConn with a long idle_timeout. We will explicitly trigger
// entering and exiting IDLE mode from the test.
dopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(r),
grpc.WithIdleTimeout(30 * time.Minute),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"pick_first":{}}]}`),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()

enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn) error)
enterIdleFunc := func() {
if err := enterIdle(cc); err != nil {
t.Errorf("Failed to enter idle mode: %v", err)
}
}
exitIdle := internal.ExitIdleModeForTesting.(func(*grpc.ClientConn) error)
exitIdleFunc := func() {
if err := exitIdle(cc); err != nil {
t.Errorf("Failed to exit idle mode: %v", err)
}
}
// Spawn goroutines that call methods on the ClientConn to enter and exit
// idle mode concurrently for one second.
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
var wg sync.WaitGroup
wg.Add(4)
go func() {
runFunc(ctx, enterIdleFunc)
wg.Done()
}()
go func() {
runFunc(ctx, enterIdleFunc)
wg.Done()
}()
go func() {
runFunc(ctx, exitIdleFunc)
wg.Done()
}()
go func() {
runFunc(ctx, exitIdleFunc)
wg.Done()
}()
wg.Wait()
}
6 changes: 6 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ var (
// GRPCResolverSchemeExtraMetadata determines when gRPC will add extra
// metadata to RPCs.
GRPCResolverSchemeExtraMetadata string = "xds"

// EnterIdleModeForTesting gets the ClientConn to enter IDLE mode.
EnterIdleModeForTesting any // func(*grpc.ClientConn) error

// ExitIdleModeForTesting gets the ClientConn to exit IDLE mode.
ExitIdleModeForTesting any // func(*grpc.ClientConn) error
)

// HealthChecker defines the signature of the client-side LB channel health checking function.
Expand Down

0 comments on commit 1466283

Please sign in to comment.