From 3a1b63c6ad1a6b9f3ff719b44d935a0a4f220176 Mon Sep 17 00:00:00 2001 From: Quirin Schweigert Date: Wed, 26 Jul 2023 15:22:24 +0200 Subject: [PATCH] Fix parent channel `read()` call on `HTTP2StreamChannel` initialization causing incorrect order of inbound `HTTP2Frame`s Motivation: Make sure the order of `HTTP2Frame`s that are fired through the pipeline of a `HTTP2StreamChannel` by `HTTP2CommonInboundStreamMultiplexer` is correct when a `read()` call on the parent channel synchronously causes further frames to be processed. Modifications: Reorder calls in `HTTP2CommonInboundStreamMultiplexer.receivedFrame(frame:context:multiplexer)` so that initial header frame is buffered and processed first. Result: Resolves #410 and makes newly added test case `HTTP2StreamMultiplexerTests.testMultiplexerFiresInitialFramesInCorrectOrder()` pass. --- .../HTTP2CommonInboundStreamMultiplexer.swift | 5 +- .../HTTP2StreamMultiplexerTests.swift | 47 +++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift b/Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift index 0160a009..01e21f7e 100644 --- a/Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift +++ b/Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift @@ -109,8 +109,11 @@ extension HTTP2CommonInboundStreamMultiplexer { // does no actual work. self.streamChannelContinuation?.yield(channel: channel.baseChannel) - channel.configureInboundStream(initializer: self.inboundStreamStateInitializer) + // Note: Firing the initial (header) frame before calling `HTTP2StreamChannel.configureInboundStream(initializer:)` + // is crucial to preserve frame order, since the initialization process might trigger another read on the parent + // channel which in turn might cause further frames to be processed synchronously. channel.receiveInboundFrame(frame) + channel.configureInboundStream(initializer: self.inboundStreamStateInitializer) if !channel.inList { self.didReadChannels.append(channel) diff --git a/Tests/NIOHTTP2Tests/HTTP2StreamMultiplexerTests.swift b/Tests/NIOHTTP2Tests/HTTP2StreamMultiplexerTests.swift index 55e45306..1afe3700 100644 --- a/Tests/NIOHTTP2Tests/HTTP2StreamMultiplexerTests.swift +++ b/Tests/NIOHTTP2Tests/HTTP2StreamMultiplexerTests.swift @@ -1927,4 +1927,51 @@ final class HTTP2StreamMultiplexerTests: XCTestCase { XCTAssertNoThrow(try channel.finish(acceptAlreadyClosed: false)) } + @available(*, deprecated, message: "Deprecated so deprecated functionality can be tested without warnings") + func testMultiplexerFiresInitialFramesInCorrectOrder() throws { + final class BufferingChannelHandler: ChannelDuplexHandler { + typealias InboundIn = Element + typealias InboundOut = Element + typealias OutboundIn = Element + typealias OutboundOut = Element + + let elementsToBufferCount: Int = 1 + var bufferedElements: CircularBuffer = [] + + func read(context: ChannelHandlerContext) { + while let bufferedElement = bufferedElements.popFirst() { + context.fireChannelRead(wrapInboundOut(bufferedElement)) + } + + context.read() + } + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + bufferedElements.append(unwrapInboundIn(data)) + + if bufferedElements.count > elementsToBufferCount { + context.fireChannelRead(wrapInboundOut(bufferedElements.removeFirst())) + } + } + } + + XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(unixDomainSocketPath: "/whatever"), promise: nil)) + let streamID = HTTP2StreamID(1) + let headerFrame = HTTP2Frame(streamID: streamID, payload: .headers(.init(headers: HPACKHeaders()))) + let payloadBuffer = channel.allocator.buffer(capacity: 0) + let dataFrame = HTTP2Frame(streamID: streamID, payload: .data(.init(data: .byteBuffer(payloadBuffer)))) + + let multiplexer = HTTP2StreamMultiplexer(mode: .server, channel: self.channel) { (channel, _) in + channel.pipeline.addHandler(FrameExpecter(expectedFrames: [headerFrame, dataFrame])) + } + + XCTAssertNoThrow(try self.channel.pipeline.addHandler(BufferingChannelHandler()).wait()) + XCTAssertNoThrow(try self.channel.pipeline.addHandler(multiplexer).wait()) + + XCTAssertNoThrow(try self.channel.writeInbound(headerFrame)) + XCTAssertNoThrow(try self.channel.writeInbound(dataFrame)) + self.activateStream(streamID) + (self.channel.eventLoop as! EmbeddedEventLoop).run() + XCTAssertNoThrow(try self.channel.finish()) + } }