Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions internal/transport/client_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,27 @@ import (

// ClientStream implements streaming functionality for a gRPC client.
type ClientStream struct {
*Stream // Embed for common stream functionality.
Stream // Embed for common stream functionality.

ct *http2Client
done chan struct{} // closed at the end of stream to unblock writers.
doneFunc func() // invoked at the end of stream.

headerChan chan struct{} // closed to indicate the end of header metadata.
headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
headerChan chan struct{} // closed to indicate the end of header metadata.
header metadata.MD // the received header metadata

status *status.Status // the status error received from the server

// Non-pointer fields are at the end to optimize GC allocations.

// headerValid indicates whether a valid header was received. Only
// meaningful after headerChan is closed (always call waitOnHeader() before
// reading its value).
headerValid bool
header metadata.MD // the received header metadata
noHeaders bool // set if the client never received headers (set only after the stream is done).

bytesReceived atomic.Bool // indicates whether any bytes have been received on this stream
unprocessed atomic.Bool // set if the server sends a refused stream or GOAWAY including this stream

status *status.Status // the status error received from the server
headerValid bool
noHeaders bool // set if the client never received headers (set only after the stream is done).
headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
bytesReceived atomic.Bool // indicates whether any bytes have been received on this stream
unprocessed atomic.Bool // set if the server sends a refused stream or GOAWAY including this stream
}

// Read reads an n byte message from the input stream.
Expand Down
17 changes: 9 additions & 8 deletions internal/transport/flowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
// writeQuota is a soft limit on the amount of data a stream can
// schedule before some of it is written out.
type writeQuota struct {
quota int32
_ noCopy
// get waits on read from when quota goes less than or equal to zero.
// replenish writes on it when quota goes positive again.
ch chan struct{}
Expand All @@ -38,16 +38,17 @@ type writeQuota struct {
// It is implemented as a field so that it can be updated
// by tests.
replenish func(n int)
quota int32
}

func newWriteQuota(sz int32, done <-chan struct{}) *writeQuota {
w := &writeQuota{
quota: sz,
ch: make(chan struct{}, 1),
done: done,
}
// init allows a writeQuota to be initialized in-place, which is useful for
// resetting a buffer or for avoiding a heap allocation when the buffer is
// embedded in another struct.
func (w *writeQuota) init(sz int32, done <-chan struct{}) {
w.quota = sz
w.ch = make(chan struct{}, 1)
w.done = done
w.replenish = w.realReplenish
return w
}

func (w *writeQuota) get(sz int32) error {
Expand Down
8 changes: 4 additions & 4 deletions internal/transport/handler_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,11 +411,10 @@ func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream
ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
req := ht.req
s := &ServerStream{
Stream: &Stream{
Stream: Stream{
id: 0, // irrelevant
ctx: ctx,
requestRead: func(int) {},
buf: newRecvBuffer(),
method: req.URL.Path,
recvCompress: req.Header.Get("grpc-encoding"),
contentSubtype: ht.contentSubtype,
Expand All @@ -424,8 +423,9 @@ func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream
st: ht,
headerWireLength: 0, // won't have access to header wire length until golang/go#18997.
}
s.trReader = &transportReader{
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf},
s.Stream.buf.init()
s.trReader = transportReader{
reader: recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: &s.buf},
windowHandler: func(int) {},
}

Expand Down
16 changes: 8 additions & 8 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,30 +481,30 @@ func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *ClientStream {
// TODO(zhaoq): Handle uint32 overflow of Stream.id.
s := &ClientStream{
Stream: &Stream{
Stream: Stream{
method: callHdr.Method,
sendCompress: callHdr.SendCompress,
buf: newRecvBuffer(),
contentSubtype: callHdr.ContentSubtype,
},
ct: t,
done: make(chan struct{}),
headerChan: make(chan struct{}),
doneFunc: callHdr.DoneFunc,
}
s.wq = newWriteQuota(defaultWriteQuota, s.done)
s.Stream.buf.init()
s.Stream.wq.init(defaultWriteQuota, s.done)
s.requestRead = func(n int) {
t.adjustWindow(s, uint32(n))
}
// The client side stream context should have exactly the same life cycle with the user provided context.
// That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
// So we use the original context here instead of creating a copy.
s.ctx = ctx
s.trReader = &transportReader{
reader: &recvBufferReader{
s.trReader = transportReader{
reader: recvBufferReader{
ctx: s.ctx,
ctxDone: s.ctx.Done(),
recv: s.buf,
recv: &s.buf,
closeStream: func(err error) {
s.Close(err)
},
Expand Down Expand Up @@ -823,7 +823,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientS
return nil
},
onOrphaned: cleanup,
wq: s.wq,
wq: &s.wq,
}
firstTry := true
var ch chan struct{}
Expand Down Expand Up @@ -854,7 +854,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientS
transportDrainRequired = t.nextID > MaxStreamID

s.id = hdr.streamID
s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
s.fc = inFlow{limit: uint32(t.initialWindowSize)}
t.activeStreams[s.id] = s
t.mu.Unlock()

Expand Down
19 changes: 9 additions & 10 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,16 +390,15 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
}
t.maxStreamID = streamID

buf := newRecvBuffer()
s := &ServerStream{
Stream: &Stream{
id: streamID,
buf: buf,
fc: &inFlow{limit: uint32(t.initialWindowSize)},
Stream: Stream{
id: streamID,
fc: inFlow{limit: uint32(t.initialWindowSize)},
},
st: t,
headerWireLength: int(frame.Header().Length),
}
s.Stream.buf.init()
var (
// if false, content-type was missing or invalid
isGRPC = false
Expand Down Expand Up @@ -644,12 +643,12 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
t.adjustWindow(s, uint32(n))
}
s.ctxDone = s.ctx.Done()
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
s.trReader = &transportReader{
reader: &recvBufferReader{
s.Stream.wq.init(defaultWriteQuota, s.ctxDone)
s.trReader = transportReader{
reader: recvBufferReader{
ctx: s.ctx,
ctxDone: s.ctxDone,
recv: s.buf,
recv: &s.buf,
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
Expand All @@ -658,7 +657,7 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
// Register the stream with loopy.
t.controlBuf.put(&registerStream{
streamID: s.id,
wq: s.wq,
wq: &s.wq,
})
handle(s)
return nil
Expand Down
5 changes: 3 additions & 2 deletions internal/transport/server_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (

// ServerStream implements streaming functionality for a gRPC server.
type ServerStream struct {
*Stream // Embed for common stream functionality.
Stream // Embed for common stream functionality.

st internalServerTransport
ctxDone <-chan struct{} // closed at the end of stream. Cache of ctx.Done() (for performance)
Expand All @@ -43,12 +43,13 @@ type ServerStream struct {
// Holds compressor names passed in grpc-accept-encoding metadata from the
// client.
clientAdvertisedCompressors string
headerWireLength int

// hdrMu protects outgoing header and trailer metadata.
hdrMu sync.Mutex
header metadata.MD // the outgoing header metadata. Updated by WriteHeader.
headerSent atomic.Bool // atomically set when the headers are sent out.

headerWireLength int
}

// Read reads an n byte message from the input stream.
Expand Down
40 changes: 27 additions & 13 deletions internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ type recvBuffer struct {
err error
}

func newRecvBuffer() *recvBuffer {
b := &recvBuffer{
c: make(chan recvMsg, 1),
}
return b
// init allows a recvBuffer to be initialized in-place, which is useful
// for resetting a buffer or for avoiding a heap allocation when the buffer
// is embedded in another struct.
func (b *recvBuffer) init() {
b.c = make(chan recvMsg, 1)
}

func (b *recvBuffer) put(r recvMsg) {
Expand Down Expand Up @@ -123,6 +123,7 @@ func (b *recvBuffer) get() <-chan recvMsg {
// recvBufferReader implements io.Reader interface to read the data from
// recvBuffer.
type recvBufferReader struct {
_ noCopy
closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata.
ctx context.Context
ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
Expand Down Expand Up @@ -285,27 +286,28 @@ const (

// Stream represents an RPC in the transport layer.
type Stream struct {
id uint32
ctx context.Context // the associated context of the stream
method string // the associated RPC method of the stream
recvCompress string
sendCompress string
buf *recvBuffer
trReader *transportReader
fc *inFlow
wq *writeQuota

// Callback to state application's intentions to read data. This
// is used to adjust flow control, if needed.
requestRead func(int)

state streamState

// contentSubtype is the content-subtype for requests.
// this must be lowercase or the behavior is undefined.
contentSubtype string

trailer metadata.MD // the key-value map of trailer metadata.

// Non-pointer fields are at the end to optimize GC performance.
state streamState
id uint32
buf recvBuffer
trReader transportReader
fc inFlow
wq writeQuota
}

func (s *Stream) swapState(st streamState) streamState {
Expand Down Expand Up @@ -401,16 +403,28 @@ func (s *Stream) read(n int) (data mem.BufferSlice, err error) {
return data, nil
}

// noCopy may be embedded into structs which must not be copied
// after the first use.
//
// See https://golang.org/issues/8005#issuecomment-190753527
// for details.
type noCopy struct {
}

func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}

// transportReader reads all the data available for this Stream from the transport and
// passes them into the decoder, which converts them into a gRPC message stream.
// The error is io.EOF when the stream is done or another non-nil error if
// the stream broke.
type transportReader struct {
reader *recvBufferReader
_ noCopy
// The handler to control the window update procedure for both this
// particular stream and the associated transport.
windowHandler func(int)
er error
reader recvBufferReader
}

func (t *transportReader) ReadMessageHeader(header []byte) (int, error) {
Expand Down
Loading