From 16007d618a54e22196ca1f1a76f732eeb26aaab2 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 15 Sep 2023 17:48:48 +0000 Subject: [PATCH 1/3] internal/idle: add a test that invokes ClientConn methods concurrently --- clientconn.go | 18 ++++--- internal/idle/idle_e2e_test.go | 88 ++++++++++++++++++++++++++++++++-- internal/internal.go | 6 +++ 3 files changed, 101 insertions(+), 11 deletions(-) diff --git a/clientconn.go b/clientconn.go index ff7fea102288..429c389e4730 100644 --- a/clientconn.go +++ b/clientconn.go @@ -337,8 +337,8 @@ func (cc *ClientConn) exitIdleMode() error { return errConnClosing } if cc.idlenessState != ccIdlenessStateIdle { - cc.mu.Unlock() channelz.Infof(logger, cc.channelzID, "ClientConn asked to exit idle mode, current mode is %v", cc.idlenessState) + cc.mu.Unlock() return nil } @@ -404,13 +404,13 @@ func (cc *ClientConn) exitIdleMode() error { // name resolver, load balancer and any subchannels. func (cc *ClientConn) enterIdleMode() error { cc.mu.Lock() + defer cc.mu.Unlock() + if cc.conns == nil { - cc.mu.Unlock() return ErrClientConnClosing } if cc.idlenessState != ccIdlenessStateActive { - channelz.Errorf(logger, cc.channelzID, "ClientConn asked to enter idle mode, current mode is %v", cc.idlenessState) - cc.mu.Unlock() + channelz.Warningf(logger, cc.channelzID, "ClientConn asked to enter idle mode, current mode is %v", cc.idlenessState) return nil } @@ -431,14 +431,14 @@ func (cc *ClientConn) enterIdleMode() error { cc.balancerWrapper.enterIdleMode() cc.csMgr.updateState(connectivity.Idle) cc.idlenessState = ccIdlenessStateIdle - cc.mu.Unlock() + cc.addTraceEvent("entering idle mode") go func() { - cc.addTraceEvent("entering idle mode") for ac := range conns { ac.tearDown(errConnIdling) } }() + return nil } @@ -804,6 +804,12 @@ func init() { internal.SubscribeToConnectivityStateChanges = func(cc *ClientConn, s grpcsync.Subscriber) func() { return cc.csMgr.pubSub.Subscribe(s) } + internal.EnterIdleModeForTesting = func(cc *ClientConn) error { + return cc.enterIdleMode() + } + internal.ExitIdleModeForTesting = func(cc *ClientConn) error { + return cc.exitIdleMode() + } } func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) { diff --git a/internal/idle/idle_e2e_test.go b/internal/idle/idle_e2e_test.go index 9c606dd04f28..67c3e7d0ab95 100644 --- a/internal/idle/idle_e2e_test.go +++ b/internal/idle/idle_e2e_test.go @@ -23,6 +23,7 @@ import ( "fmt" "io" "strings" + "sync" "testing" "time" @@ -32,6 +33,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpctest" @@ -268,9 +270,8 @@ func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) { } t.Cleanup(func() { cc.Close() }) - // Start a test backend which keeps a unary RPC call active by blocking on a - // channel that is closed by the test later on. Also push an address update - // via the resolver. + // Start a test backend that keeps the RPC call active by blocking + // on a channel that is closed by the test later on. blockCh := make(chan struct{}) backend := &stubserver.StubServer{ EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { @@ -286,6 +287,9 @@ func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) { t.Fatalf("Failed to start backend: %v", err) } t.Cleanup(backend.Stop) + + // Push an address update containing the address of the above + // backend via the manual resolver. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) // Verify that the ClientConn moves to READY. @@ -293,8 +297,8 @@ func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) { defer cancel() testutils.AwaitState(ctx, t, cc, connectivity.Ready) - // Spawn a goroutine which checks expected state transitions and idleness - // channelz trace events. + // Spawn a goroutine to check for expected behavior while a blocking + // RPC all is made from the main test goroutine. errCh := make(chan error, 1) go func() { defer close(blockCh) @@ -544,3 +548,77 @@ func (s) TestChannelIdleness_Connect(t *testing.T) { // Verify that the ClientConn moves back to READY. testutils.AwaitState(ctx, t, cc, connectivity.Ready) } + +// runFunc runs f repeatedly until the context expires. +func runFunc(ctx context.Context, f func()) { + for { + select { + case <-ctx.Done(): + return + case <-time.After(10 * time.Millisecond): + f() + } + } +} + +// Tests the scenario where there are concurrent calls to exit and enter idle +// mode on the ClientConn. Verifies that there is no race under this scenario. +func (s) TestChannelIdleness_RaceBetweenEnterAndExitIdleMode(t *testing.T) { + // Start a test backend and set the bootstrap state of the resolver to + // include this address. This will ensure that when the resolver is + // restarted when exiting idle, it will push the same address to grpc again. + r := manual.NewBuilderWithScheme("whatever") + backend := stubserver.StartTestService(t, nil) + t.Cleanup(backend.Stop) + r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) + + // Create a ClientConn with a long idle_timeout. We will explicitly trigger + // entering and exiting IDLE mode from the test. + dopts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(r), + grpc.WithIdleTimeout(30 * time.Minute), + grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"pick_first":{}}]}`), + } + cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + t.Cleanup(func() { cc.Close() }) + + var wg sync.WaitGroup + wg.Add(4) + enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn) error) + enterIdleFunc := func() { + if err := enterIdle(cc); err != nil { + t.Errorf("Failed to enter idle mode: %v", err) + } + } + exitIdle := internal.ExitIdleModeForTesting.(func(*grpc.ClientConn) error) + exitIdleFunc := func() { + if err := exitIdle(cc); err != nil { + t.Errorf("Failed to eixt idle mode: %v", err) + } + } + // Spawn goroutines that call methods on the ClientConn to enter and exit + // idle mode concurrently for one second. + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + go func() { + defer wg.Done() + runFunc(ctx, enterIdleFunc) + }() + go func() { + defer wg.Done() + runFunc(ctx, enterIdleFunc) + }() + go func() { + defer wg.Done() + runFunc(ctx, exitIdleFunc) + }() + go func() { + defer wg.Done() + runFunc(ctx, exitIdleFunc) + }() + wg.Wait() +} diff --git a/internal/internal.go b/internal/internal.go index c8a8c76d628c..0d94c63e06e2 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -175,6 +175,12 @@ var ( // GRPCResolverSchemeExtraMetadata determines when gRPC will add extra // metadata to RPCs. GRPCResolverSchemeExtraMetadata string = "xds" + + // EnterIdleModeForTesting gets the ClientConn to enter IDLE mode. + EnterIdleModeForTesting any // func(*grpc.ClientConn) error + + // ExitIdleModeForTesting gets the ClientConn to exit IDLE mode. + ExitIdleModeForTesting any // func(*grpc.ClientConn) error ) // HealthChecker defines the signature of the client-side LB channel health checking function. From 3a996a66da67ad30fbbd47ded9dfec7d8ca5c8ae Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Mon, 25 Sep 2023 23:04:40 +0000 Subject: [PATCH 2/3] review comments first round --- internal/idle/idle_e2e_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/internal/idle/idle_e2e_test.go b/internal/idle/idle_e2e_test.go index 67c3e7d0ab95..87d37b039de3 100644 --- a/internal/idle/idle_e2e_test.go +++ b/internal/idle/idle_e2e_test.go @@ -586,8 +586,6 @@ func (s) TestChannelIdleness_RaceBetweenEnterAndExitIdleMode(t *testing.T) { } t.Cleanup(func() { cc.Close() }) - var wg sync.WaitGroup - wg.Add(4) enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn) error) enterIdleFunc := func() { if err := enterIdle(cc); err != nil { @@ -597,28 +595,30 @@ func (s) TestChannelIdleness_RaceBetweenEnterAndExitIdleMode(t *testing.T) { exitIdle := internal.ExitIdleModeForTesting.(func(*grpc.ClientConn) error) exitIdleFunc := func() { if err := exitIdle(cc); err != nil { - t.Errorf("Failed to eixt idle mode: %v", err) + t.Errorf("Failed to exit idle mode: %v", err) } } // Spawn goroutines that call methods on the ClientConn to enter and exit // idle mode concurrently for one second. ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() + var wg sync.WaitGroup + wg.Add(4) go func() { - defer wg.Done() runFunc(ctx, enterIdleFunc) + wg.Done() }() go func() { - defer wg.Done() runFunc(ctx, enterIdleFunc) + wg.Done() }() go func() { - defer wg.Done() runFunc(ctx, exitIdleFunc) + wg.Done() }() go func() { - defer wg.Done() runFunc(ctx, exitIdleFunc) + wg.Done() }() wg.Wait() } From 5de9b23ad0025d4e8a183066f61db0892a090b12 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 29 Sep 2023 20:06:32 +0000 Subject: [PATCH 3/3] switch to defers instead of t.Cleanup from the main test goroutine --- internal/idle/idle_e2e_test.go | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/internal/idle/idle_e2e_test.go b/internal/idle/idle_e2e_test.go index 87d37b039de3..d2cd9d3e3752 100644 --- a/internal/idle/idle_e2e_test.go +++ b/internal/idle/idle_e2e_test.go @@ -134,11 +134,11 @@ func (s) TestChannelIdleness_Disabled_NoActivity(t *testing.T) { if err != nil { t.Fatalf("grpc.Dial() failed: %v", err) } - t.Cleanup(func() { cc.Close() }) + defer cc.Close() // Start a test backend and push an address update via the resolver. backend := stubserver.StartTestService(t, nil) - t.Cleanup(backend.Stop) + defer backend.Stop() r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) // Verify that the ClientConn moves to READY. @@ -180,12 +180,12 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) { if err != nil { t.Fatalf("grpc.Dial() failed: %v", err) } - t.Cleanup(func() { cc.Close() }) + defer cc.Close() // Start a test backend and push an address update via the resolver. lis := testutils.NewListenerWrapper(t, nil) backend := stubserver.StartTestService(t, &stubserver.StubServer{Listener: lis}) - t.Cleanup(backend.Stop) + defer backend.Stop() r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) // Verify that the ClientConn moves to READY. @@ -268,7 +268,7 @@ func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) { if err != nil { t.Fatalf("grpc.Dial() failed: %v", err) } - t.Cleanup(func() { cc.Close() }) + defer cc.Close() // Start a test backend that keeps the RPC call active by blocking // on a channel that is closed by the test later on. @@ -286,7 +286,7 @@ func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) { if err := backend.StartServer(); err != nil { t.Fatalf("Failed to start backend: %v", err) } - t.Cleanup(backend.Stop) + defer backend.Stop() // Push an address update containing the address of the above // backend via the manual resolver. @@ -357,11 +357,11 @@ func (s) TestChannelIdleness_Enabled_ActiveSinceLastCheck(t *testing.T) { if err != nil { t.Fatalf("grpc.Dial() failed: %v", err) } - t.Cleanup(func() { cc.Close() }) + defer cc.Close() // Start a test backend and push an address update via the resolver. backend := stubserver.StartTestService(t, nil) - t.Cleanup(backend.Stop) + defer backend.Stop() r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) // Verify that the ClientConn moves to READY. @@ -412,7 +412,7 @@ func (s) TestChannelIdleness_Enabled_ExitIdleOnRPC(t *testing.T) { // restarted when exiting idle, it will push the same address to grpc again. r := manual.NewBuilderWithScheme("whatever") backend := stubserver.StartTestService(t, nil) - t.Cleanup(backend.Stop) + defer backend.Stop() r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) // Create a ClientConn with a short idle_timeout. @@ -426,7 +426,7 @@ func (s) TestChannelIdleness_Enabled_ExitIdleOnRPC(t *testing.T) { if err != nil { t.Fatalf("grpc.Dial() failed: %v", err) } - t.Cleanup(func() { cc.Close() }) + defer cc.Close() // Verify that the ClientConn moves to READY. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -477,7 +477,7 @@ func (s) TestChannelIdleness_Enabled_IdleTimeoutRacesWithRPCs(t *testing.T) { // restarted when exiting idle, it will push the same address to grpc again. r := manual.NewBuilderWithScheme("whatever") backend := stubserver.StartTestService(t, nil) - t.Cleanup(backend.Stop) + defer backend.Stop() r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) // Create a ClientConn with a short idle_timeout. @@ -491,7 +491,7 @@ func (s) TestChannelIdleness_Enabled_IdleTimeoutRacesWithRPCs(t *testing.T) { if err != nil { t.Fatalf("grpc.Dial() failed: %v", err) } - t.Cleanup(func() { cc.Close() }) + defer cc.Close() // Verify that the ClientConn moves to READY. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -520,7 +520,7 @@ func (s) TestChannelIdleness_Connect(t *testing.T) { // restarted when exiting idle, it will push the same address to grpc again. r := manual.NewBuilderWithScheme("whatever") backend := stubserver.StartTestService(t, nil) - t.Cleanup(backend.Stop) + defer backend.Stop() r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) // Create a ClientConn with a short idle_timeout. @@ -534,7 +534,7 @@ func (s) TestChannelIdleness_Connect(t *testing.T) { if err != nil { t.Fatalf("grpc.Dial() failed: %v", err) } - t.Cleanup(func() { cc.Close() }) + defer cc.Close() // Verify that the ClientConn moves to IDLE. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -569,7 +569,7 @@ func (s) TestChannelIdleness_RaceBetweenEnterAndExitIdleMode(t *testing.T) { // restarted when exiting idle, it will push the same address to grpc again. r := manual.NewBuilderWithScheme("whatever") backend := stubserver.StartTestService(t, nil) - t.Cleanup(backend.Stop) + defer backend.Stop() r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) // Create a ClientConn with a long idle_timeout. We will explicitly trigger @@ -584,7 +584,7 @@ func (s) TestChannelIdleness_RaceBetweenEnterAndExitIdleMode(t *testing.T) { if err != nil { t.Fatalf("grpc.Dial() failed: %v", err) } - t.Cleanup(func() { cc.Close() }) + defer cc.Close() enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn) error) enterIdleFunc := func() {