Skip to content

Commit

Permalink
http2: rewrite inbound flow control tracking
Browse files Browse the repository at this point in the history
Add a new inflow type for tracking inbound flow control.
An inflow tracks both the window sent to the peer, and the
window we are willing to send. Updates are accumulated and
sent in a batch when the unsent window update is large
enough.

This change makes both the client and server use the same
algorithm to decide when to send window updates. This should
slightly reduce the rate of updates sent by the client, and
significantly reduce the rate sent by the server.

Fix a client flow control tracking bug: When processing data
for a canceled stream, the record of flow control consumed
by the peer was not updated to account for the discard
stream.

Fixes golang/go#28732
Fixes golang/go#56558

Change-Id: Id119d17b84b46f3dc2719f28a86758d9a10085d9
Reviewed-on: https://go-review.googlesource.com/c/net/+/448155
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Heschi Kreinick <heschi@google.com>
Run-TryBot: Damien Neil <dneil@google.com>
  • Loading branch information
neild committed Jan 3, 2023
1 parent 2aa8215 commit 7805fdc
Show file tree
Hide file tree
Showing 6 changed files with 403 additions and 206 deletions.
88 changes: 78 additions & 10 deletions http2/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,99 @@

package http2

// flow is the flow control window's size.
type flow struct {
// inflowMinRefresh is the minimum number of bytes we'll send for a
// flow control window update.
const inflowMinRefresh = 4 << 10

// inflow accounts for an inbound flow control window.
// It tracks both the latest window sent to the peer (used for enforcement)
// and the accumulated unsent window.
type inflow struct {
avail int32
unsent int32
}

// set sets the initial window.
func (f *inflow) init(n int32) {
f.avail = n
}

// add adds n bytes to the window, with a maximum window size of max,
// indicating that the peer can now send us more data.
// For example, the user read from a {Request,Response} body and consumed
// some of the buffered data, so the peer can now send more.
// It returns the number of bytes to send in a WINDOW_UPDATE frame to the peer.
// Window updates are accumulated and sent when the unsent capacity
// is at least inflowMinRefresh or will at least double the peer's available window.
func (f *inflow) add(n int) (connAdd int32) {
if n < 0 {
panic("negative update")
}
unsent := int64(f.unsent) + int64(n)
// "A sender MUST NOT allow a flow-control window to exceed 2^31-1 octets."
// RFC 7540 Section 6.9.1.
const maxWindow = 1<<31 - 1
if unsent+int64(f.avail) > maxWindow {
panic("flow control update exceeds maximum window size")
}
f.unsent = int32(unsent)
if f.unsent < inflowMinRefresh && f.unsent < f.avail {
// If there aren't at least inflowMinRefresh bytes of window to send,
// and this update won't at least double the window, buffer the update for later.
return 0
}
f.avail += f.unsent
f.unsent = 0
return int32(unsent)
}

// take attempts to take n bytes from the peer's flow control window.
// It reports whether the window has available capacity.
func (f *inflow) take(n uint32) bool {
if n > uint32(f.avail) {
return false
}
f.avail -= int32(n)
return true
}

// takeInflows attempts to take n bytes from two inflows,
// typically connection-level and stream-level flows.
// It reports whether both windows have available capacity.
func takeInflows(f1, f2 *inflow, n uint32) bool {
if n > uint32(f1.avail) || n > uint32(f2.avail) {
return false
}
f1.avail -= int32(n)
f2.avail -= int32(n)
return true
}

// outflow is the outbound flow control window's size.
type outflow struct {
_ incomparable

// n is the number of DATA bytes we're allowed to send.
// A flow is kept both on a conn and a per-stream.
// An outflow is kept both on a conn and a per-stream.
n int32

// conn points to the shared connection-level flow that is
// shared by all streams on that conn. It is nil for the flow
// conn points to the shared connection-level outflow that is
// shared by all streams on that conn. It is nil for the outflow
// that's on the conn directly.
conn *flow
conn *outflow
}

func (f *flow) setConnFlow(cf *flow) { f.conn = cf }
func (f *outflow) setConnFlow(cf *outflow) { f.conn = cf }

func (f *flow) available() int32 {
func (f *outflow) available() int32 {
n := f.n
if f.conn != nil && f.conn.n < n {
n = f.conn.n
}
return n
}

func (f *flow) take(n int32) {
func (f *outflow) take(n int32) {
if n > f.available() {
panic("internal error: took too much")
}
Expand All @@ -42,7 +110,7 @@ func (f *flow) take(n int32) {

// add adds n bytes (positive or negative) to the flow control window.
// It returns false if the sum would exceed 2^31-1.
func (f *flow) add(n int32) bool {
func (f *outflow) add(n int32) bool {
sum := f.n + n
if (sum > n) == (f.n > 0) {
f.n = sum
Expand Down
66 changes: 59 additions & 7 deletions http2/flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,61 @@ package http2

import "testing"

func TestFlow(t *testing.T) {
var st flow
var conn flow
func TestInFlowTake(t *testing.T) {
var f inflow
f.init(100)
if !f.take(40) {
t.Fatalf("f.take(40) from 100: got false, want true")
}
if !f.take(40) {
t.Fatalf("f.take(40) from 60: got false, want true")
}
if f.take(40) {
t.Fatalf("f.take(40) from 20: got true, want false")
}
if !f.take(20) {
t.Fatalf("f.take(20) from 20: got false, want true")
}
}

func TestInflowAddSmall(t *testing.T) {
var f inflow
f.init(0)
// Adding even a small amount when there is no flow causes an immediate send.
if got, want := f.add(1), int32(1); got != want {
t.Fatalf("f.add(1) to 1 = %v, want %v", got, want)
}
}

func TestInflowAdd(t *testing.T) {
var f inflow
f.init(10 * inflowMinRefresh)
if got, want := f.add(inflowMinRefresh-1), int32(0); got != want {
t.Fatalf("f.add(minRefresh - 1) = %v, want %v", got, want)
}
if got, want := f.add(1), int32(inflowMinRefresh); got != want {
t.Fatalf("f.add(minRefresh) = %v, want %v", got, want)
}
}

func TestTakeInflows(t *testing.T) {
var a, b inflow
a.init(10)
b.init(20)
if !takeInflows(&a, &b, 5) {
t.Fatalf("takeInflows(a, b, 5) from 10, 20: got false, want true")
}
if takeInflows(&a, &b, 6) {
t.Fatalf("takeInflows(a, b, 6) from 5, 15: got true, want false")
}
if !takeInflows(&a, &b, 5) {
t.Fatalf("takeInflows(a, b, 5) from 5, 15: got false, want true")
}
}

func TestOutFlow(t *testing.T) {
var st outflow
var conn outflow
st.add(3)
conn.add(2)

Expand All @@ -29,8 +81,8 @@ func TestFlow(t *testing.T) {
}
}

func TestFlowAdd(t *testing.T) {
var f flow
func TestOutFlowAdd(t *testing.T) {
var f outflow
if !f.add(1) {
t.Fatal("failed to add 1")
}
Expand All @@ -51,8 +103,8 @@ func TestFlowAdd(t *testing.T) {
}
}

func TestFlowAddOverflow(t *testing.T) {
var f flow
func TestOutFlowAddOverflow(t *testing.T) {
var f outflow
if !f.add(0) {
t.Fatal("failed to add 0")
}
Expand Down
85 changes: 30 additions & 55 deletions http2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
// configured value for inflow, that will be updated when we send a
// WINDOW_UPDATE shortly after sending SETTINGS.
sc.flow.add(initialWindowSize)
sc.inflow.add(initialWindowSize)
sc.inflow.init(initialWindowSize)
sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
sc.hpackEncoder.SetMaxDynamicTableSizeLimit(s.maxEncoderHeaderTableSize())

Expand Down Expand Up @@ -563,8 +563,8 @@ type serverConn struct {
wroteFrameCh chan frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes
bodyReadCh chan bodyReadMsg // from handlers -> serve
serveMsgCh chan interface{} // misc messages & code to send to / run on the serve loop
flow flow // conn-wide (not stream-specific) outbound flow control
inflow flow // conn-wide inbound flow control
flow outflow // conn-wide (not stream-specific) outbound flow control
inflow inflow // conn-wide inbound flow control
tlsState *tls.ConnectionState // shared by all handlers, like net/http
remoteAddrStr string
writeSched WriteScheduler
Expand Down Expand Up @@ -641,10 +641,10 @@ type stream struct {
cancelCtx func()

// owned by serverConn's serve loop:
bodyBytes int64 // body bytes seen so far
declBodyBytes int64 // or -1 if undeclared
flow flow // limits writing from Handler to client
inflow flow // what the client is allowed to POST/etc to us
bodyBytes int64 // body bytes seen so far
declBodyBytes int64 // or -1 if undeclared
flow outflow // limits writing from Handler to client
inflow inflow // what the client is allowed to POST/etc to us
state streamState
resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
gotTrailerHeader bool // HEADER frame for trailers was seen
Expand Down Expand Up @@ -1503,7 +1503,7 @@ func (sc *serverConn) processFrame(f Frame) error {
if sc.inGoAway && (sc.goAwayCode != ErrCodeNo || f.Header().StreamID > sc.maxClientStreamID) {

if f, ok := f.(*DataFrame); ok {
if sc.inflow.available() < int32(f.Length) {
if !sc.inflow.take(f.Length) {
return sc.countError("data_flow", streamError(f.Header().StreamID, ErrCodeFlowControl))
}
sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
Expand Down Expand Up @@ -1775,14 +1775,9 @@ func (sc *serverConn) processData(f *DataFrame) error {
// But still enforce their connection-level flow control,
// and return any flow control bytes since we're not going
// to consume them.
if sc.inflow.available() < int32(f.Length) {
if !sc.inflow.take(f.Length) {
return sc.countError("data_flow", streamError(id, ErrCodeFlowControl))
}
// Deduct the flow control from inflow, since we're
// going to immediately add it back in
// sendWindowUpdate, which also schedules sending the
// frames.
sc.inflow.take(int32(f.Length))
sc.sendWindowUpdate(nil, int(f.Length)) // conn-level

if st != nil && st.resetQueued {
Expand All @@ -1797,10 +1792,9 @@ func (sc *serverConn) processData(f *DataFrame) error {

// Sender sending more than they'd declared?
if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes {
if sc.inflow.available() < int32(f.Length) {
if !sc.inflow.take(f.Length) {
return sc.countError("data_flow", streamError(id, ErrCodeFlowControl))
}
sc.inflow.take(int32(f.Length))
sc.sendWindowUpdate(nil, int(f.Length)) // conn-level

st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
Expand All @@ -1811,10 +1805,9 @@ func (sc *serverConn) processData(f *DataFrame) error {
}
if f.Length > 0 {
// Check whether the client has flow control quota.
if st.inflow.available() < int32(f.Length) {
if !takeInflows(&sc.inflow, &st.inflow, f.Length) {
return sc.countError("flow_on_data_length", streamError(id, ErrCodeFlowControl))
}
st.inflow.take(int32(f.Length))

if len(data) > 0 {
wrote, err := st.body.Write(data)
Expand All @@ -1830,10 +1823,12 @@ func (sc *serverConn) processData(f *DataFrame) error {

// Return any padded flow control now, since we won't
// refund it later on body reads.
if pad := int32(f.Length) - int32(len(data)); pad > 0 {
sc.sendWindowUpdate32(nil, pad)
sc.sendWindowUpdate32(st, pad)
}
// Call sendWindowUpdate even if there is no padding,
// to return buffered flow control credit if the sent
// window has shrunk.
pad := int32(f.Length) - int32(len(data))
sc.sendWindowUpdate32(nil, pad)
sc.sendWindowUpdate32(st, pad)
}
if f.StreamEnded() {
st.endStream()
Expand Down Expand Up @@ -2105,8 +2100,7 @@ func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream
st.cw.Init()
st.flow.conn = &sc.flow // link to conn-level counter
st.flow.add(sc.initialStreamSendWindowSize)
st.inflow.conn = &sc.inflow // link to conn-level counter
st.inflow.add(sc.srv.initialStreamRecvWindowSize())
st.inflow.init(sc.srv.initialStreamRecvWindowSize())
if sc.hs.WriteTimeout != 0 {
st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
}
Expand Down Expand Up @@ -2388,47 +2382,28 @@ func (sc *serverConn) noteBodyRead(st *stream, n int) {
}

// st may be nil for conn-level
func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
sc.serveG.check()
// "The legal range for the increment to the flow control
// window is 1 to 2^31-1 (2,147,483,647) octets."
// A Go Read call on 64-bit machines could in theory read
// a larger Read than this. Very unlikely, but we handle it here
// rather than elsewhere for now.
const maxUint31 = 1<<31 - 1
for n > maxUint31 {
sc.sendWindowUpdate32(st, maxUint31)
n -= maxUint31
}
sc.sendWindowUpdate32(st, int32(n))
func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) {
sc.sendWindowUpdate(st, int(n))
}

// st may be nil for conn-level
func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) {
func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
sc.serveG.check()
if n == 0 {
return
}
if n < 0 {
panic("negative update")
}
var streamID uint32
if st != nil {
var send int32
if st == nil {
send = sc.inflow.add(n)
} else {
streamID = st.id
send = st.inflow.add(n)
}
if send == 0 {
return
}
sc.writeFrame(FrameWriteRequest{
write: writeWindowUpdate{streamID: streamID, n: uint32(n)},
write: writeWindowUpdate{streamID: streamID, n: uint32(send)},
stream: st,
})
var ok bool
if st == nil {
ok = sc.inflow.add(n)
} else {
ok = st.inflow.add(n)
}
if !ok {
panic("internal error; sent too many window updates without decrements?")
}
}

// requestBody is the Handler's Request.Body type.
Expand Down
Loading

0 comments on commit 7805fdc

Please sign in to comment.