Skip to content

Commit

Permalink
Removing testservice impl and using stubserver support for RPC calls
Browse files Browse the repository at this point in the history
  • Loading branch information
janardhankrishna-sai committed Oct 8, 2024
1 parent 53bdf3c commit cf44fe4
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 126 deletions.
74 changes: 37 additions & 37 deletions orca/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/orca"
"google.golang.org/grpc/orca/internal"
Expand All @@ -44,37 +45,6 @@ import (

const requestsMetricKey = "test-service-requests"

// An implementation of grpc_testing.TestService for the purpose of this test.
// We cannot use the StubServer approach here because we need to register the
// OpenRCAService as well on the same gRPC server.
type testServiceImpl struct {
mu sync.Mutex
requests int64

testgrpc.TestServiceServer
smr orca.ServerMetricsRecorder
}

func (t *testServiceImpl) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
t.mu.Lock()
t.requests++
t.mu.Unlock()

t.smr.SetNamedUtilization(requestsMetricKey, float64(t.requests)*0.01)
t.smr.SetCPUUtilization(50.0)
t.smr.SetMemoryUtilization(0.9)
t.smr.SetApplicationUtilization(1.2)
return &testpb.SimpleResponse{}, nil
}

func (t *testServiceImpl) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) {
t.smr.DeleteNamedUtilization(requestsMetricKey)
t.smr.SetCPUUtilization(0)
t.smr.SetMemoryUtilization(0)
t.smr.DeleteApplicationUtilization()
return &testpb.Empty{}, nil
}

// TestE2E_CustomBackendMetrics_OutOfBand tests the injection of out-of-band
// custom backend metrics from the server application, and verifies that
// expected load reports are received at the client.
Expand All @@ -94,13 +64,36 @@ func (s) TestE2E_CustomBackendMetrics_OutOfBand(t *testing.T) {
internal.AllowAnyMinReportingInterval.(func(*orca.ServiceOptions))(&opts)

// Register the OpenRCAService with a very short metrics reporting interval.
var mu sync.Mutex
var requests int

stub := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
smr.DeleteNamedUtilization(requestsMetricKey)
smr.SetCPUUtilization(0)
smr.SetMemoryUtilization(0)
smr.DeleteApplicationUtilization()
return &testpb.Empty{}, nil
},
UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
mu.Lock()
requests++
smr.SetNamedUtilization(requestsMetricKey, float64(requests)*0.01)
smr.SetCPUUtilization(50.0)
smr.SetMemoryUtilization(0.9)
smr.SetApplicationUtilization(1.2)
mu.Unlock()
return &testpb.SimpleResponse{}, nil
},
}

s := grpc.NewServer()
if err := orca.Register(s, opts); err != nil {
t.Fatalf("orca.EnableOutOfBandMetricsReportingForTesting() failed: %v", err)
}

// Register the test service implementation on the same grpc server, and start serving.
testgrpc.RegisterTestServiceServer(s, &testServiceImpl{smr: smr})
testgrpc.RegisterTestServiceServer(s, stub)
go s.Serve(lis)
defer s.Stop()
t.Logf("Started gRPC server at %s...", lis.Addr().String())
Expand All @@ -112,11 +105,11 @@ func (s) TestE2E_CustomBackendMetrics_OutOfBand(t *testing.T) {
}
defer cc.Close()

// Spawn a goroutine which sends 20 unary RPCs to the test server. This
// Spawn a goroutine which sends 20 unary RPCs to the stub server. This
// will trigger the injection of custom backend metrics from the
// testServiceImpl.
// stubserver.
const numRequests = 20
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
testStub := testgrpc.NewTestServiceClient(cc)
errCh := make(chan error, 1)
Expand All @@ -126,7 +119,7 @@ func (s) TestE2E_CustomBackendMetrics_OutOfBand(t *testing.T) {
errCh <- fmt.Errorf("UnaryCall failed: %v", err)
return
}
time.Sleep(time.Millisecond)
time.Sleep(shortReportingInterval)
}
errCh <- nil
}()
Expand All @@ -151,11 +144,18 @@ func (s) TestE2E_CustomBackendMetrics_OutOfBand(t *testing.T) {
default:
}

