-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid unnecessary flushing during unary response processing #1373
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -647,7 +647,7 @@ func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) { | |
} | ||
} | ||
|
||
func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) error { | ||
func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream, flush bool) error { | ||
first := true | ||
endHeaders := false | ||
var err error | ||
|
@@ -673,10 +673,10 @@ func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) e | |
EndStream: endStream, | ||
EndHeaders: endHeaders, | ||
} | ||
err = t.framer.writeHeaders(endHeaders, p) | ||
err = t.framer.writeHeaders(endHeaders && flush, p) | ||
first = false | ||
} else { | ||
err = t.framer.writeContinuation(endHeaders, s.id, endHeaders, b.Next(size)) | ||
err = t.framer.writeContinuation(endHeaders && flush, s.id, endHeaders, b.Next(size)) | ||
} | ||
if err != nil { | ||
t.Close() | ||
|
@@ -688,6 +688,10 @@ func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) e | |
|
||
// WriteHeader sends the header metedata md back to the client. | ||
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { | ||
return t.writeHeader(s, md, true /* flush */) | ||
} | ||
|
||
func (t *http2Server) writeHeader(s *Stream, md metadata.MD, flush bool) error { | ||
s.mu.Lock() | ||
if s.headerOk || s.state == streamDone { | ||
s.mu.Unlock() | ||
|
@@ -722,7 +726,7 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { | |
} | ||
} | ||
bufLen := t.hBuf.Len() | ||
if err := t.writeHeaders(s, t.hBuf, false); err != nil { | ||
if err := t.writeHeaders(s, t.hBuf, false, flush); err != nil { | ||
return err | ||
} | ||
if t.stats != nil { | ||
|
@@ -755,7 +759,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { | |
s.mu.Unlock() | ||
|
||
if !headersSent && hasHeader { | ||
t.WriteHeader(s, nil) | ||
t.writeHeader(s, nil, false) | ||
headersSent = true | ||
} | ||
|
||
|
@@ -795,7 +799,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { | |
} | ||
} | ||
bufLen := t.hBuf.Len() | ||
if err := t.writeHeaders(s, t.hBuf, true); err != nil { | ||
if err := t.writeHeaders(s, t.hBuf, true, true); err != nil { | ||
t.Close() | ||
return err | ||
} | ||
|
@@ -825,7 +829,8 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) { | |
} | ||
s.mu.Unlock() | ||
if writeHeaderFrame { | ||
t.WriteHeader(s, nil) | ||
flush := !opts.Delay && len(data) == 0 | ||
t.writeHeader(s, nil, flush) | ||
} | ||
r := bytes.NewBuffer(data) | ||
for { | ||
|
@@ -893,11 +898,11 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) { | |
// Reset ping strikes when sending data since this might cause | ||
// the peer to send ping. | ||
atomic.StoreUint32(&t.resetPingStrikes, 1) | ||
if err := t.framer.writeData(forceFlush, s.id, false, p); err != nil { | ||
if err := t.framer.writeData(forceFlush && !opts.Delay, s.id, false, p); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be wrong, but for the scenario in which the client has an inbound flow control which is less than the size of the server's response message, IIC this change could cause deadlock, with the server running out of flow control and the client not having yet received anything. This looks similar to the change in #973, which ran into that issue. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @apolcyn I agree that this could lead to deadlock. Perhaps it would be sufficient to flush the buffered data whenever |
||
t.Close() | ||
return connectionErrorf(true, err, "transport: %v", err) | ||
} | ||
if t.framer.adjustNumWriters(-1) == 0 { | ||
if t.framer.adjustNumWriters(-1) == 0 && !opts.Delay { | ||
t.framer.flushWrite() | ||
} | ||
t.writableChan <- 0 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO "Delay" here should be interpreted by sendResponse as applying to the response as a whole -- header, payload, and trailer -- not each piece independently.
So, I believe this actually should still be "false", and it shouldn't be factored into the logic in sendResponse until the final step (where looking for other writers before flushing).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. It seems like the calls to
adjustNumWriter
and the associated flushing are at too low a level. For unary RPCs, the number of writers should be incremented before callingsendResponse
and decremented after theWriteStatus
calls. The flushing logic currently insendResponse
is a bit difficult to reason about given the interaction with flow control and it isn't obvious to me how to pull it to a higher level. Perhaps this is the subtle bug you're referring to.I'm happy to leave this work in your hands. Consider this PR more an indication of the importance of this work. The unnecessary flushes introduce a significant amount of latency!