From 28993119b0a90cf05e3d1bbb121e2d939efe5897 Mon Sep 17 00:00:00 2001 From: Pasupuleti Sravani Date: Thu, 19 Dec 2024 06:34:44 +0000 Subject: [PATCH 01/19] Modify tests to use stubserver --- test/creds_test.go | 20 ++++++---- test/goaway_test.go | 76 +++++++++++++++++++++++-------------- test/healthcheck_test.go | 24 +++++++----- test/insecure_creds_test.go | 58 +++++++++++++--------------- test/local_creds_test.go | 30 +++++++-------- 5 files changed, 115 insertions(+), 93 deletions(-) diff --git a/test/creds_test.go b/test/creds_test.go index ff3818e0ad54..2f0f5457b0b2 100644 --- a/test/creds_test.go +++ b/test/creds_test.go @@ -32,6 +32,7 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/metadata" "google.golang.org/grpc/resolver" @@ -431,10 +432,12 @@ func (s) TestCredsHandshakeAuthority(t *testing.T) { t.Fatal(err) } cred := &authorityCheckCreds{} - s := grpc.NewServer() - go s.Serve(lis) - defer s.Stop() - + stub := &stubserver.StubServer{ + Listener: lis, + S: grpc.NewServer(), + } + stubserver.StartTestService(t, stub) + defer stub.S.Stop() r := manual.NewBuilderWithScheme("whatever") cc, err := grpc.Dial(r.Scheme()+":///"+testAuthority, grpc.WithTransportCredentials(cred), grpc.WithResolvers(r)) @@ -463,10 +466,13 @@ func (s) TestCredsHandshakeServerNameAuthority(t *testing.T) { t.Fatal(err) } cred := &authorityCheckCreds{} - s := grpc.NewServer() - go s.Serve(lis) - defer s.Stop() + stub := &stubserver.StubServer{ + Listener: lis, + S: grpc.NewServer(), + } + stubserver.StartTestService(t, stub) + defer stub.S.Stop() r := manual.NewBuilderWithScheme("whatever") cc, err := grpc.Dial(r.Scheme()+":///"+testAuthority, grpc.WithTransportCredentials(cred), grpc.WithResolvers(r)) diff --git a/test/goaway_test.go b/test/goaway_test.go index 65d2cc02d05a..fc9c8ebda143 100644 --- a/test/goaway_test.go +++ b/test/goaway_test.go @@ -57,21 +57,20 @@ func (s) TestGracefulClientOnGoAway(t *testing.T) { const maxConnAge = 100 * time.Millisecond const testTime = maxConnAge * 10 + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Failed to create listener: %v", err) + } + ss := &stubserver.StubServer{ + Listener: lis, EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, + S: grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{MaxConnectionAge: maxConnAge})), } - - s := grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{MaxConnectionAge: maxConnAge})) - defer s.Stop() - testgrpc.RegisterTestServiceServer(s, ss) - - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("Failed to create listener: %v", err) - } - go s.Serve(lis) + defer ss.S.Stop() + stubserver.StartTestService(t, ss) cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { @@ -551,13 +550,13 @@ func (s) TestGoAwayThenClose(t *testing.T) { if err != nil { t.Fatalf("Error while listening. Err: %v", err) } - s1 := grpc.NewServer() - defer s1.Stop() - ts := &funcServer{ - unaryCall: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + + ss := &stubserver.StubServer{ + Listener: lis1, + UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, - fullDuplexCall: func(stream testgrpc.TestService_FullDuplexCallServer) error { + FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { if err := stream.Send(&testpb.StreamingOutputCallResponse{}); err != nil { t.Errorf("unexpected error from send: %v", err) return err @@ -569,18 +568,37 @@ func (s) TestGoAwayThenClose(t *testing.T) { } return err }, + S: grpc.NewServer(), } - testgrpc.RegisterTestServiceServer(s1, ts) - go s1.Serve(lis1) + defer ss.S.Stop() + stubserver.StartTestService(t, ss) conn2Established := grpcsync.NewEvent() lis2, err := listenWithNotifyingListener("tcp", "localhost:0", conn2Established) if err != nil { t.Fatalf("Error while listening. Err: %v", err) } - s2 := grpc.NewServer() - defer s2.Stop() - testgrpc.RegisterTestServiceServer(s2, ts) + ss2 := &stubserver.StubServer{ + Listener: lis2, + UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{}, nil + }, + FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { + if err := stream.Send(&testpb.StreamingOutputCallResponse{}); err != nil { + t.Errorf("unexpected error from send: %v", err) + return err + } + // Wait forever. + _, err := stream.Recv() + if err == nil { + t.Error("expected to never receive any message") + } + return err + }, + S: grpc.NewServer(), + } + defer ss2.S.Stop() + stubserver.StartTestService(t, ss2) r := manual.NewBuilderWithScheme("whatever") r.InitialState(resolver.State{Addresses: []resolver.Address{ @@ -613,10 +631,8 @@ func (s) TestGoAwayThenClose(t *testing.T) { t.Fatalf("unexpected error from first recv: %v", err) } - go s2.Serve(lis2) - t.Log("Gracefully stopping server 1.") - go s1.GracefulStop() + go ss.S.GracefulStop() t.Log("Waiting for the ClientConn to enter IDLE state.") testutils.AwaitState(ctx, t, cc, connectivity.Idle) @@ -637,7 +653,7 @@ func (s) TestGoAwayThenClose(t *testing.T) { lis2.Close() t.Log("Hard closing connection 1.") - s1.Stop() + ss.S.Stop() t.Log("Waiting for the first stream to error.") if _, err = stream.Recv(); err == nil { @@ -716,10 +732,12 @@ func (s) TestTwoGoAwayPingFrames(t *testing.T) { t.Fatalf("Failed to listen: %v", err) } defer lis.Close() - s := grpc.NewServer() - defer s.Stop() - go s.Serve(lis) - + ss := &stubserver.StubServer{ + Listener: lis, + S: grpc.NewServer(), + } + stubserver.StartTestService(t, ss) + defer ss.S.Stop() conn, err := net.DialTimeout("tcp", lis.Addr().String(), defaultTestTimeout) if err != nil { t.Fatalf("Failed to dial: %v", err) @@ -745,7 +763,7 @@ func (s) TestTwoGoAwayPingFrames(t *testing.T) { }() gsDone := testutils.NewChannel() go func() { - s.GracefulStop() + ss.S.GracefulStop() gsDone.Send(nil) }() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) diff --git a/test/healthcheck_test.go b/test/healthcheck_test.go index 424682d09625..ceab68fd80b3 100644 --- a/test/healthcheck_test.go +++ b/test/healthcheck_test.go @@ -35,6 +35,7 @@ import ( "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" @@ -150,12 +151,15 @@ func setupServer(t *testing.T, watchFunc healthWatchFunc) (*grpc.Server, net.Lis } else { ts = newTestHealthServer() } - s := grpc.NewServer() - healthgrpc.RegisterHealthServer(s, ts) - testgrpc.RegisterTestServiceServer(s, &testServer{}) - go s.Serve(lis) - t.Cleanup(func() { s.Stop() }) - return s, lis, ts + + stub := &stubserver.StubServer{ + Listener: lis, + S: grpc.NewServer(), + } + healthgrpc.RegisterHealthServer(stub.S, ts) + stubserver.StartTestService(t, stub) + t.Cleanup(func() { stub.Stop() }) + return stub.S.(*grpc.Server), lis, ts } type clientConfig struct { @@ -250,13 +254,15 @@ func (s) TestHealthCheckWatchStateChange(t *testing.T) { // If Watch returns Unimplemented, then the ClientConn should go into READY state. func (s) TestHealthCheckHealthServerNotRegistered(t *testing.T) { grpctest.TLogger.ExpectError("Subchannel health check is unimplemented at server side, thus health check is disabled") - s := grpc.NewServer() lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("failed to listen due to err: %v", err) } - go s.Serve(lis) - defer s.Stop() + stub := &stubserver.StubServer{ + Listener: lis, + S: grpc.NewServer(), + } + defer stub.S.Stop() cc, r := setupClient(t, nil) r.UpdateState(resolver.State{ diff --git a/test/insecure_creds_test.go b/test/insecure_creds_test.go index 9f6c8b594708..3052c8fdad2a 100644 --- a/test/insecure_creds_test.go +++ b/test/insecure_creds_test.go @@ -83,7 +83,13 @@ func (s) TestInsecureCreds(t *testing.T) { for _, test := range tests { t.Run(test.desc, func(t *testing.T) { + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("net.Listen(tcp, localhost:0) failed: %v", err) + } + ss := &stubserver.StubServer{ + Listener: lis, EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { if !test.serverInsecureCreds { return &testpb.Empty{}, nil @@ -104,22 +110,14 @@ func (s) TestInsecureCreds(t *testing.T) { return &testpb.Empty{}, nil }, } - - sOpts := []grpc.ServerOption{} if test.serverInsecureCreds { - sOpts = append(sOpts, grpc.Creds(insecure.NewCredentials())) - } - s := grpc.NewServer(sOpts...) - defer s.Stop() - - testgrpc.RegisterTestServiceServer(s, ss) - - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("net.Listen(tcp, localhost:0) failed: %v", err) + ss.S = grpc.NewServer(grpc.Creds(insecure.NewCredentials())) + } else { + ss.S = grpc.NewServer() } + defer ss.S.Stop() - go s.Serve(lis) + stubserver.StartTestService(t, ss) addr := lis.Addr().String() opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} @@ -143,21 +141,21 @@ func (s) TestInsecureCreds(t *testing.T) { } func (s) TestInsecureCreds_WithPerRPCCredentials_AsCallOption(t *testing.T) { + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("net.Listen(tcp, localhost:0) failed: %v", err) + } + ss := &stubserver.StubServer{ + Listener: lis, EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, + S: grpc.NewServer(grpc.Creds(insecure.NewCredentials())), } - s := grpc.NewServer(grpc.Creds(insecure.NewCredentials())) - defer s.Stop() - testgrpc.RegisterTestServiceServer(s, ss) - - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("net.Listen(tcp, localhost:0) failed: %v", err) - } - go s.Serve(lis) + defer ss.S.Stop() + stubserver.StartTestService(t, ss) addr := lis.Addr().String() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -179,21 +177,19 @@ func (s) TestInsecureCreds_WithPerRPCCredentials_AsCallOption(t *testing.T) { } func (s) TestInsecureCreds_WithPerRPCCredentials_AsDialOption(t *testing.T) { + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("net.Listen(tcp, localhost:0) failed: %v", err) + } ss := &stubserver.StubServer{ EmptyCallF: func(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, + S: grpc.NewServer(grpc.Creds(insecure.NewCredentials())), } - s := grpc.NewServer(grpc.Creds(insecure.NewCredentials())) - defer s.Stop() - testgrpc.RegisterTestServiceServer(s, ss) - - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("net.Listen(tcp, localhost:0) failed: %v", err) - } - go s.Serve(lis) + defer ss.S.Stop() + stubserver.StartTestService(t, ss) addr := lis.Addr().String() dopts := []grpc.DialOption{ diff --git a/test/local_creds_test.go b/test/local_creds_test.go index 241f6306e2b4..e936c4448290 100644 --- a/test/local_creds_test.go +++ b/test/local_creds_test.go @@ -41,7 +41,13 @@ import ( ) func testLocalCredsE2ESucceed(network, address string) error { + + lis, err := net.Listen(network, address) + if err != nil { + return fmt.Errorf("Failed to create listener: %v", err) + } ss := &stubserver.StubServer{ + Listener: lis, EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { pr, ok := peer.FromContext(ctx) if !ok { @@ -69,20 +75,12 @@ func testLocalCredsE2ESucceed(network, address string) error { } return &testpb.Empty{}, nil }, + S: grpc.NewServer(grpc.Creds(local.NewCredentials())), } - sopts := []grpc.ServerOption{grpc.Creds(local.NewCredentials())} - s := grpc.NewServer(sopts...) - defer s.Stop() - - testgrpc.RegisterTestServiceServer(s, ss) + defer ss.S.Stop() - lis, err := net.Listen(network, address) - if err != nil { - return fmt.Errorf("Failed to create listener: %v", err) - } - - go s.Serve(lis) + stubserver.StartTestService(nil, ss) var cc *grpc.ClientConn lisAddr := lis.Addr().String() @@ -167,13 +165,11 @@ func testLocalCredsE2EFail(dopts []grpc.DialOption) error { EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, + S: grpc.NewServer(grpc.Creds(local.NewCredentials())), } - sopts := []grpc.ServerOption{grpc.Creds(local.NewCredentials())} - s := grpc.NewServer(sopts...) - defer s.Stop() - - testgrpc.RegisterTestServiceServer(s, ss) + defer ss.S.Stop() + stubserver.StartTestService(nil, ss) lis, err := net.Listen("tcp", "localhost:0") if err != nil { @@ -190,7 +186,7 @@ func testLocalCredsE2EFail(dopts []grpc.DialOption) error { Zone: "", } - go s.Serve(spoofListener(lis, fakeClientAddr)) + go ss.S.Serve(spoofListener(lis, fakeClientAddr)) cc, err := grpc.NewClient(lis.Addr().String(), append(dopts, grpc.WithDialer(spoofDialer(fakeServerAddr)))...) if err != nil { From a2ae0e4bda54e782d37e1affee6e159a8b08f6f9 Mon Sep 17 00:00:00 2001 From: Pasupuleti Sravani Date: Thu, 19 Dec 2024 10:55:42 +0000 Subject: [PATCH 02/19] Added emptycall to stubserver --- test/creds_test.go | 10 ++++++++-- test/goaway_test.go | 5 ++++- test/healthcheck_test.go | 10 ++++++++-- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/test/creds_test.go b/test/creds_test.go index 2f0f5457b0b2..b4ea0e13d4cb 100644 --- a/test/creds_test.go +++ b/test/creds_test.go @@ -434,7 +434,10 @@ func (s) TestCredsHandshakeAuthority(t *testing.T) { cred := &authorityCheckCreds{} stub := &stubserver.StubServer{ Listener: lis, - S: grpc.NewServer(), + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil + }, + S: grpc.NewServer(), } stubserver.StartTestService(t, stub) defer stub.S.Stop() @@ -469,7 +472,10 @@ func (s) TestCredsHandshakeServerNameAuthority(t *testing.T) { stub := &stubserver.StubServer{ Listener: lis, - S: grpc.NewServer(), + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil + }, + S: grpc.NewServer(), } stubserver.StartTestService(t, stub) defer stub.S.Stop() diff --git a/test/goaway_test.go b/test/goaway_test.go index fc9c8ebda143..ce28af21b692 100644 --- a/test/goaway_test.go +++ b/test/goaway_test.go @@ -734,7 +734,10 @@ func (s) TestTwoGoAwayPingFrames(t *testing.T) { defer lis.Close() ss := &stubserver.StubServer{ Listener: lis, - S: grpc.NewServer(), + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil + }, + S: grpc.NewServer(), } stubserver.StartTestService(t, ss) defer ss.S.Stop() diff --git a/test/healthcheck_test.go b/test/healthcheck_test.go index ceab68fd80b3..a49a159af0b8 100644 --- a/test/healthcheck_test.go +++ b/test/healthcheck_test.go @@ -154,7 +154,10 @@ func setupServer(t *testing.T, watchFunc healthWatchFunc) (*grpc.Server, net.Lis stub := &stubserver.StubServer{ Listener: lis, - S: grpc.NewServer(), + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil + }, + S: grpc.NewServer(), } healthgrpc.RegisterHealthServer(stub.S, ts) stubserver.StartTestService(t, stub) @@ -260,7 +263,10 @@ func (s) TestHealthCheckHealthServerNotRegistered(t *testing.T) { } stub := &stubserver.StubServer{ Listener: lis, - S: grpc.NewServer(), + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil + }, + S: grpc.NewServer(), } defer stub.S.Stop() From 71cfc96c4e0427dfe484a4f8048ffd06efa7fe74 Mon Sep 17 00:00:00 2001 From: Pasupuleti Sravani Date: Thu, 19 Dec 2024 11:12:32 +0000 Subject: [PATCH 03/19] fixing vet issues --- test/creds_test.go | 4 ++-- test/goaway_test.go | 2 +- test/healthcheck_test.go | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/test/creds_test.go b/test/creds_test.go index b4ea0e13d4cb..7d45d5e07ab5 100644 --- a/test/creds_test.go +++ b/test/creds_test.go @@ -434,7 +434,7 @@ func (s) TestCredsHandshakeAuthority(t *testing.T) { cred := &authorityCheckCreds{} stub := &stubserver.StubServer{ Listener: lis, - EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + EmptyCallF: func(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, S: grpc.NewServer(), @@ -472,7 +472,7 @@ func (s) TestCredsHandshakeServerNameAuthority(t *testing.T) { stub := &stubserver.StubServer{ Listener: lis, - EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + EmptyCallF: func(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, S: grpc.NewServer(), diff --git a/test/goaway_test.go b/test/goaway_test.go index ce28af21b692..7217d867876a 100644 --- a/test/goaway_test.go +++ b/test/goaway_test.go @@ -734,7 +734,7 @@ func (s) TestTwoGoAwayPingFrames(t *testing.T) { defer lis.Close() ss := &stubserver.StubServer{ Listener: lis, - EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + EmptyCallF: func(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, S: grpc.NewServer(), diff --git a/test/healthcheck_test.go b/test/healthcheck_test.go index a49a159af0b8..b3714daf03dd 100644 --- a/test/healthcheck_test.go +++ b/test/healthcheck_test.go @@ -154,7 +154,7 @@ func setupServer(t *testing.T, watchFunc healthWatchFunc) (*grpc.Server, net.Lis stub := &stubserver.StubServer{ Listener: lis, - EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + EmptyCallF: func(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, S: grpc.NewServer(), @@ -263,7 +263,7 @@ func (s) TestHealthCheckHealthServerNotRegistered(t *testing.T) { } stub := &stubserver.StubServer{ Listener: lis, - EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + EmptyCallF: func(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, S: grpc.NewServer(), From 2a93d5bbd9883b8a70805b63a362a4082db4e22b Mon Sep 17 00:00:00 2001 From: Pasupuleti Sravani Date: Thu, 19 Dec 2024 12:18:19 +0000 Subject: [PATCH 04/19] removing empty spaces --- test/local_creds_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/test/local_creds_test.go b/test/local_creds_test.go index e936c4448290..c087db08c374 100644 --- a/test/local_creds_test.go +++ b/test/local_creds_test.go @@ -167,7 +167,6 @@ func testLocalCredsE2EFail(dopts []grpc.DialOption) error { }, S: grpc.NewServer(grpc.Creds(local.NewCredentials())), } - defer ss.S.Stop() stubserver.StartTestService(nil, ss) From a57bfbddf677c64fab55b650e2b7bb8914b0015e Mon Sep 17 00:00:00 2001 From: Pasupuleti Sravani Date: Fri, 20 Dec 2024 10:30:53 +0000 Subject: [PATCH 05/19] reverting fullduplexcall --- test/goaway_test.go | 79 +++++++++++++++++---------------------------- 1 file changed, 29 insertions(+), 50 deletions(-) diff --git a/test/goaway_test.go b/test/goaway_test.go index 7217d867876a..65d2cc02d05a 100644 --- a/test/goaway_test.go +++ b/test/goaway_test.go @@ -57,20 +57,21 @@ func (s) TestGracefulClientOnGoAway(t *testing.T) { const maxConnAge = 100 * time.Millisecond const testTime = maxConnAge * 10 - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("Failed to create listener: %v", err) - } - ss := &stubserver.StubServer{ - Listener: lis, EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, - S: grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{MaxConnectionAge: maxConnAge})), } - defer ss.S.Stop() - stubserver.StartTestService(t, ss) + + s := grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{MaxConnectionAge: maxConnAge})) + defer s.Stop() + testgrpc.RegisterTestServiceServer(s, ss) + + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Failed to create listener: %v", err) + } + go s.Serve(lis) cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { @@ -550,13 +551,13 @@ func (s) TestGoAwayThenClose(t *testing.T) { if err != nil { t.Fatalf("Error while listening. Err: %v", err) } - - ss := &stubserver.StubServer{ - Listener: lis1, - UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + s1 := grpc.NewServer() + defer s1.Stop() + ts := &funcServer{ + unaryCall: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, - FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { + fullDuplexCall: func(stream testgrpc.TestService_FullDuplexCallServer) error { if err := stream.Send(&testpb.StreamingOutputCallResponse{}); err != nil { t.Errorf("unexpected error from send: %v", err) return err @@ -568,37 +569,18 @@ func (s) TestGoAwayThenClose(t *testing.T) { } return err }, - S: grpc.NewServer(), } - defer ss.S.Stop() - stubserver.StartTestService(t, ss) + testgrpc.RegisterTestServiceServer(s1, ts) + go s1.Serve(lis1) conn2Established := grpcsync.NewEvent() lis2, err := listenWithNotifyingListener("tcp", "localhost:0", conn2Established) if err != nil { t.Fatalf("Error while listening. Err: %v", err) } - ss2 := &stubserver.StubServer{ - Listener: lis2, - UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { - return &testpb.SimpleResponse{}, nil - }, - FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { - if err := stream.Send(&testpb.StreamingOutputCallResponse{}); err != nil { - t.Errorf("unexpected error from send: %v", err) - return err - } - // Wait forever. - _, err := stream.Recv() - if err == nil { - t.Error("expected to never receive any message") - } - return err - }, - S: grpc.NewServer(), - } - defer ss2.S.Stop() - stubserver.StartTestService(t, ss2) + s2 := grpc.NewServer() + defer s2.Stop() + testgrpc.RegisterTestServiceServer(s2, ts) r := manual.NewBuilderWithScheme("whatever") r.InitialState(resolver.State{Addresses: []resolver.Address{ @@ -631,8 +613,10 @@ func (s) TestGoAwayThenClose(t *testing.T) { t.Fatalf("unexpected error from first recv: %v", err) } + go s2.Serve(lis2) + t.Log("Gracefully stopping server 1.") - go ss.S.GracefulStop() + go s1.GracefulStop() t.Log("Waiting for the ClientConn to enter IDLE state.") testutils.AwaitState(ctx, t, cc, connectivity.Idle) @@ -653,7 +637,7 @@ func (s) TestGoAwayThenClose(t *testing.T) { lis2.Close() t.Log("Hard closing connection 1.") - ss.S.Stop() + s1.Stop() t.Log("Waiting for the first stream to error.") if _, err = stream.Recv(); err == nil { @@ -732,15 +716,10 @@ func (s) TestTwoGoAwayPingFrames(t *testing.T) { t.Fatalf("Failed to listen: %v", err) } defer lis.Close() - ss := &stubserver.StubServer{ - Listener: lis, - EmptyCallF: func(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) { - return &testpb.Empty{}, nil - }, - S: grpc.NewServer(), - } - stubserver.StartTestService(t, ss) - defer ss.S.Stop() + s := grpc.NewServer() + defer s.Stop() + go s.Serve(lis) + conn, err := net.DialTimeout("tcp", lis.Addr().String(), defaultTestTimeout) if err != nil { t.Fatalf("Failed to dial: %v", err) @@ -766,7 +745,7 @@ func (s) TestTwoGoAwayPingFrames(t *testing.T) { }() gsDone := testutils.NewChannel() go func() { - ss.S.GracefulStop() + s.GracefulStop() gsDone.Send(nil) }() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) From 5a8228bd72c328b9e75f79bb08450c3eb898487e Mon Sep 17 00:00:00 2001 From: Pasupuleti Sravani Date: Mon, 30 Dec 2024 08:35:31 +0000 Subject: [PATCH 06/19] resolving conflicts --- test/goaway_test.go | 69 ++++++--- test/healthcheck_test.go | 316 ++++++++++++++++++++++++++++++--------- test/local_creds_test.go | 18 +-- 3 files changed, 298 insertions(+), 105 deletions(-) diff --git a/test/goaway_test.go b/test/goaway_test.go index 65d2cc02d05a..f8fab36e445a 100644 --- a/test/goaway_test.go +++ b/test/goaway_test.go @@ -57,21 +57,20 @@ func (s) TestGracefulClientOnGoAway(t *testing.T) { const maxConnAge = 100 * time.Millisecond const testTime = maxConnAge * 10 + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Failed to create listener: %v", err) + } + ss := &stubserver.StubServer{ + Listener: lis, EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, + S: grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{MaxConnectionAge: maxConnAge})), } - - s := grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{MaxConnectionAge: maxConnAge})) - defer s.Stop() - testgrpc.RegisterTestServiceServer(s, ss) - - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("Failed to create listener: %v", err) - } - go s.Serve(lis) + defer ss.S.Stop() + stubserver.StartTestService(t, ss) cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { @@ -551,13 +550,15 @@ func (s) TestGoAwayThenClose(t *testing.T) { if err != nil { t.Fatalf("Error while listening. Err: %v", err) } - s1 := grpc.NewServer() - defer s1.Stop() - ts := &funcServer{ - unaryCall: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + //s1 := grpc.NewServer() + //defer s1.Stop() + //ts := &funcServer{ + ss1 := &stubserver.StubServer{ + Listener: lis1, + UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil }, - fullDuplexCall: func(stream testgrpc.TestService_FullDuplexCallServer) error { + FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { if err := stream.Send(&testpb.StreamingOutputCallResponse{}); err != nil { t.Errorf("unexpected error from send: %v", err) return err @@ -569,18 +570,42 @@ func (s) TestGoAwayThenClose(t *testing.T) { } return err }, + S: grpc.NewServer(), } - testgrpc.RegisterTestServiceServer(s1, ts) - go s1.Serve(lis1) + stubserver.StartTestService(t, ss1) + defer ss1.S.Stop() + //testgrpc.RegisterTestServiceServer(s1, ts) + //go s1.Serve(lis1) conn2Established := grpcsync.NewEvent() lis2, err := listenWithNotifyingListener("tcp", "localhost:0", conn2Established) if err != nil { t.Fatalf("Error while listening. Err: %v", err) } - s2 := grpc.NewServer() + /*s2 := grpc.NewServer() defer s2.Stop() - testgrpc.RegisterTestServiceServer(s2, ts) + testgrpc.RegisterTestServiceServer(s2, ts)*/ + ss2 := &stubserver.StubServer{ + Listener: lis2, + UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{}, nil + }, + FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { + if err := stream.Send(&testpb.StreamingOutputCallResponse{}); err != nil { + t.Errorf("unexpected error from send: %v", err) + return err + } + // Wait forever. + _, err := stream.Recv() + if err == nil { + t.Error("expected to never receive any message") + } + return err + }, + S: grpc.NewServer(), + } + stubserver.StartTestService(t, ss2) + defer ss2.S.Stop() r := manual.NewBuilderWithScheme("whatever") r.InitialState(resolver.State{Addresses: []resolver.Address{ @@ -613,10 +638,8 @@ func (s) TestGoAwayThenClose(t *testing.T) { t.Fatalf("unexpected error from first recv: %v", err) } - go s2.Serve(lis2) - t.Log("Gracefully stopping server 1.") - go s1.GracefulStop() + go ss1.S.GracefulStop() t.Log("Waiting for the ClientConn to enter IDLE state.") testutils.AwaitState(ctx, t, cc, connectivity.Idle) @@ -637,7 +660,7 @@ func (s) TestGoAwayThenClose(t *testing.T) { lis2.Close() t.Log("Hard closing connection 1.") - s1.Stop() + ss1.S.Stop() t.Log("Waiting for the first stream to error.") if _, err = stream.Recv(); err == nil { diff --git a/test/healthcheck_test.go b/test/healthcheck_test.go index b3714daf03dd..c16aeb4fbddf 100644 --- a/test/healthcheck_test.go +++ b/test/healthcheck_test.go @@ -22,18 +22,24 @@ import ( "context" "errors" "fmt" + "io" "net" "sync" "testing" "time" "google.golang.org/grpc" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/pickfirst" + "google.golang.org/grpc/balancer/pickfirst/pickfirstleaf" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/health" "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/internal/channelz" + "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" @@ -47,6 +53,47 @@ import ( testpb "google.golang.org/grpc/interop/grpc_testing" ) +const healthCheckingPetiolePolicyName = "health_checking_petiole_policy" + +var ( + // healthCheckTestPolicyName is the LB policy used for testing the health check + // service. + healthCheckTestPolicyName = "round_robin" +) + +func init() { + balancer.Register(&healthCheckingPetiolePolicyBuilder{}) + // Till dualstack changes are not implemented and round_robin doesn't + // delegate to pickfirst, test a fake petiole policy that delegates to + // the new pickfirst balancer. + // TODO: https://github.com/grpc/grpc-go/issues/7906 - Remove the fake + // petiole policy one round robin starts delegating to pickfirst. + if envconfig.NewPickFirstEnabled { + healthCheckTestPolicyName = healthCheckingPetiolePolicyName + } +} + +type healthCheckingPetiolePolicyBuilder struct{} + +func (bb *healthCheckingPetiolePolicyBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + return &healthCheckingPetiolePolicy{ + Balancer: balancer.Get(pickfirstleaf.Name).Build(cc, opts), + } +} + +func (bb *healthCheckingPetiolePolicyBuilder) Name() string { + return healthCheckingPetiolePolicyName +} + +func (b *healthCheckingPetiolePolicy) UpdateClientConnState(state balancer.ClientConnState) error { + state.ResolverState = pickfirstleaf.EnableHealthListener(state.ResolverState) + return b.Balancer.UpdateClientConnState(state) +} + +type healthCheckingPetiolePolicy struct { + balancer.Balancer +} + func newTestHealthServer() *testHealthServer { return newTestHealthServerWithWatchFunc(defaultWatchFunc) } @@ -157,6 +204,25 @@ func setupServer(t *testing.T, watchFunc healthWatchFunc) (*grpc.Server, net.Lis EmptyCallF: func(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, + FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error { + for { + req, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return fmt.Errorf("error receiving from stream: %v", err) + } + t.Logf("Received message: %v", req) + resp := &testpb.StreamingOutputCallResponse{ + Payload: req.Payload, + } + if err := stream.Send(resp); err != nil { + return fmt.Errorf("error sending to stream: %v", err) + } + t.Logf("Sent response: %v", resp) + } + }, S: grpc.NewServer(), } healthgrpc.RegisterHealthServer(stub.S, ts) @@ -257,28 +323,24 @@ func (s) TestHealthCheckWatchStateChange(t *testing.T) { // If Watch returns Unimplemented, then the ClientConn should go into READY state. func (s) TestHealthCheckHealthServerNotRegistered(t *testing.T) { grpctest.TLogger.ExpectError("Subchannel health check is unimplemented at server side, thus health check is disabled") + + s := grpc.NewServer() lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("failed to listen due to err: %v", err) } - stub := &stubserver.StubServer{ - Listener: lis, - EmptyCallF: func(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) { - return &testpb.Empty{}, nil - }, - S: grpc.NewServer(), - } - defer stub.S.Stop() + go s.Serve(lis) + defer s.Stop() cc, r := setupClient(t, nil) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, - ServiceConfig: parseServiceConfig(t, r, `{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"round_robin":{}}] -}`)}) + ServiceConfig: parseServiceConfig(t, r, fmt.Sprintf(`{ + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"%s":{}}] + }`, healthCheckTestPolicyName))}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -300,12 +362,12 @@ func (s) TestHealthCheckWithGoAway(t *testing.T) { tc := testgrpc.NewTestServiceClient(cc) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, - ServiceConfig: parseServiceConfig(t, r, `{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"round_robin":{}}] -}`)}) + ServiceConfig: parseServiceConfig(t, r, fmt.Sprintf(`{ + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"%s":{}}] + }`, healthCheckTestPolicyName))}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -378,12 +440,12 @@ func (s) TestHealthCheckWithConnClose(t *testing.T) { tc := testgrpc.NewTestServiceClient(cc) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, - ServiceConfig: parseServiceConfig(t, r, `{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"round_robin":{}}] -}`)}) + ServiceConfig: parseServiceConfig(t, r, fmt.Sprintf(`{ + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"%s":{}}] + }`, healthCheckTestPolicyName))}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -426,12 +488,12 @@ func (s) TestHealthCheckWithAddrConnDrain(t *testing.T) { hcEnterChan, hcExitChan := setupHealthCheckWrapper(t) cc, r := setupClient(t, &clientConfig{}) tc := testgrpc.NewTestServiceClient(cc) - sc := parseServiceConfig(t, r, `{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"round_robin":{}}] -}`) + sc := parseServiceConfig(t, r, fmt.Sprintf(`{ + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"%s":{}}] + }`, healthCheckTestPolicyName)) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: sc, @@ -508,12 +570,12 @@ func (s) TestHealthCheckWithClientConnClose(t *testing.T) { tc := testgrpc.NewTestServiceClient(cc) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, - ServiceConfig: parseServiceConfig(t, r, `{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"round_robin":{}}] -}`)}) + ServiceConfig: parseServiceConfig(t, r, (fmt.Sprintf(`{ + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"%s":{}}] + }`, healthCheckTestPolicyName)))}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -575,12 +637,12 @@ func (s) TestHealthCheckWithoutSetConnectivityStateCalledAddrConnShutDown(t *tes // The serviceName "delay" is specially handled at server side, where response will not be sent // back to client immediately upon receiving the request (client should receive no response until // test ends). - sc := parseServiceConfig(t, r, `{ - "healthCheckConfig": { - "serviceName": "delay" - }, - "loadBalancingConfig": [{"round_robin":{}}] -}`) + sc := parseServiceConfig(t, r, fmt.Sprintf(`{ + "healthCheckConfig": { + "serviceName": "delay" + }, + "loadBalancingConfig": [{"%s":{}}] + }`, healthCheckTestPolicyName)) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: sc, @@ -640,12 +702,12 @@ func (s) TestHealthCheckWithoutSetConnectivityStateCalled(t *testing.T) { // test ends). r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, - ServiceConfig: parseServiceConfig(t, r, `{ - "healthCheckConfig": { - "serviceName": "delay" - }, - "loadBalancingConfig": [{"round_robin":{}}] -}`)}) + ServiceConfig: parseServiceConfig(t, r, fmt.Sprintf(`{ + "healthCheckConfig": { + "serviceName": "delay" + }, + "loadBalancingConfig": [{"%s":{}}] + }`, healthCheckTestPolicyName))}) select { case <-hcExitChan: @@ -678,12 +740,12 @@ func testHealthCheckDisableWithDialOption(t *testing.T, addr string) { tc := testgrpc.NewTestServiceClient(cc) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: addr}}, - ServiceConfig: parseServiceConfig(t, r, `{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"round_robin":{}}] -}`)}) + ServiceConfig: parseServiceConfig(t, r, fmt.Sprintf(`{ + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"%s":{}}] + }`, healthCheckTestPolicyName))}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -784,12 +846,12 @@ func (s) TestHealthCheckChannelzCountingCallSuccess(t *testing.T) { _, r := setupClient(t, nil) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, - ServiceConfig: parseServiceConfig(t, r, `{ - "healthCheckConfig": { - "serviceName": "channelzSuccess" - }, - "loadBalancingConfig": [{"round_robin":{}}] -}`)}) + ServiceConfig: parseServiceConfig(t, r, fmt.Sprintf(`{ + "healthCheckConfig": { + "serviceName": "channelzSuccess" + }, + "loadBalancingConfig": [{"%s":{}}] + }`, healthCheckTestPolicyName))}) if err := verifyResultWithDelay(func() (bool, error) { cm, _ := channelz.GetTopChannels(0, 0) @@ -833,12 +895,12 @@ func (s) TestHealthCheckChannelzCountingCallFailure(t *testing.T) { _, r := setupClient(t, nil) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, - ServiceConfig: parseServiceConfig(t, r, `{ - "healthCheckConfig": { - "serviceName": "channelzFailure" - }, - "loadBalancingConfig": [{"round_robin":{}}] -}`)}) + ServiceConfig: parseServiceConfig(t, r, fmt.Sprintf(`{ + "healthCheckConfig": { + "serviceName": "channelzFailure" + }, + "loadBalancingConfig": [{"%s":{}}] + }`, healthCheckTestPolicyName))}) if err := verifyResultWithDelay(func() (bool, error) { cm, _ := channelz.GetTopChannels(0, 0) @@ -947,12 +1009,12 @@ func testHealthCheckSuccess(t *testing.T, e env) { // TestHealthCheckFailure invokes the unary Check() RPC on the health server // with an expired context and expects the RPC to fail. func (s) TestHealthCheckFailure(t *testing.T) { - for _, e := range listTestEnv() { - testHealthCheckFailure(t, e) + e := env{ + name: "tcp-tls", + network: "tcp", + security: "tls", + balancer: healthCheckTestPolicyName, } -} - -func testHealthCheckFailure(t *testing.T, e env) { te := newTest(t, e) te.declareLogNoise( "Failed to dial ", @@ -1178,3 +1240,111 @@ func testHealthCheckServingStatus(t *testing.T, e env) { te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_NOT_SERVING) verifyHealthCheckStatus(t, 1*time.Second, cc, defaultHealthService, healthpb.HealthCheckResponse_NOT_SERVING) } + +// Test verifies that registering a nil health listener closes the health +// client. +func (s) TestHealthCheckUnregisterHealthListener(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + hcEnterChan, hcExitChan := setupHealthCheckWrapper(t) + scChan := make(chan balancer.SubConn, 1) + readyUpdateReceivedCh := make(chan struct{}) + bf := stub.BalancerFuncs{ + Init: func(bd *stub.BalancerData) { + cc := bd.ClientConn + ccw := &subConnStoringCCWrapper{ + ClientConn: cc, + scChan: scChan, + stateListener: func(scs balancer.SubConnState) { + if scs.ConnectivityState != connectivity.Ready { + return + } + close(readyUpdateReceivedCh) + }, + } + bd.Data = balancer.Get(pickfirst.Name).Build(ccw, bd.BuildOptions) + }, + Close: func(bd *stub.BalancerData) { + bd.Data.(balancer.Balancer).Close() + }, + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + return bd.Data.(balancer.Balancer).UpdateClientConnState(ccs) + }, + } + + stub.Register(t.Name(), bf) + _, lis, ts := setupServer(t, nil) + ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING) + + _, r := setupClient(t, nil) + svcCfg := fmt.Sprintf(`{ + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"%s":{}}] + }`, t.Name()) + r.UpdateState(resolver.State{ + Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, + ServiceConfig: parseServiceConfig(t, r, svcCfg)}) + + var sc balancer.SubConn + select { + case sc = <-scChan: + case <-ctx.Done(): + t.Fatal("Context timed out waiting for SubConn creation") + } + + // Wait for the SubConn to enter READY. + select { + case <-readyUpdateReceivedCh: + case <-ctx.Done(): + t.Fatalf("Context timed out waiting for SubConn to enter READY") + } + + // Health check should start only after a health listener is registered. + select { + case <-hcEnterChan: + t.Fatalf("Health service client created prematurely.") + case <-time.After(defaultTestShortTimeout): + } + + // Register a health listener and verify it receives updates. + healthChan := make(chan balancer.SubConnState, 1) + sc.RegisterHealthListener(func(scs balancer.SubConnState) { + healthChan <- scs + }) + + select { + case <-hcEnterChan: + case <-ctx.Done(): + t.Fatalf("Context timed out waiting for health check to begin.") + } + + for readyReceived := false; !readyReceived; { + select { + case scs := <-healthChan: + t.Logf("Received health update: %v", scs) + readyReceived = scs.ConnectivityState == connectivity.Ready + case <-ctx.Done(): + t.Fatalf("Context timed out waiting for healthy backend.") + } + } + + // Registering a nil listener should invalidate the previously registered + // listener and close the health service client. + sc.RegisterHealthListener(nil) + select { + case <-hcExitChan: + case <-ctx.Done(): + t.Fatalf("Context timed out waiting for the health client to close.") + } + + ts.SetServingStatus("foo", healthpb.HealthCheckResponse_NOT_SERVING) + + // No updates should be received on the listener. + select { + case scs := <-healthChan: + t.Fatalf("Received unexpected health update on the listener: %v", scs) + case <-time.After(defaultTestShortTimeout): + } +} diff --git a/test/local_creds_test.go b/test/local_creds_test.go index c087db08c374..460e86d81756 100644 --- a/test/local_creds_test.go +++ b/test/local_creds_test.go @@ -40,7 +40,7 @@ import ( testpb "google.golang.org/grpc/interop/grpc_testing" ) -func testLocalCredsE2ESucceed(network, address string) error { +func testLocalCredsE2ESucceed(t *testing.T, network, address string) error { lis, err := net.Listen(network, address) if err != nil { @@ -80,7 +80,7 @@ func testLocalCredsE2ESucceed(network, address string) error { defer ss.S.Stop() - stubserver.StartTestService(nil, ss) + stubserver.StartTestService(t, ss) var cc *grpc.ClientConn lisAddr := lis.Addr().String() @@ -112,14 +112,14 @@ func testLocalCredsE2ESucceed(network, address string) error { } func (s) TestLocalCredsLocalhost(t *testing.T) { - if err := testLocalCredsE2ESucceed("tcp", "localhost:0"); err != nil { + if err := testLocalCredsE2ESucceed(t, "tcp", "localhost:0"); err != nil { t.Fatalf("Failed e2e test for localhost: %v", err) } } func (s) TestLocalCredsUDS(t *testing.T) { addr := fmt.Sprintf("/tmp/grpc_fullstck_test%d", time.Now().UnixNano()) - if err := testLocalCredsE2ESucceed("unix", addr); err != nil { + if err := testLocalCredsE2ESucceed(t, "unix", addr); err != nil { t.Fatalf("Failed e2e test for UDS: %v", err) } } @@ -160,7 +160,7 @@ func spoofDialer(addr net.Addr) func(target string, t time.Duration) (net.Conn, } } -func testLocalCredsE2EFail(dopts []grpc.DialOption) error { +func testLocalCredsE2EFail(t *testing.T, dopts []grpc.DialOption) error { ss := &stubserver.StubServer{ EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil @@ -168,7 +168,7 @@ func testLocalCredsE2EFail(dopts []grpc.DialOption) error { S: grpc.NewServer(grpc.Creds(local.NewCredentials())), } defer ss.S.Stop() - stubserver.StartTestService(nil, ss) + stubserver.StartTestService(t, ss) lis, err := net.Listen("tcp", "localhost:0") if err != nil { @@ -205,11 +205,11 @@ func isExpected(got, want error) bool { return status.Code(got) == status.Code(want) && strings.Contains(status.Convert(got).Message(), status.Convert(want).Message()) } -func (s) TestLocalCredsClientFail(t *testing.T) { +func TestLocalCredsClientFail(t *testing.T) { // Use local creds at client-side which should lead to client-side failure. opts := []grpc.DialOption{grpc.WithTransportCredentials(local.NewCredentials())} want := status.Error(codes.Unavailable, "transport: authentication handshake failed: local credentials rejected connection to non-local address") - if err := testLocalCredsE2EFail(opts); !isExpected(err, want) { + if err := testLocalCredsE2EFail(t, opts); !isExpected(err, want) { t.Fatalf("testLocalCredsE2EFail() = %v; want %v", err, want) } } @@ -217,7 +217,7 @@ func (s) TestLocalCredsClientFail(t *testing.T) { func (s) TestLocalCredsServerFail(t *testing.T) { // Use insecure at client-side which should lead to server-side failure. opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} - if err := testLocalCredsE2EFail(opts); status.Code(err) != codes.Unavailable { + if err := testLocalCredsE2EFail(t, opts); status.Code(err) != codes.Unavailable { t.Fatalf("testLocalCredsE2EFail() = %v; want %v", err, codes.Unavailable) } } From bab2a274352eca86af6bcf1a36712383709e74c3 Mon Sep 17 00:00:00 2001 From: Pasupuleti Sravani Date: Mon, 30 Dec 2024 10:03:05 +0000 Subject: [PATCH 07/19] fixing vet issues --- test/healthcheck_test.go | 170 ++++++++++++++++----------------------- test/local_creds_test.go | 2 +- 2 files changed, 72 insertions(+), 100 deletions(-) diff --git a/test/healthcheck_test.go b/test/healthcheck_test.go index c16aeb4fbddf..a28a3f4ce9d5 100644 --- a/test/healthcheck_test.go +++ b/test/healthcheck_test.go @@ -22,7 +22,6 @@ import ( "context" "errors" "fmt" - "io" "net" "sync" "testing" @@ -41,7 +40,6 @@ import ( "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/grpctest" - "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" @@ -198,37 +196,12 @@ func setupServer(t *testing.T, watchFunc healthWatchFunc) (*grpc.Server, net.Lis } else { ts = newTestHealthServer() } - - stub := &stubserver.StubServer{ - Listener: lis, - EmptyCallF: func(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) { - return &testpb.Empty{}, nil - }, - FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error { - for { - req, err := stream.Recv() - if err == io.EOF { - return nil - } - if err != nil { - return fmt.Errorf("error receiving from stream: %v", err) - } - t.Logf("Received message: %v", req) - resp := &testpb.StreamingOutputCallResponse{ - Payload: req.Payload, - } - if err := stream.Send(resp); err != nil { - return fmt.Errorf("error sending to stream: %v", err) - } - t.Logf("Sent response: %v", resp) - } - }, - S: grpc.NewServer(), - } - healthgrpc.RegisterHealthServer(stub.S, ts) - stubserver.StartTestService(t, stub) - t.Cleanup(func() { stub.Stop() }) - return stub.S.(*grpc.Server), lis, ts + s := grpc.NewServer() + healthgrpc.RegisterHealthServer(s, ts) + testgrpc.RegisterTestServiceServer(s, &testServer{}) + go s.Serve(lis) + t.Cleanup(func() { s.Stop() }) + return s, lis, ts } type clientConfig struct { @@ -280,11 +253,11 @@ func (s) TestHealthCheckWatchStateChange(t *testing.T) { r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseServiceConfig(t, r, `{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"round_robin":{}}] -}`)}) + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"round_robin":{}}] + }`)}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -323,7 +296,6 @@ func (s) TestHealthCheckWatchStateChange(t *testing.T) { // If Watch returns Unimplemented, then the ClientConn should go into READY state. func (s) TestHealthCheckHealthServerNotRegistered(t *testing.T) { grpctest.TLogger.ExpectError("Subchannel health check is unimplemented at server side, thus health check is disabled") - s := grpc.NewServer() lis, err := net.Listen("tcp", "localhost:0") if err != nil { @@ -336,11 +308,11 @@ func (s) TestHealthCheckHealthServerNotRegistered(t *testing.T) { r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseServiceConfig(t, r, fmt.Sprintf(`{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"%s":{}}] - }`, healthCheckTestPolicyName))}) + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"%s":{}}] + }`, healthCheckTestPolicyName))}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -363,11 +335,11 @@ func (s) TestHealthCheckWithGoAway(t *testing.T) { r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseServiceConfig(t, r, fmt.Sprintf(`{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"%s":{}}] - }`, healthCheckTestPolicyName))}) + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"%s":{}}] + }`, healthCheckTestPolicyName))}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -441,11 +413,11 @@ func (s) TestHealthCheckWithConnClose(t *testing.T) { r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseServiceConfig(t, r, fmt.Sprintf(`{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"%s":{}}] - }`, healthCheckTestPolicyName))}) + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"%s":{}}] + }`, healthCheckTestPolicyName))}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -489,11 +461,11 @@ func (s) TestHealthCheckWithAddrConnDrain(t *testing.T) { cc, r := setupClient(t, &clientConfig{}) tc := testgrpc.NewTestServiceClient(cc) sc := parseServiceConfig(t, r, fmt.Sprintf(`{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"%s":{}}] - }`, healthCheckTestPolicyName)) + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"%s":{}}] + }`, healthCheckTestPolicyName)) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: sc, @@ -571,11 +543,11 @@ func (s) TestHealthCheckWithClientConnClose(t *testing.T) { r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseServiceConfig(t, r, (fmt.Sprintf(`{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"%s":{}}] - }`, healthCheckTestPolicyName)))}) + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"%s":{}}] + }`, healthCheckTestPolicyName)))}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -638,11 +610,11 @@ func (s) TestHealthCheckWithoutSetConnectivityStateCalledAddrConnShutDown(t *tes // back to client immediately upon receiving the request (client should receive no response until // test ends). sc := parseServiceConfig(t, r, fmt.Sprintf(`{ - "healthCheckConfig": { - "serviceName": "delay" - }, - "loadBalancingConfig": [{"%s":{}}] - }`, healthCheckTestPolicyName)) + "healthCheckConfig": { + "serviceName": "delay" + }, + "loadBalancingConfig": [{"%s":{}}] + }`, healthCheckTestPolicyName)) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: sc, @@ -703,11 +675,11 @@ func (s) TestHealthCheckWithoutSetConnectivityStateCalled(t *testing.T) { r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseServiceConfig(t, r, fmt.Sprintf(`{ - "healthCheckConfig": { - "serviceName": "delay" - }, - "loadBalancingConfig": [{"%s":{}}] - }`, healthCheckTestPolicyName))}) + "healthCheckConfig": { + "serviceName": "delay" + }, + "loadBalancingConfig": [{"%s":{}}] + }`, healthCheckTestPolicyName))}) select { case <-hcExitChan: @@ -741,11 +713,11 @@ func testHealthCheckDisableWithDialOption(t *testing.T, addr string) { r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: addr}}, ServiceConfig: parseServiceConfig(t, r, fmt.Sprintf(`{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"%s":{}}] - }`, healthCheckTestPolicyName))}) + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"%s":{}}] + }`, healthCheckTestPolicyName))}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -773,11 +745,11 @@ func testHealthCheckDisableWithBalancer(t *testing.T, addr string) { r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: addr}}, ServiceConfig: parseServiceConfig(t, r, `{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"pick_first":{}}] -}`)}) + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"pick_first":{}}] + }`)}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -847,11 +819,11 @@ func (s) TestHealthCheckChannelzCountingCallSuccess(t *testing.T) { r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseServiceConfig(t, r, fmt.Sprintf(`{ - "healthCheckConfig": { - "serviceName": "channelzSuccess" - }, - "loadBalancingConfig": [{"%s":{}}] - }`, healthCheckTestPolicyName))}) + "healthCheckConfig": { + "serviceName": "channelzSuccess" + }, + "loadBalancingConfig": [{"%s":{}}] + }`, healthCheckTestPolicyName))}) if err := verifyResultWithDelay(func() (bool, error) { cm, _ := channelz.GetTopChannels(0, 0) @@ -896,11 +868,11 @@ func (s) TestHealthCheckChannelzCountingCallFailure(t *testing.T) { r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseServiceConfig(t, r, fmt.Sprintf(`{ - "healthCheckConfig": { - "serviceName": "channelzFailure" - }, - "loadBalancingConfig": [{"%s":{}}] - }`, healthCheckTestPolicyName))}) + "healthCheckConfig": { + "serviceName": "channelzFailure" + }, + "loadBalancingConfig": [{"%s":{}}] + }`, healthCheckTestPolicyName))}) if err := verifyResultWithDelay(func() (bool, error) { cm, _ := channelz.GetTopChannels(0, 0) @@ -1278,11 +1250,11 @@ func (s) TestHealthCheckUnregisterHealthListener(t *testing.T) { _, r := setupClient(t, nil) svcCfg := fmt.Sprintf(`{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"%s":{}}] - }`, t.Name()) + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"%s":{}}] + }`, t.Name()) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseServiceConfig(t, r, svcCfg)}) diff --git a/test/local_creds_test.go b/test/local_creds_test.go index 460e86d81756..2e2dc0ba317d 100644 --- a/test/local_creds_test.go +++ b/test/local_creds_test.go @@ -205,7 +205,7 @@ func isExpected(got, want error) bool { return status.Code(got) == status.Code(want) && strings.Contains(status.Convert(got).Message(), status.Convert(want).Message()) } -func TestLocalCredsClientFail(t *testing.T) { +func (s) TestLocalCredsClientFail(t *testing.T) { // Use local creds at client-side which should lead to client-side failure. opts := []grpc.DialOption{grpc.WithTransportCredentials(local.NewCredentials())} want := status.Error(codes.Unavailable, "transport: authentication handshake failed: local credentials rejected connection to non-local address") From 307ae58d51f0fef5088cd3ed2034334af5414bf7 Mon Sep 17 00:00:00 2001 From: Pasupuleti Sravani Date: Mon, 30 Dec 2024 10:21:53 +0000 Subject: [PATCH 08/19] reverting healthcheck --- test/healthcheck_test.go | 284 +++++++++------------------------------ 1 file changed, 65 insertions(+), 219 deletions(-) diff --git a/test/healthcheck_test.go b/test/healthcheck_test.go index a28a3f4ce9d5..bae77c7e7dcb 100644 --- a/test/healthcheck_test.go +++ b/test/healthcheck_test.go @@ -28,17 +28,12 @@ import ( "time" "google.golang.org/grpc" - "google.golang.org/grpc/balancer" - "google.golang.org/grpc/balancer/pickfirst" - "google.golang.org/grpc/balancer/pickfirst/pickfirstleaf" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/health" "google.golang.org/grpc/internal" - "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/internal/channelz" - "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/resolver" @@ -51,47 +46,6 @@ import ( testpb "google.golang.org/grpc/interop/grpc_testing" ) -const healthCheckingPetiolePolicyName = "health_checking_petiole_policy" - -var ( - // healthCheckTestPolicyName is the LB policy used for testing the health check - // service. - healthCheckTestPolicyName = "round_robin" -) - -func init() { - balancer.Register(&healthCheckingPetiolePolicyBuilder{}) - // Till dualstack changes are not implemented and round_robin doesn't - // delegate to pickfirst, test a fake petiole policy that delegates to - // the new pickfirst balancer. - // TODO: https://github.com/grpc/grpc-go/issues/7906 - Remove the fake - // petiole policy one round robin starts delegating to pickfirst. - if envconfig.NewPickFirstEnabled { - healthCheckTestPolicyName = healthCheckingPetiolePolicyName - } -} - -type healthCheckingPetiolePolicyBuilder struct{} - -func (bb *healthCheckingPetiolePolicyBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { - return &healthCheckingPetiolePolicy{ - Balancer: balancer.Get(pickfirstleaf.Name).Build(cc, opts), - } -} - -func (bb *healthCheckingPetiolePolicyBuilder) Name() string { - return healthCheckingPetiolePolicyName -} - -func (b *healthCheckingPetiolePolicy) UpdateClientConnState(state balancer.ClientConnState) error { - state.ResolverState = pickfirstleaf.EnableHealthListener(state.ResolverState) - return b.Balancer.UpdateClientConnState(state) -} - -type healthCheckingPetiolePolicy struct { - balancer.Balancer -} - func newTestHealthServer() *testHealthServer { return newTestHealthServerWithWatchFunc(defaultWatchFunc) } @@ -307,12 +261,12 @@ func (s) TestHealthCheckHealthServerNotRegistered(t *testing.T) { cc, r := setupClient(t, nil) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, - ServiceConfig: parseServiceConfig(t, r, fmt.Sprintf(`{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"%s":{}}] - }`, healthCheckTestPolicyName))}) + ServiceConfig: parseServiceConfig(t, r, `{ + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"round_robin":{}}] + }`)}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -334,12 +288,12 @@ func (s) TestHealthCheckWithGoAway(t *testing.T) { tc := testgrpc.NewTestServiceClient(cc) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, - ServiceConfig: parseServiceConfig(t, r, fmt.Sprintf(`{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"%s":{}}] - }`, healthCheckTestPolicyName))}) + ServiceConfig: parseServiceConfig(t, r, `{ + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"round_robin":{}}] + }`)}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -412,12 +366,12 @@ func (s) TestHealthCheckWithConnClose(t *testing.T) { tc := testgrpc.NewTestServiceClient(cc) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, - ServiceConfig: parseServiceConfig(t, r, fmt.Sprintf(`{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"%s":{}}] - }`, healthCheckTestPolicyName))}) + ServiceConfig: parseServiceConfig(t, r, `{ + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"round_robin":{}}] + }`)}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -460,12 +414,12 @@ func (s) TestHealthCheckWithAddrConnDrain(t *testing.T) { hcEnterChan, hcExitChan := setupHealthCheckWrapper(t) cc, r := setupClient(t, &clientConfig{}) tc := testgrpc.NewTestServiceClient(cc) - sc := parseServiceConfig(t, r, fmt.Sprintf(`{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"%s":{}}] - }`, healthCheckTestPolicyName)) + sc := parseServiceConfig(t, r, `{ + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"round_robin":{}}] + }`) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: sc, @@ -542,12 +496,12 @@ func (s) TestHealthCheckWithClientConnClose(t *testing.T) { tc := testgrpc.NewTestServiceClient(cc) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, - ServiceConfig: parseServiceConfig(t, r, (fmt.Sprintf(`{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"%s":{}}] - }`, healthCheckTestPolicyName)))}) + ServiceConfig: parseServiceConfig(t, r, `{ + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"round_robin":{}}] + }`)}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -609,12 +563,12 @@ func (s) TestHealthCheckWithoutSetConnectivityStateCalledAddrConnShutDown(t *tes // The serviceName "delay" is specially handled at server side, where response will not be sent // back to client immediately upon receiving the request (client should receive no response until // test ends). - sc := parseServiceConfig(t, r, fmt.Sprintf(`{ - "healthCheckConfig": { - "serviceName": "delay" - }, - "loadBalancingConfig": [{"%s":{}}] - }`, healthCheckTestPolicyName)) + sc := parseServiceConfig(t, r, `{ + "healthCheckConfig": { + "serviceName": "delay" + }, + "loadBalancingConfig": [{"round_robin":{}}] + }`) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: sc, @@ -674,12 +628,12 @@ func (s) TestHealthCheckWithoutSetConnectivityStateCalled(t *testing.T) { // test ends). r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, - ServiceConfig: parseServiceConfig(t, r, fmt.Sprintf(`{ - "healthCheckConfig": { - "serviceName": "delay" - }, - "loadBalancingConfig": [{"%s":{}}] - }`, healthCheckTestPolicyName))}) + ServiceConfig: parseServiceConfig(t, r, `{ + "healthCheckConfig": { + "serviceName": "delay" + }, + "loadBalancingConfig": [{"round_robin":{}}] + }`)}) select { case <-hcExitChan: @@ -712,12 +666,12 @@ func testHealthCheckDisableWithDialOption(t *testing.T, addr string) { tc := testgrpc.NewTestServiceClient(cc) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: addr}}, - ServiceConfig: parseServiceConfig(t, r, fmt.Sprintf(`{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"%s":{}}] - }`, healthCheckTestPolicyName))}) + ServiceConfig: parseServiceConfig(t, r, `{ + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"round_robin":{}}] + }`)}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -818,12 +772,12 @@ func (s) TestHealthCheckChannelzCountingCallSuccess(t *testing.T) { _, r := setupClient(t, nil) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, - ServiceConfig: parseServiceConfig(t, r, fmt.Sprintf(`{ - "healthCheckConfig": { - "serviceName": "channelzSuccess" - }, - "loadBalancingConfig": [{"%s":{}}] - }`, healthCheckTestPolicyName))}) + ServiceConfig: parseServiceConfig(t, r, `{ + "healthCheckConfig": { + "serviceName": "channelzSuccess" + }, + "loadBalancingConfig": [{"round_robin":{}}] + }`)}) if err := verifyResultWithDelay(func() (bool, error) { cm, _ := channelz.GetTopChannels(0, 0) @@ -867,12 +821,12 @@ func (s) TestHealthCheckChannelzCountingCallFailure(t *testing.T) { _, r := setupClient(t, nil) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, - ServiceConfig: parseServiceConfig(t, r, fmt.Sprintf(`{ - "healthCheckConfig": { - "serviceName": "channelzFailure" - }, - "loadBalancingConfig": [{"%s":{}}] - }`, healthCheckTestPolicyName))}) + ServiceConfig: parseServiceConfig(t, r, `{ + "healthCheckConfig": { + "serviceName": "channelzFailure" + }, + "loadBalancingConfig": [{"round_robin":{}}] + }`)}) if err := verifyResultWithDelay(func() (bool, error) { cm, _ := channelz.GetTopChannels(0, 0) @@ -981,12 +935,12 @@ func testHealthCheckSuccess(t *testing.T, e env) { // TestHealthCheckFailure invokes the unary Check() RPC on the health server // with an expired context and expects the RPC to fail. func (s) TestHealthCheckFailure(t *testing.T) { - e := env{ - name: "tcp-tls", - network: "tcp", - security: "tls", - balancer: healthCheckTestPolicyName, + for _, e := range listTestEnv() { + testHealthCheckFailure(t, e) } +} + +func testHealthCheckFailure(t *testing.T, e env) { te := newTest(t, e) te.declareLogNoise( "Failed to dial ", @@ -1212,111 +1166,3 @@ func testHealthCheckServingStatus(t *testing.T, e env) { te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_NOT_SERVING) verifyHealthCheckStatus(t, 1*time.Second, cc, defaultHealthService, healthpb.HealthCheckResponse_NOT_SERVING) } - -// Test verifies that registering a nil health listener closes the health -// client. -func (s) TestHealthCheckUnregisterHealthListener(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - hcEnterChan, hcExitChan := setupHealthCheckWrapper(t) - scChan := make(chan balancer.SubConn, 1) - readyUpdateReceivedCh := make(chan struct{}) - bf := stub.BalancerFuncs{ - Init: func(bd *stub.BalancerData) { - cc := bd.ClientConn - ccw := &subConnStoringCCWrapper{ - ClientConn: cc, - scChan: scChan, - stateListener: func(scs balancer.SubConnState) { - if scs.ConnectivityState != connectivity.Ready { - return - } - close(readyUpdateReceivedCh) - }, - } - bd.Data = balancer.Get(pickfirst.Name).Build(ccw, bd.BuildOptions) - }, - Close: func(bd *stub.BalancerData) { - bd.Data.(balancer.Balancer).Close() - }, - UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { - return bd.Data.(balancer.Balancer).UpdateClientConnState(ccs) - }, - } - - stub.Register(t.Name(), bf) - _, lis, ts := setupServer(t, nil) - ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING) - - _, r := setupClient(t, nil) - svcCfg := fmt.Sprintf(`{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"%s":{}}] - }`, t.Name()) - r.UpdateState(resolver.State{ - Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, - ServiceConfig: parseServiceConfig(t, r, svcCfg)}) - - var sc balancer.SubConn - select { - case sc = <-scChan: - case <-ctx.Done(): - t.Fatal("Context timed out waiting for SubConn creation") - } - - // Wait for the SubConn to enter READY. - select { - case <-readyUpdateReceivedCh: - case <-ctx.Done(): - t.Fatalf("Context timed out waiting for SubConn to enter READY") - } - - // Health check should start only after a health listener is registered. - select { - case <-hcEnterChan: - t.Fatalf("Health service client created prematurely.") - case <-time.After(defaultTestShortTimeout): - } - - // Register a health listener and verify it receives updates. - healthChan := make(chan balancer.SubConnState, 1) - sc.RegisterHealthListener(func(scs balancer.SubConnState) { - healthChan <- scs - }) - - select { - case <-hcEnterChan: - case <-ctx.Done(): - t.Fatalf("Context timed out waiting for health check to begin.") - } - - for readyReceived := false; !readyReceived; { - select { - case scs := <-healthChan: - t.Logf("Received health update: %v", scs) - readyReceived = scs.ConnectivityState == connectivity.Ready - case <-ctx.Done(): - t.Fatalf("Context timed out waiting for healthy backend.") - } - } - - // Registering a nil listener should invalidate the previously registered - // listener and close the health service client. - sc.RegisterHealthListener(nil) - select { - case <-hcExitChan: - case <-ctx.Done(): - t.Fatalf("Context timed out waiting for the health client to close.") - } - - ts.SetServingStatus("foo", healthpb.HealthCheckResponse_NOT_SERVING) - - // No updates should be received on the listener. - select { - case scs := <-healthChan: - t.Fatalf("Received unexpected health update on the listener: %v", scs) - case <-time.After(defaultTestShortTimeout): - } -} From 634a9711dbd7e2113fbeadf068a8527ef505912a Mon Sep 17 00:00:00 2001 From: Pasupuleti Sravani Date: Mon, 30 Dec 2024 10:50:40 +0000 Subject: [PATCH 09/19] removing whitespaces --- test/healthcheck_test.go | 110 +++++++++++++++++++-------------------- 1 file changed, 55 insertions(+), 55 deletions(-) diff --git a/test/healthcheck_test.go b/test/healthcheck_test.go index bae77c7e7dcb..f56d8cfbfa65 100644 --- a/test/healthcheck_test.go +++ b/test/healthcheck_test.go @@ -207,11 +207,11 @@ func (s) TestHealthCheckWatchStateChange(t *testing.T) { r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseServiceConfig(t, r, `{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"round_robin":{}}] - }`)}) + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"round_robin":{}}] + }`)}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -289,11 +289,11 @@ func (s) TestHealthCheckWithGoAway(t *testing.T) { r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseServiceConfig(t, r, `{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"round_robin":{}}] - }`)}) + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"round_robin":{}}] + }`)}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -367,11 +367,11 @@ func (s) TestHealthCheckWithConnClose(t *testing.T) { r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseServiceConfig(t, r, `{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"round_robin":{}}] - }`)}) + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"round_robin":{}}] + }`)}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -415,11 +415,11 @@ func (s) TestHealthCheckWithAddrConnDrain(t *testing.T) { cc, r := setupClient(t, &clientConfig{}) tc := testgrpc.NewTestServiceClient(cc) sc := parseServiceConfig(t, r, `{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"round_robin":{}}] - }`) + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"round_robin":{}}] +}`) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: sc, @@ -497,11 +497,11 @@ func (s) TestHealthCheckWithClientConnClose(t *testing.T) { r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseServiceConfig(t, r, `{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"round_robin":{}}] - }`)}) + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"round_robin":{}}] +}`)}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -564,11 +564,11 @@ func (s) TestHealthCheckWithoutSetConnectivityStateCalledAddrConnShutDown(t *tes // back to client immediately upon receiving the request (client should receive no response until // test ends). sc := parseServiceConfig(t, r, `{ - "healthCheckConfig": { - "serviceName": "delay" - }, - "loadBalancingConfig": [{"round_robin":{}}] - }`) + "healthCheckConfig": { + "serviceName": "delay" + }, + "loadBalancingConfig": [{"round_robin":{}}] +}`) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: sc, @@ -629,11 +629,11 @@ func (s) TestHealthCheckWithoutSetConnectivityStateCalled(t *testing.T) { r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseServiceConfig(t, r, `{ - "healthCheckConfig": { - "serviceName": "delay" - }, - "loadBalancingConfig": [{"round_robin":{}}] - }`)}) + "healthCheckConfig": { + "serviceName": "delay" + }, + "loadBalancingConfig": [{"round_robin":{}}] +}`)}) select { case <-hcExitChan: @@ -667,11 +667,11 @@ func testHealthCheckDisableWithDialOption(t *testing.T, addr string) { r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: addr}}, ServiceConfig: parseServiceConfig(t, r, `{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"round_robin":{}}] - }`)}) + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"round_robin":{}}] +}`)}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -699,11 +699,11 @@ func testHealthCheckDisableWithBalancer(t *testing.T, addr string) { r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: addr}}, ServiceConfig: parseServiceConfig(t, r, `{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"pick_first":{}}] - }`)}) + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"pick_first":{}}] +}`)}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -773,11 +773,11 @@ func (s) TestHealthCheckChannelzCountingCallSuccess(t *testing.T) { r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseServiceConfig(t, r, `{ - "healthCheckConfig": { - "serviceName": "channelzSuccess" - }, - "loadBalancingConfig": [{"round_robin":{}}] - }`)}) + "healthCheckConfig": { + "serviceName": "channelzSuccess" + }, + "loadBalancingConfig": [{"round_robin":{}}] +}`)}) if err := verifyResultWithDelay(func() (bool, error) { cm, _ := channelz.GetTopChannels(0, 0) @@ -822,11 +822,11 @@ func (s) TestHealthCheckChannelzCountingCallFailure(t *testing.T) { r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseServiceConfig(t, r, `{ - "healthCheckConfig": { - "serviceName": "channelzFailure" - }, - "loadBalancingConfig": [{"round_robin":{}}] - }`)}) + "healthCheckConfig": { + "serviceName": "channelzFailure" + }, + "loadBalancingConfig": [{"round_robin":{}}] +}`)}) if err := verifyResultWithDelay(func() (bool, error) { cm, _ := channelz.GetTopChannels(0, 0) From d8c709ae28fc64b2fbc223f5793d4d5a58ea4148 Mon Sep 17 00:00:00 2001 From: Pasupuleti Sravani Date: Mon, 30 Dec 2024 10:56:25 +0000 Subject: [PATCH 10/19] removing whitespaces --- test/healthcheck_test.go | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/test/healthcheck_test.go b/test/healthcheck_test.go index f56d8cfbfa65..424682d09625 100644 --- a/test/healthcheck_test.go +++ b/test/healthcheck_test.go @@ -207,11 +207,11 @@ func (s) TestHealthCheckWatchStateChange(t *testing.T) { r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseServiceConfig(t, r, `{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"round_robin":{}}] - }`)}) + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"round_robin":{}}] +}`)}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -262,11 +262,11 @@ func (s) TestHealthCheckHealthServerNotRegistered(t *testing.T) { r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseServiceConfig(t, r, `{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"round_robin":{}}] - }`)}) + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"round_robin":{}}] +}`)}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -289,11 +289,11 @@ func (s) TestHealthCheckWithGoAway(t *testing.T) { r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseServiceConfig(t, r, `{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"round_robin":{}}] - }`)}) + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"round_robin":{}}] +}`)}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -367,11 +367,11 @@ func (s) TestHealthCheckWithConnClose(t *testing.T) { r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseServiceConfig(t, r, `{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"round_robin":{}}] - }`)}) + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"round_robin":{}}] +}`)}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() From c493f943feea3d49979d281a16f9dfb9cb0c468e Mon Sep 17 00:00:00 2001 From: Pasupuleti Sravani Date: Tue, 31 Dec 2024 06:10:27 +0000 Subject: [PATCH 11/19] Adding listener to stubserver and removing few commented lines --- test/goaway_test.go | 35 ++++++++++++++--------------------- test/insecure_creds_test.go | 1 + test/local_creds_test.go | 10 +++++----- 3 files changed, 20 insertions(+), 26 deletions(-) diff --git a/test/goaway_test.go b/test/goaway_test.go index f8fab36e445a..ff4e5c3ca225 100644 --- a/test/goaway_test.go +++ b/test/goaway_test.go @@ -62,15 +62,15 @@ func (s) TestGracefulClientOnGoAway(t *testing.T) { t.Fatalf("Failed to create listener: %v", err) } - ss := &stubserver.StubServer{ + ss1 := &stubserver.StubServer{ Listener: lis, EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, S: grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{MaxConnectionAge: maxConnAge})), } - defer ss.S.Stop() - stubserver.StartTestService(t, ss) + defer ss1.S.Stop() + stubserver.StartTestService(t, ss1) cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { @@ -550,10 +550,8 @@ func (s) TestGoAwayThenClose(t *testing.T) { if err != nil { t.Fatalf("Error while listening. Err: %v", err) } - //s1 := grpc.NewServer() - //defer s1.Stop() - //ts := &funcServer{ - ss1 := &stubserver.StubServer{ + + ss2 := &stubserver.StubServer{ Listener: lis1, UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil @@ -563,7 +561,7 @@ func (s) TestGoAwayThenClose(t *testing.T) { t.Errorf("unexpected error from send: %v", err) return err } - // Wait forever. + // Wait until a message is received from client _, err := stream.Recv() if err == nil { t.Error("expected to never receive any message") @@ -572,20 +570,15 @@ func (s) TestGoAwayThenClose(t *testing.T) { }, S: grpc.NewServer(), } - stubserver.StartTestService(t, ss1) - defer ss1.S.Stop() - //testgrpc.RegisterTestServiceServer(s1, ts) - //go s1.Serve(lis1) + stubserver.StartTestService(t, ss2) + defer ss2.S.Stop() conn2Established := grpcsync.NewEvent() lis2, err := listenWithNotifyingListener("tcp", "localhost:0", conn2Established) if err != nil { t.Fatalf("Error while listening. Err: %v", err) } - /*s2 := grpc.NewServer() - defer s2.Stop() - testgrpc.RegisterTestServiceServer(s2, ts)*/ - ss2 := &stubserver.StubServer{ + ss3 := &stubserver.StubServer{ Listener: lis2, UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil @@ -595,7 +588,7 @@ func (s) TestGoAwayThenClose(t *testing.T) { t.Errorf("unexpected error from send: %v", err) return err } - // Wait forever. + // Wait until a message is received from client _, err := stream.Recv() if err == nil { t.Error("expected to never receive any message") @@ -604,8 +597,8 @@ func (s) TestGoAwayThenClose(t *testing.T) { }, S: grpc.NewServer(), } - stubserver.StartTestService(t, ss2) - defer ss2.S.Stop() + stubserver.StartTestService(t, ss3) + defer ss3.S.Stop() r := manual.NewBuilderWithScheme("whatever") r.InitialState(resolver.State{Addresses: []resolver.Address{ @@ -639,7 +632,7 @@ func (s) TestGoAwayThenClose(t *testing.T) { } t.Log("Gracefully stopping server 1.") - go ss1.S.GracefulStop() + go ss2.S.GracefulStop() t.Log("Waiting for the ClientConn to enter IDLE state.") testutils.AwaitState(ctx, t, cc, connectivity.Idle) @@ -660,7 +653,7 @@ func (s) TestGoAwayThenClose(t *testing.T) { lis2.Close() t.Log("Hard closing connection 1.") - ss1.S.Stop() + ss2.S.Stop() t.Log("Waiting for the first stream to error.") if _, err = stream.Recv(); err == nil { diff --git a/test/insecure_creds_test.go b/test/insecure_creds_test.go index 3052c8fdad2a..ae5e2fbc80ae 100644 --- a/test/insecure_creds_test.go +++ b/test/insecure_creds_test.go @@ -182,6 +182,7 @@ func (s) TestInsecureCreds_WithPerRPCCredentials_AsDialOption(t *testing.T) { t.Fatalf("net.Listen(tcp, localhost:0) failed: %v", err) } ss := &stubserver.StubServer{ + Listener: lis, EmptyCallF: func(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, diff --git a/test/local_creds_test.go b/test/local_creds_test.go index 2e2dc0ba317d..5de98cecec59 100644 --- a/test/local_creds_test.go +++ b/test/local_creds_test.go @@ -161,7 +161,12 @@ func spoofDialer(addr net.Addr) func(target string, t time.Duration) (net.Conn, } func testLocalCredsE2EFail(t *testing.T, dopts []grpc.DialOption) error { + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + return fmt.Errorf("Failed to create listener: %v", err) + } ss := &stubserver.StubServer{ + Listener: lis, EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, @@ -170,11 +175,6 @@ func testLocalCredsE2EFail(t *testing.T, dopts []grpc.DialOption) error { defer ss.S.Stop() stubserver.StartTestService(t, ss) - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - return fmt.Errorf("Failed to create listener: %v", err) - } - var fakeClientAddr, fakeServerAddr net.Addr fakeClientAddr = &net.IPAddr{ IP: netip.MustParseAddr("10.8.9.10").AsSlice(), From ecc756144c389fcde38904350938b062e5f6023b Mon Sep 17 00:00:00 2001 From: Pasupuleti Sravani Date: Tue, 31 Dec 2024 07:41:48 +0000 Subject: [PATCH 12/19] Adding listener to stubserver --- test/local_creds_test.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/test/local_creds_test.go b/test/local_creds_test.go index 5de98cecec59..4a2931fec7d1 100644 --- a/test/local_creds_test.go +++ b/test/local_creds_test.go @@ -161,12 +161,23 @@ func spoofDialer(addr net.Addr) func(target string, t time.Duration) (net.Conn, } func testLocalCredsE2EFail(t *testing.T, dopts []grpc.DialOption) error { + lis, err := net.Listen("tcp", "localhost:0") if err != nil { return fmt.Errorf("Failed to create listener: %v", err) } + + var fakeClientAddr, fakeServerAddr net.Addr + fakeClientAddr = &net.IPAddr{ + IP: netip.MustParseAddr("10.8.9.10").AsSlice(), + Zone: "", + } + fakeServerAddr = &net.IPAddr{ + IP: netip.MustParseAddr("10.8.9.11").AsSlice(), + Zone: "", + } ss := &stubserver.StubServer{ - Listener: lis, + Listener: spoofListener(lis, fakeClientAddr), EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, @@ -175,7 +186,7 @@ func testLocalCredsE2EFail(t *testing.T, dopts []grpc.DialOption) error { defer ss.S.Stop() stubserver.StartTestService(t, ss) - var fakeClientAddr, fakeServerAddr net.Addr + /*var fakeClientAddr, fakeServerAddr net.Addr fakeClientAddr = &net.IPAddr{ IP: netip.MustParseAddr("10.8.9.10").AsSlice(), Zone: "", @@ -183,9 +194,9 @@ func testLocalCredsE2EFail(t *testing.T, dopts []grpc.DialOption) error { fakeServerAddr = &net.IPAddr{ IP: netip.MustParseAddr("10.8.9.11").AsSlice(), Zone: "", - } + }*/ - go ss.S.Serve(spoofListener(lis, fakeClientAddr)) + //go ss.S.Serve(spoofListener(lis, fakeClientAddr)) cc, err := grpc.NewClient(lis.Addr().String(), append(dopts, grpc.WithDialer(spoofDialer(fakeServerAddr)))...) if err != nil { From 87a8136e3748701da48950360740eb7df0cac768 Mon Sep 17 00:00:00 2001 From: Pasupuleti Sravani Date: Tue, 31 Dec 2024 07:45:05 +0000 Subject: [PATCH 13/19] Adding listener to stubserver --- test/local_creds_test.go | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/test/local_creds_test.go b/test/local_creds_test.go index 4a2931fec7d1..94b953cddabf 100644 --- a/test/local_creds_test.go +++ b/test/local_creds_test.go @@ -161,12 +161,10 @@ func spoofDialer(addr net.Addr) func(target string, t time.Duration) (net.Conn, } func testLocalCredsE2EFail(t *testing.T, dopts []grpc.DialOption) error { - lis, err := net.Listen("tcp", "localhost:0") if err != nil { return fmt.Errorf("Failed to create listener: %v", err) } - var fakeClientAddr, fakeServerAddr net.Addr fakeClientAddr = &net.IPAddr{ IP: netip.MustParseAddr("10.8.9.10").AsSlice(), @@ -186,18 +184,6 @@ func testLocalCredsE2EFail(t *testing.T, dopts []grpc.DialOption) error { defer ss.S.Stop() stubserver.StartTestService(t, ss) - /*var fakeClientAddr, fakeServerAddr net.Addr - fakeClientAddr = &net.IPAddr{ - IP: netip.MustParseAddr("10.8.9.10").AsSlice(), - Zone: "", - } - fakeServerAddr = &net.IPAddr{ - IP: netip.MustParseAddr("10.8.9.11").AsSlice(), - Zone: "", - }*/ - - //go ss.S.Serve(spoofListener(lis, fakeClientAddr)) - cc, err := grpc.NewClient(lis.Addr().String(), append(dopts, grpc.WithDialer(spoofDialer(fakeServerAddr)))...) if err != nil { return fmt.Errorf("Failed to dial server: %v, %v", err, lis.Addr().String()) From a82cae329846693f95657a985a87842f1b411a3d Mon Sep 17 00:00:00 2001 From: Pasupuleti Sravani Date: Mon, 6 Jan 2025 06:05:03 +0000 Subject: [PATCH 14/19] Removing empty lines --- test/goaway_test.go | 23 +++++++++++------------ test/local_creds_test.go | 1 - 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/test/goaway_test.go b/test/goaway_test.go index ff4e5c3ca225..092df4e51c96 100644 --- a/test/goaway_test.go +++ b/test/goaway_test.go @@ -57,13 +57,13 @@ func (s) TestGracefulClientOnGoAway(t *testing.T) { const maxConnAge = 100 * time.Millisecond const testTime = maxConnAge * 10 - lis, err := net.Listen("tcp", "localhost:0") + lis1, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("Failed to create listener: %v", err) } ss1 := &stubserver.StubServer{ - Listener: lis, + Listener: lis1, EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, @@ -72,7 +72,7 @@ func (s) TestGracefulClientOnGoAway(t *testing.T) { defer ss1.S.Stop() stubserver.StartTestService(t, ss1) - cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.NewClient(lis1.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { t.Fatalf("Failed to dial server: %v", err) } @@ -81,7 +81,6 @@ func (s) TestGracefulClientOnGoAway(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - endTime := time.Now().Add(testTime) for time.Now().Before(endTime) { if _, err := c.EmptyCall(ctx, &testpb.Empty{}); err != nil { @@ -551,7 +550,7 @@ func (s) TestGoAwayThenClose(t *testing.T) { t.Fatalf("Error while listening. Err: %v", err) } - ss2 := &stubserver.StubServer{ + ss1 := &stubserver.StubServer{ Listener: lis1, UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil @@ -570,15 +569,15 @@ func (s) TestGoAwayThenClose(t *testing.T) { }, S: grpc.NewServer(), } - stubserver.StartTestService(t, ss2) - defer ss2.S.Stop() + stubserver.StartTestService(t, ss1) + defer ss1.S.Stop() conn2Established := grpcsync.NewEvent() lis2, err := listenWithNotifyingListener("tcp", "localhost:0", conn2Established) if err != nil { t.Fatalf("Error while listening. Err: %v", err) } - ss3 := &stubserver.StubServer{ + ss2 := &stubserver.StubServer{ Listener: lis2, UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{}, nil @@ -597,8 +596,8 @@ func (s) TestGoAwayThenClose(t *testing.T) { }, S: grpc.NewServer(), } - stubserver.StartTestService(t, ss3) - defer ss3.S.Stop() + stubserver.StartTestService(t, ss2) + defer ss2.S.Stop() r := manual.NewBuilderWithScheme("whatever") r.InitialState(resolver.State{Addresses: []resolver.Address{ @@ -632,7 +631,7 @@ func (s) TestGoAwayThenClose(t *testing.T) { } t.Log("Gracefully stopping server 1.") - go ss2.S.GracefulStop() + go ss1.S.GracefulStop() t.Log("Waiting for the ClientConn to enter IDLE state.") testutils.AwaitState(ctx, t, cc, connectivity.Idle) @@ -653,7 +652,7 @@ func (s) TestGoAwayThenClose(t *testing.T) { lis2.Close() t.Log("Hard closing connection 1.") - ss2.S.Stop() + ss1.S.Stop() t.Log("Waiting for the first stream to error.") if _, err = stream.Recv(); err == nil { diff --git a/test/local_creds_test.go b/test/local_creds_test.go index 94b953cddabf..ea2af275640e 100644 --- a/test/local_creds_test.go +++ b/test/local_creds_test.go @@ -41,7 +41,6 @@ import ( ) func testLocalCredsE2ESucceed(t *testing.T, network, address string) error { - lis, err := net.Listen(network, address) if err != nil { return fmt.Errorf("Failed to create listener: %v", err) From 565c52456195cff98209e52015f30cea34e7c8ae Mon Sep 17 00:00:00 2001 From: Pasupuleti Sravani Date: Tue, 7 Jan 2025 05:05:29 +0000 Subject: [PATCH 15/19] Placing defer stub.S.Stop right after stubserver.StartTestService --- test/goaway_test.go | 12 ++++++------ test/insecure_creds_test.go | 6 ++---- test/local_creds_test.go | 6 ++---- 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/test/goaway_test.go b/test/goaway_test.go index 092df4e51c96..9cbc9817de4b 100644 --- a/test/goaway_test.go +++ b/test/goaway_test.go @@ -57,22 +57,22 @@ func (s) TestGracefulClientOnGoAway(t *testing.T) { const maxConnAge = 100 * time.Millisecond const testTime = maxConnAge * 10 - lis1, err := net.Listen("tcp", "localhost:0") + lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("Failed to create listener: %v", err) } - ss1 := &stubserver.StubServer{ - Listener: lis1, + ss := &stubserver.StubServer{ + Listener: lis, EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, S: grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{MaxConnectionAge: maxConnAge})), } - defer ss1.S.Stop() - stubserver.StartTestService(t, ss1) + stubserver.StartTestService(t, ss) + defer ss.S.Stop() - cc, err := grpc.NewClient(lis1.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { t.Fatalf("Failed to dial server: %v", err) } diff --git a/test/insecure_creds_test.go b/test/insecure_creds_test.go index ae5e2fbc80ae..f8f332790e64 100644 --- a/test/insecure_creds_test.go +++ b/test/insecure_creds_test.go @@ -153,9 +153,8 @@ func (s) TestInsecureCreds_WithPerRPCCredentials_AsCallOption(t *testing.T) { }, S: grpc.NewServer(grpc.Creds(insecure.NewCredentials())), } - - defer ss.S.Stop() stubserver.StartTestService(t, ss) + defer ss.S.Stop() addr := lis.Addr().String() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -188,9 +187,8 @@ func (s) TestInsecureCreds_WithPerRPCCredentials_AsDialOption(t *testing.T) { }, S: grpc.NewServer(grpc.Creds(insecure.NewCredentials())), } - - defer ss.S.Stop() stubserver.StartTestService(t, ss) + defer ss.S.Stop() addr := lis.Addr().String() dopts := []grpc.DialOption{ diff --git a/test/local_creds_test.go b/test/local_creds_test.go index ea2af275640e..678fc0d63cba 100644 --- a/test/local_creds_test.go +++ b/test/local_creds_test.go @@ -76,10 +76,8 @@ func testLocalCredsE2ESucceed(t *testing.T, network, address string) error { }, S: grpc.NewServer(grpc.Creds(local.NewCredentials())), } - - defer ss.S.Stop() - stubserver.StartTestService(t, ss) + defer ss.S.Stop() var cc *grpc.ClientConn lisAddr := lis.Addr().String() @@ -180,8 +178,8 @@ func testLocalCredsE2EFail(t *testing.T, dopts []grpc.DialOption) error { }, S: grpc.NewServer(grpc.Creds(local.NewCredentials())), } - defer ss.S.Stop() stubserver.StartTestService(t, ss) + defer ss.S.Stop() cc, err := grpc.NewClient(lis.Addr().String(), append(dopts, grpc.WithDialer(spoofDialer(fakeServerAddr)))...) if err != nil { From 431794ae6300d9b2085351b13642fb8b10f899da Mon Sep 17 00:00:00 2001 From: Pasupuleti Sravani Date: Fri, 10 Jan 2025 06:15:34 +0000 Subject: [PATCH 16/19] Moving defer ss.S.Stop right after stubserver.startTestService --- test/insecure_creds_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test/insecure_creds_test.go b/test/insecure_creds_test.go index f8f332790e64..f05faf356c23 100644 --- a/test/insecure_creds_test.go +++ b/test/insecure_creds_test.go @@ -115,10 +115,8 @@ func (s) TestInsecureCreds(t *testing.T) { } else { ss.S = grpc.NewServer() } - defer ss.S.Stop() - stubserver.StartTestService(t, ss) - + defer ss.S.Stop() addr := lis.Addr().String() opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} if test.clientInsecureCreds { From 84cbd206bdae2933aa3d0ff9acd2a88f10d31cba Mon Sep 17 00:00:00 2001 From: Pasupuleti Sravani Date: Mon, 20 Jan 2025 06:42:25 +0000 Subject: [PATCH 17/19] removing redundant dail option --- test/insecure_creds_test.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/test/insecure_creds_test.go b/test/insecure_creds_test.go index f05faf356c23..c71a5dc6c735 100644 --- a/test/insecure_creds_test.go +++ b/test/insecure_creds_test.go @@ -110,18 +110,16 @@ func (s) TestInsecureCreds(t *testing.T) { return &testpb.Empty{}, nil }, } + sOpts := []grpc.ServerOption{} if test.serverInsecureCreds { ss.S = grpc.NewServer(grpc.Creds(insecure.NewCredentials())) } else { - ss.S = grpc.NewServer() + ss.S = grpc.NewServer(sOpts...) } stubserver.StartTestService(t, ss) defer ss.S.Stop() addr := lis.Addr().String() opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} - if test.clientInsecureCreds { - opts = []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} - } cc, err := grpc.NewClient(addr, opts...) if err != nil { t.Fatalf("grpc.NewClient(%q) failed: %v", addr, err) From 8a8051057674adaefe8ae38d45e43b2b421037b4 Mon Sep 17 00:00:00 2001 From: Pasupuleti Sravani Date: Thu, 23 Jan 2025 07:35:45 +0000 Subject: [PATCH 18/19] reverting changes for creds_test --- test/creds_test.go | 26 ++++++-------------------- 1 file changed, 6 insertions(+), 20 deletions(-) diff --git a/test/creds_test.go b/test/creds_test.go index f0e5c506e6a4..915deccd7be8 100644 --- a/test/creds_test.go +++ b/test/creds_test.go @@ -32,7 +32,6 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/metadata" "google.golang.org/grpc/resolver" @@ -433,15 +432,9 @@ func (s) TestCredsHandshakeAuthority(t *testing.T) { t.Fatal(err) } cred := &authorityCheckCreds{} - stub := &stubserver.StubServer{ - Listener: lis, - EmptyCallF: func(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) { - return &testpb.Empty{}, nil - }, - S: grpc.NewServer(), - } - stubserver.StartTestService(t, stub) - defer stub.S.Stop() + s := grpc.NewServer() + go s.Serve(lis) + defer s.Stop() r := manual.NewBuilderWithScheme("whatever") cc, err := grpc.NewClient(r.Scheme()+":///"+testAuthority, grpc.WithTransportCredentials(cred), grpc.WithResolvers(r)) @@ -471,16 +464,9 @@ func (s) TestCredsHandshakeServerNameAuthority(t *testing.T) { t.Fatal(err) } cred := &authorityCheckCreds{} - - stub := &stubserver.StubServer{ - Listener: lis, - EmptyCallF: func(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) { - return &testpb.Empty{}, nil - }, - S: grpc.NewServer(), - } - stubserver.StartTestService(t, stub) - defer stub.S.Stop() + s := grpc.NewServer() + go s.Serve(lis) + defer s.Stop() r := manual.NewBuilderWithScheme("whatever") cc, err := grpc.NewClient(r.Scheme()+":///"+testAuthority, grpc.WithTransportCredentials(cred), grpc.WithResolvers(r)) From 4f11245cfceb621fde916a86307c58cd1e8ca3ad Mon Sep 17 00:00:00 2001 From: Pasupuleti Sravani Date: Thu, 23 Jan 2025 07:37:23 +0000 Subject: [PATCH 19/19] reverting changes for creds_test --- test/creds_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/creds_test.go b/test/creds_test.go index 915deccd7be8..bedafa5b7272 100644 --- a/test/creds_test.go +++ b/test/creds_test.go @@ -435,6 +435,7 @@ func (s) TestCredsHandshakeAuthority(t *testing.T) { s := grpc.NewServer() go s.Serve(lis) defer s.Stop() + r := manual.NewBuilderWithScheme("whatever") cc, err := grpc.NewClient(r.Scheme()+":///"+testAuthority, grpc.WithTransportCredentials(cred), grpc.WithResolvers(r)) @@ -467,6 +468,7 @@ func (s) TestCredsHandshakeServerNameAuthority(t *testing.T) { s := grpc.NewServer() go s.Serve(lis) defer s.Stop() + r := manual.NewBuilderWithScheme("whatever") cc, err := grpc.NewClient(r.Scheme()+":///"+testAuthority, grpc.WithTransportCredentials(cred), grpc.WithResolvers(r))