diff --git a/internal/transport/client_stream.go b/internal/transport/client_stream.go index ccc0e017e5e5..b73a9d2285fe 100644 --- a/internal/transport/client_stream.go +++ b/internal/transport/client_stream.go @@ -29,25 +29,27 @@ import ( // ClientStream implements streaming functionality for a gRPC client. type ClientStream struct { - *Stream // Embed for common stream functionality. + Stream // Embed for common stream functionality. ct *http2Client done chan struct{} // closed at the end of stream to unblock writers. doneFunc func() // invoked at the end of stream. - headerChan chan struct{} // closed to indicate the end of header metadata. - headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times. + headerChan chan struct{} // closed to indicate the end of header metadata. + header metadata.MD // the received header metadata + + status *status.Status // the status error received from the server + + // Non-pointer fields are at the end to optimize GC allocations. + // headerValid indicates whether a valid header was received. Only // meaningful after headerChan is closed (always call waitOnHeader() before // reading its value). - headerValid bool - header metadata.MD // the received header metadata - noHeaders bool // set if the client never received headers (set only after the stream is done). - - bytesReceived atomic.Bool // indicates whether any bytes have been received on this stream - unprocessed atomic.Bool // set if the server sends a refused stream or GOAWAY including this stream - - status *status.Status // the status error received from the server + headerValid bool + noHeaders bool // set if the client never received headers (set only after the stream is done). + headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times. + bytesReceived atomic.Bool // indicates whether any bytes have been received on this stream + unprocessed atomic.Bool // set if the server sends a refused stream or GOAWAY including this stream } // Read reads an n byte message from the input stream. diff --git a/internal/transport/flowcontrol.go b/internal/transport/flowcontrol.go index b4ca86d79eb3..7cfbc9637b88 100644 --- a/internal/transport/flowcontrol.go +++ b/internal/transport/flowcontrol.go @@ -28,7 +28,7 @@ import ( // writeQuota is a soft limit on the amount of data a stream can // schedule before some of it is written out. type writeQuota struct { - quota int32 + _ noCopy // get waits on read from when quota goes less than or equal to zero. // replenish writes on it when quota goes positive again. ch chan struct{} @@ -38,16 +38,17 @@ type writeQuota struct { // It is implemented as a field so that it can be updated // by tests. replenish func(n int) + quota int32 } -func newWriteQuota(sz int32, done <-chan struct{}) *writeQuota { - w := &writeQuota{ - quota: sz, - ch: make(chan struct{}, 1), - done: done, - } +// init allows a writeQuota to be initialized in-place, which is useful for +// resetting a buffer or for avoiding a heap allocation when the buffer is +// embedded in another struct. +func (w *writeQuota) init(sz int32, done <-chan struct{}) { + w.quota = sz + w.ch = make(chan struct{}, 1) + w.done = done w.replenish = w.realReplenish - return w } func (w *writeQuota) get(sz int32) error { diff --git a/internal/transport/handler_server.go b/internal/transport/handler_server.go index d954a64c38f4..aadc5d81c0ea 100644 --- a/internal/transport/handler_server.go +++ b/internal/transport/handler_server.go @@ -411,11 +411,10 @@ func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream ctx = metadata.NewIncomingContext(ctx, ht.headerMD) req := ht.req s := &ServerStream{ - Stream: &Stream{ + Stream: Stream{ id: 0, // irrelevant ctx: ctx, requestRead: func(int) {}, - buf: newRecvBuffer(), method: req.URL.Path, recvCompress: req.Header.Get("grpc-encoding"), contentSubtype: ht.contentSubtype, @@ -424,8 +423,9 @@ func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream st: ht, headerWireLength: 0, // won't have access to header wire length until golang/go#18997. } - s.trReader = &transportReader{ - reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf}, + s.Stream.buf.init() + s.trReader = transportReader{ + reader: recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: &s.buf}, windowHandler: func(int) {}, } diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index e06771c3397f..fd5f05828fd0 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -481,10 +481,9 @@ func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *ClientStream { // TODO(zhaoq): Handle uint32 overflow of Stream.id. s := &ClientStream{ - Stream: &Stream{ + Stream: Stream{ method: callHdr.Method, sendCompress: callHdr.SendCompress, - buf: newRecvBuffer(), contentSubtype: callHdr.ContentSubtype, }, ct: t, @@ -492,7 +491,8 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *ClientSt headerChan: make(chan struct{}), doneFunc: callHdr.DoneFunc, } - s.wq = newWriteQuota(defaultWriteQuota, s.done) + s.Stream.buf.init() + s.Stream.wq.init(defaultWriteQuota, s.done) s.requestRead = func(n int) { t.adjustWindow(s, uint32(n)) } @@ -500,11 +500,11 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *ClientSt // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done. // So we use the original context here instead of creating a copy. s.ctx = ctx - s.trReader = &transportReader{ - reader: &recvBufferReader{ + s.trReader = transportReader{ + reader: recvBufferReader{ ctx: s.ctx, ctxDone: s.ctx.Done(), - recv: s.buf, + recv: &s.buf, closeStream: func(err error) { s.Close(err) }, @@ -823,7 +823,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientS return nil }, onOrphaned: cleanup, - wq: s.wq, + wq: &s.wq, } firstTry := true var ch chan struct{} @@ -854,7 +854,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientS transportDrainRequired = t.nextID > MaxStreamID s.id = hdr.streamID - s.fc = &inFlow{limit: uint32(t.initialWindowSize)} + s.fc = inFlow{limit: uint32(t.initialWindowSize)} t.activeStreams[s.id] = s t.mu.Unlock() diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 83cee314c8f4..14fb4e34cac3 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -390,16 +390,15 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade } t.maxStreamID = streamID - buf := newRecvBuffer() s := &ServerStream{ - Stream: &Stream{ - id: streamID, - buf: buf, - fc: &inFlow{limit: uint32(t.initialWindowSize)}, + Stream: Stream{ + id: streamID, + fc: inFlow{limit: uint32(t.initialWindowSize)}, }, st: t, headerWireLength: int(frame.Header().Length), } + s.Stream.buf.init() var ( // if false, content-type was missing or invalid isGRPC = false @@ -644,12 +643,12 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade t.adjustWindow(s, uint32(n)) } s.ctxDone = s.ctx.Done() - s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone) - s.trReader = &transportReader{ - reader: &recvBufferReader{ + s.Stream.wq.init(defaultWriteQuota, s.ctxDone) + s.trReader = transportReader{ + reader: recvBufferReader{ ctx: s.ctx, ctxDone: s.ctxDone, - recv: s.buf, + recv: &s.buf, }, windowHandler: func(n int) { t.updateWindow(s, uint32(n)) @@ -658,7 +657,7 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade // Register the stream with loopy. t.controlBuf.put(®isterStream{ streamID: s.id, - wq: s.wq, + wq: &s.wq, }) handle(s) return nil diff --git a/internal/transport/server_stream.go b/internal/transport/server_stream.go index cf8da0b52d0a..b203568fc349 100644 --- a/internal/transport/server_stream.go +++ b/internal/transport/server_stream.go @@ -32,7 +32,7 @@ import ( // ServerStream implements streaming functionality for a gRPC server. type ServerStream struct { - *Stream // Embed for common stream functionality. + Stream // Embed for common stream functionality. st internalServerTransport ctxDone <-chan struct{} // closed at the end of stream. Cache of ctx.Done() (for performance) @@ -43,12 +43,13 @@ type ServerStream struct { // Holds compressor names passed in grpc-accept-encoding metadata from the // client. clientAdvertisedCompressors string - headerWireLength int // hdrMu protects outgoing header and trailer metadata. hdrMu sync.Mutex header metadata.MD // the outgoing header metadata. Updated by WriteHeader. headerSent atomic.Bool // atomically set when the headers are sent out. + + headerWireLength int } // Read reads an n byte message from the input stream. diff --git a/internal/transport/transport.go b/internal/transport/transport.go index 7dd53e80a75b..617f5ae04bcd 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -68,11 +68,11 @@ type recvBuffer struct { err error } -func newRecvBuffer() *recvBuffer { - b := &recvBuffer{ - c: make(chan recvMsg, 1), - } - return b +// init allows a recvBuffer to be initialized in-place, which is useful +// for resetting a buffer or for avoiding a heap allocation when the buffer +// is embedded in another struct. +func (b *recvBuffer) init() { + b.c = make(chan recvMsg, 1) } func (b *recvBuffer) put(r recvMsg) { @@ -123,6 +123,7 @@ func (b *recvBuffer) get() <-chan recvMsg { // recvBufferReader implements io.Reader interface to read the data from // recvBuffer. type recvBufferReader struct { + _ noCopy closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata. ctx context.Context ctxDone <-chan struct{} // cache of ctx.Done() (for performance). @@ -285,27 +286,28 @@ const ( // Stream represents an RPC in the transport layer. type Stream struct { - id uint32 ctx context.Context // the associated context of the stream method string // the associated RPC method of the stream recvCompress string sendCompress string - buf *recvBuffer - trReader *transportReader - fc *inFlow - wq *writeQuota // Callback to state application's intentions to read data. This // is used to adjust flow control, if needed. requestRead func(int) - state streamState - // contentSubtype is the content-subtype for requests. // this must be lowercase or the behavior is undefined. contentSubtype string trailer metadata.MD // the key-value map of trailer metadata. + + // Non-pointer fields are at the end to optimize GC performance. + state streamState + id uint32 + buf recvBuffer + trReader transportReader + fc inFlow + wq writeQuota } func (s *Stream) swapState(st streamState) streamState { @@ -401,16 +403,28 @@ func (s *Stream) read(n int) (data mem.BufferSlice, err error) { return data, nil } +// noCopy may be embedded into structs which must not be copied +// after the first use. +// +// See https://golang.org/issues/8005#issuecomment-190753527 +// for details. +type noCopy struct { +} + +func (*noCopy) Lock() {} +func (*noCopy) Unlock() {} + // transportReader reads all the data available for this Stream from the transport and // passes them into the decoder, which converts them into a gRPC message stream. // The error is io.EOF when the stream is done or another non-nil error if // the stream broke. type transportReader struct { - reader *recvBufferReader + _ noCopy // The handler to control the window update procedure for both this // particular stream and the associated transport. windowHandler func(int) er error + reader recvBufferReader } func (t *transportReader) ReadMessageHeader(header []byte) (int, error) { diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 0b0396b0ab37..bd9e663aa38a 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -88,9 +88,6 @@ func (s *Stream) readTo(p []byte) (int, error) { } if data.Len() != len(p) { - if err == nil { - err = io.ErrUnexpectedEOF - } return 0, err } @@ -1856,17 +1853,16 @@ func waitWhileTrue(t *testing.T, condition func() (bool, error)) { func (s) TestReadGivesSameErrorAfterAnyErrorOccurs(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - testRecvBuffer := newRecvBuffer() s := &Stream{ ctx: ctx, - buf: testRecvBuffer, requestRead: func(int) {}, } - s.trReader = &transportReader{ - reader: &recvBufferReader{ + s.buf.init() + s.trReader = transportReader{ + reader: recvBufferReader{ ctx: s.ctx, ctxDone: s.ctx.Done(), - recv: s.buf, + recv: &s.buf, }, windowHandler: func(int) {}, } @@ -2591,8 +2587,8 @@ func (s) TestClientHandshakeInfoDialer(t *testing.T) { func (s) TestClientDecodeHeaderStatusErr(t *testing.T) { testStream := func() *ClientStream { return &ClientStream{ - Stream: &Stream{ - buf: &recvBuffer{ + Stream: Stream{ + buf: recvBuffer{ c: make(chan recvMsg), mu: sync.Mutex{}, }, @@ -3071,22 +3067,24 @@ func (s) TestCloseSetsConnectionDeadlines(t *testing.T) { // number of bytes read for flow control is correct. func (s) TestReadMessageHeaderMultipleBuffers(t *testing.T) { headerLen := 5 - recvBuffer := newRecvBuffer() - recvBuffer.put(recvMsg{buffer: make(mem.SliceBuffer, 3)}) - recvBuffer.put(recvMsg{buffer: make(mem.SliceBuffer, headerLen-3)}) bytesRead := 0 s := Stream{ requestRead: func(int) {}, - trReader: &transportReader{ - reader: &recvBufferReader{ - recv: recvBuffer, - }, - windowHandler: func(i int) { - bytesRead += i - }, + } + s.buf.init() + recvBuffer := &s.buf + s.trReader = transportReader{ + reader: recvBufferReader{ + recv: recvBuffer, + }, + windowHandler: func(i int) { + bytesRead += i }, } + recvBuffer.put(recvMsg{buffer: make(mem.SliceBuffer, 3)}) + recvBuffer.put(recvMsg{buffer: make(mem.SliceBuffer, headerLen-3)}) + header := make([]byte, headerLen) err := s.ReadMessageHeader(header) if err != nil { @@ -3189,8 +3187,8 @@ func (s) TestServerSendsRSTAfterDeadlineToMisbehavedClient(t *testing.T) { func (s) TestClientTransport_Handle1xxHeaders(t *testing.T) { testStream := func() *ClientStream { return &ClientStream{ - Stream: &Stream{ - buf: &recvBuffer{ + Stream: Stream{ + buf: recvBuffer{ c: make(chan recvMsg), mu: sync.Mutex{}, }, diff --git a/rpc_util.go b/rpc_util.go index 47ea09f5c9bc..7255585e6755 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -657,8 +657,20 @@ type streamReader interface { Read(n int) (mem.BufferSlice, error) } +// noCopy may be embedded into structs which must not be copied +// after the first use. +// +// See https://golang.org/issues/8005#issuecomment-190753527 +// for details. +type noCopy struct { +} + +func (*noCopy) Lock() {} +func (*noCopy) Unlock() {} + // parser reads complete gRPC messages from the underlying reader. type parser struct { + _ noCopy // r is the underlying reader. // See the comment on recvMsg for the permissible // error types. diff --git a/server.go b/server.go index 1da2a542acde..ded3c3442982 100644 --- a/server.go +++ b/server.go @@ -1596,7 +1596,7 @@ func (s *Server) processStreamingRPC(ctx context.Context, stream *transport.Serv ss := &serverStream{ ctx: ctx, s: stream, - p: &parser{r: stream, bufferPool: s.opts.bufferPool}, + p: parser{r: stream, bufferPool: s.opts.bufferPool}, codec: s.getCodec(stream.ContentSubtype()), desc: sd, maxReceiveMessageSize: s.opts.maxReceiveMessageSize, diff --git a/stream.go b/stream.go index 0a0af8961f05..f74a68752c27 100644 --- a/stream.go +++ b/stream.go @@ -529,7 +529,7 @@ func (a *csAttempt) newStream() error { } a.transportStream = s a.ctx = s.Context() - a.parser = &parser{r: s, bufferPool: a.cs.cc.dopts.copts.BufferPool} + a.parser = parser{r: s, bufferPool: a.cs.cc.dopts.copts.BufferPool} return nil } @@ -601,7 +601,7 @@ type csAttempt struct { cs *clientStream transport transport.ClientTransport transportStream *transport.ClientStream - parser *parser + parser parser pickResult balancer.PickResult finished bool @@ -1141,7 +1141,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { // Only initialize this state once per stream. a.decompressorSet = true } - if err := recv(a.parser, cs.codec, a.transportStream, a.decompressorV0, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decompressorV1, false); err != nil { + if err := recv(&a.parser, cs.codec, a.transportStream, a.decompressorV0, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decompressorV1, false); err != nil { if err == io.EOF { if statusErr := a.transportStream.Status().Err(); statusErr != nil { return statusErr @@ -1179,7 +1179,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { } // Special handling for non-server-stream rpcs. // This recv expects EOF or errors, so we don't collect inPayload. - if err := recv(a.parser, cs.codec, a.transportStream, a.decompressorV0, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decompressorV1, false); err == io.EOF { + if err := recv(&a.parser, cs.codec, a.transportStream, a.decompressorV0, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decompressorV1, false); err == io.EOF { return a.transportStream.Status().Err() // non-server streaming Recv returns nil on success } else if err != nil { return toRPCErr(err) @@ -1331,7 +1331,7 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin return nil, err } as.transportStream = s - as.parser = &parser{r: s, bufferPool: ac.dopts.copts.BufferPool} + as.parser = parser{r: s, bufferPool: ac.dopts.copts.BufferPool} ac.incrCallsStarted() if desc != unaryStreamDesc { // Listen on stream context to cleanup when the stream context is @@ -1374,7 +1374,7 @@ type addrConnStream struct { decompressorSet bool decompressorV0 Decompressor decompressorV1 encoding.Compressor - parser *parser + parser parser // mu guards finished and is held for the entire finish method. mu sync.Mutex @@ -1487,7 +1487,7 @@ func (as *addrConnStream) RecvMsg(m any) (err error) { // Only initialize this state once per stream. as.decompressorSet = true } - if err := recv(as.parser, as.codec, as.transportStream, as.decompressorV0, m, *as.callInfo.maxReceiveMessageSize, nil, as.decompressorV1, false); err != nil { + if err := recv(&as.parser, as.codec, as.transportStream, as.decompressorV0, m, *as.callInfo.maxReceiveMessageSize, nil, as.decompressorV1, false); err != nil { if err == io.EOF { if statusErr := as.transportStream.Status().Err(); statusErr != nil { return statusErr @@ -1509,7 +1509,7 @@ func (as *addrConnStream) RecvMsg(m any) (err error) { // Special handling for non-server-stream rpcs. // This recv expects EOF or errors, so we don't collect inPayload. - if err := recv(as.parser, as.codec, as.transportStream, as.decompressorV0, m, *as.callInfo.maxReceiveMessageSize, nil, as.decompressorV1, false); err == io.EOF { + if err := recv(&as.parser, as.codec, as.transportStream, as.decompressorV0, m, *as.callInfo.maxReceiveMessageSize, nil, as.decompressorV1, false); err == io.EOF { return as.transportStream.Status().Err() // non-server streaming Recv returns nil on success } else if err != nil { return toRPCErr(err) @@ -1597,7 +1597,7 @@ type ServerStream interface { type serverStream struct { ctx context.Context s *transport.ServerStream - p *parser + p parser codec baseCodec desc *StreamDesc @@ -1788,7 +1788,7 @@ func (ss *serverStream) RecvMsg(m any) (err error) { payInfo = &payloadInfo{} defer payInfo.free() } - if err := recv(ss.p, ss.codec, ss.s, ss.decompressorV0, m, ss.maxReceiveMessageSize, payInfo, ss.decompressorV1, true); err != nil { + if err := recv(&ss.p, ss.codec, ss.s, ss.decompressorV0, m, ss.maxReceiveMessageSize, payInfo, ss.decompressorV1, true); err != nil { if err == io.EOF { if len(ss.binlogs) != 0 { chc := &binarylog.ClientHalfClose{} @@ -1834,7 +1834,7 @@ func (ss *serverStream) RecvMsg(m any) (err error) { } // Special handling for non-client-stream rpcs. // This recv expects EOF or errors, so we don't collect inPayload. - if err := recv(ss.p, ss.codec, ss.s, ss.decompressorV0, m, ss.maxReceiveMessageSize, nil, ss.decompressorV1, true); err == io.EOF { + if err := recv(&ss.p, ss.codec, ss.s, ss.decompressorV0, m, ss.maxReceiveMessageSize, nil, ss.decompressorV1, true); err == io.EOF { return nil } else if err != nil { return err