Skip to content

Commit

Permalink
Add concurrency extension, refactor Connection Tester to use AsyncSeq…
Browse files Browse the repository at this point in the history
  • Loading branch information
mallexxx authored and afterxleep committed Aug 2, 2023
1 parent 7c5a0bc commit a5d33aa
Show file tree
Hide file tree
Showing 10 changed files with 440 additions and 14 deletions.
143 changes: 143 additions & 0 deletions Sources/Common/Concurrency/AsyncStream.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// SPDX-License-Identifier: MIT
// Copyright (c) 2023 Point-Free

extension AsyncStream {
/// Produces an `AsyncStream` from an `AsyncSequence` by consuming the sequence till it
/// terminates, ignoring any failure.
///
/// Useful as a kind of type eraser for live `AsyncSequence`-based dependencies.
///
/// For example, your feature may want to subscribe to screenshot notifications. You can model
/// this as a dependency client that returns an `AsyncStream`:
///
/// ```swift
/// struct ScreenshotsClient {
/// var screenshots: () -> AsyncStream<Void>
/// func callAsFunction() -> AsyncStream<Void> { self.screenshots() }
/// }
/// ```
///
/// The "live" implementation of the dependency can supply a stream by erasing the appropriate
/// `NotificationCenter.Notifications` async sequence:
///
/// ```swift
/// extension ScreenshotsClient {
/// static let live = Self(
/// screenshots: {
/// AsyncStream(
/// NotificationCenter.default
/// .notifications(named: UIApplication.userDidTakeScreenshotNotification)
/// .map { _ in }
/// )
/// }
/// )
/// }
/// ```
///
/// While your tests can use `AsyncStream.makeStream` to spin up a controllable stream for tests:
///
/// ```swift
/// func testScreenshots() {
/// let screenshots = AsyncStream.makeStream(of: Void.self)
///
/// let model = withDependencies {
/// $0.screenshots = { screenshots.stream }
/// } operation: {
/// FeatureModel()
/// }
///
/// XCTAssertEqual(model.screenshotCount, 0)
/// screenshots.continuation.yield() // Simulate a screenshot being taken.
/// XCTAssertEqual(model.screenshotCount, 1)
/// }
/// ```
///
/// - Parameter sequence: An async sequence.
public init<S: AsyncSequence>(_ sequence: S) where S.Element == Element {
var iterator: S.AsyncIterator?
self.init {
if iterator == nil {
iterator = sequence.makeAsyncIterator()
}
return try? await iterator?.next()
}
}

#if swift(<5.9)
/// Constructs and returns a stream along with its backing continuation.
///
/// A back-port of [SE-0388: Convenience Async[Throwing]Stream.makeStream methods][se-0388].
///
/// This is handy for immediately escaping the continuation from an async stream, which
/// typically requires multiple steps:
///
/// ```swift
/// var _continuation: AsyncStream<Int>.Continuation!
/// let stream = AsyncStream<Int> { continuation = $0 }
/// let continuation = _continuation!
///
/// // vs.
///
/// let (stream, continuation) = AsyncStream.makeStream(of: Int.self)
/// ```
///
/// This tool is usually used for tests where we need to supply an async sequence to a
/// dependency endpoint and get access to its continuation so that we can emulate the dependency
/// emitting data. For example, suppose you have a dependency exposing an async sequence for
/// listening to notifications. To test this you can use `makeStream`:
///
/// ```swift
/// func testScreenshots() {
/// let screenshots = AsyncStream.makeStream(of: Void.self)
///
/// let model = withDependencies {
/// $0.screenshots = { screenshots.stream }
/// } operation: {
/// FeatureModel()
/// }
///
/// XCTAssertEqual(model.screenshotCount, 0)
/// screenshots.continuation.yield() // Simulate a screenshot being taken.
/// XCTAssertEqual(model.screenshotCount, 1)
/// }
/// ```
///
/// > Warning: ⚠️ `AsyncStream` does not support multiple subscribers, therefore you can only
/// > use this helper to test features that do not subscribe multiple times to the dependency
/// > endpoint.
///
/// [se-0388]: https://github.com/apple/swift-evolution/blob/main/proposals/0388-async-stream-factory.md
///
/// - Parameters:
/// - elementType: The type of element the `AsyncStream` produces.
/// - limit: A Continuation.BufferingPolicy value to set the stream’s buffering behavior. By
/// default, the stream buffers an unlimited number of elements. You can also set the policy
/// to buffer a specified number of oldest or newest elements.
/// - Returns: An `AsyncStream`.
public static func makeStream(
of elementType: Element.Type = Element.self,
bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
) -> (stream: Self, continuation: Continuation) {
var continuation: Continuation!
return (Self(elementType, bufferingPolicy: limit) { continuation = $0 }, continuation)
}
#endif

/// An `AsyncStream` that never emits and never completes unless cancelled.
public static var never: Self {
Self { _ in }
}

/// An `AsyncStream` that never emits and completes immediately.
public static var finished: Self {
Self { $0.finish() }
}
}

