From bc2acbe8c3386c40a26d9e75d82600596ad4013e Mon Sep 17 00:00:00 2001 From: Rick Newton-Rogers Date: Wed, 4 Oct 2023 11:53:38 +0100 Subject: [PATCH] Remove dependency on SPI Motivation: Remove dependency on SPI so that we can cut a release without introducing dependency on code subject to change. Modifications: Remove all code which introduces a dependency on the NIOAsyncChannel SPI Result: Code inside SPI is removed --- ...annelHandler+InlineStreamMultiplexer.swift | 35 +- Sources/NIOHTTP2/HTTP2ChannelHandler.swift | 12 - .../HTTP2CommonInboundStreamMultiplexer.swift | 121 ----- Sources/NIOHTTP2/HTTP2PipelineHelpers.swift | 207 +------- ...iguringPipelineAsyncMultiplexerTests.swift | 458 ------------------ 5 files changed, 3 insertions(+), 830 deletions(-) delete mode 100644 Tests/NIOHTTP2Tests/ConfiguringPipelineAsyncMultiplexerTests.swift diff --git a/Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift b/Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift index 5b047fcb..fcd7e566 100644 --- a/Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift +++ b/Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift @@ -12,7 +12,7 @@ // //===----------------------------------------------------------------------===// -@_spi(AsyncChannel) import NIOCore +import NIOCore internal struct InlineStreamMultiplexer { private let context: ChannelHandlerContext @@ -211,36 +211,3 @@ extension InlineStreamMultiplexer { self.commonStreamMultiplexer.setChannelContinuation(streamChannels) } } - -extension NIOHTTP2Handler { - /// A variant of `NIOHTTP2Handler.StreamMultiplexer` which creates a child channel for each HTTP/2 stream and - /// provides access to inbound HTTP/2 streams. - /// - /// In general in NIO applications it is helpful to consider each HTTP/2 stream as an - /// independent stream of HTTP/2 frames. This multiplexer achieves this by creating a - /// number of in-memory `HTTP2StreamChannel` objects, one for each stream. These operate - /// on ``HTTP2Frame/FramePayload`` objects as their base communication - /// atom, as opposed to the regular NIO `SelectableChannel` objects which use `ByteBuffer` - /// and `IOData`. - /// - /// Outbound stream channel objects are initialized upon creation using the supplied `streamStateInitializer` which returns a type - /// `Output`. This type may be `HTTP2Frame` or changed to any other type. - @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) - @_spi(AsyncChannel) - public struct AsyncStreamMultiplexer { - private let inlineStreamMultiplexer: InlineStreamMultiplexer - public let inbound: NIOHTTP2InboundStreamChannels - - // Cannot be created by users. - internal init(_ inlineStreamMultiplexer: InlineStreamMultiplexer, continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels) { - self.inlineStreamMultiplexer = inlineStreamMultiplexer - self.inlineStreamMultiplexer.setChannelContinuation(continuation) - self.inbound = inboundStreamChannels - } - - /// Create a stream channel initialized with the provided closure - public func createStreamChannel(_ initializer: @escaping NIOChannelInitializerWithOutput) async throws -> Output { - return try await self.inlineStreamMultiplexer.createStreamChannel(initializer).get() - } - } -} diff --git a/Sources/NIOHTTP2/HTTP2ChannelHandler.swift b/Sources/NIOHTTP2/HTTP2ChannelHandler.swift index 7ddf9df6..bf34f8cd 100644 --- a/Sources/NIOHTTP2/HTTP2ChannelHandler.swift +++ b/Sources/NIOHTTP2/HTTP2ChannelHandler.swift @@ -1140,16 +1140,4 @@ extension NIOHTTP2Handler { throw NIOHTTP2Errors.missingMultiplexer() } } - - @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) - internal func syncAsyncStreamMultiplexer(continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels) throws -> AsyncStreamMultiplexer { - self.eventLoop!.preconditionInEventLoop() - - switch self.inboundStreamMultiplexer { - case let .some(.inline(multiplexer)): - return AsyncStreamMultiplexer(multiplexer, continuation: continuation, inboundStreamChannels: inboundStreamChannels) - case .some(.legacy), .none: - throw NIOHTTP2Errors.missingMultiplexer() - } - } } diff --git a/Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift b/Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift index 47e11a05..54f60b07 100644 --- a/Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift +++ b/Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift @@ -451,124 +451,3 @@ internal protocol AnyContinuation { func finish() func finish(throwing error: Error) } - - -/// `NIOHTTP2InboundStreamChannels` provides access to inbound stream channels as a generic `AsyncSequence`. -/// They make use of generics to allow for wrapping the stream `Channel`s, for example as `NIOAsyncChannel`s or protocol negotiation objects. -@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -@_spi(AsyncChannel) -public struct NIOHTTP2InboundStreamChannels: AsyncSequence { - public struct AsyncIterator: AsyncIteratorProtocol { - public typealias Element = Output - - private var iterator: AsyncThrowingStream.AsyncIterator - - init(_ iterator: AsyncThrowingStream.AsyncIterator) { - self.iterator = iterator - } - - public mutating func next() async throws -> Output? { - try await self.iterator.next() - } - } - - public typealias Element = Output - - private let asyncThrowingStream: AsyncThrowingStream - - private init(_ asyncThrowingStream: AsyncThrowingStream) { - self.asyncThrowingStream = asyncThrowingStream - } - - public func makeAsyncIterator() -> AsyncIterator { - AsyncIterator(self.asyncThrowingStream.makeAsyncIterator()) - } -} - -@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -extension NIOHTTP2InboundStreamChannels { - /// `Continuation` is a wrapper for a generic `AsyncThrowingStream` to which inbound HTTP2 stream channels are yielded.. - @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) - struct Continuation: AnyContinuation { - private var continuation: AsyncThrowingStream.Continuation - - internal init( - continuation: AsyncThrowingStream.Continuation - ) { - self.continuation = continuation - } - - /// `yield` takes a channel as outputted by the stream initializer and yields the wrapped `AsyncThrowingStream`. - /// - /// It takes channels as as `Any` type to allow wrapping by the stream initializer. - func yield(any: Any) { - let yieldResult = self.continuation.yield(any as! Output) - switch yieldResult { - case .enqueued: - break // success, nothing to do - case .dropped: - preconditionFailure("Attempted to yield when AsyncThrowingStream is over capacity. This shouldn't be possible for an unbounded stream.") - case .terminated: - preconditionFailure("Attempted to yield to AsyncThrowingStream in terminated state.") - default: - preconditionFailure("Attempt to yield to AsyncThrowingStream failed for unhandled reason.") - } - } - - /// `finish` marks the continuation as finished. - func finish() { - self.continuation.finish() - } - - /// `finish` marks the continuation as finished with the supplied error. - func finish(throwing error: Error) { - self.continuation.finish(throwing: error) - } - } - - - /// `initialize` creates a new `Continuation` object and returns it along with its backing `AsyncThrowingStream`. - /// The `StreamChannelContinuation` provides access to the inbound HTTP2 stream channels. - /// - /// - Parameters: - /// - inboundStreamInititializer: A closure which initializes the newly-created inbound stream channel and returns a generic. - /// The returned type corresponds to the output of the channel once the operations in the initializer have been performed. - /// For example an `inboundStreamInititializer` which inserts handlers before wrapping the channel in a `NIOAsyncChannel` would - /// have a `Output` corresponding to that `NIOAsyncChannel` type. Another example is in cases where there is - /// per-stream protocol negotiation where `Output` would be some form of `NIOProtocolNegotiationResult`. - static func initialize(inboundStreamInitializerOutput: Output.Type = Output.self) -> (NIOHTTP2InboundStreamChannels, Continuation) { - let (stream, continuation) = AsyncThrowingStream.makeStream(of: Output.self) - return (.init(stream), Continuation(continuation: continuation)) - } -} - -#if swift(>=5.7) -// This doesn't compile on 5.6 but the omission of Sendable is sufficient in any case -@available(*, unavailable) -extension NIOHTTP2InboundStreamChannels.AsyncIterator: Sendable {} - -@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -extension NIOHTTP2InboundStreamChannels: Sendable where Output: Sendable {} -#else -// This wasn't marked as sendable in 5.6 however it should be fine -// https://forums.swift.org/t/so-is-asyncstream-sendable-or-not/53148/2 -@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -extension NIOHTTP2InboundStreamChannels: @unchecked Sendable where Output: Sendable {} -#endif - - -#if swift(<5.9) -// this should be available in the std lib from 5.9 onwards -@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -extension AsyncThrowingStream { - public static func makeStream( - of elementType: Element.Type = Element.self, - throwing failureType: Failure.Type = Failure.self, - bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded - ) -> (stream: AsyncThrowingStream, continuation: AsyncThrowingStream.Continuation) where Failure == Error { - var continuation: AsyncThrowingStream.Continuation! - let stream = AsyncThrowingStream(bufferingPolicy: limit) { continuation = $0 } - return (stream: stream, continuation: continuation!) - } -} -#endif diff --git a/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift b/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift index e36f7f01..eec92c21 100644 --- a/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift +++ b/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift @@ -12,8 +12,8 @@ // //===----------------------------------------------------------------------===// -@_spi(AsyncChannel) import NIOCore -@_spi(AsyncChannel) import NIOTLS +import NIOCore +import NIOTLS /// The supported ALPN protocol tokens for NIO's HTTP/2 abstraction layer. /// @@ -423,206 +423,3 @@ extension ChannelPipeline.SynchronousOperations { return try handler.syncMultiplexer() } } - -// MARK: Async configurations - -extension Channel { - /// Configures a `ChannelPipeline` to speak HTTP/2 and sets up mapping functions so that it may be interacted with from concurrent code. - /// - /// In general this is not entirely useful by itself, as HTTP/2 is a negotiated protocol. This helper does not handle negotiation. - /// Instead, this simply adds the handler required to speak HTTP/2 after negotiation has completed, or when agreed by prior knowledge. - /// Use this function to setup a HTTP/2 pipeline if you wish to use async sequence abstractions over inbound and outbound streams. - /// Using this rather than implementing a similar function yourself allows that pipeline to evolve without breaking your code. - /// - /// - Parameters: - /// - mode: The mode this pipeline will operate in, server or client. - /// - configuration: The settings that will be used when establishing the connection and new streams. - /// - position: The position in the pipeline into which to insert this handler. - /// - inboundStreamInitializer: A closure that will be called whenever the remote peer initiates a new stream. - /// The output of this closure is the element type of the returned multiplexer - /// - Returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline, which can - /// be used to initiate new streams and iterate over inbound HTTP/2 stream channels. - @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) - @_spi(AsyncChannel) - public func configureAsyncHTTP2Pipeline( - mode: NIOHTTP2Handler.ParserMode, - configuration: NIOHTTP2Handler.Configuration = .init(), - position: ChannelPipeline.Position = .last, - inboundStreamInitializer: @escaping NIOChannelInitializerWithOutput - ) -> EventLoopFuture> { - if self.eventLoop.inEventLoop { - return self.eventLoop.makeCompletedFuture { - return try self.pipeline.syncOperations.configureAsyncHTTP2Pipeline( - mode: mode, - configuration: configuration, - position: position, - inboundStreamInitializer: inboundStreamInitializer - ) - } - } else { - return self.eventLoop.submit { - return try self.pipeline.syncOperations.configureAsyncHTTP2Pipeline( - mode: mode, - configuration: configuration, - position: position, - inboundStreamInitializer: inboundStreamInitializer - ) - } - } - } - - /// Configures a channel to perform an HTTP/2 secure upgrade with typed negotiation results. - /// - /// HTTP/2 secure upgrade uses the Application Layer Protocol Negotiation TLS extension to - /// negotiate the inner protocol as part of the TLS handshake. For this reason, until the TLS - /// handshake is complete, the ultimate configuration of the channel pipeline cannot be known. - /// - /// This function configures the channel with a pair of callbacks that will handle the result - /// of the negotiation. It explicitly **does not** configure a TLS handler to actually attempt - /// to negotiate ALPN. The supported ALPN protocols are provided in - /// `NIOHTTP2SupportedALPNProtocols`: please ensure that the TLS handler you are using for your - /// pipeline is appropriately configured to perform this protocol negotiation. - /// - /// If negotiation results in an unexpected protocol, the pipeline will close the connection - /// and no callback will fire. - /// - /// This configuration is acceptable for use on both client and server channel pipelines. - /// - /// - Parameters: - /// - http1ConnectionInitializer: A callback that will be invoked if HTTP/1.1 has been explicitly - /// negotiated, or if no protocol was negotiated. Must return a future that completes when the - /// channel has been fully mutated. - /// - http2ConnectionInitializer: A callback that will be invoked if HTTP/2 has been negotiated, and that - /// should configure the channel for HTTP/2 use. Must return a future that completes when the - /// channel has been fully mutated. - /// - Returns: An `EventLoopFuture` of an `EventLoopFuture` containing the `NIOProtocolNegotiationResult` that completes when the channel - /// is ready to negotiate. - internal func configureHTTP2AsyncSecureUpgrade( - http1ConnectionInitializer: @escaping NIOChannelInitializerWithOutput, - http2ConnectionInitializer: @escaping NIOChannelInitializerWithOutput - ) -> EventLoopFuture>>> { - let alpnHandler = NIOTypedApplicationProtocolNegotiationHandler>() { result in - switch result { - case .negotiated("h2"): - // Successful upgrade to HTTP/2. Let the user configure the pipeline. - return http2ConnectionInitializer(self).map { http2Output in .init(result: .http2(http2Output)) } - case .negotiated("http/1.1"), .fallback: - // Explicit or implicit HTTP/1.1 choice. - return http1ConnectionInitializer(self).map { http1Output in .init(result: .http1_1(http1Output)) } - case .negotiated: - // We negotiated something that isn't HTTP/1.1. This is a bad scene, and is a good indication - // of a user configuration error. We're going to close the connection directly. - return self.close().flatMap { self.eventLoop.makeFailedFuture(NIOHTTP2Errors.invalidALPNToken()) } - } - } - - return self.pipeline - .addHandler(alpnHandler) - .map { _ in - alpnHandler.protocolNegotiationResult - } - } - - /// Configures a `ChannelPipeline` to speak either HTTP/1.1 or HTTP/2 according to what can be negotiated with the client. - /// - /// This helper takes care of configuring the server pipeline such that it negotiates whether to - /// use HTTP/1.1 or HTTP/2. - /// - /// This function doesn't configure the TLS handler. Callers of this function need to add a TLS - /// handler appropriately configured to perform protocol negotiation. - /// - /// - Parameters: - /// - http2Configuration: The settings that will be used when establishing the HTTP/2 connections and new HTTP/2 streams. - /// - http1ConnectionInitializer: An optional callback that will be invoked only when the negotiated protocol - /// is HTTP/1.1 to configure the connection channel. - /// - http2ConnectionInitializer: An optional callback that will be invoked only when the negotiated protocol - /// is HTTP/2 to configure the connection channel. - /// - http2InboundStreamInitializer: A closure that will be called whenever the remote peer initiates a new stream. - /// The output of this closure is the element type of the returned multiplexer - /// - Returns: An `EventLoopFuture` containing a ``NIOTypedApplicationProtocolNegotiationHandler`` that completes when the channel - /// is ready to negotiate. This can then be used to access the ``NIOProtocolNegotiationResult`` which may itself - /// be waited on to retrieve the result of the negotiation. - @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) - @_spi(AsyncChannel) - public func configureAsyncHTTPServerPipeline( - http2Configuration: NIOHTTP2Handler.Configuration = .init(), - http1ConnectionInitializer: @escaping NIOChannelInitializerWithOutput, - http2ConnectionInitializer: @escaping NIOChannelInitializerWithOutput, - http2InboundStreamInitializer: @escaping NIOChannelInitializerWithOutput - ) -> EventLoopFuture) - >>>> { - let http2ConnectionInitializer: NIOChannelInitializerWithOutput<(HTTP2ConnectionOutput, NIOHTTP2Handler.AsyncStreamMultiplexer)> = { channel in - channel.configureAsyncHTTP2Pipeline( - mode: .server, - configuration: http2Configuration, - inboundStreamInitializer: http2InboundStreamInitializer - ).flatMap { multiplexer in - return http2ConnectionInitializer(channel).map { connectionChannel in - (connectionChannel, multiplexer) - } - } - } - let http1ConnectionInitializer: NIOChannelInitializerWithOutput = { channel in - channel.pipeline.configureHTTPServerPipeline().flatMap { _ in - http1ConnectionInitializer(channel) - } - } - return self.configureHTTP2AsyncSecureUpgrade( - http1ConnectionInitializer: http1ConnectionInitializer, - http2ConnectionInitializer: http2ConnectionInitializer - ) - } -} - -extension ChannelPipeline.SynchronousOperations { - /// Configures a `ChannelPipeline` to speak HTTP/2 and sets up mapping functions so that it may be interacted with from concurrent code. - /// - /// This operation **must** be called on the event loop. - /// - /// In general this is not entirely useful by itself, as HTTP/2 is a negotiated protocol. This helper does not handle negotiation. - /// Instead, this simply adds the handler required to speak HTTP/2 after negotiation has completed, or when agreed by prior knowledge. - /// Use this function to setup a HTTP/2 pipeline if you wish to use async sequence abstractions over inbound and outbound streams, - /// as it allows that pipeline to evolve without breaking your code. - /// - /// - Parameters: - /// - mode: The mode this pipeline will operate in, server or client. - /// - configuration: The settings that will be used when establishing the connection and new streams. - /// - position: The position in the pipeline into which to insert this handler. - /// - inboundStreamInitializer: A closure that will be called whenever the remote peer initiates a new stream. - /// The output of this closure is the element type of the returned multiplexer - /// - Returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline, which can - /// be used to initiate new streams and iterate over inbound HTTP/2 stream channels. - @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) - @_spi(AsyncChannel) - public func configureAsyncHTTP2Pipeline( - mode: NIOHTTP2Handler.ParserMode, - configuration: NIOHTTP2Handler.Configuration = .init(), - position: ChannelPipeline.Position = .last, - inboundStreamInitializer: @escaping NIOChannelInitializerWithOutput - ) throws -> NIOHTTP2Handler.AsyncStreamMultiplexer { - let handler = NIOHTTP2Handler( - mode: mode, - eventLoop: self.eventLoop, - connectionConfiguration: configuration.connection, - streamConfiguration: configuration.stream, - inboundStreamInitializerWithAnyOutput: { channel in - inboundStreamInitializer(channel).map { return $0 } - } - ) - - try self.addHandler(handler, position: position) - - let (inboundStreamChannels, continuation) = NIOHTTP2InboundStreamChannels.initialize(inboundStreamInitializerOutput: Output.self) - - return try handler.syncAsyncStreamMultiplexer(continuation: continuation, inboundStreamChannels: inboundStreamChannels) - } -} - -/// `NIONegotiatedHTTPVersion` is a generic negotiation result holder for HTTP/1.1 and HTTP/2 -@_spi(AsyncChannel) -public enum NIONegotiatedHTTPVersion { - case http1_1(HTTP1Output) - case http2(HTTP2Output) -} diff --git a/Tests/NIOHTTP2Tests/ConfiguringPipelineAsyncMultiplexerTests.swift b/Tests/NIOHTTP2Tests/ConfiguringPipelineAsyncMultiplexerTests.swift deleted file mode 100644 index a3e4f39d..00000000 --- a/Tests/NIOHTTP2Tests/ConfiguringPipelineAsyncMultiplexerTests.swift +++ /dev/null @@ -1,458 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftNIO open source project -// -// Copyright (c) 2019-2023 Apple Inc. and the SwiftNIO project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftNIO project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import XCTest - -import NIOConcurrencyHelpers -@_spi(AsyncChannel) import NIOCore -import NIOEmbedded -import NIOHPACK -import NIOHTTP1 -@_spi(AsyncChannel) import NIOHTTP2 -import NIOTLS - -final class ConfiguringPipelineAsyncMultiplexerTests: XCTestCase { - var clientChannel: NIOAsyncTestingChannel! - var serverChannel: NIOAsyncTestingChannel! - - override func setUp() { - self.clientChannel = NIOAsyncTestingChannel() - self.serverChannel = NIOAsyncTestingChannel() - } - - override func tearDown() { - self.clientChannel = nil - self.serverChannel = nil - } - - static let requestFramePayload = HTTP2Frame.FramePayload.headers(.init(headers: HPACKHeaders([(":method", "GET"), (":authority", "localhost"), (":scheme", "https"), (":path", "/")]), endStream: true)) - static let responseFramePayload = HTTP2Frame.FramePayload.headers(.init(headers: HPACKHeaders([(":status", "200")]), endStream: true)) - - static let requestHead = HTTPRequestHead(version: .init(major: 1, minor: 1), method: .GET, uri: "/testHTTP1") - static let responseHead = HTTPResponseHead(version: .init(major: 1, minor: 1), status: .ok, headers: HTTPHeaders([("transfer-encoding", "chunked")])) - - final class OKResponder: ChannelInboundHandler { - typealias InboundIn = HTTP2Frame.FramePayload - typealias OutboundOut = HTTP2Frame.FramePayload - - func channelRead(context: ChannelHandlerContext, data: NIOAny) { - let frame = self.unwrapInboundIn(data) - switch frame { - case .headers: - break - default: - fatalError("unexpected frame type: \(frame)") - } - - context.writeAndFlush(self.wrapOutboundOut(responseFramePayload), promise: nil) - context.fireChannelRead(data) - } - } - - final class HTTP1OKResponder: ChannelInboundHandler { - typealias InboundIn = HTTPServerRequestPart - typealias OutboundOut = HTTPServerResponsePart - - func channelRead(context: ChannelHandlerContext, data: NIOAny) { - switch self.unwrapInboundIn(data) { - case .head: - context.write(self.wrapOutboundOut(.head(responseHead)), promise: nil) - context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) - case .body, .end: - break - } - - context.fireChannelRead(data) - } - } - - final class SimpleRequest: ChannelInboundHandler { - typealias InboundIn = HTTP2Frame.FramePayload - typealias OutboundOut = HTTP2Frame.FramePayload - - func writeRequest(context: ChannelHandlerContext) { - context.writeAndFlush(self.wrapOutboundOut(requestFramePayload), promise: nil) - } - - func channelActive(context: ChannelHandlerContext) { - self.writeRequest(context: context) - context.fireChannelActive() - } - } - - // `testBasicPipelineCommunicates` ensures that a client-server system set up to use async stream abstractions - // can communicate successfully. - func testBasicPipelineCommunicates() async throws { - let requestCount = 100 - - let serverRecorder = InboundFramePayloadRecorder() - - let clientMultiplexer = try await assertNoThrowWithValue( - try await self.clientChannel.configureAsyncHTTP2Pipeline(mode: .client) { channel -> EventLoopFuture in - channel.eventLoop.makeSucceededFuture(channel) - }.get() - ) - - let serverMultiplexer = try await assertNoThrowWithValue( - try await self.serverChannel.configureAsyncHTTP2Pipeline(mode: .server) { channel -> EventLoopFuture in - channel.pipeline.addHandlers([OKResponder(), serverRecorder]).map { _ in channel } - }.get() - ) - - try await assertNoThrow(try await self.assertDoHandshake(client: self.clientChannel, server: self.serverChannel)) - - try await withThrowingTaskGroup(of: Int.self, returning: Void.self) { group in - // server - group.addTask { - var serverInboundChannelCount = 0 - for try await _ in serverMultiplexer.inbound { - serverInboundChannelCount += 1 - } - return serverInboundChannelCount - } - - // client - for _ in 0 ..< requestCount { - // Let's try sending some requests - let streamChannel = try await clientMultiplexer.createStreamChannel { channel -> EventLoopFuture in - return channel.pipeline.addHandlers([SimpleRequest(), InboundFramePayloadRecorder()]).map { - return channel - } - } - - let clientRecorder = try await streamChannel.pipeline.handler(type: InboundFramePayloadRecorder.self).get() - try await self.deliverAllBytes(from: self.clientChannel, to: self.serverChannel) - try await self.deliverAllBytes(from: self.serverChannel, to: self.clientChannel) - clientRecorder.receivedFrames.assertFramePayloadsMatch([ConfiguringPipelineAsyncMultiplexerTests.responseFramePayload]) - try await streamChannel.closeFuture.get() - } - - try await assertNoThrow(try await self.clientChannel.finish()) - try await assertNoThrow(try await self.serverChannel.finish()) - - let serverInboundChannelCount = try await assertNoThrowWithValue(try await group.next()!) - XCTAssertEqual(serverInboundChannelCount, requestCount, "We should have created one server-side channel as a result of the each HTTP/2 stream used.") - } - - serverRecorder.receivedFrames.assertFramePayloadsMatch(Array(repeating: ConfiguringPipelineAsyncMultiplexerTests.requestFramePayload, count: requestCount)) - } - - // `testNIOAsyncConnectionStreamChannelPipelineCommunicates` ensures that a client-server system set up to use `NIOAsyncChannel` - // wrappers around connection and stream channels can communicate successfully. - func testNIOAsyncConnectionStreamChannelPipelineCommunicates() async throws { - let requestCount = 100 - - let clientMultiplexer = try await assertNoThrowWithValue( - try await self.clientChannel.configureAsyncHTTP2Pipeline( - mode: .client, - inboundStreamInitializer: { channel in - channel.eventLoop.makeCompletedFuture { - try NIOAsyncChannel( - synchronouslyWrapping: channel, - configuration: .init(inboundType: HTTP2Frame.FramePayload.self, outboundType: HTTP2Frame.FramePayload.self) - ) - } - } - ).get() - ) - - let serverMultiplexer = try await assertNoThrowWithValue( - try await self.serverChannel.configureAsyncHTTP2Pipeline( - mode: .server, - inboundStreamInitializer: { channel in - channel.eventLoop.makeCompletedFuture { - try NIOAsyncChannel( - synchronouslyWrapping: channel, - configuration: .init(inboundType: HTTP2Frame.FramePayload.self, outboundType: HTTP2Frame.FramePayload.self) - ) - } - } - ).get() - ) - - try await assertNoThrow(try await self.assertDoHandshake(client: self.clientChannel, server: self.serverChannel)) - - try await withThrowingTaskGroup(of: Int.self, returning: Void.self) { group in - // server - group.addTask { - var serverInboundChannelCount = 0 - for try await streamChannel in serverMultiplexer.inbound { - for try await receivedFrame in streamChannel.inboundStream { - receivedFrame.assertFramePayloadMatches(this: ConfiguringPipelineAsyncMultiplexerTests.requestFramePayload) - - try await streamChannel.outboundWriter.write(ConfiguringPipelineAsyncMultiplexerTests.responseFramePayload) - streamChannel.outboundWriter.finish() - - try await self.deliverAllBytes(from: self.serverChannel, to: self.clientChannel) - } - serverInboundChannelCount += 1 - } - return serverInboundChannelCount - } - - // client - for _ in 0 ..< requestCount { - let streamChannel = try await clientMultiplexer.createStreamChannel() { channel in - channel.eventLoop.makeCompletedFuture { - try NIOAsyncChannel( - synchronouslyWrapping: channel, - configuration: .init( - inboundType: HTTP2Frame.FramePayload.self, - outboundType: HTTP2Frame.FramePayload.self - ) - ) - } - } - // Let's try sending some requests - try await streamChannel.outboundWriter.write(ConfiguringPipelineAsyncMultiplexerTests.requestFramePayload) - streamChannel.outboundWriter.finish() - - try await self.deliverAllBytes(from: self.clientChannel, to: self.serverChannel) - - for try await receivedFrame in streamChannel.inboundStream { - receivedFrame.assertFramePayloadMatches(this: ConfiguringPipelineAsyncMultiplexerTests.responseFramePayload) - } - } - - try await assertNoThrow(try await self.clientChannel.finish()) - try await assertNoThrow(try await self.serverChannel.finish()) - - let serverInboundChannelCount = try await assertNoThrowWithValue(try await group.next()!) - XCTAssertEqual(serverInboundChannelCount, requestCount, "We should have created one server-side channel as a result of the one HTTP/2 stream used.") - } - } - - // `testNegotiatedHTTP2BasicPipelineCommunicates` ensures that a client-server system set up to use async stream abstractions - // can communicate successfully when HTTP/2 is negotiated. - func testNegotiatedHTTP2BasicPipelineCommunicates() async throws { - let requestCount = 100 - - let serverRecorder = InboundFramePayloadRecorder() - - let clientMultiplexer = try await assertNoThrowWithValue( - try await self.clientChannel.configureAsyncHTTP2Pipeline(mode: .client) { channel -> EventLoopFuture in - channel.eventLoop.makeSucceededFuture(channel) - }.get() - ) - - let nioProtocolNegotiationResult = try await self.serverChannel.configureAsyncHTTPServerPipeline() { channel in - channel.eventLoop.makeSucceededVoidFuture() - } http2ConnectionInitializer: { channel in - channel.eventLoop.makeSucceededVoidFuture() - } http2InboundStreamInitializer: { channel -> EventLoopFuture in - channel.pipeline.addHandlers([OKResponder(), serverRecorder]).map { _ in channel } - }.get() - - // Let's pretend the TLS handler did protocol negotiation for us - self.serverChannel.pipeline.fireUserInboundEventTriggered(TLSUserEvent.handshakeCompleted(negotiatedProtocol: "h2")) - - let negotiationResult = try await nioProtocolNegotiationResult.getResult() - - try await assertNoThrow(try await self.assertDoHandshake(client: self.clientChannel, server: self.serverChannel)) - - let serverMultiplexer: NIOHTTP2Handler.AsyncStreamMultiplexer - switch negotiationResult { - case .http1_1: - preconditionFailure("Negotiation result must be HTTP/2") - case .http2(let (_, multiplexer)): - serverMultiplexer = multiplexer - } - - try await withThrowingTaskGroup(of: Int.self, returning: Void.self) { group in - // server - group.addTask { - var serverInboundChannelCount = 0 - for try await _ in serverMultiplexer.inbound { - serverInboundChannelCount += 1 - } - return serverInboundChannelCount - } - - // client - for _ in 0 ..< requestCount { - // Let's try sending some requests - let streamChannel = try await clientMultiplexer.createStreamChannel { channel -> EventLoopFuture in - return channel.pipeline.addHandlers([SimpleRequest(), InboundFramePayloadRecorder()]).map { - return channel - } - } - - let clientRecorder = try await streamChannel.pipeline.handler(type: InboundFramePayloadRecorder.self).get() - - try await self.deliverAllBytes(from: self.clientChannel, to: self.serverChannel) - try await self.deliverAllBytes(from: self.serverChannel, to: self.clientChannel) - - clientRecorder.receivedFrames.assertFramePayloadsMatch([ConfiguringPipelineAsyncMultiplexerTests.responseFramePayload]) - try await streamChannel.closeFuture.get() - } - - try await assertNoThrow(try await self.clientChannel.finish()) - try await assertNoThrow(try await self.serverChannel.finish()) - - let serverInboundChannelCount = try await assertNoThrowWithValue(try await group.next()!) - XCTAssertEqual(serverInboundChannelCount, requestCount, "We should have created one server-side channel as a result of the each HTTP/2 stream used.") - } - - serverRecorder.receivedFrames.assertFramePayloadsMatch(Array(repeating: ConfiguringPipelineAsyncMultiplexerTests.requestFramePayload, count: requestCount)) - } - - // `testNegotiatedHTTP1BasicPipelineCommunicates` ensures that a client-server system set up to use async stream abstractions - // can communicate successfully when HTTP/1.1 is negotiated. - func testNegotiatedHTTP1BasicPipelineCommunicates() async throws { - let requestCount = 100 - - let _ = try await self.clientChannel.pipeline.addHTTPClientHandlers().map { _ in - self.clientChannel.pipeline.addHandlers([InboundRecorderHandler(), HTTP1ClientSendability()]) - }.get() - - let nioProtocolNegotiationResult = try await self.serverChannel.configureAsyncHTTPServerPipeline() { channel in - channel.pipeline.addHandlers([HTTP1OKResponder(), InboundRecorderHandler()]) - } http2ConnectionInitializer: { channel in - channel.eventLoop.makeSucceededVoidFuture() - } http2InboundStreamInitializer: { channel -> EventLoopFuture in - channel.eventLoop.makeSucceededFuture(channel) - }.get() - - // Let's pretend the TLS handler did protocol negotiation for us - self.serverChannel.pipeline.fireUserInboundEventTriggered(TLSUserEvent.handshakeCompleted(negotiatedProtocol: "http/1.1")) - - let negotiationResult = try await nioProtocolNegotiationResult.getResult() - - try await self.deliverAllBytes(from: self.clientChannel, to: self.serverChannel) - try await self.deliverAllBytes(from: self.serverChannel, to: self.clientChannel) - - switch negotiationResult { - case .http1_1: - break - case .http2: - preconditionFailure("Negotiation result must be http/1.1") - } - - // client - for _ in 0 ..< requestCount { - // Let's try sending some http/1.1 requests. - // we need to put these through a mapping to remove references to `IOData` which isn't Sendable - try await self.clientChannel.writeOutbound(HTTP1ClientSendability.RequestPart.head(ConfiguringPipelineAsyncMultiplexerTests.requestHead)) - try await self.clientChannel.writeOutbound(HTTP1ClientSendability.RequestPart.end(nil)) - try await self.deliverAllBytes(from: self.clientChannel, to: self.serverChannel) - try await self.deliverAllBytes(from: self.serverChannel, to: self.clientChannel) - } - - // check expectations - let clientRecorder = try await self.clientChannel.pipeline.handler(type: InboundRecorderHandler.self).get() - let serverRecorder = try await self.serverChannel.pipeline.handler(type: InboundRecorderHandler.self).get() - - XCTAssertEqual(serverRecorder.receivedParts.count, requestCount*2) - XCTAssertEqual(clientRecorder.receivedParts.count, requestCount*2) - - for i in 0 ..< requestCount { - XCTAssertEqual(serverRecorder.receivedParts[i*2], HTTPServerRequestPart.head(ConfiguringPipelineAsyncMultiplexerTests.requestHead), "Unexpected request part in iteration \(i)") - XCTAssertEqual(serverRecorder.receivedParts[i*2+1], HTTPServerRequestPart.end(nil), "Unexpected request part in iteration \(i)") - - XCTAssertEqual(clientRecorder.receivedParts[i*2], HTTPClientResponsePart.head(ConfiguringPipelineAsyncMultiplexerTests.responseHead), "Unexpected response part in iteration \(i)") - XCTAssertEqual(clientRecorder.receivedParts[i*2+1], HTTPClientResponsePart.end(nil), "Unexpected response part in iteration \(i)") - } - - try await assertNoThrow(try await self.clientChannel.finish()) - try await assertNoThrow(try await self.serverChannel.finish()) - } - - // Simple handler which maps client request parts to remove references to `IOData` which isn't Sendable - internal final class HTTP1ClientSendability: ChannelOutboundHandler { - public typealias RequestPart = HTTPPart - - typealias OutboundIn = RequestPart - typealias OutboundOut = HTTPClientRequestPart - - func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { - let requestPart = self.unwrapOutboundIn(data) - - let httpClientRequestPart: HTTPClientRequestPart - switch requestPart { - case .head(let head): - httpClientRequestPart = .head(head) - case .body(let byteBuffer): - httpClientRequestPart = .body(.byteBuffer(byteBuffer)) - case .end(let headers): - httpClientRequestPart = .end(headers) - } - - context.write(self.wrapOutboundOut(httpClientRequestPart), promise: promise) - } - } - - // Simple handler which maps server response parts to remove references to `IOData` which isn't Sendable - internal final class HTTP1ServerSendability: ChannelOutboundHandler { - public typealias ResponsePart = HTTPPart - - typealias OutboundIn = ResponsePart - typealias OutboundOut = HTTPServerResponsePart - - func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { - let responsePart = self.unwrapOutboundIn(data) - - let httpServerResponsePart: HTTPServerResponsePart - switch responsePart { - case .head(let head): - httpServerResponsePart = .head(head) - case .body(let byteBuffer): - httpServerResponsePart = .body(.byteBuffer(byteBuffer)) - case .end(let headers): - httpServerResponsePart = .end(headers) - } - - context.write(self.wrapOutboundOut(httpServerResponsePart), promise: promise) - } - } - - /// A simple channel handler that records inbound messages. - internal final class InboundRecorderHandler: ChannelInboundHandler, @unchecked Sendable { - typealias InboundIn = message - - private let partsLock = NIOLock() - private var _receivedParts: [message] = [] - - var receivedParts: [message] { - get { - self.partsLock.withLock { - self._receivedParts - } - } - set { - self.partsLock.withLock { - self._receivedParts = newValue - } - } - } - - func channelRead(context: ChannelHandlerContext, data: NIOAny) { - self.receivedParts.append(self.unwrapInboundIn(data)) - context.fireChannelRead(data) - } - } -} - -#if swift(<5.9) -// this should be available in the std lib from 5.9 onwards -extension AsyncStream { - fileprivate static func makeStream( - of elementType: Element.Type = Element.self, - bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded - ) -> (stream: AsyncStream, continuation: AsyncStream.Continuation) { - var continuation: AsyncStream.Continuation! - let stream = AsyncStream(bufferingPolicy: limit) { continuation = $0 } - return (stream: stream, continuation: continuation!) - } -} -#endif