Skip to content

Commit

Permalink
stream: fix calloption.After() race in finish (#3672)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl authored Jun 11, 2020
1 parent d5bc6ec commit 9aa97f9
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 37 deletions.
45 changes: 19 additions & 26 deletions rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ func (d *gzipDecompressor) Type() string {
type callInfo struct {
compressorType string
failFast bool
stream ClientStream
maxReceiveMessageSize *int
maxSendMessageSize *int
creds credentials.PerRPCCredentials
Expand All @@ -180,16 +179,16 @@ type CallOption interface {

// after is called after the call has completed. after cannot return an
// error, so any failures should be reported via output parameters.
after(*callInfo)
after(*callInfo, *csAttempt)
}

// EmptyCallOption does not alter the Call configuration.
// It can be embedded in another structure to carry satellite data for use
// by interceptors.
type EmptyCallOption struct{}

func (EmptyCallOption) before(*callInfo) error { return nil }
func (EmptyCallOption) after(*callInfo) {}
func (EmptyCallOption) before(*callInfo) error { return nil }
func (EmptyCallOption) after(*callInfo, *csAttempt) {}

// Header returns a CallOptions that retrieves the header metadata
// for a unary RPC.
Expand All @@ -205,10 +204,8 @@ type HeaderCallOption struct {
}

func (o HeaderCallOption) before(c *callInfo) error { return nil }
func (o HeaderCallOption) after(c *callInfo) {
if c.stream != nil {
*o.HeaderAddr, _ = c.stream.Header()
}
func (o HeaderCallOption) after(c *callInfo, attempt *csAttempt) {
*o.HeaderAddr, _ = attempt.s.Header()
}

// Trailer returns a CallOptions that retrieves the trailer metadata
Expand All @@ -225,10 +222,8 @@ type TrailerCallOption struct {
}

func (o TrailerCallOption) before(c *callInfo) error { return nil }
func (o TrailerCallOption) after(c *callInfo) {
if c.stream != nil {
*o.TrailerAddr = c.stream.Trailer()
}
func (o TrailerCallOption) after(c *callInfo, attempt *csAttempt) {
*o.TrailerAddr = attempt.s.Trailer()
}

// Peer returns a CallOption that retrieves peer information for a unary RPC.
Expand All @@ -245,11 +240,9 @@ type PeerCallOption struct {
}

func (o PeerCallOption) before(c *callInfo) error { return nil }
func (o PeerCallOption) after(c *callInfo) {
if c.stream != nil {
if x, ok := peer.FromContext(c.stream.Context()); ok {
*o.PeerAddr = *x
}
func (o PeerCallOption) after(c *callInfo, attempt *csAttempt) {
if x, ok := peer.FromContext(attempt.s.Context()); ok {
*o.PeerAddr = *x
}
}

Expand Down Expand Up @@ -285,7 +278,7 @@ func (o FailFastCallOption) before(c *callInfo) error {
c.failFast = o.FailFast
return nil
}
func (o FailFastCallOption) after(c *callInfo) {}
func (o FailFastCallOption) after(c *callInfo, attempt *csAttempt) {}

// MaxCallRecvMsgSize returns a CallOption which sets the maximum message size
// in bytes the client can receive.
Expand All @@ -304,7 +297,7 @@ func (o MaxRecvMsgSizeCallOption) before(c *callInfo) error {
c.maxReceiveMessageSize = &o.MaxRecvMsgSize
return nil
}
func (o MaxRecvMsgSizeCallOption) after(c *callInfo) {}
func (o MaxRecvMsgSizeCallOption) after(c *callInfo, attempt *csAttempt) {}

// MaxCallSendMsgSize returns a CallOption which sets the maximum message size
// in bytes the client can send.
Expand All @@ -323,7 +316,7 @@ func (o MaxSendMsgSizeCallOption) before(c *callInfo) error {
c.maxSendMessageSize = &o.MaxSendMsgSize
return nil
}
func (o MaxSendMsgSizeCallOption) after(c *callInfo) {}
func (o MaxSendMsgSizeCallOption) after(c *callInfo, attempt *csAttempt) {}

// PerRPCCredentials returns a CallOption that sets credentials.PerRPCCredentials
// for a call.
Expand All @@ -342,7 +335,7 @@ func (o PerRPCCredsCallOption) before(c *callInfo) error {
c.creds = o.Creds
return nil
}
func (o PerRPCCredsCallOption) after(c *callInfo) {}
func (o PerRPCCredsCallOption) after(c *callInfo, attempt *csAttempt) {}

// UseCompressor returns a CallOption which sets the compressor used when
// sending the request. If WithCompressor is also set, UseCompressor has
Expand All @@ -363,7 +356,7 @@ func (o CompressorCallOption) before(c *callInfo) error {
c.compressorType = o.CompressorType
return nil
}
func (o CompressorCallOption) after(c *callInfo) {}
func (o CompressorCallOption) after(c *callInfo, attempt *csAttempt) {}

// CallContentSubtype returns a CallOption that will set the content-subtype
// for a call. For example, if content-subtype is "json", the Content-Type over
Expand Down Expand Up @@ -396,7 +389,7 @@ func (o ContentSubtypeCallOption) before(c *callInfo) error {
c.contentSubtype = o.ContentSubtype
return nil
}
func (o ContentSubtypeCallOption) after(c *callInfo) {}
func (o ContentSubtypeCallOption) after(c *callInfo, attempt *csAttempt) {}

// ForceCodec returns a CallOption that will set the given Codec to be
// used for all request and response messages for a call. The result of calling
Expand Down Expand Up @@ -428,7 +421,7 @@ func (o ForceCodecCallOption) before(c *callInfo) error {
c.codec = o.Codec
return nil
}
func (o ForceCodecCallOption) after(c *callInfo) {}
func (o ForceCodecCallOption) after(c *callInfo, attempt *csAttempt) {}

// CallCustomCodec behaves like ForceCodec, but accepts a grpc.Codec instead of
// an encoding.Codec.
Expand All @@ -450,7 +443,7 @@ func (o CustomCodecCallOption) before(c *callInfo) error {
c.codec = o.Codec
return nil
}
func (o CustomCodecCallOption) after(c *callInfo) {}
func (o CustomCodecCallOption) after(c *callInfo, attempt *csAttempt) {}

// MaxRetryRPCBufferSize returns a CallOption that limits the amount of memory
// used for buffering this RPC's requests for retry purposes.
Expand All @@ -471,7 +464,7 @@ func (o MaxRetryRPCBufferSizeCallOption) before(c *callInfo) error {
c.maxRetryRPCBufferSize = o.MaxRetryRPCBufferSize
return nil
}
func (o MaxRetryRPCBufferSizeCallOption) after(c *callInfo) {}
func (o MaxRetryRPCBufferSizeCallOption) after(c *callInfo, attempt *csAttempt) {}

// The format of the payload: compressed or not?
type payloadFormat uint8
Expand Down
20 changes: 9 additions & 11 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,6 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}
cs.binlog = binarylog.GetMethodLogger(method)

cs.callInfo.stream = cs
// Only this initial attempt has stats/tracing.
// TODO(dfawley): move to newAttempt when per-attempt stats are implemented.
if err := cs.newAttemptLocked(sh, trInfo); err != nil {
Expand Down Expand Up @@ -799,6 +798,15 @@ func (cs *clientStream) finish(err error) {
}
cs.finished = true
cs.commitAttemptLocked()
if cs.attempt != nil {
cs.attempt.finish(err)
// after functions all rely upon having a stream.
if cs.attempt.s != nil {
for _, o := range cs.opts {
o.after(cs.callInfo, cs.attempt)
}
}
}
cs.mu.Unlock()
// For binary logging. only log cancel in finish (could be caused by RPC ctx
// canceled or ClientConn closed). Trailer will be logged in RecvMsg.
Expand All @@ -820,15 +828,6 @@ func (cs *clientStream) finish(err error) {
cs.cc.incrCallsSucceeded()
}
}
if cs.attempt != nil {
cs.attempt.finish(err)
// after functions all rely upon having a stream.
if cs.attempt.s != nil {
for _, o := range cs.opts {
o.after(cs.callInfo)
}
}
}
cs.cancel()
}

Expand Down Expand Up @@ -1066,7 +1065,6 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin
t: t,
}

as.callInfo.stream = as
s, err := as.t.NewStream(as.ctx, as.callHdr)
if err != nil {
err = toRPCErr(err)
Expand Down
59 changes: 59 additions & 0 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7128,3 +7128,62 @@ func (s) TestGzipBadChecksum(t *testing.T) {
t.Errorf("ss.client.UnaryCall(_) = _, %v\n\twant: _, status(codes.Internal, contains %q)", err, gzip.ErrChecksum)
}
}

// When an RPC is canceled, it's possible that the last Recv() returns before
// all call options' after are executed.
func (s) TestCanceledRPCCallOptionRace(t *testing.T) {
ss := &stubServer{
fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
err := stream.Send(&testpb.StreamingOutputCallResponse{})
if err != nil {
return err
}
<-stream.Context().Done()
return nil
},
}
if err := ss.Start(nil); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()

const count = 1000
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

var wg sync.WaitGroup
wg.Add(count)
for i := 0; i < count; i++ {
go func() {
defer wg.Done()
var p peer.Peer
ctx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := ss.client.FullDuplexCall(ctx, grpc.Peer(&p))
if err != nil {
t.Errorf("_.FullDuplexCall(_) = _, %v", err)
return
}
if err := stream.Send(&testpb.StreamingOutputCallRequest{}); err != nil {
t.Errorf("_ has error %v while sending", err)
return
}
if _, err := stream.Recv(); err != nil {
t.Errorf("%v.Recv() = %v", stream, err)
return
}
cancel()
if _, err := stream.Recv(); status.Code(err) != codes.Canceled {
t.Errorf("%v compleled with error %v, want %s", stream, err, codes.Canceled)
return
}
// If recv returns before call options are executed, peer.Addr is not set,
// fail the test.
if p.Addr == nil {
t.Errorf("peer.Addr is nil, want non-nil")
return
}
}()
}
wg.Wait()
}

0 comments on commit 9aa97f9

Please sign in to comment.