Skip to content

Commit

Permalink
Inline http2 fixups (#402)
Browse files Browse the repository at this point in the history
There are a few places where bugs / poorly structured code crept in during the inline multiplexer work. This attempts to address some of them:

- overly long lines
- mis-named variables
- unnecessary use of generics in one case
  • Loading branch information
rnro authored Jul 3, 2023
1 parent 83c04db commit b9fe8f5
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 30 deletions.
75 changes: 53 additions & 22 deletions Sources/NIOHTTP2/HTTP2PipelineHelpers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,25 @@ extension Channel {
inboundStreamInitializer: @escaping NIOHTTP2Handler.StreamInitializer) -> EventLoopFuture<NIOHTTP2Handler.StreamMultiplexer> {
if self.eventLoop.inEventLoop {
return self.eventLoop.makeCompletedFuture {
return try self.pipeline.syncOperations.configureHTTP2Pipeline(mode: mode, connectionConfiguration: connectionConfiguration, streamConfiguration: streamConfiguration, streamDelegate: streamDelegate, position: position, inboundStreamInitializer: inboundStreamInitializer)
return try self.pipeline.syncOperations.configureHTTP2Pipeline(
mode: mode,
connectionConfiguration: connectionConfiguration,
streamConfiguration: streamConfiguration,
streamDelegate: streamDelegate,
position: position,
inboundStreamInitializer: inboundStreamInitializer
)
}
} else {
return self.eventLoop.submit {
return try self.pipeline.syncOperations.configureHTTP2Pipeline(mode: mode, connectionConfiguration: connectionConfiguration, streamConfiguration: streamConfiguration, streamDelegate: streamDelegate, position: position, inboundStreamInitializer: inboundStreamInitializer)
return try self.pipeline.syncOperations.configureHTTP2Pipeline(
mode: mode,
connectionConfiguration: connectionConfiguration,
streamConfiguration: streamConfiguration,
streamDelegate: streamDelegate,
position: position,
inboundStreamInitializer: inboundStreamInitializer
)
}
}
}
Expand Down Expand Up @@ -307,7 +321,8 @@ extension Channel {
/// - returns: `EventLoopFuture<Void>` that completes when the channel is ready.
public func configureCommonHTTPServerPipeline(
h2ConnectionChannelConfigurator: ((Channel) -> EventLoopFuture<Void>)? = nil,
_ configurator: @escaping (Channel) -> EventLoopFuture<Void>) -> EventLoopFuture<Void> {
_ configurator: @escaping (Channel) -> EventLoopFuture<Void>
) -> EventLoopFuture<Void> {
return self.configureCommonHTTPServerPipeline(h2ConnectionChannelConfigurator: h2ConnectionChannelConfigurator, targetWindowSize: 65535, configurator)
}

Expand All @@ -331,21 +346,24 @@ extension Channel {
public func configureCommonHTTPServerPipeline(
h2ConnectionChannelConfigurator: ((Channel) -> EventLoopFuture<Void>)? = nil,
targetWindowSize: Int,
_ configurator: @escaping (Channel) -> EventLoopFuture<Void>) -> EventLoopFuture<Void> {
return self._commonHTTPServerPipeline(configurator: configurator, h2ConnectionChannelConfigurator: h2ConnectionChannelConfigurator) { channel in
channel.configureHTTP2Pipeline(mode: .server, targetWindowSize: targetWindowSize) { streamChannel -> EventLoopFuture<Void> in
streamChannel.pipeline.addHandler(HTTP2FramePayloadToHTTP1ServerCodec()).flatMap { () -> EventLoopFuture<Void> in
configurator(streamChannel)
}
_ configurator: @escaping (Channel) -> EventLoopFuture<Void>
) -> EventLoopFuture<Void> {
return self._commonHTTPServerPipeline(configurator: configurator, h2ConnectionChannelConfigurator: h2ConnectionChannelConfigurator) { channel in
channel.configureHTTP2Pipeline(mode: .server, targetWindowSize: targetWindowSize) { streamChannel -> EventLoopFuture<Void> in
streamChannel.pipeline.addHandler(HTTP2FramePayloadToHTTP1ServerCodec()).flatMap { () -> EventLoopFuture<Void> in
configurator(streamChannel)
}
}
}.map { _ in () }
}
}

private func _commonHTTPServerPipeline<T>(configurator: @escaping (Channel) -> EventLoopFuture<Void>,
h2ConnectionChannelConfigurator: ((Channel) -> EventLoopFuture<Void>)?,
configureHTTP2Pipeline: @escaping (Channel) -> EventLoopFuture<T>) -> EventLoopFuture<Void> {
private func _commonHTTPServerPipeline(
configurator: @escaping (Channel) -> EventLoopFuture<Void>,
h2ConnectionChannelConfigurator: ((Channel) -> EventLoopFuture<Void>)?,
configureHTTP2Pipeline: @escaping (Channel) -> EventLoopFuture<Void>
) -> EventLoopFuture<Void> {
let h2ChannelConfigurator = { (channel: Channel) -> EventLoopFuture<Void> in
configureHTTP2Pipeline(channel).flatMap { (_: T) in
configureHTTP2Pipeline(channel).flatMap { _ in
if let h2ConnectionChannelConfigurator = h2ConnectionChannelConfigurator {
return h2ConnectionChannelConfigurator(channel)
} else {
Expand Down Expand Up @@ -385,14 +403,20 @@ extension Channel {
streamConfiguration: NIOHTTP2Handler.StreamConfiguration,
streamDelegate: NIOHTTP2StreamDelegate? = nil,
h2ConnectionChannelConfigurator: ((Channel) -> EventLoopFuture<Void>)? = nil,
configurator: @escaping NIOHTTP2Handler.StreamInitializer) -> EventLoopFuture<Void> {
return self._commonHTTPServerPipeline(configurator: configurator, h2ConnectionChannelConfigurator: h2ConnectionChannelConfigurator) { channel in
channel.configureHTTP2Pipeline(mode: .server, connectionConfiguration: connectionConfiguration, streamConfiguration: streamConfiguration, streamDelegate: streamDelegate) { streamChannel -> EventLoopFuture<Void> in
streamChannel.pipeline.addHandler(HTTP2FramePayloadToHTTP1ServerCodec()).flatMap { () -> EventLoopFuture<Void> in
configurator(streamChannel)
}
configurator: @escaping NIOHTTP2Handler.StreamInitializer
) -> EventLoopFuture<Void> {
return self._commonHTTPServerPipeline(configurator: configurator, h2ConnectionChannelConfigurator: h2ConnectionChannelConfigurator) { channel in
channel.configureHTTP2Pipeline(
mode: .server,
connectionConfiguration: connectionConfiguration,
streamConfiguration: streamConfiguration,
streamDelegate: streamDelegate
) { streamChannel -> EventLoopFuture<Void> in
streamChannel.pipeline.addHandler(HTTP2FramePayloadToHTTP1ServerCodec()).flatMap { () -> EventLoopFuture<Void> in
configurator(streamChannel)
}
}
}.map { _ in () }
}
}
}

Expand Down Expand Up @@ -420,7 +444,14 @@ extension ChannelPipeline.SynchronousOperations {
streamDelegate: NIOHTTP2StreamDelegate? = nil,
position: ChannelPipeline.Position = .last,
inboundStreamInitializer: @escaping NIOHTTP2Handler.StreamInitializer) throws -> NIOHTTP2Handler.StreamMultiplexer {
let handler = NIOHTTP2Handler(mode: mode, eventLoop: self.eventLoop, connectionConfiguration: connectionConfiguration, streamConfiguration: streamConfiguration, streamDelegate: streamDelegate, inboundStreamInitializer: inboundStreamInitializer)
let handler = NIOHTTP2Handler(
mode: mode,
eventLoop: self.eventLoop,
connectionConfiguration: connectionConfiguration,
streamConfiguration: streamConfiguration,
streamDelegate: streamDelegate,
inboundStreamInitializer: inboundStreamInitializer
)

try self.addHandler(handler, position: position)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ConfiguringPipelineInlineMultiplexerTests: XCTestCase {

func testBasicPipelineCommunicates() throws {
let serverRecorder = InboundFramePayloadRecorder()
let clientHandler = try assertNoThrowWithValue(self.clientChannel.configureHTTP2Pipeline(mode: .client, connectionConfiguration: .init(), streamConfiguration: .init(), inboundStreamInitializer: {_ in self.clientChannel.eventLoop.makeSucceededVoidFuture()} ).wait())
let clientMultiplexer = try assertNoThrowWithValue(self.clientChannel.configureHTTP2Pipeline(mode: .client, connectionConfiguration: .init(), streamConfiguration: .init(), inboundStreamInitializer: {_ in self.clientChannel.eventLoop.makeSucceededVoidFuture()} ).wait())
XCTAssertNoThrow(try self.serverChannel.configureHTTP2Pipeline(mode: .server, connectionConfiguration: .init(), streamConfiguration: .init()) { channel in
return channel.pipeline.addHandler(serverRecorder)
}.wait())
Expand All @@ -48,7 +48,7 @@ class ConfiguringPipelineInlineMultiplexerTests: XCTestCase {
let requestPromise = self.clientChannel.eventLoop.makePromise(of: Void.self)
let reqFrame = HTTP2Frame(streamID: 1, payload: .headers(.init(headers: HPACKHeaders([(":method", "GET"), (":authority", "localhost"), (":scheme", "https"), (":path", "/")]), endStream: true)))

clientHandler.createStreamChannel(promise: nil) { channel in
clientMultiplexer.createStreamChannel(promise: nil) { channel in
channel.writeAndFlush(reqFrame.payload).whenComplete { _ in channel.close(promise: requestPromise) }
return channel.eventLoop.makeSucceededFuture(())
}
Expand Down Expand Up @@ -76,7 +76,7 @@ class ConfiguringPipelineInlineMultiplexerTests: XCTestCase {

func testBasicPipelineCommunicatesWithTargetWindowSize() throws {
let serverRecorder = InboundFramePayloadRecorder()
let clientHandler = try assertNoThrowWithValue(self.clientChannel.configureHTTP2Pipeline(mode: .client, connectionConfiguration: .init(), streamConfiguration: .init(), inboundStreamInitializer: {_ in self.clientChannel.eventLoop.makeSucceededVoidFuture()} ).wait())
let clientMultiplexer = try assertNoThrowWithValue(self.clientChannel.configureHTTP2Pipeline(mode: .client, connectionConfiguration: .init(), streamConfiguration: .init(), inboundStreamInitializer: {_ in self.clientChannel.eventLoop.makeSucceededVoidFuture()} ).wait())
XCTAssertNoThrow(try self.serverChannel.configureHTTP2Pipeline(mode: .server, connectionConfiguration: .init(), streamConfiguration: .init()) { channel in
return channel.pipeline.addHandler(serverRecorder)
}.wait())
Expand All @@ -87,7 +87,7 @@ class ConfiguringPipelineInlineMultiplexerTests: XCTestCase {
let requestPromise = self.clientChannel.eventLoop.makePromise(of: Void.self)
let reqFrame = HTTP2Frame(streamID: 1, payload: .headers(.init(headers: HPACKHeaders([(":method", "GET"), (":authority", "localhost"), (":scheme", "https"), (":path", "/")]), endStream: true)))

clientHandler.createStreamChannel(promise: nil) { channel in
clientMultiplexer.createStreamChannel(promise: nil) { channel in
channel.writeAndFlush(reqFrame.payload).whenComplete { _ in channel.close(promise: requestPromise) }
return channel.eventLoop.makeSucceededFuture(())
}
Expand Down Expand Up @@ -320,7 +320,7 @@ class ConfiguringPipelineInlineMultiplexerTests: XCTestCase {
}

let serverRecorder = HTTP1ServerRequestRecorderHandler()
let clientHandler = try assertNoThrowWithValue(self.clientChannel.configureHTTP2Pipeline(mode: .client, connectionConfiguration: .init(), streamConfiguration: .init(), inboundStreamInitializer: {_ in self.clientChannel.eventLoop.makeSucceededVoidFuture()} ).wait())
let clientMultiplexer = try assertNoThrowWithValue(self.clientChannel.configureHTTP2Pipeline(mode: .client, connectionConfiguration: .init(), streamConfiguration: .init(), inboundStreamInitializer: {_ in self.clientChannel.eventLoop.makeSucceededVoidFuture()} ).wait())

XCTAssertNoThrow(try self.serverChannel.configureCommonHTTPServerPipeline(
connectionConfiguration: .init(),
Expand All @@ -340,7 +340,7 @@ class ConfiguringPipelineInlineMultiplexerTests: XCTestCase {
let requestPromise = self.clientChannel.eventLoop.makePromise(of: Void.self)
let reqFrame = HTTP2Frame(streamID: 1, payload: .headers(.init(headers: HPACKHeaders([(":method", "GET"), (":authority", "localhost"), (":scheme", "https"), (":path", "/testH2toHTTP1")]), endStream: true)))

clientHandler.createStreamChannel(promise: nil) { channel in
clientMultiplexer.createStreamChannel(promise: nil) { channel in
channel.writeAndFlush(reqFrame.payload).whenComplete { _ in channel.close(promise: requestPromise) }
return channel.eventLoop.makeSucceededFuture(())
}
Expand Down Expand Up @@ -386,7 +386,7 @@ class ConfiguringPipelineInlineMultiplexerTests: XCTestCase {
}

let serverRecorder = HTTP1ServerRequestRecorderHandler()
let clientHandler = try assertNoThrowWithValue(self.clientChannel.configureHTTP2Pipeline(mode: .client, connectionConfiguration: .init(), streamConfiguration: .init(), inboundStreamInitializer: {_ in self.clientChannel.eventLoop.makeSucceededVoidFuture()} ).wait())
let clientMultiplexer = try assertNoThrowWithValue(self.clientChannel.configureHTTP2Pipeline(mode: .client, connectionConfiguration: .init(), streamConfiguration: .init(), inboundStreamInitializer: {_ in self.clientChannel.eventLoop.makeSucceededVoidFuture()} ).wait())

XCTAssertNoThrow(try self.serverChannel.configureCommonHTTPServerPipeline(
connectionConfiguration: .init(),
Expand All @@ -406,7 +406,7 @@ class ConfiguringPipelineInlineMultiplexerTests: XCTestCase {
let requestPromise = self.clientChannel.eventLoop.makePromise(of: Void.self)
let reqFrame = HTTP2Frame(streamID: 1, payload: .headers(.init(headers: HPACKHeaders([(":method", "GET"), (":authority", "localhost"), (":scheme", "https"), (":path", "/testH2toHTTP1")]), endStream: true)))

clientHandler.createStreamChannel(promise: nil) { channel in
clientMultiplexer.createStreamChannel(promise: nil) { channel in
channel.writeAndFlush(reqFrame.payload).whenComplete { _ in channel.close(promise: requestPromise) }
return channel.eventLoop.makeSucceededFuture(())
}
Expand Down

0 comments on commit b9fe8f5

Please sign in to comment.