-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
grpc: eliminate panics in server worker implementation #6856
Changes from 2 commits
e9ea41d
e1d3fab
fe69cb2
b31d719
afc5e88
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,14 +21,20 @@ package grpc_test | |
import ( | ||
"context" | ||
"io" | ||
"runtime" | ||
"sync" | ||
"testing" | ||
"time" | ||
|
||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/credentials/insecure" | ||
"google.golang.org/grpc/internal/grpcsync" | ||
"google.golang.org/grpc/internal/stubserver" | ||
"google.golang.org/grpc/status" | ||
|
||
testgrpc "google.golang.org/grpc/interop/grpc_testing" | ||
testpb "google.golang.org/grpc/interop/grpc_testing" | ||
) | ||
|
||
// TestServer_MaxHandlers ensures that no more than MaxConcurrentStreams server | ||
|
@@ -97,3 +103,90 @@ func (s) TestServer_MaxHandlers(t *testing.T) { | |
t.Fatal("Received unexpected RPC error:", err) | ||
} | ||
} | ||
|
||
// Tests the case where the stream worker goroutine option is enabled, and a | ||
// number of RPCs are initiated around the same time that Stop() is called. This | ||
// used to result in a write to a closed channel. This test verifies that there | ||
// is no panic. | ||
func (s) TestStreamWorkers_RPCsAndStop(t *testing.T) { | ||
ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU()))) | ||
// This deferred stop takes care of stopping the server when one of the | ||
// below grpc.Dials fail, and the test exits early. | ||
defer ss.Stop() | ||
|
||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||
defer cancel() | ||
const numChannels = 20 | ||
const numRPCLoops = 20 | ||
|
||
// Create a bunch of clientconns and ensure that they are READY by making an | ||
// RPC on them. | ||
ccs := make([]*grpc.ClientConn, numChannels) | ||
for i := 0; i < numChannels; i++ { | ||
var err error | ||
ccs[i], err = grpc.Dial(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) | ||
if err != nil { | ||
t.Fatalf("[iteration: %d] grpc.Dial(%s) failed: %v", i, ss.Address, err) | ||
} | ||
defer ccs[i].Close() | ||
client := testgrpc.NewTestServiceClient(ccs[i]) | ||
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { | ||
t.Fatalf("EmptyCall() failed: %v", err) | ||
} | ||
} | ||
|
||
// Make a bunch of concurrent RPCs on the above clientconns. These will | ||
// eventually race with Stop(), and will start to fail. | ||
var wg sync.WaitGroup | ||
for i := 0; i < numChannels; i++ { | ||
client := testgrpc.NewTestServiceClient(ccs[i]) | ||
for j := 0; j < numRPCLoops; j++ { | ||
wg.Add(1) | ||
go func(client testgrpc.TestServiceClient) { | ||
defer wg.Done() | ||
for { | ||
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) | ||
defer sCancel() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is probably why you're getting the deadline exceeded errors. Just leave these using the base ctx and it should be fine I think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh crap, didn't see this comment. But figure out the same anyways :). Could have saved 30m if I had looked earlier, but I think the investigation helped. |
||
_, err := client.EmptyCall(sCtx, &testpb.Empty{}) | ||
if err == nil { | ||
continue | ||
} | ||
if code := status.Code(err); code == codes.DeadlineExceeded || code == codes.Unavailable { | ||
// Once Stop() has been called on the server, we expect | ||
// subsequent calls to fail with Unavailable or | ||
// DeadlineExceeded, the latter happens when the client | ||
// channel moves to TRANSIENT_FAILURE and will never | ||
// recover from there. | ||
return | ||
} | ||
t.Errorf("EmptyCall() failed: %v", err) | ||
return | ||
} | ||
}(client) | ||
} | ||
} | ||
|
||
// Call Stop() concurrently with the above RPC attempts. | ||
ss.Stop() | ||
wg.Wait() | ||
} | ||
|
||
// Tests the case where the stream worker goroutine option is enabled, and both | ||
// Stop() and GracefulStop() care called. This used to result in a close of a | ||
// closed channel. This test verifies that there is no panic. | ||
func (s) TestStreamWorkers_GracefulStopAndStop(t *testing.T) { | ||
ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU()))) | ||
defer ss.Stop() | ||
|
||
if err := ss.StartClient(grpc.WithTransportCredentials(insecure.NewCredentials())); err != nil { | ||
t.Fatalf("Failed to create client to stub server: %v", err) | ||
} | ||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||
defer cancel() | ||
client := testgrpc.NewTestServiceClient(ss.CC) | ||
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { | ||
t.Fatalf("EmptyCall() failed: %v", err) | ||
} | ||
|
||
ss.S.GracefulStop() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: delete this
WaitForReady
as it should not be necessary and could persist a mistaken understanding that it might be necessary. (We used to have a bug waaay back when that would have required it.)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.