Skip to content

Commit

Permalink
Move END_STREAM check earlier. (#237)
Browse files Browse the repository at this point in the history
Motivation:

In an earlier patch (1e68e51) we attempted to avoid emitting
WINDOW_UPDATE frames on streams that were already closed on their
inbound side. This was largely a reasonable fix, but we missed an
important area: END_STREAM in a sequence of frames.

This would be triggered if we received multiple DATA frames in a row,
where the last one contained END_STREAM but an earlier one triggered a
WINDOW_UPDATE. The stream state machine would have seen the END_STREAM
already and will forbid the WINDOW_UPDATE, but the stream channel won't
yet have seen it and will try to send it.

The fix is easy enough: look for END_STREAM earlier.

Modifications:

- Check for END_STREAM when the stream channel receives the frame, not
  when it passes it into the pipeline.

Result:

Fewer spurious connection errors.
  • Loading branch information
Lukasa authored Sep 10, 2020
1 parent 7d366b1 commit c7ad256
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 11 deletions.
19 changes: 8 additions & 11 deletions Sources/NIOHTTP2/HTTP2StreamChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -630,26 +630,15 @@ private extension HTTP2StreamChannel {
while self.pendingReads.count > 0 {
let frame = self.pendingReads.removeFirst()

let endStream: Bool
let dataLength: Int?

switch frame.payload {
case .data(let data):
endStream = data.endStream
dataLength = data.data.readableBytes
case .headers(let headers):
endStream = headers.endStream
dataLength = nil
default:
endStream = false
dataLength = nil
}

// We've seen end stream: close the window manager to avoid emitting extraneous WINDOW_UPDATE frames.
if endStream {
self.windowManager.closed = true
}

self.pipeline.fireChannelRead(NIOAny(frame))

if let size = dataLength, let increment = self.windowManager.bufferedFrameEmitted(size: size) {
Expand Down Expand Up @@ -712,6 +701,14 @@ internal extension HTTP2StreamChannel {
// actually delivered into the pipeline.
if case .data(let dataPayload) = frame.payload {
self.windowManager.bufferedFrameReceived(size: dataPayload.data.readableBytes)

// No further window update frames should be sent.
if dataPayload.endStream {
self.windowManager.closed = true
}
} else if case .headers(let headersPayload) = frame.payload, headersPayload.endStream {
// No further window update frames should be sent.
self.windowManager.closed = true
}
self.pendingReads.append(message)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ extension HTTP2FramePayloadStreamMultiplexerTests {
("testWeCanCreateFrameAndPayloadBasedStreamsOnAMultiplexer", testWeCanCreateFrameAndPayloadBasedStreamsOnAMultiplexer),
("testReadWhenUsingAutoreadOnChildChannel", testReadWhenUsingAutoreadOnChildChannel),
("testWindowUpdateIsNotEmittedAfterStreamIsClosed", testWindowUpdateIsNotEmittedAfterStreamIsClosed),
("testWindowUpdateIsNotEmittedAfterStreamIsClosedEvenOnLaterFrame", testWindowUpdateIsNotEmittedAfterStreamIsClosedEvenOnLaterFrame),
]
}
}
Expand Down
50 changes: 50 additions & 0 deletions Tests/NIOHTTP2Tests/HTTP2FramePayloadStreamMultiplexerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1848,4 +1848,54 @@ final class HTTP2FramePayloadStreamMultiplexerTests: XCTestCase {
// the stream has closed we don't expect to read anything out.
XCTAssertNil(try self.channel.readOutbound(as: HTTP2Frame.self))
}

func testWindowUpdateIsNotEmittedAfterStreamIsClosedEvenOnLaterFrame() throws {
let targetWindowSize = 128
let multiplexer = HTTP2StreamMultiplexer(mode: .client,
channel: self.channel,
targetWindowSize: targetWindowSize) { channel in
return channel.eventLoop.makeSucceededFuture(())
}
XCTAssertNoThrow(try self.channel.pipeline.addHandler(multiplexer).wait())

// We need to activate the underlying channel here.
XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 80)).wait())

// Write a headers frame.
let headers = HPACKHeaders([(":path", "/"), (":method", "GET"), (":authority", "localhost"), (":scheme", "https")])
let headersFrame = HTTP2Frame(streamID: 1, payload: .headers(.init(headers: headers)))
self.channel.pipeline.fireChannelRead(NIOAny(headersFrame))

// Activate the stream.
self.activateStream(1)

// Send a window updated event.
var windowUpdated = NIOHTTP2WindowUpdatedEvent(streamID: 1, inboundWindowSize: 128, outboundWindowSize: nil)
self.channel.pipeline.fireUserInboundEventTriggered(windowUpdated)
self.channel.pipeline.fireChannelReadComplete()

// The inbound window size should now be our target: 128. Write enough bytes to consume the
// inbound window as two frames: a 127-byte frame, followed by a 1-byte with END_STREAM set.
let bytes = ByteBuffer(repeating: 0, count: targetWindowSize)
let firstData = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(bytes.getSlice(at: bytes.readerIndex, length: targetWindowSize - 1)!)))
let secondData = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(bytes.getSlice(at: bytes.readerIndex, length: 1)!), endStream: true))
let firstDataFrame = HTTP2Frame(streamID: 1, payload: firstData)
let secondDataFrame = HTTP2Frame(streamID: 1, payload: secondData)

self.channel.pipeline.fireChannelRead(NIOAny(firstDataFrame))
windowUpdated = NIOHTTP2WindowUpdatedEvent(streamID: 1, inboundWindowSize: 1, outboundWindowSize: nil)
self.channel.pipeline.fireUserInboundEventTriggered(windowUpdated)

self.channel.pipeline.fireChannelRead(NIOAny(secondDataFrame))
// This is nil here for a reason: it reflects what would actually be sent in the real code. Relevantly, the nil currently
// does not actually propagate into the handler, which matters a lot.
windowUpdated = NIOHTTP2WindowUpdatedEvent(streamID: 1, inboundWindowSize: nil, outboundWindowSize: nil)
self.channel.pipeline.fireUserInboundEventTriggered(windowUpdated)

self.channel.pipeline.fireChannelReadComplete()

// We've consumed the inbound window: normally we'd expect a WINDOW_UPDATE frame but since
// the stream has closed we don't expect to read anything out.
XCTAssertNil(try self.channel.readOutbound(as: HTTP2Frame.self))
}
}

0 comments on commit c7ad256

Please sign in to comment.