mu.Lock()
if requests == numRequests {
mu.Unlock()
break
}
mu.Unlock()

wantProto := &v3orcapb.OrcaLoadReport{
CpuUtilization: 50.0,
MemUtilization: 0.9,
ApplicationUtilization: 1.2,
Utilization: map[string]float64{requestsMetricKey: numRequests * 0.01},
Utilization: map[string]float64{requestsMetricKey: float64(requests) * 0.01},
}
gotProto, err := stream.Recv()
if err != nil {
Expand Down
28 changes: 20 additions & 8 deletions test/xds/xds_client_ignore_resource_deletion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,15 +263,15 @@ func generateBootstrapContents(t *testing.T, serverURI string, ignoreResourceDel
var serverCfgs json.RawMessage
if ignoreResourceDeletion {
serverCfgs = []byte(fmt.Sprintf(`[{
"server_uri": %q,
"channel_creds": [{"type": "insecure"}],
"server_features": ["ignore_resource_deletion"]
}]`, serverURI))
"server_uri": %q,
"channel_creds": [{"type": "insecure"}],
"server_features": ["ignore_resource_deletion"]
}]`, serverURI))
} else {
serverCfgs = []byte(fmt.Sprintf(`[{
"server_uri": %q,
"channel_creds": [{"type": "insecure"}]
}]`, serverURI))
"server_uri": %q,
"channel_creds": [{"type": "insecure"}]
}]`, serverURI))

}
bootstrapContents, err := bootstrap.NewContentsForTesting(bootstrap.ConfigOptionsForTesting{
Expand Down Expand Up @@ -309,12 +309,24 @@ func setupGRPCServerWithModeChangeChannelAndServe(t *testing.T, bootstrapContent
t.Logf("Serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
updateCh <- args.Mode
})

stub := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
}
server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
if err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
t.Cleanup(server.Stop)
testgrpc.RegisterTestServiceServer(server, &testService{})

// Set the server in the stub and start the test service.
stub.S = server
stubserver.StartTestService(t, stub)

// Serve.
go func() {
Expand Down
27 changes: 20 additions & 7 deletions test/xds/xds_server_certificate_providers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
xdscreds "google.golang.org/grpc/credentials/xds"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/internal/testutils/xds/e2e/setup"
Expand Down Expand Up @@ -131,9 +132,9 @@ func (s) TestServerSideXDS_WithNoCertificateProvidersInBootstrap_Failure(t *test
nodeID := uuid.New().String()
bs, err := bootstrap.NewContentsForTesting(bootstrap.ConfigOptionsForTesting{
Servers: []byte(fmt.Sprintf(`[{
"server_uri": %q,
"channel_creds": [{"type": "insecure"}]
}]`, mgmtServer.Address)),
"server_uri": %q,
"channel_creds": [{"type": "insecure"}]
}]`, mgmtServer.Address)),
Node: []byte(fmt.Sprintf(`{"id": "%s"}`, nodeID)),
ServerListenerResourceNameTemplate: e2e.ServerListenerResourceNameTemplate,
})
Expand Down Expand Up @@ -161,9 +162,12 @@ func (s) TestServerSideXDS_WithNoCertificateProvidersInBootstrap_Failure(t *test
if err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
testgrpc.RegisterTestServiceServer(server, &testService{})
defer server.Stop()

stub := &stubserver.StubServer{}
stub.S = server
stubserver.StartTestService(t, stub)

// Create a local listener and pass it to Serve().
lis, err := testutils.LocalTCPListener()
if err != nil {
Expand Down Expand Up @@ -269,8 +273,8 @@ func (s) TestServerSideXDS_WithValidAndInvalidSecurityConfiguration(t *testing.T
}

// Create an xDS-enabled grpc server that is configured to use xDS
// credentials, and register the test service on it. Configure a mode change
// option that closes a channel when listener2 enter serving mode.
// credentials, and configure a mode change option that closes a channel
// when listener2 enters serving mode.
creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{FallbackCreds: insecure.NewCredentials()})
if err != nil {
t.Fatal(err)
Expand All @@ -283,11 +287,20 @@ func (s) TestServerSideXDS_WithValidAndInvalidSecurityConfiguration(t *testing.T
}
}
})
stub := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
}
server, err := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
if err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
testgrpc.RegisterTestServiceServer(server, &testService{})

// Set the server in the stub and start the test service.
stub.S = server
stubserver.StartTestService(t, stub)

defer server.Stop()

go func() {
Expand Down
46 changes: 22 additions & 24 deletions test/xds/xds_server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"google.golang.org/grpc/credentials/insecure"
xdscreds "google.golang.org/grpc/credentials/xds"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/internal/testutils/xds/e2e/setup"
Expand All @@ -43,27 +44,6 @@ import (
testpb "google.golang.org/grpc/interop/grpc_testing"
)

type testService struct {
testgrpc.TestServiceServer
}

func (*testService) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
}

func (*testService) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
}

func (*testService) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallServer) error {
for {
_, err := stream.Recv() // hangs here forever if stream doesn't shut down...doesn't receive EOF without any errors
if err == io.EOF {
return nil
}
}
}

func testModeChangeServerOption(t *testing.T) grpc.ServerOption {
// Create a server option to get notified about serving mode changes. We don't
// do anything other than throwing a log entry here. But this is required,
Expand Down Expand Up @@ -96,13 +76,31 @@ func setupGRPCServer(t *testing.T, bootstrapContents []byte) (net.Listener, func
if err != nil {
t.Fatal(err)
}

// Initialize an xDS-enabled gRPC server and register the stubServer on it.
stub := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
for {
_, err := stream.Recv() // hangs here forever if stream doesn't shut down...doesn't receive EOF without any errors
if err == io.EOF {
return nil
}
}
},
}
// Initialize an xDS-enabled gRPC server and use the helper to start the test service.
server, err := xds.NewGRPCServer(grpc.Creds(creds), testModeChangeServerOption(t), xds.BootstrapContentsForTesting(bootstrapContents))
if err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
testgrpc.RegisterTestServiceServer(server, &testService{})

// Set the server in the stub and start the test service.
stub.S = server
stubserver.StartTestService(t, stub)

// Create a local listener and pass it to Serve().
lis, err := testutils.LocalTCPListener()
Expand Down
25 changes: 22 additions & 3 deletions test/xds/xds_server_serving_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
xdscreds "google.golang.org/grpc/credentials/xds"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/internal/testutils/xds/e2e/setup"
Expand Down Expand Up @@ -62,13 +63,22 @@ func (s) TestServerSideXDS_RedundantUpdateSuppression(t *testing.T) {
updateCh <- args.Mode
})

// Initialize an xDS-enabled gRPC server and register the stubServer on it.
stub := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
}
// Initialize an xDS-enabled gRPC server.
server, err := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
if err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}

// Set the server in the stub and start the test service.
stub.S = server
stubserver.StartTestService(t, stub)

defer server.Stop()
testgrpc.RegisterTestServiceServer(server, &testService{})

// Setup the management server to respond with the listener resources.
host, port, err := hostPortFromListener(lis)
Expand Down Expand Up @@ -206,13 +216,22 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
}
})

stub := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
}
// Initialize an xDS-enabled gRPC server and register the stubServer on it.
server, err := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
if err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}

// Set the server in the stub and start the test service.
stub.S = server
stubserver.StartTestService(t, stub)

defer server.Stop()
testgrpc.RegisterTestServiceServer(server, &testService{})

// Setup the management server to respond with server-side Listener
// resources for both listeners.
Expand Down
Loading

0 comments on commit cf44fe4

Please sign in to comment.