extension AsyncSequence {
/// Erases this async sequence to an async stream that produces elements till this sequence
/// terminates (or fails).
public func eraseToStream() -> AsyncStream<Element> {
AsyncStream(self)
}
}
44 changes: 44 additions & 0 deletions Sources/Common/Concurrency/TaskTimeout.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
//
// TaskTimeout.swift
//
// Copyright © 2023 DuckDuckGo. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

import Foundation

public func withTimeout<T>(_ timeout: TimeInterval, throwing error: @autoclosure @escaping () -> Error, do operation: @escaping () async throws -> T) async throws -> T {
try await withThrowingTaskGroup(of: T.self) { group -> T in
group.addTask {
try await operation()
}

group.addTask {
try await Task.sleep(interval: timeout)
throw error()
}

// If the timeout finishes first, it will throw and cancel the long running task.
for try await result in group {
group.cancelAll()
return result
}

fatalError("unexpected flow")
}
}

public func withTimeout<T>(_ timeout: TimeInterval, file: StaticString = #file, line: UInt = #line, do operation: @escaping () async throws -> T) async throws -> T {
try await withTimeout(timeout, throwing: TimeoutError(interval: timeout, file: file, line: line), do: operation)
}
55 changes: 55 additions & 0 deletions Sources/Common/TimeoutError.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//
// TimeoutError.swift
//
// Copyright © 2023 DuckDuckGo. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

import Foundation

public struct TimeoutError: Error, LocalizedError, CustomDebugStringConvertible {

#if DEBUG
public let interval: TimeInterval?
public let description: String?
public let date: Date
public let file: StaticString
public let line: UInt

public var errorDescription: String? {
"TimeoutError(started: \(date), \(interval != nil ? "timeout: \(interval!)s, " : "")\(description != nil ? " description: " + description! : "") at \(file):\(line))"
}

#else

public var errorDescription: String? {
"Timeout"
}
#endif

public init(interval: TimeInterval? = nil, description: String? = nil, date: Date = Date(), file: StaticString = #file, line: UInt = #line) {
#if DEBUG
self.interval = interval
self.description = description
self.date = date
self.file = file
self.line = line
#endif
}

public var debugDescription: String {
errorDescription!
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ final class NetworkProtectionConnectionTester {

Task {
// This is a bit ugly, but it's a quick way to run the tests in parallel without a task group.
async let vpnConnected = Self.testConnection(name: "VPN", parameters: vpnParameters)
async let localConnected = Self.testConnection(name: "Local", parameters: localParameters)
async let vpnConnected = testConnection(name: "VPN", parameters: vpnParameters)
async let localConnected = testConnection(name: "Local", parameters: localParameters)
let vpnIsConnected = await vpnConnected
let localIsConnected = await localConnected

Expand All @@ -248,21 +248,29 @@ final class NetworkProtectionConnectionTester {
}
}

private static func testConnection(name: String, parameters: NWParameters) async -> Bool {
private func testConnection(name: String, parameters: NWParameters) async -> Bool {
let connection = NWConnection(to: Self.endpoint, using: parameters)
var didConnect = false

connection.stateUpdateHandler = { state in
if case .ready = state {
didConnect = true
}
}
let stateUpdateStream = connection.stateUpdateStream

connection.start(queue: Self.connectionTestQueue)
try? await Task.sleep(interval: connectionTimeout)
connection.cancel()
defer {
connection.cancel()
}

return didConnect
do {
// await for .ready connection state and consider working
return try await withTimeout(Self.connectionTimeout) {
for await state in stateUpdateStream {
if case .ready = state {
return true
}
}
return false
}
} catch {
// timeout
return false
}
}

// MARK: - Result handling
Expand Down
53 changes: 53 additions & 0 deletions Sources/NetworkProtection/Networking/NWConnectionExtension.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
//
// NWConnectionExtension.swift
//
// Copyright © 2023 DuckDuckGo. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

import Foundation
import Network

extension NWConnection {

var stateUpdateStream: AsyncStream<State> {
let (stream, continuation) = AsyncStream.makeStream(of: State.self)

class ConnectionLifeTimeTracker {
let continuation: AsyncStream<State>.Continuation
init(continuation: AsyncStream<State>.Continuation) {
self.continuation = continuation
}
deinit {
continuation.finish()
}
}
let connectionLifeTimeTracker = ConnectionLifeTimeTracker(continuation: continuation)

self.stateUpdateHandler = { state in
withExtendedLifetime(connectionLifeTimeTracker) {
_=continuation.yield(state)

switch state {
case .cancelled, .failed:
continuation.finish()
default: break
}
}
}

return stream
}

}
Loading

0 comments on commit a5d33aa

Please sign in to comment.