diff --git a/Evolution/NNNN-broadcast.md b/Evolution/NNNN-broadcast.md new file mode 100644 index 00000000..1e96ac5c --- /dev/null +++ b/Evolution/NNNN-broadcast.md @@ -0,0 +1,305 @@ +# Broadcast + +* Proposal: [NNNN](NNNN-broadcast.md) +* Authors: [Tristan Celder](https://github.com/tcldr) +* Review Manager: TBD +* Status: **Awaiting implementation** + + + * Implementation: [[Source](https://github.com/tcldr/swift-async-algorithms/blob/pr/share/Sources/AsyncAlgorithms/AsyncBroadcastSequence.swift) | + [Tests](https://github.com/tcldr/swift-async-algorithms/blob/pr/share/Tests/AsyncAlgorithmsTests/TestBroadcast.swift)] + +## Introduction + +`AsyncBroadcastSequence` unlocks additional use cases for structured concurrency and asynchronous sequences by allowing almost any asynchronous sequence to be adapted for consumption by multiple concurrent consumers. + +## Motivation + +The need often arises to distribute the values of an asynchronous sequence to multiple consumers. Intuitively, it seems that a sequence _should_ be iterable by more than a single consumer, but many types of asynchronous sequence are restricted to supporting only one consumer at a time. + +One example of an asynchronous sequence that would naturally fit this 'one to many' shape is the output of a hardware sensor. A hypothetical hardware sensor might include the following API: + +```swift +public final class Accelerometer { + + public struct Event { /* ... */ } + + // exposed as a singleton to represent the single on-device sensor + public static let shared = Accelerometer() + + private init() {} + + public var updateHandler: ((Event) -> Void)? + + public func startAccelerometer() { /* ... */ } + public func stopAccelerometer() { /* ... */ } +} +``` + +To share the sensor data with a consumer through an asynchronous sequence you might choose an `AsyncStream`: + +```swift +final class OrientationMonitor { /* ... */ } +extension OrientationMonitor { + + static var orientation: AsyncStream { + AsyncStream { continuation in + Accelerometer.shared.updateHandler = { event in + continuation.yield(event) + } + continuation.onTermination = { @Sendable _ in + Accelerometer.shared.stopAccelerometer() + } + Accelerometer.shared.startAccelerometer() + } + } +} +``` + +With a single consumer, this pattern works as expected: + +```swift +let consumer1 = Task { + for await orientation in OrientationMonitor.orientation { + print("Consumer 1: Orientation: \(orientation)") + } +} +// Output: +// Consumer 1: Orientation: (0.0, 1.0, 0.0) +// Consumer 1: Orientation: (0.0, 0.8, 0.0) +// Consumer 1: Orientation: (0.0, 0.6, 0.0) +// Consumer 1: Orientation: (0.0, 0.4, 0.0) +// ... +``` + +However, as soon as a second consumer comes along, data for the first consumer stops. This is because the singleton `Accelerometer.shared.updateHandler` is updated within the closure for the creation of the second `AsyncStream`. This has the effect of redirecting all Accelerometer data to the second stream. + +One attempted workaround might be to vend a single `AsyncStream` to all consumers: + +```swift +extension OrientationMonitor { + + static let orientation: AsyncStream = { + AsyncStream { continuation in + Accelerometer.shared.updateHandler = { event in + continuation.yield(event) + } + continuation.onTermination = { @Sendable _ in + Accelerometer.shared.stopAccelerometer() + } + Accelerometer.shared.startAccelerometer() + } + }() +} +``` + +This comes with another issue though: when two consumers materialise, the output of the stream becomes split between them: + +```swift +let consumer1 = Task { + for await orientation in OrientationMonitor.orientation { + print("Consumer 1: Orientation: \(orientation)") + } +} +let consumer2 = Task { + for await orientation in OrientationMonitor.orientation { + print("Consumer 2: Orientation: \(orientation)") + } +} +// Output: +// Consumer 1: Orientation: (0.0, 1.0, 0.0) +// Consumer 2: Orientation: (0.0, 0.8, 0.0) +// Consumer 2: Orientation: (0.0, 0.6, 0.0) +// Consumer 1: Orientation: (0.0, 0.4, 0.0) +// ... +``` +Rather than consumers receiving all values emitted by the `AsyncStream`, they receive only a subset. In addition, if the task of a consumer is cancelled, via `consumer2.cancel()` for example, the `onTermination` trigger of the `AsyncSteam.Continuation` executes and stops Accelerometer data being generated for _both_ tasks. + +## Proposed solution + +`AsyncBroadcastSequence` provides a way to multicast a single upstream asynchronous sequence to any number of consumers. + +``` +extension OrientationMonitor { + + static let orientation: AsyncBroadcastSequence> = { + let stream = AsyncStream(bufferingPolicy: .bufferingNewest(1)) { continuation in + Accelerometer.shared.updateHandler = { event in + continuation.yield(event) + } + Accelerometer.shared.startAccelerometer() + } + return stream.broadcast(disposingBaseIterator: .whenTerminated) + }() +} +``` + +Now, each consumer receives every element output by the source stream: + +```swift +let consumer1 = Task { + for await orientation in OrientationMonitor.orientation { + print("Consumer 1: Orientation: \(orientation)") + } +} +let consumer2 = Task { + for await orientation in OrientationMonitor.orientation { + print("Consumer 2: Orientation: \(orientation)") + } +} +// Output: +// Consumer 1: Orientation: (0.0, 1.0, 0.0) +// Consumer 2: Orientation: (0.0, 1.0, 0.0) +// Consumer 1: Orientation: (0.0, 0.8, 0.0) +// Consumer 2: Orientation: (0.0, 0.8, 0.0) +// Consumer 1: Orientation: (0.0, 0.6, 0.0) +// Consumer 2: Orientation: (0.0, 0.6, 0.0) +// Consumer 1: Orientation: (0.0, 0.4, 0.0) +// Consumer 2: Orientation: (0.0, 0.4, 0.0) +// ... +``` + +This does leave our accelerometer running even when the last consumer has cancelled though. While this makes sense for some use-cases, it would be better if we could automate shutdown of the accelerometer when there's no longer any demand, and start it up again when demand returns. With the help of the `deferred` algorithm, we can: + +```swift +extension OrientationMonitor { + + static let orientation: AsyncBroadcastSequence>> = { + let stream = deferred { + AsyncStream { continuation in + Accelerometer.shared.updateHandler = { event in + continuation.yield(event) + } + continuation.onTermination = { @Sendable _ in + Accelerometer.shared.stopAccelerometer() + } + Accelerometer.shared.startAccelerometer() + } + } + // `.whenTerminatedOrVacant` is the default, so we could equally write `.broadcast()` + // but it's included here for clarity. + return stream.broadcast(disposingBaseIterator: .whenTerminatedOrVacant) + }() +} +``` + +With `.whenTerminatedOrVacant` set as the iterator disposal policy (the default), when the last downstream consumer cancels the upstream iterator is dropped. This triggers `AsyncStream`'s `onTermination` handler, shutting off the Accelerometer. + +Now, with `AsyncStream` composed with `AsyncDeferredSequence`, any new demand triggers the re-execution of `AsyncDeferredSequence`'s' closure, the restart of the Accelerometer, and a new sequence for `AsyncBroadcastSequence` to share. + +### Configuration Options + +`AsyncBroadcastSequence` provides two conveniences to adapt the sequence for the most common multicast use-cases: + 1. As described above, a configurable iterator disposal policy that determines whether the shared upstream iterator is disposed of when the consumer count count falls to zero. + 2. A history feature that allows late-coming consumers to receive the most recently emitted elements prior to their arrival. One use-case could be a UI that is updated by an infrequently emitting sequence. Rather than wait for the sequence to emit a new element to populate an interface, the last emitted value can be used until such time that fresh data is emitted. + +## Detailed design + +### Algorithm Summary: +The idea behind the `AsyncBroadcastSequence` algorithm is as follows: Vended iterators of `AsyncBroadcastSequence` are known as 'runners'. Runners compete in a race to grab the next element from a base iterator for each of its iteration cycles. The 'winner' of an iteration cycle returns the element to the shared context which then supplies the result to later finishers. Once every runner has finished, the current cycle completes and the next iteration can start. This means that runners move forward in lock-step, only proceeding when the the last runner in the current iteration has received a value or has cancelled. + +#### `AsyncBroadcastSequence` Iterator Lifecycle: + + 1. **Connection**: On connection, each 'runner' is issued with an ID (and any prefixed values from the history buffer) by the shared context. From this point on, the algorithm will wait on this iterator to consume its values before moving on. This means that until `next()` is called on this iterator, all the other iterators will be held until such time that it is, or the iterator's task is cancelled. + 2. **Run**: After its prefix values have been exhausted, each time `next()` is called on the iterator, the iterator attempts to start a 'run' by calling `startRun(_:)` on the shared context. The shared context marks the iterator as 'running' and issues a role to determine the iterator's action for the current iteration cycle. The roles are as follows: + - **FETCH**: The iterator is the 'winner' of this iteration cycle. It is issued with the shared base iterator, calls `next()` on it, and once it resumes returns the value to the shared context. + - **WAIT**: The iterator hasn't won this cycle, but was fast enough that the winner has yet to resume with the element from the base iterator. Therefore, it is told to suspend (WAIT) until such time that the winner resumes. + - **YIELD**: The iterator is late (and is holding up the other iterators). The shared context issues it with the value retrieved by the winning iterator and lets it continue immediately. + - **HOLD**: The iterator is early for the next iteration cycle. So it is put in the holding pen until the next cycle can start. This is because there are other iterators that still haven't finished their run for the current iteration cycle. This iterator will be resumed when all other iterators have completed their run. + + 3. **Completion**: The iterator calls cancel on the shared context which ensures the iterator does not take part in the next iteration cycle. However, if it is currently suspended it may not resume until the current iteration cycle concludes. This is especially important if it is filling the key FETCH role for the current iteration cycle. + +### AsyncBroadcastSequence + +#### Declaration + +```swift +public struct AsyncBroadcastSequence where Base: Sendable, Base.Element: Sendable +``` + +#### Overview + +An asynchronous sequence that can be iterated by multiple concurrent consumers. + +Use an asynchronous broadcast sequence when you have multiple downstream asynchronous sequences with which you wish to share the output of a single asynchronous sequence. This can be useful if you have expensive upstream operations, or if your asynchronous sequence represents the output of a physical device. + +Elements are emitted from an asynchronous broadcast sequence at a rate that does not exceed the consumption of its slowest consumer. If this kind of back-pressure isn't desirable for your use-case, `AsyncBroadcastSequence` can be composed with buffers – either upstream, downstream, or both – to acheive the desired behavior. + +If you have an asynchronous sequence that consumes expensive system resources, it is possible to configure `AsyncBroadcastSequence` to discard its upstream iterator when the connected downstream consumer count falls to zero. This allows any cancellation tasks configured on the upstream asynchronous sequence to be initiated and for expensive resources to be terminated. `AsyncBroadcastSequence` will re-create a fresh iterator if there is further demand. + +For use-cases where it is important for consumers to have a record of elements emitted prior to their connection, a `AsyncBroadcastSequence` can also be configured to prefix its output with the most recently emitted elements. If `AsyncBroadcastSequence` is configured to drop its iterator when the connected consumer count falls to zero, its history will be discarded at the same time. + +#### Creating a sequence + +``` +init( + _ base: Base, + history historyCount: Int = 0, + disposingBaseIterator iteratorDisposalPolicy: IteratorDisposalPolicy = .whenTerminatedOrVacant +) +``` + +Contructs an asynchronous broadcast sequence. + + - `history`: the number of elements previously emitted by the sequence to prefix to the iterator of a new consumer + - `iteratorDisposalPolicy`: the iterator disposal policy applied to the upstream iterator + +### AsyncBroadcastSequence.IteratorDisposalPolicy + +#### Declaration + +```swift +public enum IteratorDisposalPolicy: Sendable { + case whenTerminated + case whenTerminatedOrVacant +} +``` + +#### Overview +The iterator disposal policy applied by an asynchronous broadcast sequence to its upstream iterator + + - `whenTerminated`: retains the upstream iterator for use by future consumers until the base asynchronous sequence is terminated + - `whenTerminatedOrVacant`: discards the upstream iterator when the number of consumers falls to zero or the base asynchronous sequence is terminated + +### broadcast(history:disposingBaseIterator) + +#### Declaration + +```swift +extension AsyncSequence { + + public func broadcast( + history historyCount: Int = 0, + disposingBaseIterator iteratorDisposalPolicy: AsyncBroadcastSequence.IteratorDisposalPolicy = .whenTerminatedOrVacant + ) -> AsyncBroadcastSequence +} +``` + +#### Overview + +Creates an asynchronous sequence that can be shared by multiple consumers. + + - `history`: the number of elements previously emitted by the sequence to prefix to the iterator of a new consumer + - `iteratorDisposalPolicy`: the iterator disposal policy applied by an asynchronous broadcast sequence to its upstream iterator + + ## Comparison with other libraries + + - **ReactiveX** ReactiveX has the [Publish](https://reactivex.io/documentation/operators/publish.html) observable which when can be composed with the [Connect](https://reactivex.io/documentation/operators/connect.html), [RefCount](https://reactivex.io/documentation/operators/refcount.html) and [Replay](https://reactivex.io/documentation/operators/replay.html) operators to support various multi-casting use-cases. The `discardsBaseIterator` behavior is applied via `RefCount` (or the .`share().refCount()` chain of operators in RxSwift), while the history behavior is achieved through `Replay` (or the .`share(replay:)` convenience in RxSwift) + + - **Combine** Combine has the [ multicast(_:)](https://developer.apple.com/documentation/combine/publishers/multicast) operator, which along with the functionality of [ConnectablePublisher](https://developer.apple.com/documentation/combine/connectablepublisher) and associated conveniences supports many of the same use cases as the ReactiveX equivalent, but in some instances requires third-party ooperators to achieve the same level of functionality. + +Due to the way a Swift `AsyncSequence`, and therefore `AsyncBroadcastSequence`, naturally applies back-pressure, the characteristics of an `AsyncBroadcastSequence` are different enough that a one-to-one API mapping of other reactive programmming libraries isn't applicable. + +However, with the available configuration options – and through composition with other asynchronous sequences – `AsyncBroadcastSequence` can trivially be adapted to support many of the same use-cases, including that of [Connect](https://reactivex.io/documentation/operators/connect.html), [RefCount](https://reactivex.io/documentation/operators/refcount.html), and [Replay](https://reactivex.io/documentation/operators/replay.html). + + ## Effect on API resilience + +TBD + +## Alternatives considered + +Creating a one-to-one multicast analog that matches that of existing reactive programming libraries. However, it would mean fighting the back-pressure characteristics of `AsyncSequence`. Instead, this implementation embraces back-pressure to yield a more flexible result. + +## Acknowledgments + +Thanks to [Philippe Hausler](https://github.com/phausler) and [Franz Busch](https://github.com/FranzBusch), as well as all other contributors on the Swift forums, for their thoughts and feedback. diff --git a/Sources/AsyncAlgorithms/AsyncBroadcastSequence.swift b/Sources/AsyncAlgorithms/AsyncBroadcastSequence.swift new file mode 100644 index 00000000..4d4aa1ea --- /dev/null +++ b/Sources/AsyncAlgorithms/AsyncBroadcastSequence.swift @@ -0,0 +1,685 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +// ALGORITHM SUMMARY: +// +// The idea behind the `AsyncBroadcastSequence` algorithm is as follows: Vended +// iterators of `AsyncBroadcastSequence` are known as 'runners'. Runners compete +// in a race to grab the next element from a base iterator for each of its +// iteration cycles. The 'winner' of an iteration cycle returns the element to +// the shared context which then supplies the result to later finishers. Once +// every runner has finished, the current cycle completes and the next +// iteration can start. This means that runners move forward in lock-step, only +// proceeding when the the last runner in the current iteration has received a +// value or has cancelled. +// +// `AsyncBroadcastSequence` ITERATOR LIFECYCLE: +// +// 1. CONNECTION: On connection, each 'runner' is issued with an ID (and any +// prefixed values from the history buffer) by the shared context. From +// this point on, the algorithm will wait on this iterator to consume its +// values before moving on. This means that until `next()` is called on +// this iterator, all the other iterators will be held until such time that +// it is, or the iterator's task is cancelled. +// +// 2. RUN: After its prefix values have been exhausted, each time `next()` is +// called on the iterator, the iterator attempts to start a 'run' by +// calling `startRun(_:)` on the shared context. The shared context marks +// the iterator as 'running' and issues a role to determine the iterator's +// action for the current iteration cycle. The roles are as follows: +// +// - FETCH: The iterator is the 'winner' of this iteration cycle. It is +// issued with the shared base iterator, calls `next()` on it, and +// once it resumes returns the value to the shared context. +// – WAIT: The iterator hasn't won this cycle, but was fast enough that +// the winner has yet to resume with the element from the base +// iterator. Therefore, it is told to suspend (WAIT) until such time +// that the winner resumes. +// – YIELD: The iterator is late (and is holding up the other iterators). +// The shared context issues it with the value retrieved by the winning +// iterator and lets it continue immediately. +// – HOLD: The iterator is early for the next iteration cycle. So it is +// put in the holding pen until the next cycle can start. This is +// because there are other iterators that still haven't finished their +// run for the current iteration cycle. This iterator will be resumed +// when all other iterators have completed their run +// +// 3. COMPLETION: The iterator calls cancel on the shared context which +// ensures the iterator does not take part in the next iteration cycle. +// However, if it is currently suspended it may not resume until the +// current iteration cycle concludes. This is especially important if it is +// filling the key FETCH role for the current iteration cycle. + +// MARK: - Member Function + +import DequeModule + +extension AsyncSequence where Self: Sendable, Element: Sendable { + + /// Creates an asynchronous sequence that can be broadcast to multiple + /// consumers. + /// + /// - parameter history: the number of previously emitted elements to prefix + /// to the iterator of a new consumer + /// - parameter iteratorDisposalPolicy:the iterator disposal policy applied by + /// a asynchronous broadcast sequence to its upstream iterator + public func broadcast( + history historyCount: Int = 0, + disposingBaseIterator iteratorDisposalPolicy: AsyncBroadcastSequence.IteratorDisposalPolicy = .whenTerminatedOrVacant + ) -> AsyncBroadcastSequence { + AsyncBroadcastSequence( + self, history: historyCount, disposingBaseIterator: iteratorDisposalPolicy) + } +} + +// MARK: - Sequence + +/// An asynchronous sequence that can be iterated by multiple concurrent +/// consumers. +/// +/// Use an asynchronous broadcast sequence when you have multiple downstream +/// asynchronous sequences with which you wish to share the output of a single +/// asynchronous sequence. This can be useful if you have expensive upstream +/// operations, or if your asynchronous sequence represents the output of a +/// physical device. +/// +/// Elements are emitted from a asynchronous broadcast sequence at a rate that +/// does not exceed the consumption of its slowest consumer. If this kind of +/// back-pressure isn't desirable for your use-case, ``AsyncBroadcastSequence`` +/// can be composed with buffers – either upstream, downstream, or both – to +/// acheive the desired behavior. +/// +/// If you have an asynchronous sequence that consumes expensive system +/// resources, it is possible to configure ``AsyncBroadcastSequence`` to discard +/// its upstream iterator when the connected downstream consumer count falls to +/// zero. This allows any cancellation tasks configured on the upstream +/// asynchronous sequence to be initiated and for expensive resources to be +/// terminated. ``AsyncBroadcastSequence`` will re-create a fresh iterator if +/// there is further demand. +/// +/// For use-cases where it is important for consumers to have a record of +/// elements emitted prior to their connection, a ``AsyncBroadcastSequence`` can +/// also be configured to prefix its output with the most recently emitted +/// elements. If ``AsyncBroadcastSequence`` is configured to drop its iterator +/// when the connected consumer count falls to zero, its history will be +/// discarded at the same time. +public struct AsyncBroadcastSequence : Sendable + where Base: Sendable, Base.Element: Sendable { + + /// The iterator disposal policy applied by a asynchronous broadcast sequence to + /// its upstream iterator + /// + /// - note: the iterator is always disposed when the base asynchronous + /// sequence terminates + public enum IteratorDisposalPolicy: Sendable { + /// retains the upstream iterator for use by future consumers until the base + /// asynchronous sequence is terminated + case whenTerminated + /// discards the upstream iterator when the number of consumers falls to + /// zero or the base asynchronous sequence is terminated + case whenTerminatedOrVacant + } + + private let context: Context + private let deallocToken: DeallocToken + + /// Contructs a asynchronous broadcast sequence + /// + /// - parameter base: the asynchronous sequence to be broadcast + /// - parameter history: the number of previously emitted elements to prefix + /// to the iterator of a new consumer + /// - parameter iteratorDisposalPolicy: the iterator disposal policy applied + /// to the upstream iterator + public init( + _ base: Base, + history historyCount: Int = 0, + disposingBaseIterator iteratorDisposalPolicy: IteratorDisposalPolicy = .whenTerminatedOrVacant + ) { + let context = Context( + base, replayCount: historyCount, iteratorDisposalPolicy: iteratorDisposalPolicy) + self.context = context + self.deallocToken = .init { context.abort() } + } +} + +// MARK: - Iterator + +extension AsyncBroadcastSequence: AsyncSequence { + + public typealias Element = Base.Element + + public struct Iterator: AsyncIteratorProtocol { + + private let id: UInt + private let deallocToken: DeallocToken? + private var prefix: Deque + private var context: Context? + + fileprivate init(_ storage: Context) { + switch storage.establish() { + case .active(let id, let prefix): + self.id = id + self.prefix = prefix + self.deallocToken = .init { storage.cancel(id) } + self.context = storage + case .terminal: + self.id = UInt.min + self.prefix = .init() + self.deallocToken = nil + self.context = nil + } + } + + public mutating func next() async rethrows -> Element? { + do { + return try await withTaskCancellationHandler { + if prefix.isEmpty == false, let element = prefix.popFirst() { + return element + } + guard let context else { return nil } + let role = context.startRun(id) + switch role { + case .fetch(let iterator): + do { + let element = try await iterator.next() + context.fetch(id, resumedWithResult: .success(element)) + return try processOutput(.success(element)) + } + catch { + context.fetch(id, resumedWithResult: .failure(error)) + return try processOutput(.failure(error)) + } + case .wait: + let output = await context.wait(id) + return try processOutput(output) + case .yield(let output, let resume): + resume?() + return try processOutput(output) + case .hold: + await context.hold(id) + return try await next() + } + } onCancel: { [context, id] in + context?.cancel(id) + } + } + catch { + self.context = nil + throw error + } + } + + private mutating func processOutput( + _ output: Result + ) rethrows -> Element? { + switch output { + case .success(let value?): + return value + default: + self.context = nil + return try output._rethrowGet() + } + } + } + + public func makeAsyncIterator() -> Iterator { + Iterator(context) + } +} + +// MARK: - Context + +private extension AsyncBroadcastSequence { + + struct Context: Sendable { + + typealias WaitContinuation = UnsafeContinuation, Never> + typealias HoldContinuation = UnsafeContinuation + + enum RunRole { + case fetch(SharedIterator) + case wait + case yield(Result, (() -> Void)?) + case hold + } + + enum Connection { + case active(id: UInt, prefix: Deque) + case terminal + } + + private enum IterationPhase { + case pending + case fetching + case done(Result) + } + + private struct Runner { + var iterationIndex: Int + var active = false + var cancelled = false + } + + private struct State: Sendable { + + let base: Base + let replayCount: Int + let iteratorDisposalPolicy: IteratorDisposalPolicy + var iterator: SharedIterator? + var nextRunnerID = (UInt.min + 1) + var currentIterationIndex = 0 + var nextIterationIndex: Int { (currentIterationIndex + 1) % 2 } + var history = Deque() + var runners = [UInt: Runner]() + var iterationPhase = IterationPhase.pending + var terminal = false + var heldRunnerContinuations = [UnsafeContinuation]() + var waitingRunnerContinuations = [UInt: WaitContinuation]() + + init( + _ base: Base, + replayCount: Int, + iteratorDisposalPolicy: IteratorDisposalPolicy + ) { + precondition(replayCount >= 0, "history must be greater than or equal to zero") + self.base = base + self.replayCount = replayCount + self.iteratorDisposalPolicy = iteratorDisposalPolicy + } + + mutating func establish() -> (Connection, (() -> Void)?) { + guard terminal == false else { return (.terminal, nil) } + defer { nextRunnerID += 1} + let iterationIndex: Int + if case .done(_) = iterationPhase { + iterationIndex = nextIterationIndex + } else { + iterationIndex = currentIterationIndex + } + runners[nextRunnerID] = Runner(iterationIndex: iterationIndex) + let connection = Connection.active(id: nextRunnerID, prefix: history) + return (connection, finalizeIterationIfNeeded()) + } + + mutating func run( + _ runnerID: UInt + ) -> RunRole { + guard var runner = runners[runnerID], runner.cancelled == false else { + return .yield(.success(nil), nil) + } + if runner.iterationIndex == currentIterationIndex { + runner.active = true + runners[runnerID] = runner + switch iterationPhase { + case .pending: + iterationPhase = .fetching + return .fetch(sharedIterator()) + case .fetching: + return .wait + case .done(let result): + finish(runnerID) + return .yield(result, finalizeIterationIfNeeded()) + } + } + else { + return .hold + } + } + + mutating func fetch( + _ runnerID: UInt, resumedWithResult result: Result + ) -> (() -> Void)? { + self.terminal = self.terminal || ((try? result.get()) == nil) + self.iterationPhase = .done(result) + finish(runnerID) + updateHistory(withResult: result) + let waitContinuation = gatherWaitingRunnerContinuationsForResumption(withResult: result) + let heldContinuation = finalizeIterationIfNeeded() + return { + waitContinuation?() + heldContinuation?() + } + } + + mutating func wait( + _ runnerID: UInt, + suspendedWithContinuation continuation: WaitContinuation + ) -> (() -> Void)? { + switch iterationPhase { + case .fetching: + waitingRunnerContinuations[runnerID] = continuation + return nil + case .done(let result): + finish(runnerID) + let waitContinuation = { continuation.resume(returning: result) } + let heldContinuation = finalizeIterationIfNeeded() + return { + waitContinuation() + heldContinuation?() + } + default: + preconditionFailure("waiting runner suspended out of band") + } + } + + private mutating func gatherWaitingRunnerContinuationsForResumption( + withResult result: Result + ) -> (() -> Void)? { + let continuations = waitingRunnerContinuations + .map { waitingRunnerID, continuation in + finish(waitingRunnerID) + return { continuation.resume(returning: result) } + } + waitingRunnerContinuations.removeAll() + return { + for continuation in continuations { continuation() } + } + } + + mutating func hold( + _ runnerID: UInt, + suspendedWithContinuation continuation: HoldContinuation + ) -> (() -> Void)? { + guard let runner = runners[runnerID], runner.iterationIndex == nextIterationIndex else { + return continuation.resume + } + heldRunnerContinuations.append(continuation) + return nil + } + + private mutating func finish(_ runnerID: UInt) { + guard var runner = runners.removeValue(forKey: runnerID) else { + preconditionFailure("run finished out of band") + } + if terminal == false, runner.cancelled == false { + runner.active = false + runner.iterationIndex = nextIterationIndex + runners[runnerID] = runner + } + } + + mutating func cancel(_ runnerID: UInt) -> (() -> Void)? { + if let runner = runners.removeValue(forKey: runnerID), runner.active { + runners[runnerID] = .init( + iterationIndex: runner.iterationIndex, active: true, cancelled: true) + return nil + } + else { + return finalizeIterationIfNeeded() + } + } + + mutating func abort() -> (() -> Void)? { + terminal = true + runners = runners.filter { _, runner in runner.active } + return finalizeIterationIfNeeded() + } + + private mutating func finalizeIterationIfNeeded() -> (() -> Void)? { + let isCurrentIterationActive = runners.values.contains { runner in + runner.iterationIndex == currentIterationIndex + } + if isCurrentIterationActive { return nil } + if terminal { + self.iterationPhase = .done(.success(nil)) + self.iterator = nil + self.history.removeAll() + } + else { + self.currentIterationIndex = nextIterationIndex + self.iterationPhase = .pending + if runners.isEmpty && iteratorDisposalPolicy == .whenTerminatedOrVacant { + self.iterator = nil + self.history.removeAll() + } + } + let continuations = heldRunnerContinuations + heldRunnerContinuations.removeAll() + return { + for continuation in continuations { continuation.resume() } + } + } + + private mutating func updateHistory(withResult result: Result) { + guard replayCount > 0, case .success(let element?) = result else { + return + } + if history.count >= replayCount { + history.removeFirst() + } + history.append(element) + } + + private mutating func sharedIterator() -> SharedIterator { + guard let iterator else { + let iterator = SharedIterator(base) + self.iterator = iterator + return iterator + } + return iterator + } + } + + private let state: ManagedCriticalState + + init(_ base: Base, replayCount: Int, iteratorDisposalPolicy: IteratorDisposalPolicy) { + self.state = .init( + State(base, replayCount: replayCount, iteratorDisposalPolicy: iteratorDisposalPolicy) + ) + } + + func establish() -> Connection { + let (connection, resume) = state.withCriticalRegion { + state in state.establish() + } + resume?() + return connection + } + + func startRun(_ runnerID: UInt) -> RunRole { + return state.withCriticalRegion { + state in state.run(runnerID) + } + } + + func fetch(_ runnerID: UInt, resumedWithResult result: Result) { + let resume = state.withCriticalRegion { state in + state.fetch(runnerID, resumedWithResult: result) + } + resume?() + } + + func wait(_ runnerID: UInt) async -> Result { + await withUnsafeContinuation { continuation in + let resume = state.withCriticalRegion { state in + state.wait(runnerID, suspendedWithContinuation: continuation) + } + resume?() + } + } + + func hold(_ runnerID: UInt) async { + await withUnsafeContinuation { continuation in + let resume = state.withCriticalRegion { state in + state.hold(runnerID, suspendedWithContinuation: continuation) + } + resume?() + } + } + + func cancel(_ runnerID: UInt) { + let resume = state.withCriticalRegion { state in + state.cancel(runnerID) + } + resume?() + } + + func abort() { + let resume = state.withCriticalRegion { state in + state.abort() + } + resume?() + } + } +} + +// MARK: - Shared Iterator + +fileprivate final class SharedIterator + where Base: Sendable, Base.Element: Sendable { + + private struct Relay: Sendable { + + private enum State { + + case idle + case pendingRequest(UnsafeContinuation<(@Sendable (Element) -> Void)?, Never>) + case pendingResponse(UnsafeContinuation) + case terminal + + mutating func sendHandler( + continuation: UnsafeContinuation<(@Sendable (Element) -> Void)?, Never> + ) -> (() -> Void)? { + switch self { + case .idle: + self = .pendingRequest(continuation) + case .pendingResponse(let receiveContinuation): + self = .idle + return { + continuation.resume { element in + receiveContinuation.resume(returning: element) + } + } + case .pendingRequest(_): + fatalError("attempt to await requestHandler() on more than one task") + case .terminal: + return { continuation.resume(returning: nil) } + } + return nil + } + + mutating func next(continuation: UnsafeContinuation) -> (() -> Void)? { + switch self { + case .idle: + self = .pendingResponse(continuation) + case .pendingResponse(_): + fatalError("attempt to await next(_:) on more than one task") + case .pendingRequest(let sendContinuation): + self = .idle + return { + sendContinuation.resume { element in + continuation.resume(returning: element) + } + } + case .terminal: + return { continuation.resume(returning: nil) } + } + return nil + } + + mutating func cancel() -> (() -> Void)? { + switch self { + case .idle: + self = .terminal + case .pendingResponse(let receiveContinuation): + self = .terminal + return { receiveContinuation.resume(returning: nil) } + case .pendingRequest(let sendContinuation): + self = .terminal + return { sendContinuation.resume(returning: nil) } + case .terminal: break + } + return nil + } + } + + private let state = ManagedCriticalState(State.idle) + + init() {} + + func sendHandler() async -> (@Sendable (Element) -> Void)? { + await withUnsafeContinuation { continuation in + let resume = state.withCriticalRegion { state in + state.sendHandler(continuation: continuation) + } + resume?() + } + } + + func next() async -> Element? { + await withUnsafeContinuation { continuation in + let resume = state.withCriticalRegion { state in + state.next(continuation: continuation) + } + resume?() + } + } + + func cancel() { + let resume = state.withCriticalRegion { state in + state.cancel() + } + resume?() + } + } + + typealias Element = Base.Element + + private let relay: Relay> + private let task: Task + + init(_ base: Base) { + let relay = Relay>() + let task = Task.detached(priority: .high) { + var iterator = base.makeAsyncIterator() + while let send = await relay.sendHandler() { + let result: Result + do { + result = .success(try await iterator.next()) + } + catch { + result = .failure(error) + } + send(result) + let terminal = (try? result.get()) == nil + if terminal { + relay.cancel() + break + } + } + } + self.relay = relay + self.task = task + } + + deinit { + relay.cancel() + } + + public func next() async rethrows -> Element? { + guard Task.isCancelled == false else { return nil } + let result = await relay.next() ?? .success(nil) + return try result._rethrowGet() + } +} + +extension SharedIterator: AsyncIteratorProtocol, Sendable {} + +// MARK: - Utilities + +/// A utility to perform deallocation tasks on value types +fileprivate final class DeallocToken: Sendable { + let action: @Sendable () -> Void + init(_ dealloc: @escaping @Sendable () -> Void) { + self.action = dealloc + } + deinit { action() } +} diff --git a/Tests/AsyncAlgorithmsTests/Support/GatedSequence.swift b/Tests/AsyncAlgorithmsTests/Support/GatedSequence.swift index 50ede106..2a757245 100644 --- a/Tests/AsyncAlgorithmsTests/Support/GatedSequence.swift +++ b/Tests/AsyncAlgorithmsTests/Support/GatedSequence.swift @@ -9,13 +9,18 @@ // //===----------------------------------------------------------------------===// -public struct GatedSequence { +public struct GatedSequence { + let elements: [Element] let gates: [Gate] - var index = 0 - public mutating func advance() { - defer { index += 1 } + let index = ManagedCriticalState(0) + + public func advance() { + let index = self.index.withCriticalRegion { index in + defer { index += 1 } + return index + } guard index < gates.count else { return } @@ -52,4 +57,3 @@ extension GatedSequence: AsyncSequence { } extension GatedSequence: Sendable where Element: Sendable { } -extension GatedSequence.Iterator: Sendable where Element: Sendable { } diff --git a/Tests/AsyncAlgorithmsTests/Support/GatedStartSequence.swift b/Tests/AsyncAlgorithmsTests/Support/GatedStartSequence.swift new file mode 100644 index 00000000..8b3ea9c0 --- /dev/null +++ b/Tests/AsyncAlgorithmsTests/Support/GatedStartSequence.swift @@ -0,0 +1,110 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +/// An `AsyncSequence` that delays publishing elements until an entry threshold has been reached. +/// Once the entry threshold has been met the sequence proceeds as normal. +public struct GatedStartSequence: Sendable { + + private let elements: [Element] + private let semaphore: BasicSemaphore + + /// Decrements the entry counter and, upon reaching zero, resumes the iterator + public func enter() { + semaphore.signal() + } + + /// Creates new ``StartableSequence`` with an initial entry count + public init(_ elements: T, count: Int) where T.Element == Element { + self.elements = Array(elements) + self.semaphore = .init(count: 1 - count) + } +} + +extension GatedStartSequence: AsyncSequence { + + public struct Iterator: AsyncIteratorProtocol { + + private var elements: [Element] + private let semaphore: BasicSemaphore + + init(elements: [Element], semaphore: BasicSemaphore) { + self.elements = elements + self.semaphore = semaphore + } + + public mutating func next() async -> Element? { + await semaphore.wait() + semaphore.signal() + guard let element = elements.first else { return nil } + elements.removeFirst() + return element + } + } + + public func makeAsyncIterator() -> Iterator { + Iterator(elements: elements, semaphore: semaphore) + } +} + +struct BasicSemaphore { + + private struct State { + + var count: Int + var continuations = [UnsafeContinuation]() + + mutating func wait(continuation: UnsafeContinuation) -> (() -> Void)? { + count -= 1 + if count < 0 { + continuations.append(continuation) + return nil + } + else { + return { continuation.resume() } + } + } + + mutating func signal() -> (() -> Void)? { + count += 1 + if count >= 0 { + let continuations = self.continuations + self.continuations.removeAll() + return { + for continuation in continuations { continuation.resume() } + } + } + else { + return nil + } + } + } + + private let state: ManagedCriticalState + + /// Creates new counting semaphore with an initial value. + init(count: Int) { + self.state = ManagedCriticalState(State(count: count)) + } + + /// Waits for, or decrements, a semaphore. + func wait() async { + await withUnsafeContinuation { continuation in + let resume = state.withCriticalRegion { $0.wait(continuation: continuation) } + resume?() + } + } + + /// Signals (increments) a semaphore. + func signal() { + let resume = state.withCriticalRegion { $0.signal() } + resume?() + } +} diff --git a/Tests/AsyncAlgorithmsTests/TestBroadcast.swift b/Tests/AsyncAlgorithmsTests/TestBroadcast.swift new file mode 100644 index 00000000..af4bd594 --- /dev/null +++ b/Tests/AsyncAlgorithmsTests/TestBroadcast.swift @@ -0,0 +1,430 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +@preconcurrency import XCTest +import AsyncAlgorithms + +final class TestBroadcast: XCTestCase { + + func test_broadcast_basic() async { + let expected = [1, 2, 3, 4] + let base = GatedStartSequence(expected, count: 2) + let sequence = base.broadcast() + let results = await withTaskGroup(of: [Int].self) { group in + group.addTask { + var iterator = sequence.makeAsyncIterator() + base.enter() + return await iterator.collect() + } + group.addTask { + var iterator = sequence.makeAsyncIterator() + base.enter() + return await iterator.collect() + } + return await Array(group) + } + XCTAssertEqual(expected, results[0]) + XCTAssertEqual(expected, results[1]) + } + + func test_broadcast_iterator_iterates_past_end() async { + let base = GatedStartSequence([1, 2, 3, 4], count: 2) + let sequence = base.broadcast() + let results = await withTaskGroup(of: Int?.self) { group in + group.addTask { + var iterator = sequence.makeAsyncIterator() + base.enter() + let _ = await iterator.collect() + return await iterator.next() + } + group.addTask { + var iterator = sequence.makeAsyncIterator() + base.enter() + let _ = await iterator.collect() + return await iterator.next() + } + return await Array(group) + } + XCTAssertNil(results[0]) + XCTAssertNil(results[1]) + } + + func test_broadcast_throws() async { + let base = GatedStartSequence([1, 2, 3, 4], count: 2) + let expected = [1, 2] + let sequence = base.map { try throwOn(3, $0) }.broadcast() + let results = await withTaskGroup(of: (elements: [Int], error: Error?).self) { group in + group.addTask { + var iterator = sequence.makeAsyncIterator() + base.enter() + return await iterator.collectWithError() + } + group.addTask { + var iterator = sequence.makeAsyncIterator() + base.enter() + return await iterator.collectWithError() + } + return await Array(group) + } + XCTAssertEqual(expected, results[0].elements) + XCTAssertEqual(expected, results[1].elements) + XCTAssertNotNil(results[0].error as? Failure) + XCTAssertNotNil(results[1].error as? Failure) + } + + func test_broadcast_concurrent_consumer_wide() async throws { + let noOfConsumers = 100 + let noOfEmissions = 100 + let expected = (0.. 0) + } + + func test_broadcast_multiple_consumer_cancellation() async { + let source = Indefinite(value: 1) + let sequence = source.async.broadcast() + var tasks = [Task]() + var iterated = [XCTestExpectation]() + var finished = [XCTestExpectation]() + for _ in 0..<16 { + let iterate = expectation(description: "task iterated") + iterate.assertForOverFulfill = false + let finish = expectation(description: "task finished") + iterated.append(iterate) + finished.append(finish) + let task = Task { + var iterator = sequence.makeAsyncIterator() + while let _ = await iterator.next() { + iterate.fulfill() + } + finish.fulfill() + } + tasks.append(task) + } + wait(for: iterated, timeout: 1.0) + for task in tasks { task.cancel() } + wait(for: finished, timeout: 1.0) + } + + func test_broadcast_iterator_retained_when_vacant_if_policy() async { + let base = [0,1,2,3].async + let sequence = base.broadcast(disposingBaseIterator: .whenTerminated) + let expected0 = [0] + let expected1 = [1] + let expected2 = [2] + let result0 = await sequence.prefix(1).reduce(into:[Int]()) { $0.append($1) } + let result1 = await sequence.prefix(1).reduce(into:[Int]()) { $0.append($1) } + let result2 = await sequence.prefix(1).reduce(into:[Int]()) { $0.append($1) } + XCTAssertEqual(expected0, result0) + XCTAssertEqual(expected1, result1) + XCTAssertEqual(expected2, result2) + } + + func test_broadcast_iterator_discarded_when_vacant_if_policy() async { + let base = [0,1,2,3].async + let sequence = base.broadcast(disposingBaseIterator: .whenTerminatedOrVacant) + let expected0 = [0] + let expected1 = [0] + let expected2 = [0] + let result0 = await sequence.prefix(1).reduce(into:[Int]()) { $0.append($1) } + let result1 = await sequence.prefix(1).reduce(into:[Int]()) { $0.append($1) } + let result2 = await sequence.prefix(1).reduce(into:[Int]()) { $0.append($1) } + XCTAssertEqual(expected0, result0) + XCTAssertEqual(expected1, result1) + XCTAssertEqual(expected2, result2) + } + + func test_broadcast_iterator_discarded_when_terminal_regardless_of_policy() async { + let base = [0,1,2,3].async + let sequence = base.broadcast(disposingBaseIterator: .whenTerminated) + let expected0 = [0,1,2,3] + let expected1 = [Int]() + let expected2 = [Int]() + let result0 = await sequence.reduce(into:[Int]()) { $0.append($1) } + let result1 = await sequence.reduce(into:[Int]()) { $0.append($1) } + let result2 = await sequence.reduce(into:[Int]()) { $0.append($1) } + XCTAssertEqual(expected0, result0) + XCTAssertEqual(expected1, result1) + XCTAssertEqual(expected2, result2) + } + + func test_broadcast_iterator_discarded_when_throws_regardless_of_policy() async { + let base = [0,1,2,3].async.map { try throwOn(1, $0) } + let sequence = base.broadcast(disposingBaseIterator: .whenTerminatedOrVacant) + let expected0 = [0] + let expected1 = [Int]() + let expected2 = [Int]() + var iterator0 = sequence.makeAsyncIterator() + let result0 = await iterator0.collectWithError(count: 2) + var iterator1 = sequence.makeAsyncIterator() + let result1 = await iterator1.collectWithError(count: 2) + var iterator2 = sequence.makeAsyncIterator() + let result2 = await iterator2.collectWithError(count: 2) + XCTAssertEqual(expected0, result0.elements) + XCTAssertEqual(expected1, result1.elements) + XCTAssertEqual(expected2, result2.elements) + XCTAssertNotNil(result0.error as? Failure) + XCTAssertNil(result1.error) + XCTAssertNil(result2.error) + } + + func test_broadcast_history_count_0() async throws { + let p0 = ["a","b","c","d"] + let p1 = ["e","f","g","h"] + let p2 = ["i","j","k","l"] + let p3 = ["m","n","o","p"] + let base = GatedSequence(p0 + p1 + p2 + p3) + let sequence = base.broadcast(history: 0) + let expected = [["e", "f"], ["i", "j"], ["m", "n"]] + let gate = Gate() + for _ in 0..>> + let expected0: [Int] = [1] + let expected1: [Int] = [] + let base = Indefinite(value: 1).async + var sequence: Sequence! = base.broadcast() + var iterator = sequence.makeAsyncIterator() + let result0 = await iterator.collect(count: 1) + sequence = nil + let result1 = await iterator.collect() + XCTAssertEqual(expected0, result0) + XCTAssertEqual(expected1, result1) + } +} + +fileprivate extension AsyncIteratorProtocol { + + mutating func collect(count: Int = .max) async rethrows -> [Element] { + var result = [Element]() + var i = count + while let element = try await next() { + result.append(element) + i -= 1 + if i <= 0 { return result } + } + return result + } + + mutating func collectWithError(count: Int = .max) async -> (elements: [Element], error: Error?) { + var result = [Element]() + var i = count + do { + while let element = try await next() { + result.append(element) + i -= 1 + if i <= 0 { return (result, nil) } + } + return (result, nil) + } + catch { + return (result, error) + } + } +}