Skip to content

Commit

Permalink
Export changes to OSS.
Browse files Browse the repository at this point in the history
  • Loading branch information
MakMukhi committed Apr 3, 2018
1 parent 2eae9d0 commit 8dd7549
Show file tree
Hide file tree
Showing 10 changed files with 1,942 additions and 1,848 deletions.
13 changes: 13 additions & 0 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1241,7 +1241,20 @@ func (ac *addrConn) transportMonitor() {
// Block until we receive a goaway or an error occurs.
select {
case <-t.GoAway():
done := t.Error()
cleanup := t.Close
// Since this transport will be orphaned (won't have a transportMonitor)
// we need to launch a goroutine to keep track of clientConn.Close()
// happening since it might not be noticed by any other goroutine for a while.
go func() {
<-done
cleanup()
}()
case <-t.Error():
// In case this is triggered because clientConn.Close()
// was called, we want to immeditately close the transport
// since no other goroutine might notice it for a while.
t.Close()
case <-cdeadline:
ac.mu.Lock()
// This implies that client received server preface.
Expand Down
83 changes: 40 additions & 43 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,6 @@ type lazyConn struct {

func (l *lazyConn) Write(b []byte) (int, error) {
if atomic.LoadInt32(&(l.beLazy)) == 1 {
// The sleep duration here needs to less than the leakCheck deadline.
time.Sleep(time.Second)
}
return l.Conn.Write(b)
Expand Down Expand Up @@ -963,7 +962,7 @@ func testServerGoAwayPendingRPC(t *testing.T, e env) {
}
// The existing RPC should be still good to proceed.
if err := stream.Send(req); err != nil {
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
}
if _, err := stream.Recv(); err != nil {
t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
Expand Down Expand Up @@ -3053,7 +3052,6 @@ func testMultipleSetHeaderStreamingRPCError(t *testing.T, e env) {
if !reflect.DeepEqual(header, expectedHeader) {
t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
}

if err := stream.CloseSend(); err != nil {
t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
}
Expand Down Expand Up @@ -3156,44 +3154,6 @@ func testRetry(t *testing.T, e env) {
}
}

func TestRPCTimeout(t *testing.T) {
defer leakcheck.Check(t)
for _, e := range listTestEnv() {
testRPCTimeout(t, e)
}
}

// TODO(zhaoq): Have a better test coverage of timeout and cancellation mechanism.
func testRPCTimeout(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security, unaryCallSleepTime: 50 * time.Millisecond})
defer te.tearDown()

cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)

const argSize = 2718
const respSize = 314

payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
if err != nil {
t.Fatal(err)
}

req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseSize: respSize,
Payload: payload,
}
for i := -1; i <= 10; i++ {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(i)*time.Millisecond)
if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/UnaryCallv(_, _) = _, %v; want <nil>, error code: %s", err, codes.DeadlineExceeded)
}
cancel()
}
}

func TestCancel(t *testing.T) {
defer leakcheck.Check(t)
for _, e := range listTestEnv() {
Expand Down Expand Up @@ -3687,7 +3647,7 @@ func testClientStreaming(t *testing.T, e env, sizes []int) {
Payload: payload,
}
if err := stream.Send(req); err != nil {
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
}
sum += s
}
Expand Down Expand Up @@ -5078,7 +5038,7 @@ func TestTapTimeout(t *testing.T) {
ss := &stubServer{
emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
<-ctx.Done()
return &testpb.Empty{}, nil
return nil, status.Errorf(codes.Canceled, ctx.Err().Error())
},
}
if err := ss.Start(sopts); err != nil {
Expand Down Expand Up @@ -6218,3 +6178,40 @@ func TestFailFastRPCErrorOnBadCertificates(t *testing.T) {
}
te.t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want err.Error() contains %q", err, clientAlwaysFailCredErrorMsg)
}

func TestRPCTimeout(t *testing.T) {
defer leakcheck.Check(t)
for _, e := range listTestEnv() {
testRPCTimeout(t, e)
}
}

func testRPCTimeout(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security, unaryCallSleepTime: 500 * time.Millisecond})
defer te.tearDown()

cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)

const argSize = 2718
const respSize = 314

payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
if err != nil {
t.Fatal(err)
}

req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseSize: respSize,
Payload: payload,
}
for i := -1; i <= 10; i++ {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(i)*time.Millisecond)
if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/UnaryCallv(_, _) = _, %v; want <nil>, error code: %s", err, codes.DeadlineExceeded)
}
cancel()
}
}
Loading

0 comments on commit 8dd7549

Please sign in to comment.