From 462ded73adab44a64697cdd5aa8d9d11e224ef0a Mon Sep 17 00:00:00 2001 From: Rick Newton-Rogers Date: Wed, 25 Oct 2023 15:49:16 +0100 Subject: [PATCH] HTTP/2 Async API (#424) We previously developed an async API for NIO HTTP/2 which was guarded under SPI. Now that the swift-nio async API is released we can reintroduce this code promoted to SPI. This change introduces: * `AsyncStreamMultiplexer` - an async variant of the HTTP/2 stream multiplexer which can be used to create outbound streams and provide access to an async sequence (`NIOHTTP2AsyncSequence`) of inbound streams * New pipeline configuration functions (e.g. `configureAsyncHTTP2Pipeline`) to support the new async mode --- Package.swift | 2 +- ...annelHandler+InlineStreamMultiplexer.swift | 39 ++ Sources/NIOHTTP2/HTTP2ChannelHandler.swift | 12 + .../HTTP2CommonInboundStreamMultiplexer.swift | 114 ++++- Sources/NIOHTTP2/HTTP2PipelineHelpers.swift | 195 ++++++++ ...iguringPipelineAsyncMultiplexerTests.swift | 458 ++++++++++++++++++ 6 files changed, 817 insertions(+), 3 deletions(-) create mode 100644 Tests/NIOHTTP2Tests/ConfiguringPipelineAsyncMultiplexerTests.swift diff --git a/Package.swift b/Package.swift index c9723fd5..47db7f0d 100644 --- a/Package.swift +++ b/Package.swift @@ -21,7 +21,7 @@ let package = Package( .library(name: "NIOHTTP2", targets: ["NIOHTTP2"]), ], dependencies: [ - .package(url: "https://github.com/apple/swift-nio.git", from: "2.58.0"), + .package(url: "https://github.com/apple/swift-nio.git", from: "2.60.0"), .package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.2"), .package(url: "https://github.com/apple/swift-docc-plugin", from: "1.0.0"), ], diff --git a/Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift b/Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift index fcd7e566..e5215949 100644 --- a/Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift +++ b/Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift @@ -211,3 +211,42 @@ extension InlineStreamMultiplexer { self.commonStreamMultiplexer.setChannelContinuation(streamChannels) } } + +extension NIOHTTP2Handler { + /// A variant of `NIOHTTP2Handler.StreamMultiplexer` which creates a child channel for each HTTP/2 stream and + /// provides access to inbound HTTP/2 streams. + /// + /// In general in NIO applications it is helpful to consider each HTTP/2 stream as an + /// independent stream of HTTP/2 frames. This multiplexer achieves this by creating a + /// number of in-memory `HTTP2StreamChannel` objects, one for each stream. These operate + /// on ``HTTP2Frame/FramePayload`` objects as their base communication + /// atom, as opposed to the regular NIO `SelectableChannel` objects which use `ByteBuffer` + /// and `IOData`. + /// + /// Inbound (remotely-initiated) streams are accessible via the ``inbound`` property, having been initialized and + /// returned as the `InboundStreamOutput` type. + /// + /// You can open a stream by calling ``openStream(_:)``. Locally-initiated stream channel objects are initialized upon creation using the supplied `initializer` which returns a type + /// `Output`. This type may be `HTTP2Frame` or changed to any other type. + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + public struct AsyncStreamMultiplexer { + private let inlineStreamMultiplexer: InlineStreamMultiplexer + public let inbound: NIOHTTP2AsyncSequence + + // Cannot be created by users. + internal init(_ inlineStreamMultiplexer: InlineStreamMultiplexer, continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2AsyncSequence) { + self.inlineStreamMultiplexer = inlineStreamMultiplexer + self.inlineStreamMultiplexer.setChannelContinuation(continuation) + self.inbound = inboundStreamChannels + } + + + /// Create a stream channel initialized with the provided closure + /// - Parameter initializer: A closure that will be called upon the created stream which is responsible for + /// initializing the stream's `Channel`. + /// - Returns: The result of the `initializer`. + public func openStream(_ initializer: @escaping NIOChannelInitializerWithOutput) async throws -> Output { + return try await self.inlineStreamMultiplexer.createStreamChannel(initializer).get() + } + } +} diff --git a/Sources/NIOHTTP2/HTTP2ChannelHandler.swift b/Sources/NIOHTTP2/HTTP2ChannelHandler.swift index 1dc563ef..6e9ac229 100644 --- a/Sources/NIOHTTP2/HTTP2ChannelHandler.swift +++ b/Sources/NIOHTTP2/HTTP2ChannelHandler.swift @@ -1180,4 +1180,16 @@ extension NIOHTTP2Handler { throw NIOHTTP2Errors.missingMultiplexer() } } + + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + internal func syncAsyncStreamMultiplexer(continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2AsyncSequence) throws -> AsyncStreamMultiplexer { + self.eventLoop!.preconditionInEventLoop() + + switch self.inboundStreamMultiplexer { + case let .some(.inline(multiplexer)): + return AsyncStreamMultiplexer(multiplexer, continuation: continuation, inboundStreamChannels: inboundStreamChannels) + case .some(.legacy), .none: + throw NIOHTTP2Errors.missingMultiplexer() + } + } } diff --git a/Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift b/Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift index 54f60b07..ede25351 100644 --- a/Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift +++ b/Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift @@ -444,10 +444,120 @@ extension HTTP2CommonInboundStreamMultiplexer { } } -/// `ChannelContinuation` is used to generic async-sequence-like objects to deal with `Channel`s. This is so that they may be held -/// by the `HTTP2ChannelHandler` without causing it to become generic itself. +/// `AnyContinuation` is used to generic async-sequence-like objects to deal with the generic element types without +/// the holding type becoming generic itself. +/// +/// This is useful in in the case of the `HTTP2ChannelHandler` which must deal with types which hold stream initializers +/// which have a generic return type. internal protocol AnyContinuation { func yield(any: Any) func finish() func finish(throwing error: Error) } + + +/// `NIOHTTP2AsyncSequence` is an implementation of the `AsyncSequence` protocol which allows iteration over a generic +/// element type `Output`. +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +public struct NIOHTTP2AsyncSequence: AsyncSequence { + public struct AsyncIterator: AsyncIteratorProtocol { + public typealias Element = Output + + private var iterator: AsyncThrowingStream.AsyncIterator + + init(wrapping iterator: AsyncThrowingStream.AsyncIterator) { + self.iterator = iterator + } + + public mutating func next() async throws -> Output? { + try await self.iterator.next() + } + } + + public typealias Element = Output + + private let asyncThrowingStream: AsyncThrowingStream + + private init(_ asyncThrowingStream: AsyncThrowingStream) { + self.asyncThrowingStream = asyncThrowingStream + } + + public func makeAsyncIterator() -> AsyncIterator { + AsyncIterator(wrapping: self.asyncThrowingStream.makeAsyncIterator()) + } +} + +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +extension NIOHTTP2AsyncSequence { + /// `Continuation` is a wrapper for a generic `AsyncThrowingStream` to which the products of the initializers of + /// inbound (remotely-initiated) HTTP/2 stream channels are yielded. + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + struct Continuation: AnyContinuation { + private var continuation: AsyncThrowingStream.Continuation + + internal init(wrapping continuation: AsyncThrowingStream.Continuation) { + self.continuation = continuation + } + + /// `yield` takes a channel as outputted by the stream initializer and yields the wrapped `AsyncThrowingStream`. + /// + /// It takes channels as as `Any` type to allow wrapping by the stream initializer. + func yield(any: Any) { + let yieldResult = self.continuation.yield(any as! Output) + switch yieldResult { + case .enqueued: + break // success, nothing to do + case .dropped: + preconditionFailure("Attempted to yield when AsyncThrowingStream is over capacity. This shouldn't be possible for an unbounded stream.") + case .terminated: + preconditionFailure("Attempted to yield to AsyncThrowingStream in terminated state.") + default: + preconditionFailure("Attempt to yield to AsyncThrowingStream failed for unhandled reason.") + } + } + + /// `finish` marks the continuation as finished. + func finish() { + self.continuation.finish() + } + + /// `finish` marks the continuation as finished with the supplied error. + func finish(throwing error: Error) { + self.continuation.finish(throwing: error) + } + } + + + /// `initialize` creates a new `Continuation` object and returns it along with its backing ``NIOHTTP2AsyncSequence``. + /// The `Continuation` provides the ability to yield to the backing .``NIOHTTP2AsyncSequence``. + /// + /// - Parameters: + /// - inboundStreamInitializerOutput: The type which is returned by the initializer operating on the inbound + /// (remotely-initiated) HTTP/2 streams. + static func initialize(inboundStreamInitializerOutput: Output.Type = Output.self) -> (NIOHTTP2AsyncSequence, Continuation) { + let (stream, continuation) = AsyncThrowingStream.makeStream(of: Output.self) + return (.init(stream), Continuation(wrapping: continuation)) + } +} + +@available(*, unavailable) +extension NIOHTTP2AsyncSequence.AsyncIterator: Sendable {} + +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +extension NIOHTTP2AsyncSequence: Sendable where Output: Sendable {} + +#if swift(<5.9) +// this should be available in the std lib from 5.9 onwards +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +extension AsyncThrowingStream { + static func makeStream( + of elementType: Element.Type = Element.self, + throwing failureType: Failure.Type = Failure.self, + bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded + ) -> (stream: AsyncThrowingStream, continuation: AsyncThrowingStream.Continuation) where Failure == Error { + var continuation: AsyncThrowingStream.Continuation! + let stream = AsyncThrowingStream(bufferingPolicy: limit) { continuation = $0 } + return (stream: stream, continuation: continuation!) + } +} +#endif diff --git a/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift b/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift index 46e0db37..cbfea273 100644 --- a/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift +++ b/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift @@ -416,3 +416,198 @@ extension ChannelPipeline.SynchronousOperations { return try handler.syncMultiplexer() } } + +// MARK: Async configurations + +extension Channel { + /// Configures a `ChannelPipeline` to speak HTTP/2 and sets up mapping functions so that it may be interacted with from concurrent code. + /// + /// In general this is not entirely useful by itself, as HTTP/2 is a negotiated protocol. This helper does not handle negotiation. + /// Instead, this simply adds the handler required to speak HTTP/2 after negotiation has completed, or when agreed by prior knowledge. + /// Use this function to setup a HTTP/2 pipeline if you wish to use async sequence abstractions over inbound and outbound streams. + /// Using this rather than implementing a similar function yourself allows that pipeline to evolve without breaking your code. + /// + /// - Parameters: + /// - mode: The mode this pipeline will operate in, server or client. + /// - configuration: The settings that will be used when establishing the connection and new streams. + /// - streamInitializer: A closure that will be called whenever the remote peer initiates a new stream. + /// The output of this closure is the element type of the returned multiplexer + /// - Returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline, which can + /// be used to initiate new streams and iterate over inbound HTTP/2 stream channels. + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + public func configureAsyncHTTP2Pipeline( + mode: NIOHTTP2Handler.ParserMode, + configuration: NIOHTTP2Handler.Configuration = .init(), + streamInitializer: @escaping NIOChannelInitializerWithOutput + ) -> EventLoopFuture> { + if self.eventLoop.inEventLoop { + return self.eventLoop.makeCompletedFuture { + return try self.pipeline.syncOperations.configureAsyncHTTP2Pipeline( + mode: mode, + configuration: configuration, + streamInitializer: streamInitializer + ) + } + } else { + return self.eventLoop.submit { + return try self.pipeline.syncOperations.configureAsyncHTTP2Pipeline( + mode: mode, + configuration: configuration, + streamInitializer: streamInitializer + ) + } + } + } + + /// Configures a channel to perform an HTTP/2 secure upgrade with typed negotiation results. + /// + /// HTTP/2 secure upgrade uses the Application Layer Protocol Negotiation TLS extension to + /// negotiate the inner protocol as part of the TLS handshake. For this reason, until the TLS + /// handshake is complete, the ultimate configuration of the channel pipeline cannot be known. + /// + /// This function configures the channel with a pair of callbacks that will handle the result + /// of the negotiation. It explicitly **does not** configure a TLS handler to actually attempt + /// to negotiate ALPN. The supported ALPN protocols are provided in + /// `NIOHTTP2SupportedALPNProtocols`: please ensure that the TLS handler you are using for your + /// pipeline is appropriately configured to perform this protocol negotiation. + /// + /// If negotiation results in an unexpected protocol, the pipeline will close the connection + /// and no callback will fire. + /// + /// This configuration is acceptable for use on both client and server channel pipelines. + /// + /// - Parameters: + /// - http1ConnectionInitializer: A callback that will be invoked if HTTP/1.1 has been explicitly + /// negotiated, or if no protocol was negotiated. Must return a future that completes when the + /// channel has been fully mutated. + /// - http2ConnectionInitializer: A callback that will be invoked if HTTP/2 has been negotiated, and that + /// should configure the channel for HTTP/2 use. Must return a future that completes when the + /// channel has been fully mutated. + /// - Returns: An `EventLoopFuture` of an `EventLoopFuture` containing the `NIOProtocolNegotiationResult` that completes when the channel + /// is ready to negotiate. + internal func configureHTTP2AsyncSecureUpgrade( + http1ConnectionInitializer: @escaping NIOChannelInitializerWithOutput, + http2ConnectionInitializer: @escaping NIOChannelInitializerWithOutput + ) -> EventLoopFuture>> { + let alpnHandler = NIOTypedApplicationProtocolNegotiationHandler>() { result in + switch result { + case .negotiated("h2"): + // Successful upgrade to HTTP/2. Let the user configure the pipeline. + return http2ConnectionInitializer(self).map { http2Output in .http2(http2Output) } + case .negotiated("http/1.1"), .fallback: + // Explicit or implicit HTTP/1.1 choice. + return http1ConnectionInitializer(self).map { http1Output in .http1_1(http1Output) } + case .negotiated: + // We negotiated something that isn't HTTP/1.1. This is a bad scene, and is a good indication + // of a user configuration error. We're going to close the connection directly. + return self.close().flatMap { self.eventLoop.makeFailedFuture(NIOHTTP2Errors.invalidALPNToken()) } + } + } + + return self.pipeline + .addHandler(alpnHandler) + .map { _ in + alpnHandler.protocolNegotiationResult + } + } + + /// Configures a `ChannelPipeline` to speak either HTTP/1.1 or HTTP/2 according to what can be negotiated with the client. + /// + /// This helper takes care of configuring the server pipeline such that it negotiates whether to + /// use HTTP/1.1 or HTTP/2. + /// + /// This function doesn't configure the TLS handler. Callers of this function need to add a TLS + /// handler appropriately configured to perform protocol negotiation. + /// + /// - Parameters: + /// - http2Configuration: The settings that will be used when establishing the HTTP/2 connections and new HTTP/2 streams. + /// - http1ConnectionInitializer: An optional callback that will be invoked only when the negotiated protocol + /// is HTTP/1.1 to configure the connection channel. + /// - http2ConnectionInitializer: An optional callback that will be invoked only when the negotiated protocol + /// is HTTP/2 to configure the connection channel. + /// - http2StreamInitializer: A closure that will be called whenever the remote peer initiates a new stream. + /// The output of this closure is the element type of the returned multiplexer + /// - Returns: An `EventLoopFuture` containing a ``NIOTypedApplicationProtocolNegotiationHandler`` that completes when the channel + /// is ready to negotiate. This can then be used to access the ``NIOProtocolNegotiationResult`` which may itself + /// be waited on to retrieve the result of the negotiation. + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + public func configureAsyncHTTPServerPipeline( + http2Configuration: NIOHTTP2Handler.Configuration = .init(), + http1ConnectionInitializer: @escaping NIOChannelInitializerWithOutput, + http2ConnectionInitializer: @escaping NIOChannelInitializerWithOutput, + http2StreamInitializer: @escaping NIOChannelInitializerWithOutput + ) -> EventLoopFuture) + >>> { + let http2ConnectionInitializer: NIOChannelInitializerWithOutput<(HTTP2ConnectionOutput, NIOHTTP2Handler.AsyncStreamMultiplexer)> = { channel in + channel.configureAsyncHTTP2Pipeline( + mode: .server, + configuration: http2Configuration, + streamInitializer: http2StreamInitializer + ).flatMap { multiplexer in + return http2ConnectionInitializer(channel).map { connectionChannel in + (connectionChannel, multiplexer) + } + } + } + let http1ConnectionInitializer: NIOChannelInitializerWithOutput = { channel in + channel.pipeline.configureHTTPServerPipeline().flatMap { _ in + http1ConnectionInitializer(channel) + } + } + return self.configureHTTP2AsyncSecureUpgrade( + http1ConnectionInitializer: http1ConnectionInitializer, + http2ConnectionInitializer: http2ConnectionInitializer + ) + } +} + +extension ChannelPipeline.SynchronousOperations { + /// Configures a `ChannelPipeline` to speak HTTP/2 and sets up mapping functions so that it may be interacted with from concurrent code. + /// + /// This operation **must** be called on the event loop. + /// + /// In general this is not entirely useful by itself, as HTTP/2 is a negotiated protocol. This helper does not handle negotiation. + /// Instead, this simply adds the handler required to speak HTTP/2 after negotiation has completed, or when agreed by prior knowledge. + /// Use this function to setup a HTTP/2 pipeline if you wish to use async sequence abstractions over inbound and outbound streams, + /// as it allows that pipeline to evolve without breaking your code. + /// + /// - Parameters: + /// - mode: The mode this pipeline will operate in, server or client. + /// - configuration: The settings that will be used when establishing the connection and new streams. + /// - streamInitializer: A closure that will be called whenever the remote peer initiates a new stream. + /// The output of this closure is the element type of the returned multiplexer + /// - Returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline, which can + /// be used to initiate new streams and iterate over inbound HTTP/2 stream channels. + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + public func configureAsyncHTTP2Pipeline( + mode: NIOHTTP2Handler.ParserMode, + configuration: NIOHTTP2Handler.Configuration = .init(), + streamInitializer: @escaping NIOChannelInitializerWithOutput + ) throws -> NIOHTTP2Handler.AsyncStreamMultiplexer { + let handler = NIOHTTP2Handler( + mode: mode, + eventLoop: self.eventLoop, + connectionConfiguration: configuration.connection, + streamConfiguration: configuration.stream, + inboundStreamInitializerWithAnyOutput: { channel in + streamInitializer(channel).map { return $0 } + } + ) + + try self.addHandler(handler) + + let (inboundStreamChannels, continuation) = NIOHTTP2AsyncSequence.initialize(inboundStreamInitializerOutput: Output.self) + + return try handler.syncAsyncStreamMultiplexer(continuation: continuation, inboundStreamChannels: inboundStreamChannels) + } +} + +/// `NIONegotiatedHTTPVersion` is a generic negotiation result holder for HTTP/1.1 and HTTP/2 +public enum NIONegotiatedHTTPVersion { + /// Protocol negotiation resulted in the connection using HTTP/1.1. + case http1_1(HTTP1Output) + /// Protocol negotiation resulted in the connection using HTTP/2. + case http2(HTTP2Output) +} diff --git a/Tests/NIOHTTP2Tests/ConfiguringPipelineAsyncMultiplexerTests.swift b/Tests/NIOHTTP2Tests/ConfiguringPipelineAsyncMultiplexerTests.swift new file mode 100644 index 00000000..6af97eb8 --- /dev/null +++ b/Tests/NIOHTTP2Tests/ConfiguringPipelineAsyncMultiplexerTests.swift @@ -0,0 +1,458 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2019-2023 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import XCTest + +import NIOConcurrencyHelpers +import NIOCore +import NIOEmbedded +import NIOHPACK +import NIOHTTP1 +import NIOHTTP2 +import NIOTLS + +final class ConfiguringPipelineAsyncMultiplexerTests: XCTestCase { + var clientChannel: NIOAsyncTestingChannel! + var serverChannel: NIOAsyncTestingChannel! + + override func setUp() { + self.clientChannel = NIOAsyncTestingChannel() + self.serverChannel = NIOAsyncTestingChannel() + } + + override func tearDown() { + self.clientChannel = nil + self.serverChannel = nil + } + + static let requestFramePayload = HTTP2Frame.FramePayload.headers(.init(headers: HPACKHeaders([(":method", "GET"), (":authority", "localhost"), (":scheme", "https"), (":path", "/")]), endStream: true)) + static let responseFramePayload = HTTP2Frame.FramePayload.headers(.init(headers: HPACKHeaders([(":status", "200")]), endStream: true)) + + static let requestHead = HTTPRequestHead(version: .init(major: 1, minor: 1), method: .GET, uri: "/testHTTP1") + static let responseHead = HTTPResponseHead(version: .init(major: 1, minor: 1), status: .ok, headers: HTTPHeaders([("transfer-encoding", "chunked")])) + + final class OKResponder: ChannelInboundHandler { + typealias InboundIn = HTTP2Frame.FramePayload + typealias OutboundOut = HTTP2Frame.FramePayload + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + let frame = self.unwrapInboundIn(data) + switch frame { + case .headers: + break + default: + fatalError("unexpected frame type: \(frame)") + } + + context.writeAndFlush(self.wrapOutboundOut(responseFramePayload), promise: nil) + context.fireChannelRead(data) + } + } + + final class HTTP1OKResponder: ChannelInboundHandler { + typealias InboundIn = HTTPServerRequestPart + typealias OutboundOut = HTTPServerResponsePart + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + switch self.unwrapInboundIn(data) { + case .head: + context.write(self.wrapOutboundOut(.head(responseHead)), promise: nil) + context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) + case .body, .end: + break + } + + context.fireChannelRead(data) + } + } + + final class SimpleRequest: ChannelInboundHandler { + typealias InboundIn = HTTP2Frame.FramePayload + typealias OutboundOut = HTTP2Frame.FramePayload + + func writeRequest(context: ChannelHandlerContext) { + context.writeAndFlush(self.wrapOutboundOut(requestFramePayload), promise: nil) + } + + func channelActive(context: ChannelHandlerContext) { + self.writeRequest(context: context) + context.fireChannelActive() + } + } + + // `testBasicPipelineCommunicates` ensures that a client-server system set up to use async stream abstractions + // can communicate successfully. + func testBasicPipelineCommunicates() async throws { + let requestCount = 100 + + let serverRecorder = InboundFramePayloadRecorder() + + let clientMultiplexer = try await assertNoThrowWithValue( + try await self.clientChannel.configureAsyncHTTP2Pipeline(mode: .client) { channel -> EventLoopFuture in + channel.eventLoop.makeSucceededFuture(channel) + }.get() + ) + + let serverMultiplexer = try await assertNoThrowWithValue( + try await self.serverChannel.configureAsyncHTTP2Pipeline(mode: .server) { channel -> EventLoopFuture in + channel.pipeline.addHandlers([OKResponder(), serverRecorder]).map { _ in channel } + }.get() + ) + + try await assertNoThrow(try await self.assertDoHandshake(client: self.clientChannel, server: self.serverChannel)) + + try await withThrowingTaskGroup(of: Int.self, returning: Void.self) { group in + // server + group.addTask { + var serverInboundChannelCount = 0 + for try await _ in serverMultiplexer.inbound { + serverInboundChannelCount += 1 + } + return serverInboundChannelCount + } + + // client + for _ in 0 ..< requestCount { + // Let's try sending some requests + let streamChannel = try await clientMultiplexer.openStream { channel -> EventLoopFuture in + return channel.pipeline.addHandlers([SimpleRequest(), InboundFramePayloadRecorder()]).map { + return channel + } + } + + let clientRecorder = try await streamChannel.pipeline.handler(type: InboundFramePayloadRecorder.self).get() + try await self.deliverAllBytes(from: self.clientChannel, to: self.serverChannel) + try await self.deliverAllBytes(from: self.serverChannel, to: self.clientChannel) + clientRecorder.receivedFrames.assertFramePayloadsMatch([ConfiguringPipelineAsyncMultiplexerTests.responseFramePayload]) + try await streamChannel.closeFuture.get() + } + + try await assertNoThrow(try await self.clientChannel.finish()) + try await assertNoThrow(try await self.serverChannel.finish()) + + let serverInboundChannelCount = try await assertNoThrowWithValue(try await group.next()!) + XCTAssertEqual(serverInboundChannelCount, requestCount, "We should have created one server-side channel as a result of the each HTTP/2 stream used.") + } + + serverRecorder.receivedFrames.assertFramePayloadsMatch(Array(repeating: ConfiguringPipelineAsyncMultiplexerTests.requestFramePayload, count: requestCount)) + } + + // `testNIOAsyncConnectionStreamChannelPipelineCommunicates` ensures that a client-server system set up to use `NIOAsyncChannel` + // wrappers around connection and stream channels can communicate successfully. + func testNIOAsyncConnectionStreamChannelPipelineCommunicates() async throws { + let requestCount = 100 + + let clientMultiplexer = try await assertNoThrowWithValue( + try await self.clientChannel.configureAsyncHTTP2Pipeline( + mode: .client, + streamInitializer: { channel in + channel.eventLoop.makeCompletedFuture { + try NIOAsyncChannel( + synchronouslyWrapping: channel, + configuration: .init(inboundType: HTTP2Frame.FramePayload.self, outboundType: HTTP2Frame.FramePayload.self) + ) + } + } + ).get() + ) + + let serverMultiplexer = try await assertNoThrowWithValue( + try await self.serverChannel.configureAsyncHTTP2Pipeline( + mode: .server, + streamInitializer: { channel in + channel.eventLoop.makeCompletedFuture { + try NIOAsyncChannel( + synchronouslyWrapping: channel, + configuration: .init(inboundType: HTTP2Frame.FramePayload.self, outboundType: HTTP2Frame.FramePayload.self) + ) + } + } + ).get() + ) + + try await assertNoThrow(try await self.assertDoHandshake(client: self.clientChannel, server: self.serverChannel)) + + try await withThrowingTaskGroup(of: Int.self, returning: Void.self) { group in + // server + group.addTask { + var serverInboundChannelCount = 0 + for try await streamChannel in serverMultiplexer.inbound { + for try await receivedFrame in streamChannel.inbound { + receivedFrame.assertFramePayloadMatches(this: ConfiguringPipelineAsyncMultiplexerTests.requestFramePayload) + + try await streamChannel.outbound.write(ConfiguringPipelineAsyncMultiplexerTests.responseFramePayload) + streamChannel.outbound.finish() + + try await self.deliverAllBytes(from: self.serverChannel, to: self.clientChannel) + } + serverInboundChannelCount += 1 + } + return serverInboundChannelCount + } + + // client + for _ in 0 ..< requestCount { + let streamChannel = try await clientMultiplexer.openStream() { channel in + channel.eventLoop.makeCompletedFuture { + try NIOAsyncChannel( + synchronouslyWrapping: channel, + configuration: .init( + inboundType: HTTP2Frame.FramePayload.self, + outboundType: HTTP2Frame.FramePayload.self + ) + ) + } + } + // Let's try sending some requests + try await streamChannel.outbound.write(ConfiguringPipelineAsyncMultiplexerTests.requestFramePayload) + streamChannel.outbound.finish() + + try await self.deliverAllBytes(from: self.clientChannel, to: self.serverChannel) + + for try await receivedFrame in streamChannel.inbound { + receivedFrame.assertFramePayloadMatches(this: ConfiguringPipelineAsyncMultiplexerTests.responseFramePayload) + } + } + + try await assertNoThrow(try await self.clientChannel.finish()) + try await assertNoThrow(try await self.serverChannel.finish()) + + let serverInboundChannelCount = try await assertNoThrowWithValue(try await group.next()!) + XCTAssertEqual(serverInboundChannelCount, requestCount, "We should have created one server-side channel as a result of the one HTTP/2 stream used.") + } + } + + // `testNegotiatedHTTP2BasicPipelineCommunicates` ensures that a client-server system set up to use async stream abstractions + // can communicate successfully when HTTP/2 is negotiated. + func testNegotiatedHTTP2BasicPipelineCommunicates() async throws { + let requestCount = 100 + + let serverRecorder = InboundFramePayloadRecorder() + + let clientMultiplexer = try await assertNoThrowWithValue( + try await self.clientChannel.configureAsyncHTTP2Pipeline(mode: .client) { channel -> EventLoopFuture in + channel.eventLoop.makeSucceededFuture(channel) + }.get() + ) + + let negotiationResultFuture = try await self.serverChannel.configureAsyncHTTPServerPipeline() { channel in + channel.eventLoop.makeSucceededVoidFuture() + } http2ConnectionInitializer: { channel in + channel.eventLoop.makeSucceededVoidFuture() + } http2StreamInitializer: { channel -> EventLoopFuture in + channel.pipeline.addHandlers([OKResponder(), serverRecorder]).map { _ in channel } + }.get() + + // Let's pretend the TLS handler did protocol negotiation for us + self.serverChannel.pipeline.fireUserInboundEventTriggered(TLSUserEvent.handshakeCompleted(negotiatedProtocol: "h2")) + + let negotiationResult = try await negotiationResultFuture.get() + + try await assertNoThrow(try await self.assertDoHandshake(client: self.clientChannel, server: self.serverChannel)) + + let serverMultiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer + switch negotiationResult { + case .http1_1: + preconditionFailure("Negotiation result must be HTTP/2") + case .http2(let (_, multiplexer)): + serverMultiplexer = multiplexer + } + + try await withThrowingTaskGroup(of: Int.self, returning: Void.self) { group in + // server + group.addTask { + var serverInboundChannelCount = 0 + for try await _ in serverMultiplexer.inbound { + serverInboundChannelCount += 1 + } + return serverInboundChannelCount + } + + // client + for _ in 0 ..< requestCount { + // Let's try sending some requests + let streamChannel = try await clientMultiplexer.openStream { channel -> EventLoopFuture in + return channel.pipeline.addHandlers([SimpleRequest(), InboundFramePayloadRecorder()]).map { + return channel + } + } + + let clientRecorder = try await streamChannel.pipeline.handler(type: InboundFramePayloadRecorder.self).get() + + try await self.deliverAllBytes(from: self.clientChannel, to: self.serverChannel) + try await self.deliverAllBytes(from: self.serverChannel, to: self.clientChannel) + + clientRecorder.receivedFrames.assertFramePayloadsMatch([ConfiguringPipelineAsyncMultiplexerTests.responseFramePayload]) + try await streamChannel.closeFuture.get() + } + + try await assertNoThrow(try await self.clientChannel.finish()) + try await assertNoThrow(try await self.serverChannel.finish()) + + let serverInboundChannelCount = try await assertNoThrowWithValue(try await group.next()!) + XCTAssertEqual(serverInboundChannelCount, requestCount, "We should have created one server-side channel as a result of the each HTTP/2 stream used.") + } + + serverRecorder.receivedFrames.assertFramePayloadsMatch(Array(repeating: ConfiguringPipelineAsyncMultiplexerTests.requestFramePayload, count: requestCount)) + } + + // `testNegotiatedHTTP1BasicPipelineCommunicates` ensures that a client-server system set up to use async stream abstractions + // can communicate successfully when HTTP/1.1 is negotiated. + func testNegotiatedHTTP1BasicPipelineCommunicates() async throws { + let requestCount = 100 + + let _ = try await self.clientChannel.pipeline.addHTTPClientHandlers().map { _ in + self.clientChannel.pipeline.addHandlers([InboundRecorderHandler(), HTTP1ClientSendability()]) + }.get() + + let negotiationResultFuture = try await self.serverChannel.configureAsyncHTTPServerPipeline() { channel in + channel.pipeline.addHandlers([HTTP1OKResponder(), InboundRecorderHandler()]) + } http2ConnectionInitializer: { channel in + channel.eventLoop.makeSucceededVoidFuture() + } http2StreamInitializer: { channel -> EventLoopFuture in + channel.eventLoop.makeSucceededFuture(channel) + }.get() + + // Let's pretend the TLS handler did protocol negotiation for us + self.serverChannel.pipeline.fireUserInboundEventTriggered(TLSUserEvent.handshakeCompleted(negotiatedProtocol: "http/1.1")) + + let negotiationResult = try await negotiationResultFuture.get() + + try await self.deliverAllBytes(from: self.clientChannel, to: self.serverChannel) + try await self.deliverAllBytes(from: self.serverChannel, to: self.clientChannel) + + switch negotiationResult { + case .http1_1: + break + case .http2: + preconditionFailure("Negotiation result must be http/1.1") + } + + // client + for _ in 0 ..< requestCount { + // Let's try sending some http/1.1 requests. + // we need to put these through a mapping to remove references to `IOData` which isn't Sendable + try await self.clientChannel.writeOutbound(HTTP1ClientSendability.RequestPart.head(ConfiguringPipelineAsyncMultiplexerTests.requestHead)) + try await self.clientChannel.writeOutbound(HTTP1ClientSendability.RequestPart.end(nil)) + try await self.deliverAllBytes(from: self.clientChannel, to: self.serverChannel) + try await self.deliverAllBytes(from: self.serverChannel, to: self.clientChannel) + } + + // check expectations + let clientRecorder = try await self.clientChannel.pipeline.handler(type: InboundRecorderHandler.self).get() + let serverRecorder = try await self.serverChannel.pipeline.handler(type: InboundRecorderHandler.self).get() + + XCTAssertEqual(serverRecorder.receivedParts.count, requestCount*2) + XCTAssertEqual(clientRecorder.receivedParts.count, requestCount*2) + + for i in 0 ..< requestCount { + XCTAssertEqual(serverRecorder.receivedParts[i*2], HTTPServerRequestPart.head(ConfiguringPipelineAsyncMultiplexerTests.requestHead), "Unexpected request part in iteration \(i)") + XCTAssertEqual(serverRecorder.receivedParts[i*2+1], HTTPServerRequestPart.end(nil), "Unexpected request part in iteration \(i)") + + XCTAssertEqual(clientRecorder.receivedParts[i*2], HTTPClientResponsePart.head(ConfiguringPipelineAsyncMultiplexerTests.responseHead), "Unexpected response part in iteration \(i)") + XCTAssertEqual(clientRecorder.receivedParts[i*2+1], HTTPClientResponsePart.end(nil), "Unexpected response part in iteration \(i)") + } + + try await assertNoThrow(try await self.clientChannel.finish()) + try await assertNoThrow(try await self.serverChannel.finish()) + } + + // Simple handler which maps client request parts to remove references to `IOData` which isn't Sendable + internal final class HTTP1ClientSendability: ChannelOutboundHandler { + public typealias RequestPart = HTTPPart + + typealias OutboundIn = RequestPart + typealias OutboundOut = HTTPClientRequestPart + + func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { + let requestPart = self.unwrapOutboundIn(data) + + let httpClientRequestPart: HTTPClientRequestPart + switch requestPart { + case .head(let head): + httpClientRequestPart = .head(head) + case .body(let byteBuffer): + httpClientRequestPart = .body(.byteBuffer(byteBuffer)) + case .end(let headers): + httpClientRequestPart = .end(headers) + } + + context.write(self.wrapOutboundOut(httpClientRequestPart), promise: promise) + } + } + + // Simple handler which maps server response parts to remove references to `IOData` which isn't Sendable + internal final class HTTP1ServerSendability: ChannelOutboundHandler { + public typealias ResponsePart = HTTPPart + + typealias OutboundIn = ResponsePart + typealias OutboundOut = HTTPServerResponsePart + + func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { + let responsePart = self.unwrapOutboundIn(data) + + let httpServerResponsePart: HTTPServerResponsePart + switch responsePart { + case .head(let head): + httpServerResponsePart = .head(head) + case .body(let byteBuffer): + httpServerResponsePart = .body(.byteBuffer(byteBuffer)) + case .end(let headers): + httpServerResponsePart = .end(headers) + } + + context.write(self.wrapOutboundOut(httpServerResponsePart), promise: promise) + } + } + + /// A simple channel handler that records inbound messages. + internal final class InboundRecorderHandler: ChannelInboundHandler, @unchecked Sendable { + typealias InboundIn = message + + private let partsLock = NIOLock() + private var _receivedParts: [message] = [] + + var receivedParts: [message] { + get { + self.partsLock.withLock { + self._receivedParts + } + } + set { + self.partsLock.withLock { + self._receivedParts = newValue + } + } + } + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + self.receivedParts.append(self.unwrapInboundIn(data)) + context.fireChannelRead(data) + } + } +} + +#if swift(<5.9) +// this should be available in the std lib from 5.9 onwards +extension AsyncStream { + fileprivate static func makeStream( + of elementType: Element.Type = Element.self, + bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded + ) -> (stream: AsyncStream, continuation: AsyncStream.Continuation) { + var continuation: AsyncStream.Continuation! + let stream = AsyncStream(bufferingPolicy: limit) { continuation = $0 } + return (stream: stream, continuation: continuation!) + } +} +#endif