From 93405f7e7f48b75883a41fc2f1da26da411c5d86 Mon Sep 17 00:00:00 2001 From: Franz Busch Date: Fri, 27 Oct 2023 12:22:21 +0100 Subject: [PATCH] Back out new typed HTTP protocol upgrader # Motivation We got reports in https://github.com/apple/swift-nio/issues/2574 that our new typed HTTP upgrader are hitting a Swift compiler bug which manifests in a runtime crash on older iOS/macOS/etc. # Modification This PR backs out the new typed HTTP protocol upgrader APIs so that we can unblock our users until the Swift compiler bug is fixed. # Result No more crashes for our users. --- Sources/NIOHTTP1/HTTPTypedPipelineSetup.swift | 248 --------- .../NIOTypedHTTPClientUpgradeHandler.swift | 283 ---------- ...OTypedHTTPClientUpgraderStateMachine.swift | 334 ----------- .../NIOTypedHTTPServerUpgradeHandler.swift | 369 ------------- ...OTypedHTTPServerUpgraderStateMachine.swift | 385 ------------- Sources/NIOTCPEchoClient/Client.swift | 2 +- Sources/NIOTCPEchoServer/Server.swift | 2 +- .../NIOWebSocketClientUpgrader.swift | 56 -- .../NIOWebSocketServerUpgrader.swift | 84 --- Sources/NIOWebSocketClient/Client.swift | 241 ++++---- Sources/NIOWebSocketServer/Server.swift | 461 ++++++++-------- .../HTTPClientUpgradeTests.swift | 235 +------- .../HTTPServerUpgradeTests.swift | 518 +----------------- .../WebSocketClientEndToEndTests.swift | 211 ------- .../WebSocketServerEndToEndTests.swift | 27 - 15 files changed, 367 insertions(+), 3089 deletions(-) delete mode 100644 Sources/NIOHTTP1/HTTPTypedPipelineSetup.swift delete mode 100644 Sources/NIOHTTP1/NIOTypedHTTPClientUpgradeHandler.swift delete mode 100644 Sources/NIOHTTP1/NIOTypedHTTPClientUpgraderStateMachine.swift delete mode 100644 Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift delete mode 100644 Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift diff --git a/Sources/NIOHTTP1/HTTPTypedPipelineSetup.swift b/Sources/NIOHTTP1/HTTPTypedPipelineSetup.swift deleted file mode 100644 index 57fe5fd780..0000000000 --- a/Sources/NIOHTTP1/HTTPTypedPipelineSetup.swift +++ /dev/null @@ -1,248 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftNIO open source project -// -// Copyright (c) 2023 Apple Inc. and the SwiftNIO project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftNIO project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// -import NIOCore - -// MARK: - Server pipeline configuration - -/// Configuration for an upgradable HTTP pipeline. -@available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) -public struct NIOUpgradableHTTPServerPipelineConfiguration { - /// Whether to provide assistance handling HTTP clients that pipeline - /// their requests. Defaults to `true`. If `false`, users will need to handle clients that pipeline themselves. - public var enablePipelining = true - - /// Whether to provide assistance handling protocol errors (e.g. failure to parse the HTTP - /// request) by sending 400 errors. Defaults to `true`. - public var enableErrorHandling = true - - /// Whether to validate outbound response headers to confirm that they are - /// spec compliant. Defaults to `true`. - public var enableResponseHeaderValidation = true - - /// The configuration for the ``HTTPResponseEncoder``. - public var encoderConfiguration = HTTPResponseEncoder.Configuration() - - /// The configuration for the ``NIOTypedHTTPServerUpgradeHandler``. - public var upgradeConfiguration: NIOTypedHTTPServerUpgradeConfiguration - - /// Initializes a new ``NIOUpgradableHTTPServerPipelineConfiguration`` with default values. - /// - /// The current defaults provide the following features: - /// 1. Assistance handling clients that pipeline HTTP requests. - /// 2. Assistance handling protocol errors. - /// 3. Outbound header fields validation to protect against response splitting attacks. - public init( - upgradeConfiguration: NIOTypedHTTPServerUpgradeConfiguration - ) { - self.upgradeConfiguration = upgradeConfiguration - } -} - -extension ChannelPipeline { - /// Configure a `ChannelPipeline` for use as an HTTP server. - /// - /// - Parameters: - /// - configuration: The HTTP pipeline's configuration. - /// - Returns: An `EventLoopFuture` that will fire when the pipeline is configured. The future contains an `EventLoopFuture` - /// that is fired once the pipeline has been upgraded or not and contains the `UpgradeResult`. - @available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) - public func configureUpgradableHTTPServerPipeline( - configuration: NIOUpgradableHTTPServerPipelineConfiguration - ) -> EventLoopFuture> { - self._configureUpgradableHTTPServerPipeline( - configuration: configuration - ) - } - - @available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) - private func _configureUpgradableHTTPServerPipeline( - configuration: NIOUpgradableHTTPServerPipelineConfiguration - ) -> EventLoopFuture> { - let future: EventLoopFuture> - - if self.eventLoop.inEventLoop { - let result = Result, Error> { - try self.syncOperations.configureUpgradableHTTPServerPipeline( - configuration: configuration - ) - } - future = self.eventLoop.makeCompletedFuture(result) - } else { - future = self.eventLoop.submit { - try self.syncOperations.configureUpgradableHTTPServerPipeline( - configuration: configuration - ) - } - } - - return future - } -} - -extension ChannelPipeline.SynchronousOperations { - /// Configure a `ChannelPipeline` for use as an HTTP server. - /// - /// - Parameters: - /// - configuration: The HTTP pipeline's configuration. - /// - Returns: An `EventLoopFuture` that is fired once the pipeline has been upgraded or not and contains the `UpgradeResult`. - @available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) - public func configureUpgradableHTTPServerPipeline( - configuration: NIOUpgradableHTTPServerPipelineConfiguration - ) throws -> EventLoopFuture { - self.eventLoop.assertInEventLoop() - - let responseEncoder = HTTPResponseEncoder(configuration: configuration.encoderConfiguration) - let requestDecoder = ByteToMessageHandler(HTTPRequestDecoder(leftOverBytesStrategy: .forwardBytes)) - - var extraHTTPHandlers = [RemovableChannelHandler]() - extraHTTPHandlers.reserveCapacity(4) - extraHTTPHandlers.append(requestDecoder) - - try self.addHandler(responseEncoder) - try self.addHandler(requestDecoder) - - if configuration.enablePipelining { - let pipeliningHandler = HTTPServerPipelineHandler() - try self.addHandler(pipeliningHandler) - extraHTTPHandlers.append(pipeliningHandler) - } - - if configuration.enableResponseHeaderValidation { - let headerValidationHandler = NIOHTTPResponseHeadersValidator() - try self.addHandler(headerValidationHandler) - extraHTTPHandlers.append(headerValidationHandler) - } - - if configuration.enableErrorHandling { - let errorHandler = HTTPServerProtocolErrorHandler() - try self.addHandler(errorHandler) - extraHTTPHandlers.append(errorHandler) - } - - let upgrader = NIOTypedHTTPServerUpgradeHandler( - httpEncoder: responseEncoder, - extraHTTPHandlers: extraHTTPHandlers, - upgradeConfiguration: configuration.upgradeConfiguration - ) - try self.addHandler(upgrader) - - return upgrader.upgradeResultFuture - } -} - -// MARK: - Client pipeline configuration - -/// Configuration for an upgradable HTTP pipeline. -@available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) -public struct NIOUpgradableHTTPClientPipelineConfiguration { - /// The strategy to use when dealing with leftover bytes after removing the ``HTTPDecoder`` from the pipeline. - public var leftOverBytesStrategy = RemoveAfterUpgradeStrategy.dropBytes - - /// Whether to validate outbound response headers to confirm that they are - /// spec compliant. Defaults to `true`. - public var enableOutboundHeaderValidation = true - - /// The configuration for the ``HTTPRequestEncoder``. - public var encoderConfiguration = HTTPRequestEncoder.Configuration() - - /// The configuration for the ``NIOTypedHTTPClientUpgradeHandler``. - public var upgradeConfiguration: NIOTypedHTTPClientUpgradeConfiguration - - /// Initializes a new ``NIOUpgradableHTTPClientPipelineConfiguration`` with default values. - /// - /// The current defaults provide the following features: - /// 1. Outbound header fields validation to protect against response splitting attacks. - public init( - upgradeConfiguration: NIOTypedHTTPClientUpgradeConfiguration - ) { - self.upgradeConfiguration = upgradeConfiguration - } -} - -extension ChannelPipeline { - /// Configure a `ChannelPipeline` for use as an HTTP client. - /// - /// - Parameters: - /// - configuration: The HTTP pipeline's configuration. - /// - Returns: An `EventLoopFuture` that will fire when the pipeline is configured. The future contains an `EventLoopFuture` - /// that is fired once the pipeline has been upgraded or not and contains the `UpgradeResult`. - @available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) - public func configureUpgradableHTTPClientPipeline( - configuration: NIOUpgradableHTTPClientPipelineConfiguration - ) -> EventLoopFuture> { - self._configureUpgradableHTTPClientPipeline(configuration: configuration) - } - - @available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) - private func _configureUpgradableHTTPClientPipeline( - configuration: NIOUpgradableHTTPClientPipelineConfiguration - ) -> EventLoopFuture> { - let future: EventLoopFuture> - - if self.eventLoop.inEventLoop { - let result = Result, Error> { - try self.syncOperations.configureUpgradableHTTPClientPipeline( - configuration: configuration - ) - } - future = self.eventLoop.makeCompletedFuture(result) - } else { - future = self.eventLoop.submit { - try self.syncOperations.configureUpgradableHTTPClientPipeline( - configuration: configuration - ) - } - } - - return future - } -} - -extension ChannelPipeline.SynchronousOperations { - /// Configure a `ChannelPipeline` for use as an HTTP client. - /// - /// - Parameters: - /// - configuration: The HTTP pipeline's configuration. - /// - Returns: An `EventLoopFuture` that is fired once the pipeline has been upgraded or not and contains the `UpgradeResult`. - @available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) - public func configureUpgradableHTTPClientPipeline( - configuration: NIOUpgradableHTTPClientPipelineConfiguration - ) throws -> EventLoopFuture { - self.eventLoop.assertInEventLoop() - - let requestEncoder = HTTPRequestEncoder(configuration: configuration.encoderConfiguration) - let responseDecoder = ByteToMessageHandler(HTTPResponseDecoder(leftOverBytesStrategy: configuration.leftOverBytesStrategy)) - var httpHandlers = [RemovableChannelHandler]() - httpHandlers.reserveCapacity(3) - httpHandlers.append(requestEncoder) - httpHandlers.append(responseDecoder) - - try self.addHandler(requestEncoder) - try self.addHandler(responseDecoder) - - if configuration.enableOutboundHeaderValidation { - let headerValidationHandler = NIOHTTPRequestHeadersValidator() - try self.addHandler(headerValidationHandler) - httpHandlers.append(headerValidationHandler) - } - - let upgrader = NIOTypedHTTPClientUpgradeHandler( - httpHandlers: httpHandlers, - upgradeConfiguration: configuration.upgradeConfiguration - ) - try self.addHandler(upgrader) - - return upgrader.upgradeResultFuture - } -} diff --git a/Sources/NIOHTTP1/NIOTypedHTTPClientUpgradeHandler.swift b/Sources/NIOHTTP1/NIOTypedHTTPClientUpgradeHandler.swift deleted file mode 100644 index f5a2f505ec..0000000000 --- a/Sources/NIOHTTP1/NIOTypedHTTPClientUpgradeHandler.swift +++ /dev/null @@ -1,283 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftNIO open source project -// -// Copyright (c) 2013 Apple Inc. and the SwiftNIO project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftNIO project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// -import NIOCore - -/// An object that implements `NIOTypedHTTPClientProtocolUpgrader` knows how to handle HTTP upgrade to -/// a protocol on a client-side channel. -/// It has the option of denying this upgrade based upon the server response. -public protocol NIOTypedHTTPClientProtocolUpgrader { - associatedtype UpgradeResult: Sendable - - /// The protocol this upgrader knows how to support. - var supportedProtocol: String { get } - - /// All the header fields the protocol requires in the request to successfully upgrade. - /// These header fields will be added to the outbound request's "Connection" header field. - /// It is the responsibility of the custom headers call to actually add these required headers. - var requiredUpgradeHeaders: [String] { get } - - /// Additional headers to be added to the request, beyond the "Upgrade" and "Connection" headers. - func addCustom(upgradeRequestHeaders: inout HTTPHeaders) - - /// Gives the receiving upgrader the chance to deny the upgrade based on the upgrade HTTP response. - func shouldAllowUpgrade(upgradeResponse: HTTPResponseHead) -> Bool - - /// Called when the upgrade response has been flushed. At this time it is safe to mutate the channel - /// pipeline to add whatever channel handlers are required. - /// Until the returned `EventLoopFuture` succeeds, all received data will be buffered. - func upgrade(channel: Channel, upgradeResponse: HTTPResponseHead) -> EventLoopFuture -} - -/// The upgrade configuration for the ``NIOTypedHTTPClientUpgradeHandler``. -@available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) -public struct NIOTypedHTTPClientUpgradeConfiguration { - /// The initial request head that is sent out once the channel becomes active. - public var upgradeRequestHead: HTTPRequestHead - - /// The array of potential upgraders. - public var upgraders: [any NIOTypedHTTPClientProtocolUpgrader] - - /// A closure that is run once it is determined that no protocol upgrade is happening. This can be used - /// to configure handlers that expect HTTP. - public var notUpgradingCompletionHandler: @Sendable (Channel) -> EventLoopFuture - - public init( - upgradeRequestHead: HTTPRequestHead, - upgraders: [any NIOTypedHTTPClientProtocolUpgrader], - notUpgradingCompletionHandler: @Sendable @escaping (Channel) -> EventLoopFuture - ) { - precondition(upgraders.count > 0, "A minimum of one protocol upgrader must be specified.") - self.upgradeRequestHead = upgradeRequestHead - self.upgraders = upgraders - self.notUpgradingCompletionHandler = notUpgradingCompletionHandler - } -} - -/// A client-side channel handler that sends a HTTP upgrade handshake request to perform a HTTP-upgrade. -/// This handler will add all appropriate headers to perform an upgrade to -/// the a protocol. It may add headers for a set of protocols in preference order. -/// If the upgrade fails (i.e. response is not 101 Switching Protocols), this handler simply -/// removes itself from the pipeline. If the upgrade is successful, it upgrades the pipeline to the new protocol. -/// -/// The request sends an order of preference to request which protocol it would like to use for the upgrade. -/// It will only upgrade to the protocol that is returned first in the list and does not currently -/// have the capability to upgrade to multiple simultaneous layered protocols. -@available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) -public final class NIOTypedHTTPClientUpgradeHandler: ChannelDuplexHandler, RemovableChannelHandler { - public typealias OutboundIn = HTTPClientRequestPart - public typealias OutboundOut = HTTPClientRequestPart - public typealias InboundIn = HTTPClientResponsePart - public typealias InboundOut = HTTPClientResponsePart - - /// The upgrade future which will be completed once protocol upgrading has been done. - public var upgradeResultFuture: EventLoopFuture { - self.upgradeResultPromise.futureResult - } - - private let upgradeRequestHead: HTTPRequestHead - private let httpHandlers: [RemovableChannelHandler] - private let notUpgradingCompletionHandler: @Sendable (Channel) -> EventLoopFuture - private var stateMachine: NIOTypedHTTPClientUpgraderStateMachine - private var _upgradeResultPromise: EventLoopPromise? - private var upgradeResultPromise: EventLoopPromise { - precondition( - self._upgradeResultPromise != nil, - "Tried to access the upgrade result before the handler was added to a pipeline" - ) - return self._upgradeResultPromise! - } - - /// Create a ``NIOTypedHTTPClientUpgradeHandler``. - /// - /// - Parameters: - /// - httpHandlers: All `RemovableChannelHandler` objects which will be removed from the pipeline - /// once the upgrade response is sent. This is used to ensure that the pipeline will be in a clean state - /// after the upgrade. It should include any handlers that are directly related to handling HTTP. - /// At the very least this should include the `HTTPEncoder` and `HTTPDecoder`, but should also include - /// any other handler that cannot tolerate receiving non-HTTP data. - /// - upgradeConfiguration: The upgrade configuration. - public init( - httpHandlers: [RemovableChannelHandler], - upgradeConfiguration: NIOTypedHTTPClientUpgradeConfiguration - ) { - self.httpHandlers = httpHandlers - var upgradeRequestHead = upgradeConfiguration.upgradeRequestHead - Self.addHeaders( - to: &upgradeRequestHead, - upgraders: upgradeConfiguration.upgraders - ) - self.upgradeRequestHead = upgradeRequestHead - self.stateMachine = .init(upgraders: upgradeConfiguration.upgraders) - self.notUpgradingCompletionHandler = upgradeConfiguration.notUpgradingCompletionHandler - } - - public func handlerAdded(context: ChannelHandlerContext) { - self._upgradeResultPromise = context.eventLoop.makePromise(of: UpgradeResult.self) - } - - public func handlerRemoved(context: ChannelHandlerContext) { - switch self.stateMachine.handlerRemoved() { - case .failUpgradePromise: - self.upgradeResultPromise.fail(ChannelError.inappropriateOperationForState) - case .none: - break - } - } - - public func channelActive(context: ChannelHandlerContext) { - switch self.stateMachine.channelActive() { - case .writeUpgradeRequest: - context.write(self.wrapOutboundOut(.head(self.upgradeRequestHead)), promise: nil) - context.write(self.wrapOutboundOut(.body(.byteBuffer(.init()))), promise: nil) - context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) - - case .none: - break - } - } - - private static func addHeaders( - to requestHead: inout HTTPRequestHead, - upgraders: [any NIOTypedHTTPClientProtocolUpgrader] - ) { - let requiredHeaders = ["upgrade"] + upgraders.flatMap { $0.requiredUpgradeHeaders } - requestHead.headers.add(name: "Connection", value: requiredHeaders.joined(separator: ",")) - - let allProtocols = upgraders.map { $0.supportedProtocol.lowercased() } - requestHead.headers.add(name: "Upgrade", value: allProtocols.joined(separator: ",")) - - // Allow each upgrader the chance to add custom headers. - for upgrader in upgraders { - upgrader.addCustom(upgradeRequestHeaders: &requestHead.headers) - } - } - - public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { - switch self.stateMachine.write() { - case .failWrite(let error): - promise?.fail(error) - - case .forwardWrite: - context.write(data, promise: promise) - } - } - - public func channelRead(context: ChannelHandlerContext, data: NIOAny) { - switch self.stateMachine.channelReadData(data) { - case .unwrapData: - let responsePart = self.unwrapInboundIn(data) - self.channelRead(context: context, responsePart: responsePart) - - case .fireChannelRead: - context.fireChannelRead(data) - - case .none: - break - } - } - - private func channelRead(context: ChannelHandlerContext, responsePart: HTTPClientResponsePart) { - switch self.stateMachine.channelReadResponsePart(responsePart) { - case .fireErrorCaughtAndRemoveHandler(let error): - self.upgradeResultPromise.fail(error) - context.fireErrorCaught(error) - context.pipeline.removeHandler(self, promise: nil) - - case .runNotUpgradingInitializer: - self.notUpgradingCompletionHandler(context.channel) - .hop(to: context.eventLoop) - .whenComplete { result in - self.upgradingHandlerCompleted(context: context, result) - } - - case .startUpgrading(let upgrader, let responseHead): - self.startUpgrading( - context: context, - upgrader: upgrader, - responseHead: responseHead - ) - - case .none: - break - } - } - - private func startUpgrading( - context: ChannelHandlerContext, - upgrader: any NIOTypedHTTPClientProtocolUpgrader, - responseHead: HTTPResponseHead - ) { - // Before we start the upgrade we have to remove the HTTPEncoder and HTTPDecoder handlers from the - // pipeline, to prevent them parsing any more data. We'll buffer the incoming data until that completes. - self.removeHTTPHandlers(context: context) - .flatMap { - upgrader.upgrade(channel: context.channel, upgradeResponse: responseHead) - }.hop(to: context.eventLoop) - .whenComplete { result in - self.upgradingHandlerCompleted(context: context, result) - } - } - - private func upgradingHandlerCompleted( - context: ChannelHandlerContext, - _ result: Result - ) { - switch self.stateMachine.upgradingHandlerCompleted(result) { - case .fireErrorCaughtAndRemoveHandler(let error): - self.upgradeResultPromise.fail(error) - context.fireErrorCaught(error) - context.pipeline.removeHandler(self, promise: nil) - - case .fireErrorCaughtAndStartUnbuffering(let error): - self.upgradeResultPromise.fail(error) - context.fireErrorCaught(error) - self.unbuffer(context: context) - - case .startUnbuffering(let value): - self.upgradeResultPromise.succeed(value) - self.unbuffer(context: context) - - case .removeHandler(let value): - self.upgradeResultPromise.succeed(value) - context.pipeline.removeHandler(self, promise: nil) - - case .none: - break - } - } - - private func unbuffer(context: ChannelHandlerContext) { - while true { - switch self.stateMachine.unbuffer() { - case .fireChannelRead(let data): - context.fireChannelRead(data) - - case .fireChannelReadCompleteAndRemoveHandler: - context.fireChannelReadComplete() - context.pipeline.removeHandler(self, promise: nil) - return - } - } - } - - /// Removes any extra HTTP-related handlers from the channel pipeline. - private func removeHTTPHandlers(context: ChannelHandlerContext) -> EventLoopFuture { - guard self.httpHandlers.count > 0 else { - return context.eventLoop.makeSucceededFuture(()) - } - - let removeFutures = self.httpHandlers.map { context.pipeline.removeHandler($0) } - return .andAllSucceed(removeFutures, on: context.eventLoop) - } -} diff --git a/Sources/NIOHTTP1/NIOTypedHTTPClientUpgraderStateMachine.swift b/Sources/NIOHTTP1/NIOTypedHTTPClientUpgraderStateMachine.swift deleted file mode 100644 index fa04481ea9..0000000000 --- a/Sources/NIOHTTP1/NIOTypedHTTPClientUpgraderStateMachine.swift +++ /dev/null @@ -1,334 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftNIO open source project -// -// Copyright (c) 2023 Apple Inc. and the SwiftNIO project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftNIO project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import DequeModule -import NIOCore - -@available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) -struct NIOTypedHTTPClientUpgraderStateMachine { - @usableFromInline - enum State { - /// The state before we received a TLSUserEvent. We are just forwarding any read at this point. - case initial(upgraders: [any NIOTypedHTTPClientProtocolUpgrader]) - - /// The request has been sent. We are waiting for the upgrade response. - case awaitingUpgradeResponseHead(upgraders: [any NIOTypedHTTPClientProtocolUpgrader]) - - @usableFromInline - struct AwaitingUpgradeResponseEnd { - var upgrader: any NIOTypedHTTPClientProtocolUpgrader - var responseHead: HTTPResponseHead - } - /// We received the response head and are just waiting for the response end. - case awaitingUpgradeResponseEnd(AwaitingUpgradeResponseEnd) - - @usableFromInline - struct Upgrading { - var buffer: Deque - } - /// We are either running the upgrading handler. - case upgrading(Upgrading) - - @usableFromInline - struct Unbuffering { - var buffer: Deque - } - case unbuffering(Unbuffering) - - case finished - - case modifying - } - - private var state: State - - init(upgraders: [any NIOTypedHTTPClientProtocolUpgrader]) { - self.state = .initial(upgraders: upgraders) - } - - @usableFromInline - enum HandlerRemovedAction { - case failUpgradePromise - } - - @inlinable - mutating func handlerRemoved() -> HandlerRemovedAction? { - switch self.state { - case .initial, .awaitingUpgradeResponseHead, .awaitingUpgradeResponseEnd, .upgrading, .unbuffering: - self.state = .finished - return .failUpgradePromise - - case .finished: - return .none - - case .modifying: - fatalError("Internal inconsistency in HTTPClientUpgradeStateMachine") - } - } - - @usableFromInline - enum ChannelActiveAction { - case writeUpgradeRequest - } - - @inlinable - mutating func channelActive() -> ChannelActiveAction? { - switch self.state { - case .initial(let upgraders): - self.state = .awaitingUpgradeResponseHead(upgraders: upgraders) - return .writeUpgradeRequest - - case .finished: - return nil - - case .awaitingUpgradeResponseHead, .awaitingUpgradeResponseEnd, .unbuffering, .upgrading: - fatalError("Internal inconsistency in HTTPClientUpgradeStateMachine") - - case .modifying: - fatalError("Internal inconsistency in HTTPClientUpgradeStateMachine") - } - } - - @usableFromInline - enum WriteAction { - case failWrite(Error) - case forwardWrite - } - - @usableFromInline - func write() -> WriteAction { - switch self.state { - case .initial, .awaitingUpgradeResponseHead, .awaitingUpgradeResponseEnd, .upgrading: - return .failWrite(NIOHTTPClientUpgradeError.writingToHandlerDuringUpgrade) - - case .unbuffering, .finished: - return .forwardWrite - - case .modifying: - fatalError("Internal inconsistency in HTTPClientUpgradeStateMachine") - } - } - - @usableFromInline - enum ChannelReadDataAction { - case unwrapData - case fireChannelRead - } - - @inlinable - mutating func channelReadData(_ data: NIOAny) -> ChannelReadDataAction? { - switch self.state { - case .initial: - return .unwrapData - - case .awaitingUpgradeResponseHead, .awaitingUpgradeResponseEnd: - return .unwrapData - - case .upgrading(var upgrading): - // We got a read while running upgrading. - // We have to buffer the read to unbuffer it afterwards - self.state = .modifying - upgrading.buffer.append(data) - self.state = .upgrading(upgrading) - return nil - - case .unbuffering(var unbuffering): - self.state = .modifying - unbuffering.buffer.append(data) - self.state = .unbuffering(unbuffering) - return nil - - case .finished: - return .fireChannelRead - - case .modifying: - fatalError("Internal inconsistency in HTTPServerUpgradeStateMachine") - } - } - - - @usableFromInline - enum ChannelReadResponsePartAction { - case fireErrorCaughtAndRemoveHandler(Error) - case runNotUpgradingInitializer - case startUpgrading( - upgrader: any NIOTypedHTTPClientProtocolUpgrader, - responseHeaders: HTTPResponseHead - ) - } - - @inlinable - mutating func channelReadResponsePart(_ responsePart: HTTPClientResponsePart) -> ChannelReadResponsePartAction? { - switch self.state { - case .initial: - fatalError("Internal inconsistency in HTTPClientUpgradeStateMachine") - - case .awaitingUpgradeResponseHead(let upgraders): - // We should decide if we can upgrade based on the first response header: if we aren't upgrading, - // by the time the body comes in we should be out of the pipeline. That means that if we don't think we're - // upgrading, the only thing we should see is a response head. Anything else in an error. - guard case .head(let response) = responsePart else { - self.state = .finished - return .fireErrorCaughtAndRemoveHandler(NIOHTTPClientUpgradeError.invalidHTTPOrdering) - } - - // Assess whether the server has accepted our upgrade request. - guard case .switchingProtocols = response.status else { - var buffer = Deque() - buffer.append(.init(responsePart)) - self.state = .upgrading(.init(buffer: buffer)) - return .runNotUpgradingInitializer - } - - // Ok, we have a HTTP response. Check if it's an upgrade confirmation. - // If it's not, we want to pass it on and remove ourselves from the channel pipeline. - let acceptedProtocols = response.headers[canonicalForm: "upgrade"] - - // At the moment we only upgrade to the first protocol returned from the server. - guard let protocolName = acceptedProtocols.first?.lowercased() else { - // There are no upgrade protocols returned. - self.state = .finished - return .fireErrorCaughtAndRemoveHandler(NIOHTTPClientUpgradeError.responseProtocolNotFound) - } - - let matchingUpgrader = upgraders - .first(where: { $0.supportedProtocol.lowercased() == protocolName }) - - guard let upgrader = matchingUpgrader else { - // There is no upgrader for this protocol. - self.state = .finished - return .fireErrorCaughtAndRemoveHandler(NIOHTTPClientUpgradeError.responseProtocolNotFound) - } - - guard upgrader.shouldAllowUpgrade(upgradeResponse: response) else { - // The upgrader says no. - self.state = .finished - return .fireErrorCaughtAndRemoveHandler(NIOHTTPClientUpgradeError.upgraderDeniedUpgrade) - } - - // We received the response head and decided that we can upgrade. - // We now need to wait for the response end and then we can perform the upgrade - self.state = .awaitingUpgradeResponseEnd(.init( - upgrader: upgrader, - responseHead: response - )) - return .none - - case .awaitingUpgradeResponseEnd(let awaitingUpgradeResponseEnd): - switch responsePart { - case .head: - // We got two HTTP response heads. - self.state = .finished - return .fireErrorCaughtAndRemoveHandler(NIOHTTPClientUpgradeError.invalidHTTPOrdering) - - case .body: - // We tolerate body parts to be send but just ignore them - return .none - - case .end: - // We got the response end and can now run the upgrader. - self.state = .upgrading(.init(buffer: .init())) - return .startUpgrading( - upgrader: awaitingUpgradeResponseEnd.upgrader, - responseHeaders: awaitingUpgradeResponseEnd.responseHead - ) - } - - case .upgrading, .unbuffering, .finished: - fatalError("Internal inconsistency in HTTPClientUpgradeStateMachine") - - - case .modifying: - fatalError("Internal inconsistency in HTTPClientUpgradeStateMachine") - } - } - - @usableFromInline - enum UpgradingHandlerCompletedAction { - case fireErrorCaughtAndStartUnbuffering(Error) - case removeHandler(UpgradeResult) - case fireErrorCaughtAndRemoveHandler(Error) - case startUnbuffering(UpgradeResult) - } - - @inlinable - mutating func upgradingHandlerCompleted(_ result: Result) -> UpgradingHandlerCompletedAction? { - switch self.state { - case .initial, .awaitingUpgradeResponseHead, .awaitingUpgradeResponseEnd, .unbuffering: - fatalError("Internal inconsistency in HTTPClientUpgradeStateMachine") - - case .upgrading(let upgrading): - switch result { - case .success(let value): - if !upgrading.buffer.isEmpty { - self.state = .unbuffering(.init(buffer: upgrading.buffer)) - return .startUnbuffering(value) - } else { - self.state = .finished - return .removeHandler(value) - } - - case .failure(let error): - if !upgrading.buffer.isEmpty { - // So we failed to upgrade. There is nothing really that we can do here. - // We are unbuffering the reads but there shouldn't be any handler in the pipeline - // that expects a specific type of reads anyhow. - self.state = .unbuffering(.init(buffer: upgrading.buffer)) - return .fireErrorCaughtAndStartUnbuffering(error) - } else { - self.state = .finished - return .fireErrorCaughtAndRemoveHandler(error) - } - } - - case .finished: - // We have to tolerate this - return nil - - case .modifying: - fatalError("Internal inconsistency in HTTPClientUpgradeStateMachine") - } - } - - @usableFromInline - enum UnbufferAction { - case fireChannelRead(NIOAny) - case fireChannelReadCompleteAndRemoveHandler - } - - @inlinable - mutating func unbuffer() -> UnbufferAction { - switch self.state { - case .initial, .awaitingUpgradeResponseHead, .awaitingUpgradeResponseEnd, .upgrading, .finished: - preconditionFailure("Invalid state \(self.state)") - - case .unbuffering(var unbuffering): - self.state = .modifying - - if let element = unbuffering.buffer.popFirst() { - self.state = .unbuffering(unbuffering) - - return .fireChannelRead(element) - } else { - self.state = .finished - - return .fireChannelReadCompleteAndRemoveHandler - } - - case .modifying: - fatalError("Internal inconsistency in HTTPClientUpgradeStateMachine") - - } - } -} diff --git a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift deleted file mode 100644 index 55b21e5982..0000000000 --- a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift +++ /dev/null @@ -1,369 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftNIO open source project -// -// Copyright (c) 2023 Apple Inc. and the SwiftNIO project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftNIO project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// -import NIOCore - -/// An object that implements `NIOTypedHTTPServerProtocolUpgrader` knows how to handle HTTP upgrade to -/// a protocol on a server-side channel. -public protocol NIOTypedHTTPServerProtocolUpgrader { - associatedtype UpgradeResult: Sendable - - /// The protocol this upgrader knows how to support. - var supportedProtocol: String { get } - - /// All the header fields the protocol needs in the request to successfully upgrade. These header fields - /// will be provided to the handler when it is asked to handle the upgrade. They will also be validated - /// against the inbound request's `Connection` header field. - var requiredUpgradeHeaders: [String] { get } - - /// Builds the upgrade response headers. Should return any headers that need to be supplied to the client - /// in the 101 Switching Protocols response. If upgrade cannot proceed for any reason, this function should - /// return a failed future. - func buildUpgradeResponse( - channel: Channel, - upgradeRequest: HTTPRequestHead, - initialResponseHeaders: HTTPHeaders - ) -> EventLoopFuture - - /// Called when the upgrade response has been flushed. At this time it is safe to mutate the channel pipeline - /// to add whatever channel handlers are required. Until the returned `EventLoopFuture` succeeds, all received - /// data will be buffered. - func upgrade( - channel: Channel, - upgradeRequest: HTTPRequestHead - ) -> EventLoopFuture -} - -/// The upgrade configuration for the ``NIOTypedHTTPServerUpgradeHandler``. -@available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) -public struct NIOTypedHTTPServerUpgradeConfiguration { - /// The array of potential upgraders. - public var upgraders: [any NIOTypedHTTPServerProtocolUpgrader] - - /// A closure that is run once it is determined that no protocol upgrade is happening. This can be used - /// to configure handlers that expect HTTP. - public var notUpgradingCompletionHandler: @Sendable (Channel) -> EventLoopFuture - - public init( - upgraders: [any NIOTypedHTTPServerProtocolUpgrader], - notUpgradingCompletionHandler: @Sendable @escaping (Channel) -> EventLoopFuture - ) { - self.upgraders = upgraders - self.notUpgradingCompletionHandler = notUpgradingCompletionHandler - } -} - -/// A server-side channel handler that receives HTTP requests and optionally performs an HTTP-upgrade. -/// -/// Removes itself from the channel pipeline after the first inbound request on the connection, regardless of -/// whether the upgrade succeeded or not. -/// -/// This handler behaves a bit differently from its Netty counterpart because it does not allow upgrade -/// on any request but the first on a connection. This is primarily to handle clients that pipeline: it's -/// sufficiently difficult to ensure that the upgrade happens at a safe time while dealing with pipelined -/// requests that we choose to punt on it entirely and not allow it. As it happens this is mostly fine: -/// the odds of someone needing to upgrade midway through the lifetime of a connection are very low. -@available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) -public final class NIOTypedHTTPServerUpgradeHandler: ChannelInboundHandler, RemovableChannelHandler { - public typealias InboundIn = HTTPServerRequestPart - public typealias InboundOut = HTTPServerRequestPart - public typealias OutboundOut = HTTPServerResponsePart - - private let upgraders: [String: any NIOTypedHTTPServerProtocolUpgrader] - private let notUpgradingCompletionHandler: @Sendable (Channel) -> EventLoopFuture - private let httpEncoder: HTTPResponseEncoder - private let extraHTTPHandlers: [RemovableChannelHandler] - private var stateMachine = NIOTypedHTTPServerUpgraderStateMachine() - - private var _upgradeResultPromise: EventLoopPromise? - private var upgradeResultPromise: EventLoopPromise { - precondition( - self._upgradeResultPromise != nil, - "Tried to access the upgrade result before the handler was added to a pipeline" - ) - return self._upgradeResultPromise! - } - - /// The upgrade future which will be completed once protocol upgrading has been done. - public var upgradeResultFuture: EventLoopFuture { - self.upgradeResultPromise.futureResult - } - - /// Create a ``NIOTypedHTTPServerUpgradeHandler``. - /// - /// - Parameters: - /// - httpEncoder: The ``HTTPResponseEncoder`` encoding responses from this handler and which will - /// be removed from the pipeline once the upgrade response is sent. This is used to ensure - /// that the pipeline will be in a clean state after upgrade. - /// - extraHTTPHandlers: Any other handlers that are directly related to handling HTTP. At the very least - /// this should include the `HTTPDecoder`, but should also include any other handler that cannot tolerate - /// receiving non-HTTP data. - /// - upgradeConfiguration: The upgrade configuration. - public init( - httpEncoder: HTTPResponseEncoder, - extraHTTPHandlers: [RemovableChannelHandler], - upgradeConfiguration: NIOTypedHTTPServerUpgradeConfiguration - ) { - var upgraderMap = [String: any NIOTypedHTTPServerProtocolUpgrader]() - for upgrader in upgradeConfiguration.upgraders { - upgraderMap[upgrader.supportedProtocol.lowercased()] = upgrader - } - self.upgraders = upgraderMap - self.notUpgradingCompletionHandler = upgradeConfiguration.notUpgradingCompletionHandler - self.httpEncoder = httpEncoder - self.extraHTTPHandlers = extraHTTPHandlers - } - - public func handlerAdded(context: ChannelHandlerContext) { - self._upgradeResultPromise = context.eventLoop.makePromise(of: UpgradeResult.self) - } - - public func handlerRemoved(context: ChannelHandlerContext) { - switch self.stateMachine.handlerRemoved() { - case .failUpgradePromise: - self.upgradeResultPromise.fail(ChannelError.inappropriateOperationForState) - case .none: - break - } - } - - public func channelRead(context: ChannelHandlerContext, data: NIOAny) { - switch self.stateMachine.channelReadData(data) { - case .unwrapData: - let requestPart = self.unwrapInboundIn(data) - self.channelRead(context: context, requestPart: requestPart) - - case .fireChannelRead: - context.fireChannelRead(data) - - case .none: - break - } - } - - private func channelRead(context: ChannelHandlerContext, requestPart: HTTPServerRequestPart) { - switch self.stateMachine.channelReadRequestPart(requestPart) { - case .failUpgradePromise(let error): - self.upgradeResultPromise.fail(error) - - case .runNotUpgradingInitializer: - self.notUpgradingCompletionHandler(context.channel) - .hop(to: context.eventLoop) - .whenComplete { result in - self.upgradingHandlerCompleted(context: context, result, requestHeadAndProtocol: nil) - } - - case .findUpgrader(let head, let requestedProtocols, let allHeaderNames, let connectionHeader): - let protocolIterator = requestedProtocols.makeIterator() - self.handleUpgradeForProtocol( - context: context, - protocolIterator: protocolIterator, - request: head, - allHeaderNames: allHeaderNames, - connectionHeader: connectionHeader - ).whenComplete { result in - context.eventLoop.assertInEventLoop() - self.findingUpgradeCompleted(context: context, requestHead: head, result) - } - - case .startUpgrading(let upgrader, let requestHead, let responseHeaders, let proto): - self.startUpgrading( - context: context, - upgrader: upgrader, - requestHead: requestHead, - responseHeaders: responseHeaders, - proto: proto - ) - - case .none: - break - } - } - - private func upgradingHandlerCompleted( - context: ChannelHandlerContext, - _ result: Result, - requestHeadAndProtocol: (HTTPRequestHead, String)? - ) { - switch self.stateMachine.upgradingHandlerCompleted(result) { - case .fireErrorCaughtAndRemoveHandler(let error): - self.upgradeResultPromise.fail(error) - context.fireErrorCaught(error) - context.pipeline.removeHandler(self, promise: nil) - - case .fireErrorCaughtAndStartUnbuffering(let error): - self.upgradeResultPromise.fail(error) - context.fireErrorCaught(error) - self.unbuffer(context: context) - - case .startUnbuffering(let value): - if let requestHeadAndProtocol = requestHeadAndProtocol { - context.fireUserInboundEventTriggered(HTTPServerUpgradeEvents.upgradeComplete(toProtocol: requestHeadAndProtocol.1, upgradeRequest: requestHeadAndProtocol.0)) - } - self.upgradeResultPromise.succeed(value) - self.unbuffer(context: context) - - case .removeHandler(let value): - if let requestHeadAndProtocol = requestHeadAndProtocol { - context.fireUserInboundEventTriggered(HTTPServerUpgradeEvents.upgradeComplete(toProtocol: requestHeadAndProtocol.1, upgradeRequest: requestHeadAndProtocol.0)) - } - self.upgradeResultPromise.succeed(value) - context.pipeline.removeHandler(self, promise: nil) - - case .none: - break - } - } - - /// Attempt to upgrade a single protocol. - /// - /// Will recurse through `protocolIterator` if upgrade fails. - private func handleUpgradeForProtocol( - context: ChannelHandlerContext, - protocolIterator: Array.Iterator, - request: HTTPRequestHead, - allHeaderNames: Set, - connectionHeader: Set - ) -> EventLoopFuture<(upgrader: any NIOTypedHTTPServerProtocolUpgrader, responseHeaders: HTTPHeaders, proto: String)?> { - // We want a local copy of the protocol iterator. We'll pass it to the next invocation of the function. - var protocolIterator = protocolIterator - guard let proto = protocolIterator.next() else { - // We're done! No suitable protocol for upgrade. - return context.eventLoop.makeSucceededFuture(nil) - } - - guard let upgrader = self.upgraders[proto.lowercased()] else { - return self.handleUpgradeForProtocol(context: context, protocolIterator: protocolIterator, request: request, allHeaderNames: allHeaderNames, connectionHeader: connectionHeader) - } - - let requiredHeaders = Set(upgrader.requiredUpgradeHeaders.map { $0.lowercased() }) - guard requiredHeaders.isSubset(of: allHeaderNames) && requiredHeaders.isSubset(of: connectionHeader) else { - return self.handleUpgradeForProtocol(context: context, protocolIterator: protocolIterator, request: request, allHeaderNames: allHeaderNames, connectionHeader: connectionHeader) - } - - let responseHeaders = self.buildUpgradeHeaders(protocol: proto) - return upgrader.buildUpgradeResponse( - channel: context.channel, - upgradeRequest: request, - initialResponseHeaders: responseHeaders - ) - .hop(to: context.eventLoop) - .map { (upgrader, $0, proto) } - .flatMapError { error in - // No upgrade here. We want to fire the error down the pipeline, and then try another loop iteration. - context.fireErrorCaught(error) - return self.handleUpgradeForProtocol(context: context, protocolIterator: protocolIterator, request: request, allHeaderNames: allHeaderNames, connectionHeader: connectionHeader) - } - } - - private func findingUpgradeCompleted( - context: ChannelHandlerContext, - requestHead: HTTPRequestHead, - _ result: Result<(upgrader: any NIOTypedHTTPServerProtocolUpgrader, responseHeaders: HTTPHeaders, proto: String)?, Error> - ) { - switch self.stateMachine.findingUpgraderCompleted(requestHead: requestHead, result) { - case .startUpgrading(let upgrader, let responseHeaders, let proto): - self.startUpgrading( - context: context, - upgrader: upgrader, - requestHead: requestHead, - responseHeaders: responseHeaders, - proto: proto - ) - - case .runNotUpgradingInitializer: - self.notUpgradingCompletionHandler(context.channel) - .hop(to: context.eventLoop) - .whenComplete { result in - self.upgradingHandlerCompleted(context: context, result, requestHeadAndProtocol: nil) - } - - case .fireErrorCaughtAndStartUnbuffering(let error): - self.upgradeResultPromise.fail(error) - context.fireErrorCaught(error) - self.unbuffer(context: context) - - case .fireErrorCaughtAndRemoveHandler(let error): - self.upgradeResultPromise.fail(error) - context.fireErrorCaught(error) - context.pipeline.removeHandler(self, promise: nil) - - case .none: - break - } - } - - private func startUpgrading( - context: ChannelHandlerContext, - upgrader: any NIOTypedHTTPServerProtocolUpgrader, - requestHead: HTTPRequestHead, - responseHeaders: HTTPHeaders, - proto: String - ) { - // Before we finish the upgrade we have to remove the HTTPDecoder and any other non-Encoder HTTP - // handlers from the pipeline, to prevent them parsing any more data. We'll buffer the data until - // that completes. - // While there are a lot of Futures involved here it's quite possible that all of this code will - // actually complete synchronously: we just want to program for the possibility that it won't. - // Once that's done, we send the upgrade response, then remove the HTTP encoder, then call the - // internal handler, then call the user code, and then finally when the user code is done we do - // our final cleanup steps, namely we replay the received data we buffered in the meantime and - // then remove ourselves from the pipeline. - self.removeExtraHandlers(context: context).flatMap { - self.sendUpgradeResponse(context: context, responseHeaders: responseHeaders) - }.flatMap { - context.pipeline.removeHandler(self.httpEncoder) - }.flatMap { () -> EventLoopFuture in - return upgrader.upgrade(channel: context.channel, upgradeRequest: requestHead) - }.hop(to: context.eventLoop) - .whenComplete { result in - self.upgradingHandlerCompleted(context: context, result, requestHeadAndProtocol: (requestHead, proto)) - } - } - - /// Sends the 101 Switching Protocols response for the pipeline. - private func sendUpgradeResponse(context: ChannelHandlerContext, responseHeaders: HTTPHeaders) -> EventLoopFuture { - var response = HTTPResponseHead(version: .http1_1, status: .switchingProtocols) - response.headers = responseHeaders - return context.writeAndFlush(wrapOutboundOut(HTTPServerResponsePart.head(response))) - } - - /// Builds the initial mandatory HTTP headers for HTTP upgrade responses. - private func buildUpgradeHeaders(`protocol`: String) -> HTTPHeaders { - return HTTPHeaders([("connection", "upgrade"), ("upgrade", `protocol`)]) - } - - /// Removes any extra HTTP-related handlers from the channel pipeline. - private func removeExtraHandlers(context: ChannelHandlerContext) -> EventLoopFuture { - guard self.extraHTTPHandlers.count > 0 else { - return context.eventLoop.makeSucceededFuture(()) - } - - return .andAllSucceed(self.extraHTTPHandlers.map { context.pipeline.removeHandler($0) }, - on: context.eventLoop) - } - - private func unbuffer(context: ChannelHandlerContext) { - while true { - switch self.stateMachine.unbuffer() { - case .fireChannelRead(let data): - context.fireChannelRead(data) - - case .fireChannelReadCompleteAndRemoveHandler: - context.fireChannelReadComplete() - context.pipeline.removeHandler(self, promise: nil) - return - } - } - } -} diff --git a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift deleted file mode 100644 index d0fcf287de..0000000000 --- a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift +++ /dev/null @@ -1,385 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftNIO open source project -// -// Copyright (c) 2023 Apple Inc. and the SwiftNIO project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftNIO project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import DequeModule -import NIOCore - -@available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) -struct NIOTypedHTTPServerUpgraderStateMachine { - @usableFromInline - enum State { - /// The state before we received a TLSUserEvent. We are just forwarding any read at this point. - case initial - - @usableFromInline - struct AwaitingUpgrader { - var seenFirstRequest: Bool - var buffer: Deque - } - - /// The request head has been received. We're currently running the future chain awaiting an upgrader. - case awaitingUpgrader(AwaitingUpgrader) - - @usableFromInline - struct UpgraderReady { - var upgrader: any NIOTypedHTTPServerProtocolUpgrader - var requestHead: HTTPRequestHead - var responseHeaders: HTTPHeaders - var proto: String - var buffer: Deque - } - - /// We have an upgrader, which means we can begin upgrade we are just waiting for the request end. - case upgraderReady(UpgraderReady) - - @usableFromInline - struct Upgrading { - var buffer: Deque - } - /// We are either running the upgrading handler. - case upgrading(Upgrading) - - @usableFromInline - struct Unbuffering { - var buffer: Deque - } - case unbuffering(Unbuffering) - - case finished - - case modifying - } - - private var state = State.initial - - @usableFromInline - enum HandlerRemovedAction { - case failUpgradePromise - } - - @inlinable - mutating func handlerRemoved() -> HandlerRemovedAction? { - switch self.state { - case .initial, .awaitingUpgrader, .upgraderReady, .upgrading, .unbuffering: - self.state = .finished - return .failUpgradePromise - - case .finished: - return .none - - case .modifying: - fatalError("Internal inconsistency in HTTPServerUpgradeStateMachine") - } - } - - @usableFromInline - enum ChannelReadDataAction { - case unwrapData - case fireChannelRead - } - - @inlinable - mutating func channelReadData(_ data: NIOAny) -> ChannelReadDataAction? { - switch self.state { - case .initial: - return .unwrapData - - case .awaitingUpgrader(var awaitingUpgrader): - if awaitingUpgrader.seenFirstRequest { - // We should buffer the data since we have seen the full request. - self.state = .modifying - awaitingUpgrader.buffer.append(data) - self.state = .awaitingUpgrader(awaitingUpgrader) - return nil - } else { - // We shouldn't buffer. This means we are still expecting HTTP parts. - return .unwrapData - } - - case .upgraderReady: - // We have not seen the end of the HTTP request so this - // data is probably an HTTP request part. - return .unwrapData - - case .unbuffering(var unbuffering): - self.state = .modifying - unbuffering.buffer.append(data) - self.state = .unbuffering(unbuffering) - return nil - - case .finished: - return .fireChannelRead - - case .upgrading(var upgrading): - // We got a read while running ugprading. - // We have to buffer the read to unbuffer it afterwards - self.state = .modifying - upgrading.buffer.append(data) - self.state = .upgrading(upgrading) - return nil - - case .modifying: - fatalError("Internal inconsistency in HTTPServerUpgradeStateMachine") - } - } - - @usableFromInline - enum ChannelReadRequestPartAction { - case failUpgradePromise(Error) - case runNotUpgradingInitializer - case startUpgrading( - upgrader: any NIOTypedHTTPServerProtocolUpgrader, - requestHead: HTTPRequestHead, - responseHeaders: HTTPHeaders, - proto: String - ) - case findUpgrader( - head: HTTPRequestHead, - requestedProtocols: [String], - allHeaderNames: Set, - connectionHeader: Set - ) - } - - @inlinable - mutating func channelReadRequestPart(_ requestPart: HTTPServerRequestPart) -> ChannelReadRequestPartAction? { - switch self.state { - case .initial: - guard case .head(let head) = requestPart else { - // The first data that we saw was not a head. This is a protocol error and we are just going to - // fail upgrading - return .failUpgradePromise(HTTPServerUpgradeErrors.invalidHTTPOrdering) - } - - // Ok, we have a HTTP head. Check if it's an upgrade. - let requestedProtocols = head.headers[canonicalForm: "upgrade"].map(String.init) - guard requestedProtocols.count > 0 else { - // We have to buffer now since we got the request head but are not upgrading. - // The user is configuring the HTTP pipeline now. - var buffer = Deque() - buffer.append(NIOAny(requestPart)) - self.state = .upgrading(.init(buffer: buffer)) - return .runNotUpgradingInitializer - } - - // We can now transition to awaiting the upgrader. This means that we are trying to - // find an upgrade that can handle requested protocols. We are not buffering because - // we are waiting for the request end. - self.state = .awaitingUpgrader(.init(seenFirstRequest: false, buffer: .init())) - - let connectionHeader = Set(head.headers[canonicalForm: "connection"].map { $0.lowercased() }) - let allHeaderNames = Set(head.headers.map { $0.name.lowercased() }) - - return .findUpgrader( - head: head, - requestedProtocols: requestedProtocols, - allHeaderNames: allHeaderNames, - connectionHeader: connectionHeader - ) - - case .awaitingUpgrader(let awaitingUpgrader): - switch (awaitingUpgrader.seenFirstRequest, requestPart) { - case (true, _): - // This is weird we are seeing more requests parts after we have seen an end - // Let's fail upgrading - return .failUpgradePromise(HTTPServerUpgradeErrors.invalidHTTPOrdering) - - case (false, .head): - // This is weird we are seeing another head but haven't seen the end for the request before - return .failUpgradePromise(HTTPServerUpgradeErrors.invalidHTTPOrdering) - - case (false, .body): - // This is weird we are seeing body parts for a request that indicated that it wanted - // to upgrade. - return .failUpgradePromise(HTTPServerUpgradeErrors.invalidHTTPOrdering) - - case (false, .end): - // Okay we got the end as expected. Just gotta store this in our state. - self.state = .awaitingUpgrader(.init(seenFirstRequest: true, buffer: awaitingUpgrader.buffer)) - return nil - } - - case .upgraderReady(let upgraderReady): - switch requestPart { - case .head: - // This is weird we are seeing another head but haven't seen the end for the request before - return .failUpgradePromise(HTTPServerUpgradeErrors.invalidHTTPOrdering) - - case .body: - // This is weird we are seeing body parts for a request that indicated that it wanted - // to upgrade. - return .failUpgradePromise(HTTPServerUpgradeErrors.invalidHTTPOrdering) - - case .end: - // Okay we got the end as expected and our upgrader is ready so let's start upgrading - self.state = .upgrading(.init(buffer: upgraderReady.buffer)) - return .startUpgrading( - upgrader: upgraderReady.upgrader, - requestHead: upgraderReady.requestHead, - responseHeaders: upgraderReady.responseHeaders, - proto: upgraderReady.proto - ) - } - - case .upgrading, .unbuffering, .finished: - fatalError("Internal inconsistency in HTTPServerUpgradeStateMachine") - - - case .modifying: - fatalError("Internal inconsistency in HTTPServerUpgradeStateMachine") - } - } - - @usableFromInline - enum UpgradingHandlerCompletedAction { - case fireErrorCaughtAndStartUnbuffering(Error) - case removeHandler(UpgradeResult) - case fireErrorCaughtAndRemoveHandler(Error) - case startUnbuffering(UpgradeResult) - } - - @inlinable - mutating func upgradingHandlerCompleted(_ result: Result) -> UpgradingHandlerCompletedAction? { - switch self.state { - case .initial: - fatalError("Internal inconsistency in HTTPServerUpgradeStateMachine") - - case .upgrading(let upgrading): - switch result { - case .success(let value): - if !upgrading.buffer.isEmpty { - self.state = .unbuffering(.init(buffer: upgrading.buffer)) - return .startUnbuffering(value) - } else { - self.state = .finished - return .removeHandler(value) - } - - case .failure(let error): - if !upgrading.buffer.isEmpty { - // So we failed to upgrade. There is nothing really that we can do here. - // We are unbuffering the reads but there shouldn't be any handler in the pipeline - // that expects a specific type of reads anyhow. - self.state = .unbuffering(.init(buffer: upgrading.buffer)) - return .fireErrorCaughtAndStartUnbuffering(error) - } else { - self.state = .finished - return .fireErrorCaughtAndRemoveHandler(error) - } - } - - case .finished: - // We have to tolerate this - return nil - - case .awaitingUpgrader, .upgraderReady, .unbuffering: - fatalError("Internal inconsistency in HTTPServerUpgradeStateMachine") - - case .modifying: - fatalError("Internal inconsistency in HTTPServerUpgradeStateMachine") - } - } - - @usableFromInline - enum FindingUpgraderCompletedAction { - case startUpgrading(upgrader: any NIOTypedHTTPServerProtocolUpgrader, responseHeaders: HTTPHeaders, proto: String) - case runNotUpgradingInitializer - case fireErrorCaughtAndStartUnbuffering(Error) - case fireErrorCaughtAndRemoveHandler(Error) - } - - @inlinable - mutating func findingUpgraderCompleted( - requestHead: HTTPRequestHead, - _ result: Result<(upgrader: any NIOTypedHTTPServerProtocolUpgrader, responseHeaders: HTTPHeaders, proto: String)?, Error> - ) -> FindingUpgraderCompletedAction? { - switch self.state { - case .initial, .upgraderReady: - fatalError("Internal inconsistency in HTTPServerUpgradeStateMachine") - - case .awaitingUpgrader(let awaitingUpgrader): - switch result { - case .success(.some((let upgrader, let responseHeaders, let proto))): - if awaitingUpgrader.seenFirstRequest { - // We have seen the end of the request. So we can upgrade now. - self.state = .upgrading(.init(buffer: awaitingUpgrader.buffer)) - return .startUpgrading(upgrader: upgrader, responseHeaders: responseHeaders, proto: proto) - } else { - // We have not yet seen the end so we have to wait until that happens - self.state = .upgraderReady(.init( - upgrader: upgrader, - requestHead: requestHead, - responseHeaders: responseHeaders, - proto: proto, - buffer: awaitingUpgrader.buffer - )) - return nil - } - - case .success(.none): - // There was no upgrader to handle the request. We just run the not upgrading - // initializer now. - self.state = .upgrading(.init(buffer: awaitingUpgrader.buffer)) - return .runNotUpgradingInitializer - - case .failure(let error): - if !awaitingUpgrader.buffer.isEmpty { - self.state = .unbuffering(.init(buffer: awaitingUpgrader.buffer)) - return .fireErrorCaughtAndStartUnbuffering(error) - } else { - self.state = .finished - return .fireErrorCaughtAndRemoveHandler(error) - } - } - - case .upgrading, .unbuffering, .finished: - fatalError("Internal inconsistency in HTTPServerUpgradeStateMachine") - - case .modifying: - fatalError("Internal inconsistency in HTTPServerUpgradeStateMachine") - } - } - - @usableFromInline - enum UnbufferAction { - case fireChannelRead(NIOAny) - case fireChannelReadCompleteAndRemoveHandler - } - - @inlinable - mutating func unbuffer() -> UnbufferAction { - switch self.state { - case .initial, .awaitingUpgrader, .upgraderReady, .upgrading, .finished: - preconditionFailure("Invalid state \(self.state)") - - case .unbuffering(var unbuffering): - self.state = .modifying - - if let element = unbuffering.buffer.popFirst() { - self.state = .unbuffering(unbuffering) - - return .fireChannelRead(element) - } else { - self.state = .finished - - return .fireChannelReadCompleteAndRemoveHandler - } - - case .modifying: - fatalError("Internal inconsistency in HTTPServerUpgradeStateMachine") - - } - } - -} diff --git a/Sources/NIOTCPEchoClient/Client.swift b/Sources/NIOTCPEchoClient/Client.swift index 0d8bd4404f..9bc0e0c9aa 100644 --- a/Sources/NIOTCPEchoClient/Client.swift +++ b/Sources/NIOTCPEchoClient/Client.swift @@ -15,7 +15,7 @@ import NIOCore import NIOPosix -@available(macOS 14, *) +@available(macOS 14, iOS 17, tvOS 17, watchOS 10, *) @main struct Client { /// The host to connect to. diff --git a/Sources/NIOTCPEchoServer/Server.swift b/Sources/NIOTCPEchoServer/Server.swift index 390fff795b..edc52f2e1b 100644 --- a/Sources/NIOTCPEchoServer/Server.swift +++ b/Sources/NIOTCPEchoServer/Server.swift @@ -15,7 +15,7 @@ import NIOCore import NIOPosix -@available(macOS 14, *) +@available(macOS 14, iOS 17, tvOS 17, watchOS 10, *) @main struct Server { /// The server's host. diff --git a/Sources/NIOWebSocket/NIOWebSocketClientUpgrader.swift b/Sources/NIOWebSocket/NIOWebSocketClientUpgrader.swift index ac6e92ee32..6483954bde 100644 --- a/Sources/NIOWebSocket/NIOWebSocketClientUpgrader.swift +++ b/Sources/NIOWebSocket/NIOWebSocketClientUpgrader.swift @@ -74,62 +74,6 @@ public final class NIOWebSocketClientUpgrader: NIOHTTPClientProtocolUpgrader { } } -/// A `NIOTypedHTTPClientProtocolUpgrader` that knows how to do the WebSocket upgrade dance. -/// -/// This upgrader assumes that the `HTTPClientUpgradeHandler` will create and send the upgrade request. -/// This upgrader also assumes that the `HTTPClientUpgradeHandler` will appropriately mutate the -/// pipeline to remove the HTTP `ChannelHandler`s. -@available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) -public final class NIOTypedWebSocketClientUpgrader: NIOTypedHTTPClientProtocolUpgrader { - /// RFC 6455 specs this as the required entry in the Upgrade header. - public let supportedProtocol: String = "websocket" - /// None of the websocket headers are actually defined as 'required'. - public let requiredUpgradeHeaders: [String] = [] - - private let requestKey: String - private let maxFrameSize: Int - private let enableAutomaticErrorHandling: Bool - private let upgradePipelineHandler: @Sendable (Channel, HTTPResponseHead) -> EventLoopFuture - - /// - Parameters: - /// - requestKey: Sent to the server in the `Sec-WebSocket-Key` HTTP header. Default is random request key. - /// - maxFrameSize: Largest incoming `WebSocketFrame` size in bytes. Default is 16,384 bytes. - /// - enableAutomaticErrorHandling: If true, adds `WebSocketProtocolErrorHandler` to the channel pipeline to catch and respond to WebSocket protocol errors. Default is true. - /// - upgradePipelineHandler: Called once the upgrade was successful. - public init( - requestKey: String = NIOWebSocketClientUpgrader.randomRequestKey(), - maxFrameSize: Int = 1 << 14, - enableAutomaticErrorHandling: Bool = true, - upgradePipelineHandler: @escaping @Sendable (Channel, HTTPResponseHead) -> EventLoopFuture - ) { - precondition(requestKey != "", "The request key must contain a valid Sec-WebSocket-Key") - precondition(maxFrameSize <= UInt32.max, "invalid overlarge max frame size") - self.requestKey = requestKey - self.upgradePipelineHandler = upgradePipelineHandler - self.maxFrameSize = maxFrameSize - self.enableAutomaticErrorHandling = enableAutomaticErrorHandling - } - - public func addCustom(upgradeRequestHeaders: inout NIOHTTP1.HTTPHeaders) { - _addCustom(upgradeRequestHeaders: &upgradeRequestHeaders, requestKey: self.requestKey) - } - - public func shouldAllowUpgrade(upgradeResponse: HTTPResponseHead) -> Bool { - _shouldAllowUpgrade(upgradeResponse: upgradeResponse, requestKey: self.requestKey) - } - - public func upgrade(channel: Channel, upgradeResponse: HTTPResponseHead) -> EventLoopFuture { - _upgrade( - channel: channel, - upgradeResponse: upgradeResponse, - maxFrameSize: self.maxFrameSize, - enableAutomaticErrorHandling: self.enableAutomaticErrorHandling, - upgradePipelineHandler: self.upgradePipelineHandler - ) - } -} - - @available(*, unavailable) extension NIOWebSocketClientUpgrader: Sendable {} diff --git a/Sources/NIOWebSocket/NIOWebSocketServerUpgrader.swift b/Sources/NIOWebSocket/NIOWebSocketServerUpgrader.swift index 4580d0ec07..44b9f56731 100644 --- a/Sources/NIOWebSocket/NIOWebSocketServerUpgrader.swift +++ b/Sources/NIOWebSocket/NIOWebSocketServerUpgrader.swift @@ -175,90 +175,6 @@ public final class NIOWebSocketServerUpgrader: HTTPServerProtocolUpgrader, @unch } } -/// A `NIOTypedHTTPServerProtocolUpgrader` that knows how to do the WebSocket upgrade dance. -/// -/// Users may frequently want to offer multiple websocket endpoints on the same port. For this -/// reason, this `WebServerSocketUpgrader` only knows how to do the required parts of the upgrade and to -/// complete the handshake. Users are expected to provide a callback that examines the HTTP headers -/// (including the path) and determines whether this is a websocket upgrade request that is acceptable -/// to them. -/// -/// This upgrader assumes that the `HTTPServerUpgradeHandler` will appropriately mutate the pipeline to -/// remove the HTTP `ChannelHandler`s. -public final class NIOTypedWebSocketServerUpgrader: NIOTypedHTTPServerProtocolUpgrader, Sendable { - private typealias ShouldUpgrade = @Sendable (Channel, HTTPRequestHead) -> EventLoopFuture - private typealias UpgradePipelineHandler = @Sendable (Channel, HTTPRequestHead) -> EventLoopFuture - - /// RFC 6455 specs this as the required entry in the Upgrade header. - public let supportedProtocol: String = "websocket" - - /// We deliberately do not actually set any required headers here, because the websocket - /// spec annoyingly does not actually force the client to send these in the Upgrade header, - /// which NIO requires. We check for these manually. - public let requiredUpgradeHeaders: [String] = [] - - private let shouldUpgrade: ShouldUpgrade - private let upgradePipelineHandler: UpgradePipelineHandler - private let maxFrameSize: Int - private let enableAutomaticErrorHandling: Bool - - /// Create a new ``NIOTypedWebSocketServerUpgrader``. - /// - /// - Parameters: - /// - maxFrameSize: The maximum frame size the decoder is willing to tolerate from the - /// remote peer. WebSockets in principle allows frame sizes up to `2**64` bytes, but - /// this is an objectively unreasonable maximum value (on AMD64 systems it is not - /// possible to even. Users may set this to any value up to `UInt32.max`. - /// - automaticErrorHandling: Whether the pipeline should automatically handle protocol - /// errors by sending error responses and closing the connection. Defaults to `true`, - /// may be set to `false` if the user wishes to handle their own errors. - /// - shouldUpgrade: A callback that determines whether the websocket request should be - /// upgraded. This callback is responsible for creating a `HTTPHeaders` object with - /// any headers that it needs on the response *except for* the `Upgrade`, `Connection`, - /// and `Sec-WebSocket-Accept` headers, which this upgrader will handle. Should return - /// an `EventLoopFuture` containing `nil` if the upgrade should be refused. - /// - enableAutomaticErrorHandling: A function that will be called once the upgrade response is - /// flushed, and that is expected to mutate the `Channel` appropriately to handle the - /// websocket protocol. This only needs to add the user handlers: the - /// `WebSocketFrameEncoder` and `WebSocketFrameDecoder` will have been added to the - /// pipeline automatically. - public init( - maxFrameSize: Int = 1 << 14, - enableAutomaticErrorHandling: Bool = true, - shouldUpgrade: @escaping @Sendable (Channel, HTTPRequestHead) -> EventLoopFuture, - upgradePipelineHandler: @escaping @Sendable (Channel, HTTPRequestHead) -> EventLoopFuture - ) { - precondition(maxFrameSize <= UInt32.max, "invalid overlarge max frame size") - self.shouldUpgrade = shouldUpgrade - self.upgradePipelineHandler = upgradePipelineHandler - self.maxFrameSize = maxFrameSize - self.enableAutomaticErrorHandling = enableAutomaticErrorHandling - } - - public func buildUpgradeResponse( - channel: Channel, - upgradeRequest: HTTPRequestHead, - initialResponseHeaders: HTTPHeaders - ) -> EventLoopFuture { - _buildUpgradeResponse( - channel: channel, - upgradeRequest: upgradeRequest, - initialResponseHeaders: initialResponseHeaders, - shouldUpgrade: self.shouldUpgrade - ) - } - - public func upgrade(channel: Channel, upgradeRequest: HTTPRequestHead) -> EventLoopFuture { - _upgrade( - channel: channel, - upgradeRequest: upgradeRequest, - maxFrameSize: self.maxFrameSize, - automaticErrorHandling: self.enableAutomaticErrorHandling, - upgradePipelineHandler: self.upgradePipelineHandler - ) - } -} - private func _buildUpgradeResponse( channel: Channel, upgradeRequest: HTTPRequestHead, diff --git a/Sources/NIOWebSocketClient/Client.swift b/Sources/NIOWebSocketClient/Client.swift index 6477416684..a2698536fe 100644 --- a/Sources/NIOWebSocketClient/Client.swift +++ b/Sources/NIOWebSocketClient/Client.swift @@ -12,127 +12,136 @@ // //===----------------------------------------------------------------------===// #if swift(>=5.9) -import NIOCore -import NIOPosix -import NIOHTTP1 -import NIOWebSocket - -@available(macOS 14, *) @main struct Client { - /// The host to connect to. - private let host: String - /// The port to connect to. - private let port: Int - /// The client's event loop group. - private let eventLoopGroup: MultiThreadedEventLoopGroup - - enum UpgradeResult { - case websocket(NIOAsyncChannel) - case notUpgraded - } - - static func main() async throws { - let client = Client( - host: "localhost", - port: 8888, - eventLoopGroup: .singleton - ) - try await client.run() - } - - /// This method starts the client and tries to setup a WebSocket connection. - func run() async throws { - let upgradeResult: EventLoopFuture = try await ClientBootstrap(group: self.eventLoopGroup) - .connect( - host: self.host, - port: self.port - ) { channel in - channel.eventLoop.makeCompletedFuture { - let upgrader = NIOTypedWebSocketClientUpgrader( - upgradePipelineHandler: { (channel, _) in - channel.eventLoop.makeCompletedFuture { - let asyncChannel = try NIOAsyncChannel(synchronouslyWrapping: channel) - return UpgradeResult.websocket(asyncChannel) - } - } - ) - - var headers = HTTPHeaders() - headers.add(name: "Content-Type", value: "text/plain; charset=utf-8") - headers.add(name: "Content-Length", value: "0") - - let requestHead = HTTPRequestHead( - version: .http1_1, - method: .GET, - uri: "/", - headers: headers - ) - - let clientUpgradeConfiguration = NIOTypedHTTPClientUpgradeConfiguration( - upgradeRequestHead: requestHead, - upgraders: [upgrader], - notUpgradingCompletionHandler: { channel in - channel.eventLoop.makeCompletedFuture { - return UpgradeResult.notUpgraded - } - } - ) - - let negotiationResultFuture = try channel.pipeline.syncOperations.configureUpgradableHTTPClientPipeline( - configuration: .init(upgradeConfiguration: clientUpgradeConfiguration) - ) - - return negotiationResultFuture - } - } - - // We are awaiting and handling the upgrade result now. - try await self.handleUpgradeResult(upgradeResult) - } - - /// This method handles the upgrade result. - private func handleUpgradeResult(_ upgradeResult: EventLoopFuture) async throws { - switch try await upgradeResult.get() { - case .websocket(let websocketChannel): - print("Handling websocket connection") - try await self.handleWebsocketChannel(websocketChannel) - print("Done handling websocket connection") - case .notUpgraded: - // The upgrade to websocket did not succeed. We are just exiting in this case. - print("Upgrade declined") - } + static func main() { + fatalError("Disabled due to https://github.com/apple/swift-nio/issues/2574") } +} - private func handleWebsocketChannel(_ channel: NIOAsyncChannel) async throws { - // We are sending a ping frame and then - // start to handle all inbound frames. - - let pingFrame = WebSocketFrame(fin: true, opcode: .ping, data: ByteBuffer(string: "Hello!")) - try await channel.outbound.write(pingFrame) - - for try await frame in channel.inbound { - switch frame.opcode { - case .pong: - print("Received pong: \(String(buffer: frame.data))") - - case .text: - print("Received: \(String(buffer: frame.data))") +// Commented out due https://github.com/apple/swift-nio/issues/2574 - case .connectionClose: - // Handle a received close frame. We're just going to close by returning from this method. - print("Received Close instruction from server") - return - case .binary, .continuation, .ping: - // We ignore these frames. - break - default: - // Unknown frames are errors. - return - } - } - } -} +//import NIOCore +//import NIOPosix +//import NIOHTTP1 +//import NIOWebSocket +// +//@available(macOS 14, iOS 17, tvOS 17, watchOS 10, *) +//@main +//struct Client { +// /// The host to connect to. +// private let host: String +// /// The port to connect to. +// private let port: Int +// /// The client's event loop group. +// private let eventLoopGroup: MultiThreadedEventLoopGroup +// +// enum UpgradeResult { +// case websocket(NIOAsyncChannel) +// case notUpgraded +// } +// +// static func main() async throws { +// let client = Client( +// host: "localhost", +// port: 8888, +// eventLoopGroup: .singleton +// ) +// try await client.run() +// } +// +// /// This method starts the client and tries to setup a WebSocket connection. +// func run() async throws { +// let upgradeResult: EventLoopFuture = try await ClientBootstrap(group: self.eventLoopGroup) +// .connect( +// host: self.host, +// port: self.port +// ) { channel in +// channel.eventLoop.makeCompletedFuture { +// let upgrader = NIOTypedWebSocketClientUpgrader( +// upgradePipelineHandler: { (channel, _) in +// channel.eventLoop.makeCompletedFuture { +// let asyncChannel = try NIOAsyncChannel(synchronouslyWrapping: channel) +// return UpgradeResult.websocket(asyncChannel) +// } +// } +// ) +// +// var headers = HTTPHeaders() +// headers.add(name: "Content-Type", value: "text/plain; charset=utf-8") +// headers.add(name: "Content-Length", value: "0") +// +// let requestHead = HTTPRequestHead( +// version: .http1_1, +// method: .GET, +// uri: "/", +// headers: headers +// ) +// +// let clientUpgradeConfiguration = NIOTypedHTTPClientUpgradeConfiguration( +// upgradeRequestHead: requestHead, +// upgraders: [upgrader], +// notUpgradingCompletionHandler: { channel in +// channel.eventLoop.makeCompletedFuture { +// return UpgradeResult.notUpgraded +// } +// } +// ) +// +// let negotiationResultFuture = try channel.pipeline.syncOperations.configureUpgradableHTTPClientPipeline( +// configuration: .init(upgradeConfiguration: clientUpgradeConfiguration) +// ) +// +// return negotiationResultFuture +// } +// } +// +// // We are awaiting and handling the upgrade result now. +// try await self.handleUpgradeResult(upgradeResult) +// } +// +// /// This method handles the upgrade result. +// private func handleUpgradeResult(_ upgradeResult: EventLoopFuture) async throws { +// switch try await upgradeResult.get() { +// case .websocket(let websocketChannel): +// print("Handling websocket connection") +// try await self.handleWebsocketChannel(websocketChannel) +// print("Done handling websocket connection") +// case .notUpgraded: +// // The upgrade to websocket did not succeed. We are just exiting in this case. +// print("Upgrade declined") +// } +// } +// +// private func handleWebsocketChannel(_ channel: NIOAsyncChannel) async throws { +// // We are sending a ping frame and then +// // start to handle all inbound frames. +// +// let pingFrame = WebSocketFrame(fin: true, opcode: .ping, data: ByteBuffer(string: "Hello!")) +// try await channel.outbound.write(pingFrame) +// +// for try await frame in channel.inbound { +// switch frame.opcode { +// case .pong: +// print("Received pong: \(String(buffer: frame.data))") +// +// case .text: +// print("Received: \(String(buffer: frame.data))") +// +// case .connectionClose: +// // Handle a received close frame. We're just going to close by returning from this method. +// print("Received Close instruction from server") +// return +// case .binary, .continuation, .ping: +// // We ignore these frames. +// break +// default: +// // Unknown frames are errors. +// return +// } +// } +// } +//} #else @main diff --git a/Sources/NIOWebSocketServer/Server.swift b/Sources/NIOWebSocketServer/Server.swift index 525c64b00d..dad8fbd12c 100644 --- a/Sources/NIOWebSocketServer/Server.swift +++ b/Sources/NIOWebSocketServer/Server.swift @@ -41,238 +41,247 @@ let websocketResponse = """ """ -@available(macOS 14, *) @main struct Server { - /// The server's host. - private let host: String - /// The server's port. - private let port: Int - /// The server's event loop group. - private let eventLoopGroup: MultiThreadedEventLoopGroup - - private static let responseBody = ByteBuffer(string: websocketResponse) - - enum UpgradeResult { - case websocket(NIOAsyncChannel) - case notUpgraded(NIOAsyncChannel>) - } - - static func main() async throws { - let server = Server( - host: "localhost", - port: 8888, - eventLoopGroup: .singleton - ) - try await server.run() - } - - /// This method starts the server and handles incoming connections. - func run() async throws { - let channel: NIOAsyncChannel, Never> = try await ServerBootstrap(group: self.eventLoopGroup) - .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) - .bind( - host: self.host, - port: self.port - ) { channel in - channel.eventLoop.makeCompletedFuture { - let upgrader = NIOTypedWebSocketServerUpgrader( - shouldUpgrade: { (channel, head) in - channel.eventLoop.makeSucceededFuture(HTTPHeaders()) - }, - upgradePipelineHandler: { (channel, _) in - channel.eventLoop.makeCompletedFuture { - let asyncChannel = try NIOAsyncChannel(synchronouslyWrapping: channel) - return UpgradeResult.websocket(asyncChannel) - } - } - ) - - let serverUpgradeConfiguration = NIOTypedHTTPServerUpgradeConfiguration( - upgraders: [upgrader], - notUpgradingCompletionHandler: { channel in - channel.eventLoop.makeCompletedFuture { - try channel.pipeline.syncOperations.addHandler(HTTPByteBufferResponsePartHandler()) - let asyncChannel = try NIOAsyncChannel>(synchronouslyWrapping: channel) - return UpgradeResult.notUpgraded(asyncChannel) - } - } - ) - - let negotiationResultFuture = try channel.pipeline.syncOperations.configureUpgradableHTTPServerPipeline( - configuration: .init(upgradeConfiguration: serverUpgradeConfiguration) - ) - - return negotiationResultFuture - } - } - - // We are handling each incoming connection in a separate child task. It is important - // to use a discarding task group here which automatically discards finished child tasks. - // A normal task group retains all child tasks and their outputs in memory until they are - // consumed by iterating the group or by exiting the group. Since, we are never consuming - // the results of the group we need the group to automatically discard them; otherwise, this - // would result in a memory leak over time. - try await withThrowingDiscardingTaskGroup { group in - for try await upgradeResult in channel.inbound { - group.addTask { - await self.handleUpgradeResult(upgradeResult) - } - } - } - } - - /// This method handles a single connection by echoing back all inbound data. - private func handleUpgradeResult(_ upgradeResult: EventLoopFuture) async { - // Note that this method is non-throwing and we are catching any error. - // We do this since we don't want to tear down the whole server when a single connection - // encounters an error. - do { - switch try await upgradeResult.get() { - case .websocket(let websocketChannel): - print("Handling websocket connection") - try await self.handleWebsocketChannel(websocketChannel) - print("Done handling websocket connection") - case .notUpgraded(let httpChannel): - print("Handling HTTP connection") - try await self.handleHTTPChannel(httpChannel) - print("Done handling HTTP connection") - } - } catch { - print("Hit error: \(error)") - } - } - - private func handleWebsocketChannel(_ channel: NIOAsyncChannel) async throws { - try await withThrowingTaskGroup(of: Void.self) { group in - group.addTask { - for try await frame in channel.inbound { - switch frame.opcode { - case .ping: - print("Received ping") - var frameData = frame.data - let maskingKey = frame.maskKey - - if let maskingKey = maskingKey { - frameData.webSocketUnmask(maskingKey) - } - - let responseFrame = WebSocketFrame(fin: true, opcode: .pong, data: frameData) - try await channel.outbound.write(responseFrame) - - case .connectionClose: - // This is an unsolicited close. We're going to send a response frame and - // then, when we've sent it, close up shop. We should send back the close code the remote - // peer sent us, unless they didn't send one at all. - print("Received close") - var data = frame.unmaskedData - let closeDataCode = data.readSlice(length: 2) ?? ByteBuffer() - let closeFrame = WebSocketFrame(fin: true, opcode: .connectionClose, data: closeDataCode) - try await channel.outbound.write(closeFrame) - return - case .binary, .continuation, .pong: - // We ignore these frames. - break - default: - // Unknown frames are errors. - return - } - } - } - - group.addTask { - // This is our main business logic where we are just sending the current time - // every second. - while true { - // We can't really check for error here, but it's also not the purpose of the - // example so let's not worry about it. - let theTime = ContinuousClock().now - var buffer = channel.channel.allocator.buffer(capacity: 12) - buffer.writeString("\(theTime)") - - let frame = WebSocketFrame(fin: true, opcode: .text, data: buffer) - - print("Sending time") - try await channel.outbound.write(frame) - try await Task.sleep(for: .seconds(1)) - } - } - - try await group.next() - group.cancelAll() - } - } - - - private func handleHTTPChannel(_ channel: NIOAsyncChannel>) async throws { - for try await requestPart in channel.inbound { - // We're not interested in request bodies here: we're just serving up GET responses - // to get the client to initiate a websocket request. - guard case .head(let head) = requestPart else { - return - } - - // GETs only. - guard case .GET = head.method else { - try await self.respond405(writer: channel.outbound) - return - } - - var headers = HTTPHeaders() - headers.add(name: "Content-Type", value: "text/html") - headers.add(name: "Content-Length", value: String(Self.responseBody.readableBytes)) - headers.add(name: "Connection", value: "close") - let responseHead = HTTPResponseHead( - version: .init(major: 1, minor: 1), - status: .ok, - headers: headers - ) - - try await channel.outbound.write( - contentsOf: [ - .head(responseHead), - .body(Self.responseBody), - .end(nil) - ] - ) - } - } - - private func respond405(writer: NIOAsyncChannelOutboundWriter>) async throws { - var headers = HTTPHeaders() - headers.add(name: "Connection", value: "close") - headers.add(name: "Content-Length", value: "0") - let head = HTTPResponseHead( - version: .http1_1, - status: .methodNotAllowed, - headers: headers - ) - - try await writer.write( - contentsOf: [ - .head(head), - .end(nil) - ] - ) + static func main() { + fatalError("Disabled due to https://github.com/apple/swift-nio/issues/2574") } } -final class HTTPByteBufferResponsePartHandler: ChannelOutboundHandler { - typealias OutboundIn = HTTPPart - typealias OutboundOut = HTTPServerResponsePart +// Commented out due https://github.com/apple/swift-nio/issues/2574 - func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { - let part = self.unwrapOutboundIn(data) - switch part { - case .head(let head): - context.write(self.wrapOutboundOut(.head(head)), promise: promise) - case .body(let buffer): - context.write(self.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: promise) - case .end(let trailers): - context.write(self.wrapOutboundOut(.end(trailers)), promise: promise) - } - } -} +//@available(macOS 14, iOS 17, tvOS 17, watchOS 10, *) +//@main +//struct Server { +// /// The server's host. +// private let host: String +// /// The server's port. +// private let port: Int +// /// The server's event loop group. +// private let eventLoopGroup: MultiThreadedEventLoopGroup +// +// private static let responseBody = ByteBuffer(string: websocketResponse) +// +// enum UpgradeResult { +// case websocket(NIOAsyncChannel) +// case notUpgraded(NIOAsyncChannel>) +// } +// +// static func main() async throws { +// let server = Server( +// host: "localhost", +// port: 8888, +// eventLoopGroup: .singleton +// ) +// try await server.run() +// } +// +// /// This method starts the server and handles incoming connections. +// func run() async throws { +// let channel: NIOAsyncChannel, Never> = try await ServerBootstrap(group: self.eventLoopGroup) +// .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) +// .bind( +// host: self.host, +// port: self.port +// ) { channel in +// channel.eventLoop.makeCompletedFuture { +// let upgrader = NIOTypedWebSocketServerUpgrader( +// shouldUpgrade: { (channel, head) in +// channel.eventLoop.makeSucceededFuture(HTTPHeaders()) +// }, +// upgradePipelineHandler: { (channel, _) in +// channel.eventLoop.makeCompletedFuture { +// let asyncChannel = try NIOAsyncChannel(synchronouslyWrapping: channel) +// return UpgradeResult.websocket(asyncChannel) +// } +// } +// ) +// +// let serverUpgradeConfiguration = NIOTypedHTTPServerUpgradeConfiguration( +// upgraders: [upgrader], +// notUpgradingCompletionHandler: { channel in +// channel.eventLoop.makeCompletedFuture { +// try channel.pipeline.syncOperations.addHandler(HTTPByteBufferResponsePartHandler()) +// let asyncChannel = try NIOAsyncChannel>(synchronouslyWrapping: channel) +// return UpgradeResult.notUpgraded(asyncChannel) +// } +// } +// ) +// +// let negotiationResultFuture = try channel.pipeline.syncOperations.configureUpgradableHTTPServerPipeline( +// configuration: .init(upgradeConfiguration: serverUpgradeConfiguration) +// ) +// +// return negotiationResultFuture +// } +// } +// +// // We are handling each incoming connection in a separate child task. It is important +// // to use a discarding task group here which automatically discards finished child tasks. +// // A normal task group retains all child tasks and their outputs in memory until they are +// // consumed by iterating the group or by exiting the group. Since, we are never consuming +// // the results of the group we need the group to automatically discard them; otherwise, this +// // would result in a memory leak over time. +// try await withThrowingDiscardingTaskGroup { group in +// for try await upgradeResult in channel.inbound { +// group.addTask { +// await self.handleUpgradeResult(upgradeResult) +// } +// } +// } +// } +// +// /// This method handles a single connection by echoing back all inbound data. +// private func handleUpgradeResult(_ upgradeResult: EventLoopFuture) async { +// // Note that this method is non-throwing and we are catching any error. +// // We do this since we don't want to tear down the whole server when a single connection +// // encounters an error. +// do { +// switch try await upgradeResult.get() { +// case .websocket(let websocketChannel): +// print("Handling websocket connection") +// try await self.handleWebsocketChannel(websocketChannel) +// print("Done handling websocket connection") +// case .notUpgraded(let httpChannel): +// print("Handling HTTP connection") +// try await self.handleHTTPChannel(httpChannel) +// print("Done handling HTTP connection") +// } +// } catch { +// print("Hit error: \(error)") +// } +// } +// +// private func handleWebsocketChannel(_ channel: NIOAsyncChannel) async throws { +// try await withThrowingTaskGroup(of: Void.self) { group in +// group.addTask { +// for try await frame in channel.inbound { +// switch frame.opcode { +// case .ping: +// print("Received ping") +// var frameData = frame.data +// let maskingKey = frame.maskKey +// +// if let maskingKey = maskingKey { +// frameData.webSocketUnmask(maskingKey) +// } +// +// let responseFrame = WebSocketFrame(fin: true, opcode: .pong, data: frameData) +// try await channel.outbound.write(responseFrame) +// +// case .connectionClose: +// // This is an unsolicited close. We're going to send a response frame and +// // then, when we've sent it, close up shop. We should send back the close code the remote +// // peer sent us, unless they didn't send one at all. +// print("Received close") +// var data = frame.unmaskedData +// let closeDataCode = data.readSlice(length: 2) ?? ByteBuffer() +// let closeFrame = WebSocketFrame(fin: true, opcode: .connectionClose, data: closeDataCode) +// try await channel.outbound.write(closeFrame) +// return +// case .binary, .continuation, .pong: +// // We ignore these frames. +// break +// default: +// // Unknown frames are errors. +// return +// } +// } +// } +// +// group.addTask { +// // This is our main business logic where we are just sending the current time +// // every second. +// while true { +// // We can't really check for error here, but it's also not the purpose of the +// // example so let's not worry about it. +// let theTime = ContinuousClock().now +// var buffer = channel.channel.allocator.buffer(capacity: 12) +// buffer.writeString("\(theTime)") +// +// let frame = WebSocketFrame(fin: true, opcode: .text, data: buffer) +// +// print("Sending time") +// try await channel.outbound.write(frame) +// try await Task.sleep(for: .seconds(1)) +// } +// } +// +// try await group.next() +// group.cancelAll() +// } +// } +// +// +// private func handleHTTPChannel(_ channel: NIOAsyncChannel>) async throws { +// for try await requestPart in channel.inbound { +// // We're not interested in request bodies here: we're just serving up GET responses +// // to get the client to initiate a websocket request. +// guard case .head(let head) = requestPart else { +// return +// } +// +// // GETs only. +// guard case .GET = head.method else { +// try await self.respond405(writer: channel.outbound) +// return +// } +// +// var headers = HTTPHeaders() +// headers.add(name: "Content-Type", value: "text/html") +// headers.add(name: "Content-Length", value: String(Self.responseBody.readableBytes)) +// headers.add(name: "Connection", value: "close") +// let responseHead = HTTPResponseHead( +// version: .init(major: 1, minor: 1), +// status: .ok, +// headers: headers +// ) +// +// try await channel.outbound.write( +// contentsOf: [ +// .head(responseHead), +// .body(Self.responseBody), +// .end(nil) +// ] +// ) +// } +// } +// +// private func respond405(writer: NIOAsyncChannelOutboundWriter>) async throws { +// var headers = HTTPHeaders() +// headers.add(name: "Connection", value: "close") +// headers.add(name: "Content-Length", value: "0") +// let head = HTTPResponseHead( +// version: .http1_1, +// status: .methodNotAllowed, +// headers: headers +// ) +// +// try await writer.write( +// contentsOf: [ +// .head(head), +// .end(nil) +// ] +// ) +// } +//} +// +//final class HTTPByteBufferResponsePartHandler: ChannelOutboundHandler { +// typealias OutboundIn = HTTPPart +// typealias OutboundOut = HTTPServerResponsePart +// +// func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { +// let part = self.unwrapOutboundIn(data) +// switch part { +// case .head(let head): +// context.write(self.wrapOutboundOut(.head(head)), promise: promise) +// case .body(let buffer): +// context.write(self.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: promise) +// case .end(let trailers): +// context.write(self.wrapOutboundOut(.end(trailers)), promise: promise) +// } +// } +//} #else @main diff --git a/Tests/NIOHTTP1Tests/HTTPClientUpgradeTests.swift b/Tests/NIOHTTP1Tests/HTTPClientUpgradeTests.swift index 89f2b64c40..7bdd4c3622 100644 --- a/Tests/NIOHTTP1Tests/HTTPClientUpgradeTests.swift +++ b/Tests/NIOHTTP1Tests/HTTPClientUpgradeTests.swift @@ -33,7 +33,7 @@ extension EmbeddedChannel { } @available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) -protocol TypedAndUntypedHTTPClientProtocolUpgrader: NIOHTTPClientProtocolUpgrader, NIOTypedHTTPClientProtocolUpgrader where UpgradeResult == Bool {} +protocol TypedAndUntypedHTTPClientProtocolUpgrader: NIOHTTPClientProtocolUpgrader {} private final class SuccessfulClientUpgrader: TypedAndUntypedHTTPClientProtocolUpgrader { fileprivate let supportedProtocol: String @@ -282,9 +282,8 @@ private final class RecordingHTTPHandler: ChannelInboundHandler, RemovableChanne @available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) private func assertPipelineContainsUpgradeHandler(channel: Channel) { let handler = try? channel.pipeline.syncOperations.handler(type: NIOHTTPClientUpgradeHandler.self) - let typedHandler = try? channel.pipeline.syncOperations.handler(type: NIOTypedHTTPClientUpgradeHandler.self) - XCTAssertTrue(handler != nil || typedHandler != nil) + XCTAssertTrue(handler != nil) } @available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) @@ -947,233 +946,3 @@ class HTTPClientUpgradeTestCase: XCTestCase { } } } - -@available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) -final class TypedHTTPClientUpgradeTestCase: HTTPClientUpgradeTestCase { - override func setUpClientChannel( - clientHTTPHandler: RemovableChannelHandler, - clientUpgraders: [any TypedAndUntypedHTTPClientProtocolUpgrader], - _ upgradeCompletionHandler: @escaping (ChannelHandlerContext) -> Void - ) throws -> EmbeddedChannel { - - let channel = EmbeddedChannel() - - var headers = HTTPHeaders() - headers.add(name: "Content-Type", value: "text/plain; charset=utf-8") - headers.add(name: "Content-Length", value: "\(0)") - - let requestHead = HTTPRequestHead( - version: .http1_1, - method: .GET, - uri: "/", - headers: headers - ) - - let upgraders: [any NIOTypedHTTPClientProtocolUpgrader] = Array(clientUpgraders.map { $0 as! any NIOTypedHTTPClientProtocolUpgrader }) - - let config = NIOTypedHTTPClientUpgradeConfiguration( - upgradeRequestHead: requestHead, - upgraders: upgraders - ) { channel in - channel.eventLoop.makeCompletedFuture { - try channel.pipeline.syncOperations.addHandler(clientHTTPHandler) - }.map { _ in - false - } - } - var configuration = NIOUpgradableHTTPClientPipelineConfiguration(upgradeConfiguration: config) - configuration.leftOverBytesStrategy = .forwardBytes - let upgradeResult = try channel.pipeline.syncOperations.configureUpgradableHTTPClientPipeline(configuration: configuration) - let context = try channel.pipeline.syncOperations.context(handlerType: NIOTypedHTTPClientUpgradeHandler.self) - - try channel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 0)) - .wait() - upgradeResult.whenSuccess { result in - if result { - upgradeCompletionHandler(context) - } - } - - return channel - } - - // - MARK: The following tests are all overridden from the base class since they slightly differ in behaviour - - override func testUpgradeOnlyHandlesKnownProtocols() throws { - var upgradeHandlerCallbackFired = false - - let clientUpgrader = ExplodingClientUpgrader(forProtocol: "myProto") - let clientHandler = RecordingHTTPHandler() - - // The process should kick-off independently by sending the upgrade request to the server. - let clientChannel = try setUpClientChannel(clientHTTPHandler: clientHandler, - clientUpgraders: [clientUpgrader]) { _ in - - // This is called before the upgrader gets called. - upgradeHandlerCallbackFired = true - } - defer { - XCTAssertNoThrow(try clientChannel.finish()) - } - - let response = "HTTP/1.1 101 Switching Protocols\r\nConnection: upgrade\r\nUpgrade: unknownProtocol\r\n\r\n" - XCTAssertThrowsError(try clientChannel.writeInbound(clientChannel.allocator.buffer(string: response))) { error in - XCTAssertEqual(error as? NIOHTTPClientUpgradeError, .responseProtocolNotFound) - } - - clientChannel.embeddedEventLoop.run() - - // Should fail with error (response is malformed) and remove upgrader from pipeline. - - // Check that the http elements are not removed from the pipeline. - clientChannel.pipeline.assertContains(handlerType: HTTPRequestEncoder.self) - clientChannel.pipeline.assertContains(handlerType: ByteToMessageHandler.self) - - // Check that the HTTP handler received its response. - XCTAssertLessThanOrEqual(0, clientHandler.channelReadChannelHandlerContextDataCallCount) - // Check an error is reported - XCTAssertEqual(0, clientHandler.errorCaughtChannelHandlerContextCallCount) - - XCTAssertFalse(upgradeHandlerCallbackFired) - - XCTAssertNoThrow(try clientChannel.pipeline - .assertDoesNotContain(handlerType: NIOHTTPClientUpgradeHandler.self)) - } - - override func testUpgradeResponseCanBeRejectedByClientUpgrader() throws { - let upgradeProtocol = "myProto" - - var upgradeHandlerCallbackFired = false - - let clientUpgrader = DenyingClientUpgrader(forProtocol: upgradeProtocol) - let clientHandler = RecordingHTTPHandler() - - // The process should kick-off independently by sending the upgrade request to the server. - let clientChannel = try setUpClientChannel(clientHTTPHandler: clientHandler, - clientUpgraders: [clientUpgrader]) { _ in - - // This is called before the upgrader gets called. - upgradeHandlerCallbackFired = true - } - defer { - XCTAssertNoThrow(try clientChannel.finish()) - } - - let response = "HTTP/1.1 101 Switching Protocols\r\nConnection: upgrade\r\nUpgrade: \(upgradeProtocol)\r\n\r\n" - XCTAssertThrowsError(try clientChannel.writeInbound(clientChannel.allocator.buffer(string: response))) { error in - XCTAssertEqual(error as? NIOHTTPClientUpgradeError, .upgraderDeniedUpgrade) - } - - clientChannel.embeddedEventLoop.run() - - // Should fail with error (response is denied) and remove upgrader from pipeline. - - // Check that the http elements are not removed from the pipeline. - clientChannel.pipeline.assertContains(handlerType: HTTPRequestEncoder.self) - clientChannel.pipeline.assertContains(handlerType: ByteToMessageHandler.self) - - XCTAssertEqual(1, clientUpgrader.addCustomUpgradeRequestHeadersCallCount) - - // Check that the HTTP handler received its response. - XCTAssertLessThanOrEqual(0, clientHandler.channelReadChannelHandlerContextDataCallCount) - - // Check an error is reported - XCTAssertEqual(0, clientHandler.errorCaughtChannelHandlerContextCallCount) - - XCTAssertFalse(upgradeHandlerCallbackFired) - - XCTAssertNoThrow(try clientChannel.pipeline - .assertDoesNotContain(handlerType: NIOHTTPClientUpgradeHandler.self)) - } - - override func testFiresOutboundErrorDuringAddingHandlers() throws { - let upgradeProtocol = "myProto" - var errorOnAdditionalChannelWrite: Error? - var upgradeHandlerCallbackFired = false - - let clientUpgrader = UpgradeDelayClientUpgrader(forProtocol: upgradeProtocol) - let clientHandler = RecordingHTTPHandler() - - let clientChannel = try setUpClientChannel(clientHTTPHandler: clientHandler, - clientUpgraders: [clientUpgrader]) { (context) in - - // This is called before the upgrader gets called. - upgradeHandlerCallbackFired = true - } - defer { - XCTAssertNoThrow(try clientChannel.finish()) - } - - // Push the successful server response. - let response = "HTTP/1.1 101 Switching Protocols\r\nConnection: upgrade\r\nUpgrade: \(upgradeProtocol)\r\n\r\n" - XCTAssertNoThrow(try clientChannel.writeInbound(clientChannel.allocator.buffer(string: response))) - - let promise = clientChannel.eventLoop.makePromise(of: Void.self) - - promise.futureResult.whenFailure() { error in - errorOnAdditionalChannelWrite = error - } - - // Send another outbound request during the upgrade. - let requestHead = HTTPRequestHead(version: .http1_1, method: .GET, uri: "/") - let secondRequest: HTTPClientRequestPart = .head(requestHead) - clientChannel.writeAndFlush(secondRequest, promise: promise) - - clientChannel.embeddedEventLoop.run() - - let promiseError = errorOnAdditionalChannelWrite as! NIOHTTPClientUpgradeError - XCTAssertEqual(NIOHTTPClientUpgradeError.writingToHandlerDuringUpgrade, promiseError) - - // Soundness check that the upgrade was delayed. - XCTAssertEqual(0, clientUpgrader.upgradedHandler.handlerAddedContextCallCount) - - // Upgrade now. - clientUpgrader.unblockUpgrade() - clientChannel.embeddedEventLoop.run() - - // Check that the upgrade was still successful, despite the interruption. - XCTAssert(upgradeHandlerCallbackFired) - XCTAssertEqual(1, clientUpgrader.upgradedHandler.handlerAddedContextCallCount) - } - - override func testUpgradeResponseMissingAllProtocols() throws { - var upgradeHandlerCallbackFired = false - - let clientUpgrader = ExplodingClientUpgrader(forProtocol: "myProto") - let clientHandler = RecordingHTTPHandler() - - // The process should kick-off independently by sending the upgrade request to the server. - let clientChannel = try setUpClientChannel(clientHTTPHandler: clientHandler, - clientUpgraders: [clientUpgrader]) { _ in - - // This is called before the upgrader gets called. - upgradeHandlerCallbackFired = true - } - defer { - XCTAssertNoThrow(try clientChannel.finish()) - } - - let response = "HTTP/1.1 101 Switching Protocols\r\nConnection: upgrade\r\n\r\n" - XCTAssertThrowsError(try clientChannel.writeInbound(clientChannel.allocator.buffer(string: response))) { error in - XCTAssertEqual(error as? NIOHTTPClientUpgradeError, .responseProtocolNotFound) - } - - clientChannel.embeddedEventLoop.run() - - // Should fail with error (response is malformed) and remove upgrader from pipeline. - - // Check that the http elements are not removed from the pipeline. - clientChannel.pipeline.assertContains(handlerType: HTTPRequestEncoder.self) - clientChannel.pipeline.assertContains(handlerType: ByteToMessageHandler.self) - - // Check that the HTTP handler received its response. - XCTAssertLessThanOrEqual(0, clientHandler.channelReadChannelHandlerContextDataCallCount) - // Check an error is reported - XCTAssertEqual(0, clientHandler.errorCaughtChannelHandlerContextCallCount) - - XCTAssertFalse(upgradeHandlerCallbackFired) - - XCTAssertNoThrow(try clientChannel.pipeline - .assertDoesNotContain(handlerType: NIOHTTPClientUpgradeHandler.self)) - } -} diff --git a/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift b/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift index 378d64d0a8..4393adcfc6 100644 --- a/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift +++ b/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift @@ -36,11 +36,7 @@ extension ChannelPipeline { @available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) fileprivate func assertContainsUpgrader() { - do { - _ = try self.context(handlerType: NIOTypedHTTPServerUpgradeHandler.self).wait() - } catch { - self.assertContains(handlerType: HTTPServerUpgradeHandler.self) - } + self.assertContains(handlerType: HTTPServerUpgradeHandler.self) } func assertContains(handlerType: Handler.Type) { @@ -63,15 +59,7 @@ extension ChannelPipeline { // handler present, keep waiting usleep(50) } catch ChannelPipelineError.notFound { - // Checking if the typed variant is present - do { - _ = try self.context(handlerType: NIOTypedHTTPServerUpgradeHandler.self).wait() - // handler present, keep waiting - usleep(50) - } catch ChannelPipelineError.notFound { - // No upgrader, we're good. - return - } + return } } @@ -175,7 +163,7 @@ internal func assertResponseIs(response: String, expectedResponseLine: String, e } @available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) -protocol TypedAndUntypedHTTPServerProtocolUpgrader: HTTPServerProtocolUpgrader, NIOTypedHTTPServerProtocolUpgrader where UpgradeResult == Bool {} +protocol TypedAndUntypedHTTPServerProtocolUpgrader: HTTPServerProtocolUpgrader {} private class ExplodingUpgrader: TypedAndUntypedHTTPServerProtocolUpgrader { let supportedProtocol: String @@ -1551,503 +1539,3 @@ class HTTPServerUpgradeTestCase: XCTestCase { channel.pipeline.assertContainsUpgrader() } } - -@available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) -final class TypedHTTPServerUpgradeTestCase: HTTPServerUpgradeTestCase { - fileprivate override func setUpTestWithAutoremoval( - pipelining: Bool = false, - upgraders: [any TypedAndUntypedHTTPServerProtocolUpgrader], - extraHandlers: [ChannelHandler], - notUpgradingHandler: (@Sendable (Channel) -> EventLoopFuture)? = nil, - _ upgradeCompletionHandler: @escaping UpgradeCompletionHandler - ) throws -> (Channel, Channel, Channel) { - let connectionChannelPromise = Self.eventLoop.makePromise(of: Channel.self) - let serverChannelFuture = ServerBootstrap(group: Self.eventLoop) - .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) - .childChannelInitializer { channel in - channel.eventLoop.makeCompletedFuture { - connectionChannelPromise.succeed(channel) - var configuration = NIOUpgradableHTTPServerPipelineConfiguration( - upgradeConfiguration: .init( - upgraders: upgraders.map { $0 as! any NIOTypedHTTPServerProtocolUpgrader }, - notUpgradingCompletionHandler: { notUpgradingHandler?($0) ?? $0.eventLoop.makeSucceededFuture(false) } - ) - ) - configuration.enablePipelining = pipelining - return try channel.pipeline.syncOperations.configureUpgradableHTTPServerPipeline(configuration: configuration) - .flatMap { result in - if result { - return channel.pipeline.context(handlerType: NIOTypedHTTPServerUpgradeHandler.self) - .map { - upgradeCompletionHandler($0) - } - } else { - return channel.eventLoop.makeSucceededVoidFuture() - } - } - } - .flatMap { _ in - let futureResults = extraHandlers.map { channel.pipeline.addHandler($0) } - return EventLoopFuture.andAllSucceed(futureResults, on: channel.eventLoop) - } - }.bind(host: "127.0.0.1", port: 0) - let clientChannel = try connectedClientChannel(group: Self.eventLoop, serverAddress: serverChannelFuture.wait().localAddress!) - return (try serverChannelFuture.wait(), clientChannel, try connectionChannelPromise.futureResult.wait()) - } - - func testNotUpgrading() throws { - let notUpgraderCbFired = UnsafeMutableTransferBox(false) - - let upgrader = SuccessfulUpgrader(forProtocol: "myproto", requiringHeaders: ["kafkaesque"]) { _ in } - - let (_, client, connectedServer) = try setUpTestWithAutoremoval( - upgraders: [upgrader], - extraHandlers: [], - notUpgradingHandler: { channel in - notUpgraderCbFired.wrappedValue = true - // We're closing the connection now. - channel.close(promise: nil) - return channel.eventLoop.makeSucceededFuture(true) - } - ) { _ in } - - - let completePromise = Self.eventLoop.makePromise(of: Void.self) - let clientHandler = ArrayAccumulationHandler { buffers in - let resultString = buffers.map { $0.getString(at: $0.readerIndex, length: $0.readableBytes)! }.joined(separator: "") - XCTAssertEqual(resultString, "") - completePromise.succeed(()) - } - XCTAssertNoThrow(try client.pipeline.addHandler(clientHandler).wait()) - - // This request is safe to upgrade. - let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nUpgrade: notmyproto\r\nKafkaesque: yup\r\nConnection: upgrade\r\nConnection: kafkaesque\r\n\r\n" - XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) - - // Let the machinery do its thing. - XCTAssertNoThrow(try completePromise.futureResult.wait()) - - // At this time we want to assert that the not upgrader got called. - XCTAssert(notUpgraderCbFired.wrappedValue) - - // We also want to confirm that the upgrade handler is no longer in the pipeline. - try connectedServer.pipeline.assertDoesNotContainUpgrader() - } - - // - MARK: The following tests are all overridden from the base class since they slightly differ in behaviour - - override func testSimpleUpgradeSucceeds() throws { - // This test is different since we call the completionHandler after the upgrader - // modified the pipeline in the typed version. - let upgradeRequest = UnsafeMutableTransferBox(nil) - let upgradeHandlerCbFired = UnsafeMutableTransferBox(false) - let upgraderCbFired = UnsafeMutableTransferBox(false) - - let upgrader = SuccessfulUpgrader(forProtocol: "myproto", requiringHeaders: ["kafkaesque"]) { req in - // This is called before completion block. - upgradeRequest.wrappedValue = req - upgradeHandlerCbFired.wrappedValue = true - - XCTAssert(upgradeHandlerCbFired.wrappedValue) - upgraderCbFired.wrappedValue = true - } - - let (_, client, connectedServer) = try setUpTestWithAutoremoval( - upgraders: [upgrader], - extraHandlers: [] - ) { (context) in - // This is called before the upgrader gets called. - XCTAssertNotNil(upgradeRequest.wrappedValue) - upgradeHandlerCbFired.wrappedValue = true - - // We're closing the connection now. - context.close(promise: nil) - } - - - let completePromise = Self.eventLoop.makePromise(of: Void.self) - let clientHandler = ArrayAccumulationHandler { buffers in - let resultString = buffers.map { $0.getString(at: $0.readerIndex, length: $0.readableBytes)! }.joined(separator: "") - assertResponseIs(response: resultString, - expectedResponseLine: "HTTP/1.1 101 Switching Protocols", - expectedResponseHeaders: ["X-Upgrade-Complete: true", "upgrade: myproto", "connection: upgrade"]) - completePromise.succeed(()) - } - XCTAssertNoThrow(try client.pipeline.addHandler(clientHandler).wait()) - - // This request is safe to upgrade. - let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nUpgrade: myproto\r\nKafkaesque: yup\r\nConnection: upgrade\r\nConnection: kafkaesque\r\n\r\n" - XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) - - // Let the machinery do its thing. - XCTAssertNoThrow(try completePromise.futureResult.wait()) - - // At this time we want to assert that everything got called. Their own callbacks assert - // that the ordering was correct. - XCTAssert(upgradeHandlerCbFired.wrappedValue) - XCTAssert(upgraderCbFired.wrappedValue) - - // We also want to confirm that the upgrade handler is no longer in the pipeline. - try connectedServer.pipeline.assertDoesNotContainUpgrader() - } - - override func testUpgradeRespectsClientPreference() throws { - // This test is different since we call the completionHandler after the upgrader - // modified the pipeline in the typed version. - let upgradeRequest = UnsafeMutableTransferBox(nil) - let upgradeHandlerCbFired = UnsafeMutableTransferBox(false) - let upgraderCbFired = UnsafeMutableTransferBox(false) - - let explodingUpgrader = ExplodingUpgrader(forProtocol: "exploder") - let successfulUpgrader = SuccessfulUpgrader(forProtocol: "myproto", requiringHeaders: ["kafkaesque"]) { req in - upgradeRequest.wrappedValue = req - XCTAssertFalse(upgradeHandlerCbFired.wrappedValue) - upgraderCbFired.wrappedValue = true - } - - let (_, client, connectedServer) = try setUpTestWithAutoremoval(upgraders: [explodingUpgrader, successfulUpgrader], - extraHandlers: []) { context in - // This is called before the upgrader gets called. - XCTAssertNotNil(upgradeRequest.wrappedValue) - upgradeHandlerCbFired.wrappedValue = true - - // We're closing the connection now. - context.close(promise: nil) - } - - - let completePromise = Self.eventLoop.makePromise(of: Void.self) - let clientHandler = ArrayAccumulationHandler { buffers in - let resultString = buffers.map { $0.getString(at: $0.readerIndex, length: $0.readableBytes)! }.joined(separator: "") - assertResponseIs(response: resultString, - expectedResponseLine: "HTTP/1.1 101 Switching Protocols", - expectedResponseHeaders: ["X-Upgrade-Complete: true", "upgrade: myproto", "connection: upgrade"]) - completePromise.succeed(()) - } - XCTAssertNoThrow(try client.pipeline.addHandler(clientHandler).wait()) - - // This request is safe to upgrade. - let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nUpgrade: myproto, exploder\r\nKafkaesque: yup\r\nConnection: upgrade, kafkaesque\r\n\r\n" - XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) - - // Let the machinery do its thing. - XCTAssertNoThrow(try completePromise.futureResult.wait()) - - // At this time we want to assert that everything got called. Their own callbacks assert - // that the ordering was correct. - XCTAssert(upgradeHandlerCbFired.wrappedValue) - XCTAssert(upgraderCbFired.wrappedValue) - - // We also want to confirm that the upgrade handler is no longer in the pipeline. - try connectedServer.pipeline.waitForUpgraderToBeRemoved() - } - - override func testUpgraderCanRejectUpgradeForPersonalReasons() throws { - // This test is different since we call the completionHandler after the upgrader - // modified the pipeline in the typed version. - let upgradeRequest = UnsafeMutableTransferBox(nil) - let upgradeHandlerCbFired = UnsafeMutableTransferBox(false) - let upgraderCbFired = UnsafeMutableTransferBox(false) - - let explodingUpgrader = UpgraderSaysNo(forProtocol: "noproto") - let successfulUpgrader = SuccessfulUpgrader(forProtocol: "myproto", requiringHeaders: ["kafkaesque"]) { req in - upgradeRequest.wrappedValue = req - XCTAssertFalse(upgradeHandlerCbFired.wrappedValue) - upgraderCbFired.wrappedValue = true - } - let errorCatcher = ErrorSaver() - - let (_, client, connectedServer) = try setUpTestWithAutoremoval(upgraders: [explodingUpgrader, successfulUpgrader], - extraHandlers: [errorCatcher]) { context in - // This is called before the upgrader gets called. - XCTAssertNotNil(upgradeRequest.wrappedValue) - upgradeHandlerCbFired.wrappedValue = true - - // We're closing the connection now. - context.close(promise: nil) - } - - - let completePromise = Self.eventLoop.makePromise(of: Void.self) - let clientHandler = ArrayAccumulationHandler { buffers in - let resultString = buffers.map { $0.getString(at: $0.readerIndex, length: $0.readableBytes)! }.joined(separator: "") - assertResponseIs(response: resultString, - expectedResponseLine: "HTTP/1.1 101 Switching Protocols", - expectedResponseHeaders: ["X-Upgrade-Complete: true", "upgrade: myproto", "connection: upgrade"]) - completePromise.succeed(()) - } - XCTAssertNoThrow(try client.pipeline.addHandler(clientHandler).wait()) - - // This request is safe to upgrade. - let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nUpgrade: noproto,myproto\r\nKafkaesque: yup\r\nConnection: upgrade, kafkaesque\r\n\r\n" - XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) - - // Let the machinery do its thing. - XCTAssertNoThrow(try completePromise.futureResult.wait()) - - // At this time we want to assert that everything got called. Their own callbacks assert - // that the ordering was correct. - XCTAssert(upgradeHandlerCbFired.wrappedValue) - XCTAssert(upgraderCbFired.wrappedValue) - - // We also want to confirm that the upgrade handler is no longer in the pipeline. - try connectedServer.pipeline.waitForUpgraderToBeRemoved() - - // And we want to confirm we saved the error. - XCTAssertEqual(errorCatcher.errors.count, 1) - - switch(errorCatcher.errors[0]) { - case UpgraderSaysNo.No.no: - break - default: - XCTFail("Unexpected error: \(errorCatcher.errors[0])") - } - } - - override func testUpgradeWithUpgradePayloadInlineWithRequestWorks() throws { - // This test is different since we call the completionHandler after the upgrader - // modified the pipeline in the typed version. - enum ReceivedTheWrongThingError: Error { case error } - let upgradeRequest = UnsafeMutableTransferBox(nil) - let upgradeHandlerCbFired = UnsafeMutableTransferBox(false) - let upgraderCbFired = UnsafeMutableTransferBox(false) - - class CheckWeReadInlineAndExtraData: ChannelDuplexHandler { - typealias InboundIn = ByteBuffer - typealias OutboundIn = Never - typealias OutboundOut = Never - - enum State { - case fresh - case added - case inlineDataRead - case extraDataRead - case closed - } - - private let firstByteDonePromise: EventLoopPromise - private let secondByteDonePromise: EventLoopPromise - private let allDonePromise: EventLoopPromise - private var state = State.fresh - - init(firstByteDonePromise: EventLoopPromise, - secondByteDonePromise: EventLoopPromise, - allDonePromise: EventLoopPromise) { - self.firstByteDonePromise = firstByteDonePromise - self.secondByteDonePromise = secondByteDonePromise - self.allDonePromise = allDonePromise - } - - func handlerAdded(context: ChannelHandlerContext) { - XCTAssertEqual(.fresh, self.state) - self.state = .added - } - - func channelRead(context: ChannelHandlerContext, data: NIOAny) { - var buf = self.unwrapInboundIn(data) - XCTAssertEqual(1, buf.readableBytes) - let stringRead = buf.readString(length: buf.readableBytes) - switch self.state { - case .added: - XCTAssertEqual("A", stringRead) - self.state = .inlineDataRead - if stringRead == .some("A") { - self.firstByteDonePromise.succeed(()) - } else { - self.firstByteDonePromise.fail(ReceivedTheWrongThingError.error) - } - case .inlineDataRead: - XCTAssertEqual("B", stringRead) - self.state = .extraDataRead - context.channel.close(promise: nil) - if stringRead == .some("B") { - self.secondByteDonePromise.succeed(()) - } else { - self.secondByteDonePromise.fail(ReceivedTheWrongThingError.error) - } - default: - XCTFail("channel read in wrong state \(self.state)") - } - } - - func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise?) { - XCTAssertEqual(.extraDataRead, self.state) - self.state = .closed - context.close(mode: mode, promise: promise) - - self.allDonePromise.succeed(()) - } - } - - let upgrader = SuccessfulUpgrader(forProtocol: "myproto", requiringHeaders: ["kafkaesque"]) { req in - upgradeRequest.wrappedValue = req - XCTAssertFalse(upgradeHandlerCbFired.wrappedValue) - upgraderCbFired.wrappedValue = true - } - - let promiseGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) - defer { - XCTAssertNoThrow(try promiseGroup.syncShutdownGracefully()) - } - let firstByteDonePromise = promiseGroup.next().makePromise(of: Void.self) - let secondByteDonePromise = promiseGroup.next().makePromise(of: Void.self) - let allDonePromise = promiseGroup.next().makePromise(of: Void.self) - let (_, client, connectedServer) = try setUpTestWithAutoremoval(upgraders: [upgrader], - extraHandlers: []) { (context) in - // This is called before the upgrader gets called. - XCTAssertNotNil(upgradeRequest.wrappedValue) - upgradeHandlerCbFired.wrappedValue = true - - _ = context.channel.pipeline.addHandler(CheckWeReadInlineAndExtraData(firstByteDonePromise: firstByteDonePromise, - secondByteDonePromise: secondByteDonePromise, - allDonePromise: allDonePromise)) - } - - - let completePromise = Self.eventLoop.makePromise(of: Void.self) - let clientHandler = ArrayAccumulationHandler { buffers in - let resultString = buffers.map { $0.getString(at: $0.readerIndex, length: $0.readableBytes)! }.joined(separator: "") - assertResponseIs(response: resultString, - expectedResponseLine: "HTTP/1.1 101 Switching Protocols", - expectedResponseHeaders: ["X-Upgrade-Complete: true", "upgrade: myproto", "connection: upgrade"]) - completePromise.succeed(()) - } - XCTAssertNoThrow(try client.pipeline.addHandler(clientHandler).wait()) - - // This request is safe to upgrade. - var request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nUpgrade: myproto\r\nKafkaesque: yup\r\nConnection: upgrade\r\nConnection: kafkaesque\r\n\r\n" - request += "A" - XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) - - XCTAssertNoThrow(try firstByteDonePromise.futureResult.wait() as Void) - - XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: "B"))).wait()) - - XCTAssertNoThrow(try secondByteDonePromise.futureResult.wait() as Void) - - XCTAssertNoThrow(try allDonePromise.futureResult.wait() as Void) - - // Let the machinery do its thing. - XCTAssertNoThrow(try completePromise.futureResult.wait()) - - // At this time we want to assert that everything got called. Their own callbacks assert - // that the ordering was correct. - XCTAssert(upgradeHandlerCbFired.wrappedValue) - XCTAssert(upgraderCbFired.wrappedValue) - - // We also want to confirm that the upgrade handler is no longer in the pipeline. - try connectedServer.pipeline.assertDoesNotContainUpgrader() - - XCTAssertNoThrow(try allDonePromise.futureResult.wait()) - } - - override func testWeTolerateUpgradeFuturesFromWrongEventLoops() throws { - // This test is different since we call the completionHandler after the upgrader - // modified the pipeline in the typed version. - let upgradeRequest = UnsafeMutableTransferBox(nil) - let upgradeHandlerCbFired = UnsafeMutableTransferBox(false) - let upgraderCbFired = UnsafeMutableTransferBox(false) - let otherELG = MultiThreadedEventLoopGroup(numberOfThreads: 1) - defer { - XCTAssertNoThrow(try otherELG.syncShutdownGracefully()) - } - - let upgrader = SuccessfulUpgrader(forProtocol: "myproto", - requiringHeaders: ["kafkaesque"], - buildUpgradeResponseFuture: { - // this is the wrong EL - otherELG.next().makeSucceededFuture($1) - }) { req in - upgradeRequest.wrappedValue = req - XCTAssertFalse(upgradeHandlerCbFired.wrappedValue) - upgraderCbFired.wrappedValue = true - } - - let (_, client, connectedServer) = try setUpTestWithAutoremoval(upgraders: [upgrader], - extraHandlers: []) { (context) in - // This is called before the upgrader gets called. - XCTAssertNotNil(upgradeRequest.wrappedValue) - upgradeHandlerCbFired.wrappedValue = true - - // We're closing the connection now. - context.close(promise: nil) - } - - - let completePromise = Self.eventLoop.makePromise(of: Void.self) - let clientHandler = ArrayAccumulationHandler { buffers in - let resultString = buffers.map { $0.getString(at: $0.readerIndex, length: $0.readableBytes)! }.joined(separator: "") - assertResponseIs(response: resultString, - expectedResponseLine: "HTTP/1.1 101 Switching Protocols", - expectedResponseHeaders: ["X-Upgrade-Complete: true", "upgrade: myproto", "connection: upgrade"]) - completePromise.succeed(()) - } - XCTAssertNoThrow(try client.pipeline.addHandler(clientHandler).wait()) - - // This request is safe to upgrade. - let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nUpgrade: myproto\r\nKafkaesque: yup\r\nConnection: upgrade\r\nConnection: kafkaesque\r\n\r\n" - XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) - - // Let the machinery do its thing. - XCTAssertNoThrow(try completePromise.futureResult.wait()) - - // At this time we want to assert that everything got called. Their own callbacks assert - // that the ordering was correct. - XCTAssert(upgradeHandlerCbFired.wrappedValue) - XCTAssert(upgraderCbFired.wrappedValue) - - // We also want to confirm that the upgrade handler is no longer in the pipeline. - try connectedServer.pipeline.assertDoesNotContainUpgrader() - } - - override func testUpgradeFiresUserEvent() throws { - // This test is different since we call the completionHandler after the upgrader - // modified the pipeline in the typed version. - let eventSaver = UnsafeTransfer(UserEventSaver()) - - let upgrader = SuccessfulUpgrader(forProtocol: "myproto", requiringHeaders: []) { req in - XCTAssertEqual(eventSaver.wrappedValue.events.count, 0) - } - - let (_, client, connectedServer) = try setUpTestWithAutoremoval(upgraders: [upgrader], - extraHandlers: [eventSaver.wrappedValue]) { context in - XCTAssertEqual(eventSaver.wrappedValue.events.count, 1) - context.close(promise: nil) - } - - - let completePromise = Self.eventLoop.makePromise(of: Void.self) - let clientHandler = ArrayAccumulationHandler { buffers in - let resultString = buffers.map { $0.getString(at: $0.readerIndex, length: $0.readableBytes)! }.joined(separator: "") - assertResponseIs(response: resultString, - expectedResponseLine: "HTTP/1.1 101 Switching Protocols", - expectedResponseHeaders: ["X-Upgrade-Complete: true", "upgrade: myproto", "connection: upgrade"]) - completePromise.succeed(()) - } - XCTAssertNoThrow(try client.pipeline.addHandler(clientHandler).wait()) - - // This request is safe to upgrade. - let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nUpgrade: myproto\r\nKafkaesque: yup\r\nConnection: upgrade,kafkaesque\r\n\r\n" - XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) - - // Let the machinery do its thing. - XCTAssertNoThrow(try completePromise.futureResult.wait()) - - // At this time we should have received one user event. We schedule this onto the - // event loop to guarantee thread safety. - XCTAssertNoThrow(try connectedServer.eventLoop.scheduleTask(deadline: .now()) { - XCTAssertEqual(eventSaver.wrappedValue.events.count, 1) - if case .upgradeComplete(let proto, let req) = eventSaver.wrappedValue.events[0] { - XCTAssertEqual(proto, "myproto") - XCTAssertEqual(req.method, .OPTIONS) - XCTAssertEqual(req.uri, "*") - XCTAssertEqual(req.version, .http1_1) - } else { - XCTFail("Unexpected event: \(eventSaver.wrappedValue.events[0])") - } - }.futureResult.wait()) - - // We also want to confirm that the upgrade handler is no longer in the pipeline. - try connectedServer.pipeline.waitForUpgraderToBeRemoved() - } -} diff --git a/Tests/NIOWebSocketTests/WebSocketClientEndToEndTests.swift b/Tests/NIOWebSocketTests/WebSocketClientEndToEndTests.swift index bd9cef6936..137e897988 100644 --- a/Tests/NIOWebSocketTests/WebSocketClientEndToEndTests.swift +++ b/Tests/NIOWebSocketTests/WebSocketClientEndToEndTests.swift @@ -404,214 +404,3 @@ class WebSocketClientEndToEndTests: XCTestCase { XCTAssertNoThrow(try clientChannel.close().wait()) } } - -@available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) -final class TypedWebSocketClientEndToEndTests: WebSocketClientEndToEndTests { - func setUpClientChannel( - clientUpgraders: [any NIOTypedHTTPClientProtocolUpgrader], - notUpgradingCompletionHandler: @Sendable @escaping (Channel) -> EventLoopFuture - ) throws -> (EmbeddedChannel, EventLoopFuture) { - - let channel = EmbeddedChannel() - - var headers = HTTPHeaders() - headers.add(name: "Content-Type", value: "text/plain; charset=utf-8") - headers.add(name: "Content-Length", value: "\(0)") - - let requestHead = HTTPRequestHead( - version: .http1_1, - method: .GET, - uri: "/", - headers: headers - ) - - let config = NIOTypedHTTPClientUpgradeConfiguration( - upgradeRequestHead: requestHead, - upgraders: clientUpgraders, - notUpgradingCompletionHandler: notUpgradingCompletionHandler - ) - - let upgradeResult = try channel.pipeline.syncOperations.configureUpgradableHTTPClientPipeline(configuration: .init(upgradeConfiguration: config)) - - try channel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 0)) - .wait() - - return (channel, upgradeResult) - } - - override func testSimpleUpgradeSucceeds() throws { - let requestKey = "OfS0wDaT5NoxF2gqm7Zj2YtetzM=" - let responseKey = "yKEqitDFPE81FyIhKTm+ojBqigk=" - - let basicUpgrader = NIOTypedWebSocketClientUpgrader( - requestKey: requestKey, - upgradePipelineHandler: { (channel: Channel, _: HTTPResponseHead) in - channel.pipeline.addHandler(WebSocketRecorderHandler()) - }) - - // The process should kick-off independently by sending the upgrade request to the server. - let (clientChannel, upgradeResult) = try setUpClientChannel( - clientUpgraders: [basicUpgrader], - notUpgradingCompletionHandler: { $0.eventLoop.makeSucceededVoidFuture() } - ) - - // Read the server request. - if let requestString = try clientChannel.readByteBufferOutputAsString() { - XCTAssertEqual(requestString, basicRequest() + "\r\nConnection: upgrade\r\nUpgrade: websocket\r\nSec-WebSocket-Key: \(requestKey)\r\nSec-WebSocket-Version: 13\r\n\r\n") - } else { - XCTFail() - } - - // Push the successful server response. - let response = "HTTP/1.1 101 Switching Protocols\r\nConnection: upgrade\r\nUpgrade: websocket\r\nSec-WebSocket-Accept:\(responseKey)\r\n\r\n" - - XCTAssertNoThrow(try clientChannel.writeInbound(clientChannel.allocator.buffer(string: response))) - - clientChannel.embeddedEventLoop.run() - - // Once upgraded, validate the http pipeline has been removed. - XCTAssertNoThrow(try clientChannel.pipeline - .assertDoesNotContain(handlerType: HTTPRequestEncoder.self)) - XCTAssertNoThrow(try clientChannel.pipeline - .assertDoesNotContain(handlerType: ByteToMessageHandler.self)) - XCTAssertNoThrow(try clientChannel.pipeline - .assertDoesNotContain(handlerType: NIOHTTPClientUpgradeHandler.self)) - - // Check that the pipeline now has the correct websocket handlers added. - XCTAssertNoThrow(try clientChannel.pipeline - .assertContains(handlerType: WebSocketFrameEncoder.self)) - XCTAssertNoThrow(try clientChannel.pipeline - .assertContains(handlerType: ByteToMessageHandler.self)) - XCTAssertNoThrow(try clientChannel.pipeline - .assertContains(handlerType: WebSocketRecorderHandler.self)) - - try upgradeResult.wait() - - // Close the pipeline. - XCTAssertNoThrow(try clientChannel.close().wait()) - } - - override func testRejectUpgradeIfMissingAcceptKey() throws { - let requestKey = "OfS0wDaT5NoxF2gqm7Zj2YtetzM=" - - let basicUpgrader = NIOTypedWebSocketClientUpgrader( - requestKey: requestKey, - upgradePipelineHandler: { (channel: Channel, _: HTTPResponseHead) in - channel.pipeline.addHandler(WebSocketRecorderHandler()) - }) - - // The process should kick-off independently by sending the upgrade request to the server. - let (clientChannel, upgradeResult) = try setUpClientChannel( - clientUpgraders: [basicUpgrader], - notUpgradingCompletionHandler: { $0.eventLoop.makeSucceededVoidFuture() } - ) - - // Push the successful server response but with a missing accept key. - let response = "HTTP/1.1 101 Switching Protocols\r\nConnection: upgrade\r\nUpgrade: websocket\r\n\r\n" - - XCTAssertThrowsError(try clientChannel.writeInbound(clientChannel.allocator.buffer(string: response))) { error in - XCTAssertEqual(error as? NIOHTTPClientUpgradeError, NIOHTTPClientUpgradeError.upgraderDeniedUpgrade) - } - - // Close the pipeline. - XCTAssertNoThrow(try clientChannel.close().wait()) - - XCTAssertThrowsError(try upgradeResult.wait()) { error in - XCTAssertEqual(error as? NIOHTTPClientUpgradeError, NIOHTTPClientUpgradeError.upgraderDeniedUpgrade) - } - } - - override func testRejectUpgradeIfIncorrectAcceptKey() throws { - let requestKey = "OfS0wDaT5NoxF2gqm7Zj2YtetzM=" - let responseKey = "notACorrectKeyL1am=F1y=nn=" - - let basicUpgrader = NIOTypedWebSocketClientUpgrader( - requestKey: requestKey, - upgradePipelineHandler: { (channel: Channel, _: HTTPResponseHead) in - channel.pipeline.addHandler(WebSocketRecorderHandler()) - }) - - // The process should kick-off independently by sending the upgrade request to the server. - let (clientChannel, upgradeResult) = try setUpClientChannel( - clientUpgraders: [basicUpgrader], - notUpgradingCompletionHandler: { $0.eventLoop.makeSucceededVoidFuture() } - ) - - // Push the successful server response but with an incorrect response key. - let response = "HTTP/1.1 101 Switching Protocols\r\nConnection: upgrade\r\nUpgrade: websocket\r\nSec-WebSocket-Accept:\(responseKey)\r\n\r\n" - - XCTAssertThrowsError(try clientChannel.writeInbound(clientChannel.allocator.buffer(string: response))) { error in - XCTAssertEqual(error as? NIOHTTPClientUpgradeError, NIOHTTPClientUpgradeError.upgraderDeniedUpgrade) - } - - // Close the pipeline. - XCTAssertNoThrow(try clientChannel.close().wait()) - - XCTAssertThrowsError(try upgradeResult.wait()) { error in - XCTAssertEqual(error as? NIOHTTPClientUpgradeError, NIOHTTPClientUpgradeError.upgraderDeniedUpgrade) - } - } - - override func testRejectUpgradeIfNotWebsocket() throws { - let requestKey = "OfS0wDaT5NoxF2gqm7Zj2YtetzM=" - let responseKey = "yKEqitDFPE81FyIhKTm+ojBqigk=" - - let basicUpgrader = NIOTypedWebSocketClientUpgrader( - requestKey: requestKey, - upgradePipelineHandler: { (channel: Channel, _: HTTPResponseHead) in - channel.pipeline.addHandler(WebSocketRecorderHandler()) - }) - - // The process should kick-off independently by sending the upgrade request to the server. - let (clientChannel, upgradeResult) = try setUpClientChannel( - clientUpgraders: [basicUpgrader], - notUpgradingCompletionHandler: { $0.eventLoop.makeSucceededVoidFuture() } - ) - - // Push the successful server response with an incorrect protocol. - let response = "HTTP/1.1 101 Switching Protocols\r\nConnection: upgrade\r\nUpgrade: myProtocol\r\nSec-WebSocket-Accept:\(responseKey)\r\n\r\n" - - XCTAssertThrowsError(try clientChannel.writeInbound(clientChannel.allocator.buffer(string: response))) { error in - XCTAssertEqual(error as? NIOHTTPClientUpgradeError, NIOHTTPClientUpgradeError.responseProtocolNotFound) - } - - // Close the pipeline. - XCTAssertNoThrow(try clientChannel.close().wait()) - - XCTAssertThrowsError(try upgradeResult.wait()) { error in - XCTAssertEqual(error as? NIOHTTPClientUpgradeError, NIOHTTPClientUpgradeError.responseProtocolNotFound) - } - } - - override fileprivate func runSuccessfulUpgrade() throws -> (EmbeddedChannel, WebSocketRecorderHandler) { - let handler = WebSocketRecorderHandler() - - let basicUpgrader = NIOTypedWebSocketClientUpgrader( - requestKey: "OfS0wDaT5NoxF2gqm7Zj2YtetzM=", - upgradePipelineHandler: { (channel: Channel, _: HTTPResponseHead) in - channel.pipeline.addHandler(handler) - }) - - // The process should kick-off independently by sending the upgrade request to the server. - let (clientChannel, upgradeResult) = try setUpClientChannel( - clientUpgraders: [basicUpgrader], - notUpgradingCompletionHandler: { $0.eventLoop.makeSucceededVoidFuture() } - ) - - // Push the successful server response. - let response = "HTTP/1.1 101 Switching Protocols\r\nConnection: upgrade\r\nUpgrade: websocket\r\nSec-WebSocket-Accept:yKEqitDFPE81FyIhKTm+ojBqigk=\r\n\r\n" - - XCTAssertNoThrow(try clientChannel.writeInbound(clientChannel.allocator.buffer(string: response))) - - clientChannel.embeddedEventLoop.run() - - // We now have a successful upgrade, clear the output channels read to test the frames. - XCTAssertNoThrow(try clientChannel.readOutbound(as: ByteBuffer.self)) - - clientChannel.embeddedEventLoop.run() - - try upgradeResult.wait() - - return (clientChannel, handler) - } -} diff --git a/Tests/NIOWebSocketTests/WebSocketServerEndToEndTests.swift b/Tests/NIOWebSocketTests/WebSocketServerEndToEndTests.swift index 44246e3ab0..2a1a3c6980 100644 --- a/Tests/NIOWebSocketTests/WebSocketServerEndToEndTests.swift +++ b/Tests/NIOWebSocketTests/WebSocketServerEndToEndTests.swift @@ -526,30 +526,3 @@ class WebSocketServerEndToEndTests: XCTestCase { XCTAssertNoThrow(XCTAssertEqual([], try server.readAllOutboundBytes())) } } - -@available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) -final class TypedWebSocketServerEndToEndTests: WebSocketServerEndToEndTests { - override func createTestFixtures( - upgraders: [WebSocketServerUpgraderConfiguration] - ) -> (loop: EmbeddedEventLoop, serverChannel: EmbeddedChannel, clientChannel: EmbeddedChannel) { - let loop = EmbeddedEventLoop() - let serverChannel = EmbeddedChannel(loop: loop) - let upgraders = upgraders.map { NIOTypedWebSocketServerUpgrader( - maxFrameSize: $0.maxFrameSize, - enableAutomaticErrorHandling: $0.automaticErrorHandling, - shouldUpgrade: $0.shouldUpgrade, - upgradePipelineHandler: $0.upgradePipelineHandler - )} - - XCTAssertNoThrow(try serverChannel.pipeline.syncOperations.configureUpgradableHTTPServerPipeline( - configuration: .init( - upgradeConfiguration: NIOTypedHTTPServerUpgradeConfiguration( - upgraders: upgraders, - notUpgradingCompletionHandler: { $0.eventLoop.makeSucceededVoidFuture() } - ) - ) - )) - let clientChannel = EmbeddedChannel(loop: loop) - return (loop: loop, serverChannel: serverChannel, clientChannel: clientChannel) - } -}