diff --git a/Package.swift b/Package.swift index c54f673c43..1ed800aa0e 100644 --- a/Package.swift +++ b/Package.swift @@ -23,6 +23,8 @@ var targets: [PackageDescription.Target] = [ "CNIODarwin", "CNIOWindows", "NIOConcurrencyHelpers"]), + .target(name: "_NIOConcurrency", + dependencies: ["NIO"]), .target(name: "NIOFoundationCompat", dependencies: ["NIO"]), .target(name: "CNIOAtomics", dependencies: []), .target(name: "CNIOSHA1", dependencies: []), @@ -65,6 +67,8 @@ var targets: [PackageDescription.Target] = [ dependencies: ["NIO", "NIOHTTP1"]), .target(name: "NIOCrashTester", dependencies: ["NIO", "NIOHTTP1", "NIOWebSocket", "NIOFoundationCompat"]), + .target(name: "NIOAsyncAwaitDemo", + dependencies: ["NIO", "NIOHTTP1", "_NIOConcurrency"]), .testTarget(name: "NIOTests", dependencies: ["NIO", "NIOFoundationCompat", "NIOTestUtils", "NIOConcurrencyHelpers"]), .testTarget(name: "NIOConcurrencyHelpersTests", @@ -86,6 +90,7 @@ let package = Package( products: [ .library(name: "NIO", targets: ["NIO"]), .library(name: "_NIO1APIShims", targets: ["_NIO1APIShims"]), + .library(name: "_NIOConcurrency", targets: ["_NIOConcurrency"]), .library(name: "NIOTLS", targets: ["NIOTLS"]), .library(name: "NIOHTTP1", targets: ["NIOHTTP1"]), .library(name: "NIOConcurrencyHelpers", targets: ["NIOConcurrencyHelpers"]), diff --git a/Sources/NIOAsyncAwaitDemo/AsyncChannelIO.swift b/Sources/NIOAsyncAwaitDemo/AsyncChannelIO.swift new file mode 100644 index 0000000000..9c85377c75 --- /dev/null +++ b/Sources/NIOAsyncAwaitDemo/AsyncChannelIO.swift @@ -0,0 +1,43 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2021 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIO +import NIOHTTP1 + +#if compiler(>=5.4) // we cannot write this on one line with `&&` because Swift 5.0 doesn't like it... +#if compiler(>=5.4) && $AsyncAwait +struct AsyncChannelIO { + let channel: Channel + + init(_ channel: Channel) { + self.channel = channel + } + + func start() async throws -> AsyncChannelIO { + try await channel.pipeline.addHandler(RequestResponseHandler()).get() + return self + } + + func sendRequest(_ request: Request) async throws -> Response { + let responsePromise: EventLoopPromise = channel.eventLoop.makePromise() + try await self.channel.writeAndFlush((request, responsePromise)).get() + return try await responsePromise.futureResult.get() + } + + func close() async throws { + try await self.channel.close() + } +} +#endif +#endif diff --git a/Sources/NIOAsyncAwaitDemo/FullRequestResponse.swift b/Sources/NIOAsyncAwaitDemo/FullRequestResponse.swift new file mode 100644 index 0000000000..7b1622631a --- /dev/null +++ b/Sources/NIOAsyncAwaitDemo/FullRequestResponse.swift @@ -0,0 +1,132 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2017-2021 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +// THIS FILE IS MOSTLY COPIED FROM swift-nio-extras + +import NIO +import NIOHTTP1 + +public final class MakeFullRequestHandler: ChannelOutboundHandler { + public typealias OutboundOut = HTTPClientRequestPart + public typealias OutboundIn = HTTPRequestHead + + public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { + let req = self.unwrapOutboundIn(data) + + context.write(self.wrapOutboundOut(.head(req)), promise: nil) + context.write(self.wrapOutboundOut(.end(nil)), promise: promise) + } +} + +/// `RequestResponseHandler` receives a `Request` alongside an `EventLoopPromise` from the `Channel`'s +/// outbound side. It will fulfill the promise with the `Response` once it's received from the `Channel`'s inbound +/// side. +/// +/// `RequestResponseHandler` does support pipelining `Request`s and it will send them pipelined further down the +/// `Channel`. Should `RequestResponseHandler` receive an error from the `Channel`, it will fail all promises meant for +/// the outstanding `Reponse`s and close the `Channel`. All requests enqueued after an error occured will be immediately +/// failed with the first error the channel received. +/// +/// `RequestResponseHandler` requires that the `Response`s arrive on `Channel` in the same order as the `Request`s +/// were submitted. +public final class RequestResponseHandler: ChannelDuplexHandler { + public typealias InboundIn = Response + public typealias InboundOut = Never + public typealias OutboundIn = (Request, EventLoopPromise) + public typealias OutboundOut = Request + + private enum State { + case operational + case error(Error) + + var isOperational: Bool { + switch self { + case .operational: + return true + case .error: + return false + } + } + } + + private var state: State = .operational + private var promiseBuffer: CircularBuffer> + + + /// Create a new `RequestResponseHandler`. + /// + /// - parameters: + /// - initialBufferCapacity: `RequestResponseHandler` saves the promises for all outstanding responses in a + /// buffer. `initialBufferCapacity` is the initial capacity for this buffer. You usually do not need to set + /// this parameter unless you intend to pipeline very deeply and don't want the buffer to resize. + public init(initialBufferCapacity: Int = 4) { + self.promiseBuffer = CircularBuffer(initialCapacity: initialBufferCapacity) + } + + public func channelInactive(context: ChannelHandlerContext) { + switch self.state { + case .error: + // We failed any outstanding promises when we entered the error state and will fail any + // new promises in write. + assert(self.promiseBuffer.count == 0) + case .operational: + let promiseBuffer = self.promiseBuffer + self.promiseBuffer.removeAll() + promiseBuffer.forEach { promise in + promise.fail(ChannelError.eof) + } + } + context.fireChannelInactive() + } + + public func channelRead(context: ChannelHandlerContext, data: NIOAny) { + guard self.state.isOperational else { + // we're in an error state, ignore further responses + assert(self.promiseBuffer.count == 0) + return + } + + let response = self.unwrapInboundIn(data) + let promise = self.promiseBuffer.removeFirst() + + promise.succeed(response) + } + + public func errorCaught(context: ChannelHandlerContext, error: Error) { + guard self.state.isOperational else { + assert(self.promiseBuffer.count == 0) + return + } + self.state = .error(error) + let promiseBuffer = self.promiseBuffer + self.promiseBuffer.removeAll() + context.close(promise: nil) + promiseBuffer.forEach { + $0.fail(error) + } + } + + public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { + let (request, responsePromise) = self.unwrapOutboundIn(data) + switch self.state { + case .error(let error): + assert(self.promiseBuffer.count == 0) + responsePromise.fail(error) + promise?.fail(error) + case .operational: + self.promiseBuffer.append(responsePromise) + context.write(self.wrapOutboundOut(request), promise: promise) + } + } +} diff --git a/Sources/NIOAsyncAwaitDemo/main.swift b/Sources/NIOAsyncAwaitDemo/main.swift new file mode 100644 index 0000000000..804fdea8df --- /dev/null +++ b/Sources/NIOAsyncAwaitDemo/main.swift @@ -0,0 +1,77 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2021 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIO +import _NIOConcurrency +import NIOHTTP1 +import Dispatch + +#if compiler(>=5.4) // we cannot write this on one line with `&&` because Swift 5.0 doesn't like it... +#if compiler(>=5.4) && $AsyncAwait + +import _Concurrency + +let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) +defer { + try! group.syncShutdownGracefully() +} + +func makeHTTPChannel(host: String, port: Int) async throws -> AsyncChannelIO { + let channel = try await ClientBootstrap(group: group).connect(host: host, port: port).get() + try await channel.pipeline.addHTTPClientHandlers().get() + try await channel.pipeline.addHandler(NIOHTTPClientResponseAggregator(maxContentLength: 1_000_000)) + try await channel.pipeline.addHandler(MakeFullRequestHandler()) + return try await AsyncChannelIO(channel).start() +} + +func main() async { + do { + let channel = try await makeHTTPChannel(host: "httpbin.org", port: 80) + print("OK, connected to \(channel)") + + print("Sending request 1", terminator: "") + let response1 = try await channel.sendRequest(HTTPRequestHead(version: .http1_1, + method: .GET, + uri: "/base64/SGVsbG8gV29ybGQsIGZyb20gSFRUUEJpbiEgCg==", + headers: ["host": "httpbin.org"])) + print(", response:", String(buffer: response1.body ?? ByteBuffer())) + + print("Sending request 2", terminator: "") + let response2 = try await channel.sendRequest(HTTPRequestHead(version: .http1_1, + method: .GET, + uri: "/get", + headers: ["host": "httpbin.org"])) + print(", response:", String(buffer: response2.body ?? ByteBuffer())) + + try await channel.close() + print("all, done") + } catch { + print("ERROR: \(error)") + } +} + +let dg = DispatchGroup() +dg.enter() +let task = Task.runDetached { + await main() + dg.leave() +} +dg.wait() +#else +print("ERROR: This demo only works with async/await enabled (NIO.System.hasAsyncAwaitSupport = \(NIO.System.hasAsyncAwaitSupport))") +print("Try: swift run -Xswiftc -Xfrontend -Xswiftc -enable-experimental-concurrency NIOAsyncAwaitDemo") +#endif +#else +print("ERROR: Concurrency only supported on Swift > 5.4.") +#endif diff --git a/Sources/_NIOConcurrency/AsyncAwaitSupport.swift b/Sources/_NIOConcurrency/AsyncAwaitSupport.swift new file mode 100644 index 0000000000..b1f1dac0f7 --- /dev/null +++ b/Sources/_NIOConcurrency/AsyncAwaitSupport.swift @@ -0,0 +1,175 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2021 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIO + +#if compiler(>=5.4) // we cannot write this on one line with `&&` because Swift 5.0 doesn't like it... +#if compiler(>=5.4) && $AsyncAwait +import _Concurrency + +extension EventLoopFuture { + /// Get the value/error from an `EventLoopFuture` in an `async` context. + /// + /// This function can be used to bridge an `EventLoopFuture` into the `async` world. Ie. if you're in an `async` + /// function and want to get the result of this future. + public func get() async throws -> Value { + return try await withUnsafeThrowingContinuation { cont in + self.whenComplete { result in + switch result { + case .success(let value): + cont.resume(returning: value) + case .failure(let error): + cont.resume(throwing: error) + } + } + } + } +} + +extension EventLoopPromise { + /// Complete a future with the result (or error) of the `async` function `body`. + /// + /// This function can be used to bridge the `async` world into an `EventLoopPromise`. + /// + /// - parameters: + /// - body: The `async` function to run. + public func completeWithAsync(_ body: @escaping () async throws -> Value) { + Task.runDetached { + do { + let value = try await body() + self.succeed(value) + } catch { + self.fail(error) + } + } + } +} + +extension Channel { + /// Shortcut for calling `write` and `flush`. + /// + /// - parameters: + /// - data: the data to write + /// - promise: the `EventLoopPromise` that will be notified once the `write` operation completes, + /// or `nil` if not interested in the outcome of the operation. + public func writeAndFlush(_ any: T) async throws { + try await self.writeAndFlush(any).get() + } + + /// Set `option` to `value` on this `Channel`. + public func setOption(_ option: Option, value: Option.Value) async throws { + try await self.setOption(option, value: value).get() + } + + /// Get the value of `option` for this `Channel`. + public func getOption(_ option: Option) async throws -> Option.Value { + return try await self.getOption(option).get() + } +} + +extension ChannelOutboundInvoker { + /// Register on an `EventLoop` and so have all its IO handled. + /// + /// - returns: the future which will be notified once the operation completes. + public func register(file: StaticString = #file, line: UInt = #line) async throws { + try await self.register(file: file, line: line).get() + } + + /// Bind to a `SocketAddress`. + /// - parameters: + /// - to: the `SocketAddress` to which we should bind the `Channel`. + /// - returns: the future which will be notified once the operation completes. + public func bind(to address: SocketAddress, file: StaticString = #file, line: UInt = #line) async throws { + try await self.bind(to: address, file: file, line: line).get() + } + + /// Connect to a `SocketAddress`. + /// - parameters: + /// - to: the `SocketAddress` to which we should connect the `Channel`. + /// - returns: the future which will be notified once the operation completes. + public func connect(to address: SocketAddress, file: StaticString = #file, line: UInt = #line) async throws { + try await self.connect(to: address, file: file, line: line).get() + } + + /// Shortcut for calling `write` and `flush`. + /// + /// - parameters: + /// - data: the data to write + /// - returns: the future which will be notified once the `write` operation completes. + public func writeAndFlush(_ data: NIOAny, file: StaticString = #file, line: UInt = #line) async throws { + try await self.writeAndFlush(data, file: file, line: line).get() + } + + /// Close the `Channel` and so the connection if one exists. + /// + /// - parameters: + /// - mode: the `CloseMode` that is used + /// - returns: the future which will be notified once the operation completes. + public func close(mode: CloseMode = .all, file: StaticString = #file, line: UInt = #line) async throws { + try await self.close(mode: mode, file: file, line: line).get() + } + + /// Trigger a custom user outbound event which will flow through the `ChannelPipeline`. + /// + /// - parameters: + /// - event: the event itself. + /// - returns: the future which will be notified once the operation completes. + public func triggerUserOutboundEvent(_ event: Any, file: StaticString = #file, line: UInt = #line) async throws { + try await self.triggerUserOutboundEvent(event, file: file, line: line).get() + } +} + +extension ChannelPipeline { + public func addHandler(_ handler: ChannelHandler, + name: String? = nil, + position: ChannelPipeline.Position = .last) async throws { + try await self.addHandler(handler, name: name, position: position).get() + } + + public func removeHandler(_ handler: RemovableChannelHandler) async throws { + try await self.removeHandler(handler).get() + } + + public func removeHandler(name: String) async throws { + try await self.removeHandler(name: name).get() + } + + public func removeHandler(context: ChannelHandlerContext) async throws { + try await self.removeHandler(context: context).get() + } + + public func context(handler: ChannelHandler) async throws -> ChannelHandlerContext { + return try await self.context(handler: handler).get() + } + + public func context(name: String) async throws -> ChannelHandlerContext { + return try await self.context(name: name).get() + } + + public func context(handlerType: Handler.Type) async throws -> ChannelHandlerContext { + return try await self.context(handlerType: handlerType).get() + } + + public func addHandlers(_ handlers: [ChannelHandler], + position: ChannelPipeline.Position = .last) async throws { + try await self.addHandlers(handlers, position: position).get() + } + + public func addHandlers(_ handlers: ChannelHandler..., + position: ChannelPipeline.Position = .last) async throws { + try await self.addHandlers(handlers, position: position) + } +} +#endif +#endif diff --git a/Sources/_NIOConcurrency/Helpers.swift b/Sources/_NIOConcurrency/Helpers.swift new file mode 100644 index 0000000000..c6aa5d182a --- /dev/null +++ b/Sources/_NIOConcurrency/Helpers.swift @@ -0,0 +1,33 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2021 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIO + +// note: We have to define the variable `hasAsyncAwait` here because if we copy this code into the property below, +// it doesn't compile on Swift 5.0. +#if compiler(>=5.4) +#if compiler(>=5.4) // we cannot write this on one line with `&&` because Swift 5.0 doesn't like it... +fileprivate let hasAsyncAwait = true +#else +fileprivate let hasAsyncAwait = false +#endif +#else +fileprivate let hasAsyncAwait = false +#endif + +extension NIO.System { + public static var hasAsyncAwaitSupport: Bool { + return hasAsyncAwait + } +}