Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid unnecessary flushing during unary response processing #1373

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
opts := &transport.Options{
Last: true,
Delay: false,
Delay: true,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO "Delay" here should be interpreted by sendResponse as applying to the response as a whole -- header, payload, and trailer -- not each piece independently.

So, I believe this actually should still be "false", and it shouldn't be factored into the logic in sendResponse until the final step (where looking for other writers before flushing).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. It seems like the calls to adjustNumWriter and the associated flushing are at too low a level. For unary RPCs, the number of writers should be incremented before calling sendResponse and decremented after the WriteStatus calls. The flushing logic currently in sendResponse is a bit difficult to reason about given the interaction with flow control and it isn't obvious to me how to pull it to a higher level. Perhaps this is the subtle bug you're referring to.

I'm happy to leave this work in your hands. Consider this PR more an indication of the importance of this work. The unnecessary flushes introduce a significant amount of latency!

}
if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
if err == io.EOF {
Expand Down
23 changes: 14 additions & 9 deletions transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
}
}

func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) error {
func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream, flush bool) error {
first := true
endHeaders := false
var err error
Expand All @@ -673,10 +673,10 @@ func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) e
EndStream: endStream,
EndHeaders: endHeaders,
}
err = t.framer.writeHeaders(endHeaders, p)
err = t.framer.writeHeaders(endHeaders && flush, p)
first = false
} else {
err = t.framer.writeContinuation(endHeaders, s.id, endHeaders, b.Next(size))
err = t.framer.writeContinuation(endHeaders && flush, s.id, endHeaders, b.Next(size))
}
if err != nil {
t.Close()
Expand All @@ -688,6 +688,10 @@ func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) e

// WriteHeader sends the header metedata md back to the client.
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
return t.writeHeader(s, md, true /* flush */)
}

func (t *http2Server) writeHeader(s *Stream, md metadata.MD, flush bool) error {
s.mu.Lock()
if s.headerOk || s.state == streamDone {
s.mu.Unlock()
Expand Down Expand Up @@ -722,7 +726,7 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
}
}
bufLen := t.hBuf.Len()
if err := t.writeHeaders(s, t.hBuf, false); err != nil {
if err := t.writeHeaders(s, t.hBuf, false, flush); err != nil {
return err
}
if t.stats != nil {
Expand Down Expand Up @@ -755,7 +759,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
s.mu.Unlock()

if !headersSent && hasHeader {
t.WriteHeader(s, nil)
t.writeHeader(s, nil, false)
headersSent = true
}

Expand Down Expand Up @@ -795,7 +799,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
}
}
bufLen := t.hBuf.Len()
if err := t.writeHeaders(s, t.hBuf, true); err != nil {
if err := t.writeHeaders(s, t.hBuf, true, true); err != nil {
t.Close()
return err
}
Expand Down Expand Up @@ -825,7 +829,8 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) {
}
s.mu.Unlock()
if writeHeaderFrame {
t.WriteHeader(s, nil)
flush := !opts.Delay && len(data) == 0
t.writeHeader(s, nil, flush)
}
r := bytes.NewBuffer(data)
for {
Expand Down Expand Up @@ -893,11 +898,11 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) {
// Reset ping strikes when sending data since this might cause
// the peer to send ping.
atomic.StoreUint32(&t.resetPingStrikes, 1)
if err := t.framer.writeData(forceFlush, s.id, false, p); err != nil {
if err := t.framer.writeData(forceFlush && !opts.Delay, s.id, false, p); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be wrong, but for the scenario in which the client has an inbound flow control which is less than the size of the server's response message, IIC this change could cause deadlock, with the server running out of flow control and the client not having yet received anything.

This looks similar to the change in #973, which ran into that issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apolcyn I agree that this could lead to deadlock. Perhaps it would be sufficient to flush the buffered data whenever Write needs to loop. Also, I'm curious what the forceFlush logic is doing given that there is another check a few lines below to flush the data if we were the last writer.

t.Close()
return connectionErrorf(true, err, "transport: %v", err)
}
if t.framer.adjustNumWriters(-1) == 0 {
if t.framer.adjustNumWriters(-1) == 0 && !opts.Delay {
t.framer.flushWrite()
}
t.writableChan <- 0
Expand Down
2 changes: 1 addition & 1 deletion transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (h *testStreamHandler) handleStreamInvalidHeaderField(t *testing.T, s *Stre
<-h.t.writableChan
h.t.hBuf.Reset()
h.t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: expectedInvalidHeaderField})
if err := h.t.writeHeaders(s, h.t.hBuf, false); err != nil {
if err := h.t.writeHeaders(s, h.t.hBuf, false, true); err != nil {
t.Fatalf("Failed to write headers: %v", err)
}
h.t.writableChan <- 0
Expand Down