From 37f5be7504cecef1dccba23d63737eb75c013563 Mon Sep 17 00:00:00 2001 From: Simon Whitty Date: Sat, 27 Aug 2022 14:16:26 +1000 Subject: [PATCH] CancellingContinuation --- .../Sources/Continuation+Extensions.swift | 91 -------------- FlyingFox/Sources/HTTPServer+Listening.swift | 48 +------- FlyingFox/Sources/URLSession+Async.swift | 25 ++-- FlyingFox/Tests/HTTPServerTests.swift | 2 + .../Sources/CancellingContinuation.swift | 112 ++++++++++++++++++ .../Sources/Continuation+Extensions.swift | 91 -------------- FlyingSocks/Sources/PollingSocketPool.swift | 108 ++++------------- 7 files changed, 163 insertions(+), 314 deletions(-) delete mode 100644 FlyingFox/Sources/Continuation+Extensions.swift create mode 100644 FlyingSocks/Sources/CancellingContinuation.swift delete mode 100644 FlyingSocks/Sources/Continuation+Extensions.swift diff --git a/FlyingFox/Sources/Continuation+Extensions.swift b/FlyingFox/Sources/Continuation+Extensions.swift deleted file mode 100644 index 9c5f2c19..00000000 --- a/FlyingFox/Sources/Continuation+Extensions.swift +++ /dev/null @@ -1,91 +0,0 @@ -// -// Continuation+Extensions.swift -// FlyingFox -// -// Created by Simon Whitty on 17/02/2022. -// Copyright © 2022 Simon Whitty. All rights reserved. -// -// Distributed under the permissive MIT license -// Get the latest version from here: -// -// https://github.com/swhitty/FlyingFox -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE. -// - -import Foundation - -func withCancellingContinuation(function: String = #function, - returning returnType: T.Type, - body: (CheckedContinuation, inout CancellationHandler) -> Void) async throws -> T { - let inner = CancellationHandler.Inner() - return try await withTaskCancellationHandler( - operation: { - try await withCheckedThrowingContinuation(function: function) { (continuation: CheckedContinuation) in - var handler = CancellationHandler(inner: inner) - body(continuation, &handler) - } - }, - onCancel: inner.cancel) -} - -struct CancellationHandler { - - fileprivate let inner: Inner - - @Sendable - mutating func onCancel(_ handler: @escaping () -> Void) { - inner.onCancel(handler) - } -} - -extension CancellationHandler { - - final class Inner { - - private let lock = NSLock() - private var isCancelled: Bool = false - private var handler: (() -> Void)? - - @Sendable - func onCancel(_ handler: @escaping () -> Void) { - lock.lock() - self.handler = handler - let isCancelled = self.isCancelled - lock.unlock() - - if isCancelled { - handler() - } - } - - @Sendable - fileprivate func cancel() { - lock.lock() - isCancelled = true - let handler = self.handler - lock.unlock() - - handler?() - } - } -} - - - diff --git a/FlyingFox/Sources/HTTPServer+Listening.swift b/FlyingFox/Sources/HTTPServer+Listening.swift index 486b1c0f..3ce92857 100644 --- a/FlyingFox/Sources/HTTPServer+Listening.swift +++ b/FlyingFox/Sources/HTTPServer+Listening.swift @@ -30,6 +30,7 @@ // import Foundation +import FlyingSocks extension HTTPServer { @@ -41,24 +42,10 @@ extension HTTPServer { private func doWaitUntilListening() async throws { guard !isListening else { return } - try await withCancellingContinuation(returning: Void.self) { c, handler in - let continuation = Continuation(c) - waiting.insert(continuation) - handler.onCancel { - self.cancelContinuation(continuation) - } - } - } - - // Careful not to escape non-isolated method - // https://bugs.swift.org/browse/SR-15745 - nonisolated private func cancelContinuation(_ continuation: Continuation) { - Task { await _cancelContinuation(continuation) } - } - - private func _cancelContinuation(_ continuation: Continuation) { - guard let removed = waiting.remove(continuation) else { return } - removed.cancel() + let continuation = Continuation() + waiting.insert(continuation) + defer { waiting.remove(continuation) } + return try await continuation.value } func isListeningDidUpdate(from previous: Bool) { @@ -71,28 +58,5 @@ extension HTTPServer { } } - final class Continuation: Hashable { - - private let continuation: CheckedContinuation - - init(_ continuation: CheckedContinuation) { - self.continuation = continuation - } - - func resume() { - continuation.resume() - } - - func cancel() { - continuation.resume(throwing: CancellationError()) - } - - func hash(into hasher: inout Hasher) { - ObjectIdentifier(self).hash(into: &hasher) - } - - static func == (lhs: Continuation, rhs: Continuation) -> Bool { - lhs === rhs - } - } + typealias Continuation = CancellingContinuation } diff --git a/FlyingFox/Sources/URLSession+Async.swift b/FlyingFox/Sources/URLSession+Async.swift index cb2ba0e1..aabd7ae6 100644 --- a/FlyingFox/Sources/URLSession+Async.swift +++ b/FlyingFox/Sources/URLSession+Async.swift @@ -33,6 +33,7 @@ import Foundation #if canImport(FoundationNetworking) import FoundationNetworking #endif +import FlyingSocks @available(iOS, deprecated: 15.0, message: "use data(for request: URLRequest) directly") @available(tvOS, deprecated: 15.0, message: "use data(for request: URLRequest) directly") @@ -51,15 +52,23 @@ extension URLSession { } func makeData(for request: URLRequest) async throws -> (Data, URLResponse) { - try await withCancellingContinuation(returning: (Data, URLResponse).self) { continuation, handler in - let task = dataTask(with: request) { data, response, error in - guard let data = data, let response = response else { - return continuation.resume(throwing: error!) - } - continuation.resume(returning: (data, response)) + let continuation = CancellingContinuation<(Data, URLResponse), Error>() + let task = dataTask(with: request) { data, response, error in + guard let data = data, let response = response else { + continuation.resume(throwing: error!) + return } - task.resume() - handler.onCancel(task.cancel) + continuation.resume(returning: (data, response)) + } + defer { task.cancel() } + task.resume() + + do { + return try await continuation.value + } catch is CancellationError { + throw URLError(.cancelled) + } catch { + throw error } } } diff --git a/FlyingFox/Tests/HTTPServerTests.swift b/FlyingFox/Tests/HTTPServerTests.swift index cad81f80..3c4eec66 100644 --- a/FlyingFox/Tests/HTTPServerTests.swift +++ b/FlyingFox/Tests/HTTPServerTests.swift @@ -195,6 +195,8 @@ final class HTTPServerTests: XCTestCase { return .make(statusCode: .ok) } let task = Task { try await server.start() } + defer { task.cancel() } + try await server.waitUntilListening() let request = URLRequest(url: URL(string: "http://localhost:8008")!) let (_, response) = try await URLSession.shared.makeData(for: request) diff --git a/FlyingSocks/Sources/CancellingContinuation.swift b/FlyingSocks/Sources/CancellingContinuation.swift new file mode 100644 index 00000000..58b23331 --- /dev/null +++ b/FlyingSocks/Sources/CancellingContinuation.swift @@ -0,0 +1,112 @@ +// +// CancellingContinuation.swift +// FlyingFox +// +// Created by Simon Whitty on 27/08/2022. +// Copyright © 2022 Simon Whitty. All rights reserved. +// +// Distributed under the permissive MIT license +// Get the latest version from here: +// +// https://github.com/swhitty/FlyingFox +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +// + +/// Wrapper around `CheckedContinuation` throwing CancellationError when the +/// task is cancelled. +public struct CancellingContinuation { + + private let inner: Inner + + public init(function: String = #function) { + self.inner = Inner(function: function) + } + + public var value: Success { + get async throws { + try await withTaskCancellationHandler { + cancel() + } operation: { + try await inner.getValue() + } + } + } + + public func resume(returning value: Success) { + Task { await inner.resume(with: .success(value)) } + } + + public func resume() where Success == Void { + resume(returning: ()) + } + + public func resume(throwing error: Failure) { + Task { await inner.resume(with: .failure(error)) } + } + + public func cancel() { + Task { await inner.resume(with: .failure(CancellationError())) } + } +} + +private extension CancellingContinuation { + + actor Inner { + private let function: String + private var continuation: CheckedContinuation? + private var result: Result? + private var hasStarted: Bool = false + + init(function: String) { + self.function = function + } + + func getValue() async throws -> Success { + precondition(hasStarted == false, "Can only wait a single time.") + hasStarted = true + if let result = result { + return try result.get() + } else { + return try await withCheckedThrowingContinuation(function: function) { + continuation = $0 + } + } + } + + func resume(with result: Result) { + if let continuation = continuation { + self.continuation = nil + continuation.resume(with: result) + } else if self.result == nil { + self.result = result + } + } + } +} + +extension CancellingContinuation: Hashable { + public static func == (lhs: CancellingContinuation, rhs: CancellingContinuation) -> Bool { + lhs.inner === rhs.inner + } + + public func hash(into hasher: inout Hasher) { + ObjectIdentifier(inner).hash(into: &hasher) + } +} diff --git a/FlyingSocks/Sources/Continuation+Extensions.swift b/FlyingSocks/Sources/Continuation+Extensions.swift deleted file mode 100644 index 9c5f2c19..00000000 --- a/FlyingSocks/Sources/Continuation+Extensions.swift +++ /dev/null @@ -1,91 +0,0 @@ -// -// Continuation+Extensions.swift -// FlyingFox -// -// Created by Simon Whitty on 17/02/2022. -// Copyright © 2022 Simon Whitty. All rights reserved. -// -// Distributed under the permissive MIT license -// Get the latest version from here: -// -// https://github.com/swhitty/FlyingFox -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE. -// - -import Foundation - -func withCancellingContinuation(function: String = #function, - returning returnType: T.Type, - body: (CheckedContinuation, inout CancellationHandler) -> Void) async throws -> T { - let inner = CancellationHandler.Inner() - return try await withTaskCancellationHandler( - operation: { - try await withCheckedThrowingContinuation(function: function) { (continuation: CheckedContinuation) in - var handler = CancellationHandler(inner: inner) - body(continuation, &handler) - } - }, - onCancel: inner.cancel) -} - -struct CancellationHandler { - - fileprivate let inner: Inner - - @Sendable - mutating func onCancel(_ handler: @escaping () -> Void) { - inner.onCancel(handler) - } -} - -extension CancellationHandler { - - final class Inner { - - private let lock = NSLock() - private var isCancelled: Bool = false - private var handler: (() -> Void)? - - @Sendable - func onCancel(_ handler: @escaping () -> Void) { - lock.lock() - self.handler = handler - let isCancelled = self.isCancelled - lock.unlock() - - if isCancelled { - handler() - } - } - - @Sendable - fileprivate func cancel() { - lock.lock() - isCancelled = true - let handler = self.handler - lock.unlock() - - handler?() - } - } -} - - - diff --git a/FlyingSocks/Sources/PollingSocketPool.swift b/FlyingSocks/Sources/PollingSocketPool.swift index 6e2c8e32..c6f0c1b1 100644 --- a/FlyingSocks/Sources/PollingSocketPool.swift +++ b/FlyingSocks/Sources/PollingSocketPool.swift @@ -48,36 +48,31 @@ public final actor PollingSocketPool: AsyncSocketPool { public func suspendSocket(_ socket: Socket, untilReadyFor events: Socket.Events) async throws { let socket = SuspendedSocket(file: socket.file, events: events) - return try await withCancellingContinuation(returning: Void.self) { continuation, handler in - let continuation = Continuation(continuation) - appendContinuation(continuation, for: socket) - handler.onCancel { - self.removeContinuation(continuation, for: socket) - } - } + let continuation = Continuation() + defer { removeContinuation(continuation, for: socket) } + appendContinuation(continuation, for: socket) + return try await continuation.value } private let pollInterval: Interval private let loopInterval: Interval + + typealias Continuation = CancellingContinuation private var waiting: [SuspendedSocket: Set] = [:] { didSet { if !waiting.isEmpty, let continuation = loop { - loop = nil continuation.resume() } } } - private var loop: Continuation? + private var loop: CancellingContinuation? private func suspendLoopUntilSocketsExist() async throws { - try await withCancellingContinuation(returning: Void.self) { continuation, handler in - let continuation = Continuation(continuation) - loop = continuation - handler.onCancel { - continuation.cancel() - } - } + let continuation = CancellingContinuation() + loop = continuation + defer { loop = nil } + return try await continuation.value } private func appendContinuation(_ continuation: Continuation, for socket: SuspendedSocket) { @@ -90,16 +85,9 @@ public final actor PollingSocketPool: AsyncSocketPool { waiting[socket] = existing } - private func _removeContinuation(_ continuation: Continuation, for socket: SuspendedSocket) { + private func removeContinuation(_ continuation: Continuation, for socket: SuspendedSocket) { guard waiting[socket]?.contains(continuation) == true else { return } waiting[socket]?.remove(continuation) - continuation.cancel() - } - - // Careful not to escape non-isolated method - // https://bugs.swift.org/browse/SR-15745 - nonisolated private func removeContinuation(_ continuation: Continuation, for socket: SuspendedSocket) { - Task { await _removeContinuation(continuation, for: socket) } } private var state: State = .ready @@ -114,16 +102,18 @@ public final actor PollingSocketPool: AsyncSocketPool { guard state != .running else { throw Error("Not Ready") } state = .running - defer { - for continuation in waiting.values.flatMap({ $0 }) { - continuation.cancel() - } + do { + try await poll() + } catch { + let pending = waiting waiting = [:] state = .complete loop = nil + for continuation in pending.values.flatMap({ $0 }) { + continuation.cancel() + } + throw error } - - try await poll() } private func poll() async throws { @@ -154,16 +144,16 @@ public final actor PollingSocketPool: AsyncSocketPool { private func processPoll(socket: SuspendedSocket, revents: POLLEvents) { if revents.intersects(with: socket.events.pollEvents) { - let continuations = waiting[socket] + let continuations = waiting[socket] ?? [] waiting[socket] = nil - continuations?.forEach { - $0.resume() + for c in continuations { + c.resume() } } else if revents.intersects(with: .errors) { - let continuations = waiting[socket] + let continuations = waiting[socket] ?? [] waiting[socket] = nil - continuations?.forEach { - $0.disconnected() + for c in continuations { + c.resume(throwing: .disconnected) } } } @@ -172,52 +162,6 @@ public final actor PollingSocketPool: AsyncSocketPool { var file: Socket.FileDescriptor var events: Socket.Events } - - final class Continuation: Hashable, @unchecked Sendable { - - private let continuation: CheckedContinuation - @Locked private(set) var isComplete: Bool - - init(_ continuation: CheckedContinuation) { - self.continuation = continuation - self.isComplete = false - } - - func resume() { - _isComplete.unlock { - if $0 == false { - continuation.resume() - $0 = true - } - } - } - - func disconnected() { - _isComplete.unlock { - if $0 == false { - continuation.resume(throwing: SocketError.disconnected) - $0 = true - } - } - } - - func cancel() { - _isComplete.unlock { - if $0 == false { - continuation.resume(throwing: CancellationError()) - $0 = true - } - } - } - - func hash(into hasher: inout Hasher) { - ObjectIdentifier(self).hash(into: &hasher) - } - - static func == (lhs: Continuation, rhs: Continuation) -> Bool { - lhs === rhs - } - } } extension PollingSocketPool {