diff --git a/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift b/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift index 3bae69d1..83a266df 100644 --- a/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift +++ b/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift @@ -705,6 +705,55 @@ extension Channel { (HTTP2ConnectionOutput, NIOHTTP2Handler.AsyncStreamMultiplexer) > > + > { + self.configureAsyncHTTPServerPipeline( + streamDelegate: nil, + http2Configuration: http2Configuration, + http1ConnectionInitializer: http1ConnectionInitializer, + http2ConnectionInitializer: http2ConnectionInitializer, + http2StreamInitializer: http2StreamInitializer + ) + } + + /// Configures a `ChannelPipeline` to speak either HTTP/1.1 or HTTP/2 according to what can be negotiated with the client. + /// + /// This helper takes care of configuring the server pipeline such that it negotiates whether to + /// use HTTP/1.1 or HTTP/2. + /// + /// This function doesn't configure the TLS handler. Callers of this function need to add a TLS + /// handler appropriately configured to perform protocol negotiation. + /// + /// - Parameters: + /// - streamDelegate: A delegate which is called when streams are created and closed. + /// - http2Configuration: The settings that will be used when establishing the HTTP/2 connections and new HTTP/2 streams. + /// - http1ConnectionInitializer: An optional callback that will be invoked only when the negotiated protocol + /// is HTTP/1.1 to configure the connection channel. + /// - http2ConnectionInitializer: An optional callback that will be invoked only when the negotiated protocol + /// is HTTP/2 to configure the connection channel. The channel has an `ChannelOutboundHandler/OutboundIn` type of ``HTTP2Frame``. + /// - http2StreamInitializer: A closure that will be called whenever the remote peer initiates a new stream. + /// The output of this closure is the element type of the returned multiplexer + /// - Returns: An `EventLoopFuture` containing a `NIOTypedApplicationProtocolNegotiationHandler` that completes when the channel + /// is ready to negotiate. This can then be used to access the `NIOProtocolNegotiationResult` which may itself + /// be waited on to retrieve the result of the negotiation. + @inlinable + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + public func configureAsyncHTTPServerPipeline< + HTTP1ConnectionOutput: Sendable, + HTTP2ConnectionOutput: Sendable, + HTTP2StreamOutput: Sendable + >( + streamDelegate: NIOHTTP2StreamDelegate?, + http2Configuration: NIOHTTP2Handler.Configuration = .init(), + http1ConnectionInitializer: @escaping NIOChannelInitializerWithOutput, + http2ConnectionInitializer: @escaping NIOChannelInitializerWithOutput, + http2StreamInitializer: @escaping NIOChannelInitializerWithOutput + ) -> EventLoopFuture< + EventLoopFuture< + NIONegotiatedHTTPVersion< + HTTP1ConnectionOutput, + (HTTP2ConnectionOutput, NIOHTTP2Handler.AsyncStreamMultiplexer) + > + > > { let http2ConnectionInitializer: NIOChannelInitializerWithOutput< @@ -712,6 +761,7 @@ extension Channel { > = { channel in channel.configureAsyncHTTP2Pipeline( mode: .server, + streamDelegate: streamDelegate, configuration: http2Configuration, streamInitializer: http2StreamInitializer ).flatMap { multiplexer in diff --git a/Tests/NIOHTTP2Tests/ConfiguringPipelineAsyncMultiplexerTests.swift b/Tests/NIOHTTP2Tests/ConfiguringPipelineAsyncMultiplexerTests.swift index 6243716b..684e418f 100644 --- a/Tests/NIOHTTP2Tests/ConfiguringPipelineAsyncMultiplexerTests.swift +++ b/Tests/NIOHTTP2Tests/ConfiguringPipelineAsyncMultiplexerTests.swift @@ -532,6 +532,112 @@ final class ConfiguringPipelineAsyncMultiplexerTests: XCTestCase { try await assertNoThrow(try await self.serverChannel.finish()) } + // `testNegotiatedHTTP2BasicPipelineStreamDelegate` ensures that a client-server system set up to use async stream abstractions + // with a NIOHTTP2StreamDelegate calls the delegate methods . + func testNegotiatedHTTP2BasicPipelineStreamDelegate() async throws { + final class TestStreamDelegate: NIOHTTP2StreamDelegate { + let streamCount: NIOLockedValueBox + let streamsCreated: NIOLockedValueBox + + init() { + self.streamCount = .init(0) + self.streamsCreated = .init(0) + } + + func streamCreated(_ id: NIOHTTP2.HTTP2StreamID, channel: any NIOCore.Channel) { + self.streamCount.withLockedValue { $0 += 1 } + self.streamsCreated.withLockedValue { $0 += 1 } + } + + func streamClosed(_ id: NIOHTTP2.HTTP2StreamID, channel: any NIOCore.Channel) { + self.streamCount.withLockedValue { $0 -= 1 } + } + } + let requestCount = 100 + + let streamDelegate = TestStreamDelegate() + + let clientMultiplexer = try await assertNoThrowWithValue( + try await self.clientChannel.configureAsyncHTTP2Pipeline(mode: .client) { + channel -> EventLoopFuture in + channel.eventLoop.makeSucceededFuture(channel) + }.get() + ) + + let negotiationResultFuture = try await self.serverChannel.configureAsyncHTTPServerPipeline( + streamDelegate: streamDelegate + ) { channel in + channel.eventLoop.makeSucceededVoidFuture() + } http2ConnectionInitializer: { channel in + channel.eventLoop.makeSucceededVoidFuture() + } http2StreamInitializer: { channel -> EventLoopFuture in + channel.pipeline.addHandlers([OKResponder()]).map { _ in channel } + }.get() + + // Let's pretend the TLS handler did protocol negotiation for us + self.serverChannel.pipeline.fireUserInboundEventTriggered( + TLSUserEvent.handshakeCompleted(negotiatedProtocol: "h2") + ) + + try await assertNoThrow( + try await self.assertDoHandshake(client: self.clientChannel, server: self.serverChannel) + ) + + try await withThrowingTaskGroup(of: Int.self, returning: Void.self) { group in + // server + group.addTask { + let negotiationResult = try await negotiationResultFuture.get() + let serverMultiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer + switch negotiationResult { + case .http1_1: + preconditionFailure("Negotiation result must be HTTP/2") + case .http2(let (_, multiplexer)): + serverMultiplexer = multiplexer + } + + var serverInboundChannelCount = 0 + for try await _ in serverMultiplexer.inbound { + serverInboundChannelCount += 1 + } + return serverInboundChannelCount + } + + // client + for _ in 0.. EventLoopFuture in + channel.pipeline.addHandlers([SimpleRequest(), InboundFramePayloadRecorder()]).map { + channel + } + } + + let clientRecorder = try await streamChannel.pipeline.handler(type: InboundFramePayloadRecorder.self) + .get() + + try await Self.deliverAllBytes(from: self.clientChannel, to: self.serverChannel) + try await Self.deliverAllBytes(from: self.serverChannel, to: self.clientChannel) + + clientRecorder.receivedFrames.assertFramePayloadsMatch([ + ConfiguringPipelineAsyncMultiplexerTests.responseFramePayload + ]) + try await streamChannel.closeFuture.get() + } + + try await assertNoThrow(try await self.clientChannel.finish()) + try await assertNoThrow(try await self.serverChannel.finish()) + + let serverInboundChannelCount = try await assertNoThrowWithValue(try await group.next()!) + XCTAssertEqual( + serverInboundChannelCount, + requestCount, + "We should have created one server-side channel as a result of the each HTTP/2 stream used." + ) + } + + XCTAssertEqual(streamDelegate.streamCount.withLockedValue { $0 }, 0) + XCTAssertEqual(streamDelegate.streamsCreated.withLockedValue { $0 }, requestCount) + } + // Simple handler which maps client request parts to remove references to `IOData` which isn't Sendable internal final class HTTP1ClientSendability: ChannelOutboundHandler { public typealias RequestPart = HTTPPart