Skip to content

Broadcast algorithm (Previously Shared) #227

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 18 commits into from
Closed

Conversation

tcldr
Copy link

@tcldr tcldr commented Nov 3, 2022

TL;DR: I’ve implemented a multicasting algorithm that supports back-pressure and – either directly or indirectly – supports all the use cases mentioned in the multicasting thread.

After the previous discussion in the multicast(_:) thread around various use-cases for a multicasting algorithm, I decided to attempt one myself. It was a fun exercise! If anything, It crystallised for me the role that back-pressure plays within asynchronous sequence pipelines and the opportunities it gives rise to.

So with that in mind, I’ve put together a proposal for a shared asynchronous sequence that embraces the natural back-pressure characteristics of Swift’s asynchronous sequences, while still being adaptable to use-cases where back-pressure needs to be release through the use of follow-on sequences.

I hope it’s useful. It would be great to hear your feedback.

Broadcast

Introduction

AsyncBroadcastSequence unlocks additional use cases for structured concurrency and asynchronous sequences by allowing almost any asynchronous sequence to be adapted for consumption by multiple concurrent consumers.

Motivation

The need often arises to distribute the values of an asynchronous sequence to multiple consumers. Intuitively, it seems that a sequence should be iterable by more than a single consumer, but many types of asynchronous sequence are restricted to supporting only one consumer at a time.

One example of an asynchronous sequence that would naturally fit this 'one to many' shape is the output of a hardware sensor. A hypothetical hardware sensor might include the following API:

public final class Accelerometer {
  
  public struct Event { /* ... */ }
  
  // exposed as a singleton to represent the single on-device sensor
  public static let shared = Accelerometer()
  
  private init() {}
  
  public var updateHandler: ((Event) -> Void)?
  
  public func startAccelerometer() { /* ... */ }
  public func stopAccelerometer() { /* ... */ }
}

To share the sensor data with a consumer through an asynchronous sequence you might choose an AsyncStream:

final class OrientationMonitor { /* ... */ }
extension OrientationMonitor {
  
  static var orientation: AsyncStream<Accelerometer.Event> {
    AsyncStream { continuation in
      Accelerometer.shared.updateHandler = { event in
        continuation.yield(event)
      }
      continuation.onTermination = { @Sendable _ in
        Accelerometer.shared.stopAccelerometer()
      }
      Accelerometer.shared.startAccelerometer()
    }
  }
}

With a single consumer, this pattern works as expected:

let consumer1 = Task {
  for await orientation in OrientationMonitor.orientation {
      print("Consumer 1: Orientation: \(orientation)")
  }
}
// Output:
// Consumer 1: Orientation: (0.0, 1.0, 0.0)
// Consumer 1: Orientation: (0.0, 0.8, 0.0)
// Consumer 1: Orientation: (0.0, 0.6, 0.0)
// Consumer 1: Orientation: (0.0, 0.4, 0.0)
// ...

However, as soon as a second consumer comes along, data for the first consumer stops. This is because the singleton Accelerometer.shared.updateHandler is updated within the closure for the creation of the second AsyncStream. This has the effect of redirecting all Accelerometer data to the second stream.

One attempted workaround might be to vend a single AsyncStream to all consumers:

extension OrientationMonitor {
  
  static let orientation: AsyncStream<Accelerometer.Event> = {
    AsyncStream { continuation in
      Accelerometer.shared.updateHandler = { event in
        continuation.yield(event)
      }
      continuation.onTermination = { @Sendable _ in
        Accelerometer.shared.stopAccelerometer()
      }
      Accelerometer.shared.startAccelerometer()
    }
  }()
}

This comes with another issue though: when two consumers materialise, the output of the stream becomes split between them:

let consumer1 = Task {
  for await orientation in OrientationMonitor.orientation {
      print("Consumer 1: Orientation: \(orientation)")
  }
}
let consumer2 = Task {
  for await orientation in OrientationMonitor.orientation {
      print("Consumer 2: Orientation: \(orientation)")
  }
}
// Output:
// Consumer 1: Orientation: (0.0, 1.0, 0.0)
// Consumer 2: Orientation: (0.0, 0.8, 0.0)
// Consumer 2: Orientation: (0.0, 0.6, 0.0)
// Consumer 1: Orientation: (0.0, 0.4, 0.0)
// ...

Rather than consumers receiving all values emitted by the AsyncStream, they receive only a subset. In addition, if the task of a consumer is cancelled, via consumer2.cancel() for example, the onTermination trigger of the AsyncSteam.Continuation executes and stops Accelerometer data being generated for both tasks.

Proposed solution

AsyncBroadcastSequence provides a way to multicast a single upstream asynchronous sequence to any number of consumers.

extension OrientationMonitor {
  
  static let orientation: AsyncBroadcastSequence<AsyncStream<Accelerometer.Event>> = {
    let stream = AsyncStream(bufferingPolicy: .bufferingNewest(1)) { continuation in
      Accelerometer.shared.updateHandler = { event in
        continuation.yield(event)
      }
      Accelerometer.shared.startAccelerometer()
    }
    return stream.broadcast(disposingBaseIterator: .whenTerminated)
  }()
}

Now, each consumer receives every element output by the source stream:

let consumer1 = Task {
  for await orientation in OrientationMonitor.orientation {
      print("Consumer 1: Orientation: \(orientation)")
  }
}
let consumer2 = Task {
  for await orientation in OrientationMonitor.orientation {
      print("Consumer 2: Orientation: \(orientation)")
  }
}
// Output:
// Consumer 1: Orientation: (0.0, 1.0, 0.0)
// Consumer 2: Orientation: (0.0, 1.0, 0.0)
// Consumer 1: Orientation: (0.0, 0.8, 0.0)
// Consumer 2: Orientation: (0.0, 0.8, 0.0)
// Consumer 1: Orientation: (0.0, 0.6, 0.0)
// Consumer 2: Orientation: (0.0, 0.6, 0.0)
// Consumer 1: Orientation: (0.0, 0.4, 0.0)
// Consumer 2: Orientation: (0.0, 0.4, 0.0)
// ...

This does leave our accelerometer running even when the last consumer has cancelled though. While this makes sense for some use-cases, it would be better if we could automate shutdown of the accelerometer when there's no longer any demand, and start it up again when demand returns. With the help of the deferred algorithm, we can:

extension OrientationMonitor {
  
  static let orientation: AsyncBroadcastSequence<AsyncDeferredSequence<AsyncStream<Accelerometer.Event>>> = {
    let stream = deferred {
      AsyncStream { continuation in
        Accelerometer.shared.updateHandler = { event in
          continuation.yield(event)
        }
        continuation.onTermination = { @Sendable _ in
          Accelerometer.shared.stopAccelerometer()
        }
        Accelerometer.shared.startAccelerometer()
      }
    }
    // `.whenTerminatedOrVacant` is the default, so we could equally write `.broadcast()`
    // but it's included here for clarity.
    return stream.broadcast(disposingBaseIterator: .whenTerminatedOrVacant)
  }()
}

With .whenTerminatedOrVacant set as the iterator disposal policy (the default), when the last downstream consumer cancels the upstream iterator is dropped. This triggers AsyncStream's onTermination handler, shutting off the Accelerometer.

Now, with AsyncStream composed with AsyncDeferredSequence, any new demand triggers the re-execution of AsyncDeferredSequence's' closure, the restart of the Accelerometer, and a new sequence for AsyncBroadcastSequence to share.

Configuration Options

AsyncBroadcastSequence provides two conveniences to adapt the sequence for the most common multicast use-cases:

  1. As described above, a configurable iterator disposal policy that determines whether the shared upstream iterator is disposed of when the consumer count count falls to zero.
  2. A history feature that allows late-coming consumers to receive the most recently emitted elements prior to their arrival. One use-case could be a UI that is updated by an infrequently emitting sequence. Rather than wait for the sequence to emit a new element to populate an interface, the last emitted value can be used until such time that fresh data is emitted.

Detailed design

Algorithm Summary:

The idea behind the AsyncBroadcastSequence algorithm is as follows: Vended iterators of AsyncBroadcastSequence are known as 'runners'. Runners compete in a race to grab the next element from a base iterator for each of its iteration cycles. The 'winner' of an iteration cycle returns the element to the shared context which then supplies the result to later finishers. Once every runner has finished, the current cycle completes and the next iteration can start. This means that runners move forward in lock-step, only proceeding when the the last runner in the current iteration has received a value or has cancelled.

AsyncBroadcastSequence Iterator Lifecycle:

  1. Connection: On connection, each 'runner' is issued with an ID (and any prefixed values from the history buffer) by the shared context. From this point on, the algorithm will wait on this iterator to consume its values before moving on. This means that until next() is called on this iterator, all the other iterators will be held until such time that it is, or the iterator's task is cancelled.

  2. Run: After its prefix values have been exhausted, each time next() is called on the iterator, the iterator attempts to start a 'run' by calling startRun(_:) on the shared context. The shared context marks the iterator as 'running' and issues a role to determine the iterator's action for the current iteration cycle. The roles are as follows:
    - FETCH: The iterator is the 'winner' of this iteration cycle. It is issued with the shared base iterator, calls next() on it, and once it resumes returns the value to the shared context.
    - WAIT: The iterator hasn't won this cycle, but was fast enough that the winner has yet to resume with the element from the base iterator. Therefore, it is told to suspend (WAIT) until such time that the winner resumes.
    - YIELD: The iterator is late (and is holding up the other iterators). The shared context issues it with the value retrieved by the winning iterator and lets it continue immediately.
    - HOLD: The iterator is early for the next iteration cycle. So it is put in the holding pen until the next cycle can start. This is because there are other iterators that still haven't finished their run for the current iteration cycle. This iterator will be resumed when all other iterators have completed their run.

  3. Completion: The iterator calls cancel on the shared context which ensures the iterator does not take part in the next iteration cycle. However, if it is currently suspended it may not resume until the current iteration cycle concludes. This is especially important if it is filling the key FETCH role for the current iteration cycle.

AsyncBroadcastSequence

Declaration

public struct AsyncBroadcastSequence<Base: AsyncSequence> where Base: Sendable, Base.Element: Sendable

Overview

An asynchronous sequence that can be iterated by multiple concurrent consumers.

Use an asynchronous broadcast sequence when you have multiple downstream asynchronous sequences with which you wish to share the output of a single asynchronous sequence. This can be useful if you have expensive upstream operations, or if your asynchronous sequence represents the output of a physical device.

Elements are emitted from an asynchronous broadcast sequence at a rate that does not exceed the consumption of its slowest consumer. If this kind of back-pressure isn't desirable for your use-case, AsyncBroadcastSequence can be composed with buffers – either upstream, downstream, or both – to acheive the desired behavior.

If you have an asynchronous sequence that consumes expensive system resources, it is possible to configure AsyncBroadcastSequence to discard its upstream iterator when the connected downstream consumer count falls to zero. This allows any cancellation tasks configured on the upstream asynchronous sequence to be initiated and for expensive resources to be terminated. AsyncBroadcastSequence will re-create a fresh iterator if there is further demand.

For use-cases where it is important for consumers to have a record of elements emitted prior to their connection, a AsyncBroadcastSequence can also be configured to prefix its output with the most recently emitted elements. If AsyncBroadcastSequence is configured to drop its iterator when the connected consumer count falls to zero, its history will be discarded at the same time.

Creating a sequence

init(
  _ base: Base,
  history historyCount: Int = 0,
  disposingBaseIterator iteratorDisposalPolicy: IteratorDisposalPolicy = .whenTerminatedOrVacant
)

Contructs an asynchronous broadcast sequence.

  • history: the number of elements previously emitted by the sequence to prefix to the iterator of a new consumer
  • iteratorDisposalPolicy: the iterator disposal policy applied to the upstream iterator

AsyncBroadcastSequence.IteratorDisposalPolicy

Declaration

public enum IteratorDisposalPolicy: Sendable {
  case whenTerminated
  case whenTerminatedOrVacant
}

Overview

The iterator disposal policy applied by an asynchronous broadcast sequence to its upstream iterator

  • whenTerminated: retains the upstream iterator for use by future consumers until the base asynchronous sequence is terminated
  • whenTerminatedOrVacant: discards the upstream iterator when the number of consumers falls to zero or the base asynchronous sequence is terminated

broadcast(history:disposingBaseIterator)

Declaration

extension AsyncSequence {

  public func broadcast(
    history historyCount: Int = 0,
    disposingBaseIterator iteratorDisposalPolicy: AsyncBroadcastSequence<Self>.IteratorDisposalPolicy = .whenTerminatedOrVacant
  ) -> AsyncBroadcastSequence<Self>
}

Overview

Creates an asynchronous sequence that can be shared by multiple consumers.

  • history: the number of elements previously emitted by the sequence to prefix to the iterator of a new consumer
  • iteratorDisposalPolicy: the iterator disposal policy applied by an asynchronous broadcast sequence to its upstream iterator

Comparison with other libraries

  • ReactiveX ReactiveX has the Publish observable which when can be composed with the Connect, RefCount and Replay operators to support various multi-casting use-cases. The discardsBaseIterator behavior is applied via RefCount (or the .share().refCount() chain of operators in RxSwift), while the history behavior is achieved through Replay (or the .share(replay:) convenience in RxSwift)

  • Combine Combine has the multicast(_:) operator, which along with the functionality of ConnectablePublisher and associated conveniences supports many of the same use cases as the ReactiveX equivalent, but in some instances requires third-party ooperators to achieve the same level of functionality.

Due to the way a Swift AsyncSequence, and therefore AsyncBroadcastSequence, naturally applies back-pressure, the characteristics of an AsyncBroadcastSequence are different enough that a one-to-one API mapping of other reactive programmming libraries isn't applicable.

However, with the available configuration options – and through composition with other asynchronous sequences – AsyncBroadcastSequence can trivially be adapted to support many of the same use-cases, including that of Connect, RefCount, and Replay.

Effect on API resilience

TBD

Alternatives considered

Creating a one-to-one multicast analog that matches that of existing reactive programming libraries. However, it would mean fighting the back-pressure characteristics of AsyncSequence. Instead, this implementation embraces back-pressure to yield a more flexible result.

Acknowledgments

Thanks to Philippe Hausler and Franz Busch, as well as all other contributors on the Swift forums, for their thoughts and feedback.

Copy link
Member

@FranzBusch FranzBusch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for opening this! I wanna read through it thoroughly at some point but currently the focus is on stabilising what we have so it might take me a bit

/// iterator when the connected consumer count falls to zero, its history will be discarded at the
/// same time.
public struct AsyncSharedSequence<Base: AsyncSequence>
where Base: Sendable, Base.Element: Sendable, Base.AsyncIterator: Sendable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still need to read through all of this but we cannot work with the requirement that the Base.AsyncIterator is Sendable. AsyncIterators should almost never be Sendable so this algo wouldn't be usable

Copy link
Author

@tcldr tcldr Nov 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Well, I think it should be fine if we use @unchecked on a particular portion of the state as the iterator is being used in a particular way that prevents concurrent access. I'll need someone to confirm my understanding on that though!

I've remove the constraint, added @unchecked where appropriate and run through the compiler via swift build -Xswiftc -Xfrontend -Xswiftc -strict-concurrency=complete and I'm no longer seeing any warnings, so fingers-crossed this way of using an iterator is viable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The iterator is now protected by an actor, so we should be good to go. No requirement for a Sendable iterator. No @unchecked required, which is good, as, thinking about it, I'm not sure that would even work.

@tcldr tcldr changed the title [WIP] Shared Shared algorithm Nov 9, 2022
@tcldr tcldr changed the title Shared algorithm Broadcast algorithm (Previously Shared) Nov 20, 2022
@danthorpe
Copy link

Hello 👋,

I've spent some time trying to read the latest on efforts towards multi-cast support. Is there any progress on this or any other multi-casting ability for this repository? Is there a roadmap or anticipated timeline for features in this repository? I appreciate that it's free & open-source software, just thought I'd ask 🙏

@tcldr
Copy link
Author

tcldr commented Jul 31, 2023

Hi, @danthorpe. Not as far as I'm aware. I'm sure @FranzBusch will know more. I think it's all pending a v1.0 release of the current algorithms in the package. Multicasting was a v1.1. thing last I heard. In my mind, It's a bit of an open question as to what multicasting should look like for async sequences: the more I delved into it, the clearer it became to me why Rx and derivatives chose the subject pattern for example. To pull that off with async sequences would require more language features – so that we could define a protocol on the send end for example. In the meantime though, this works.

@FranzBusch
Copy link
Member

Correct. We are still working on the current 1.0.0 release and broadcast is very high on the list of things that I want to nail after 1.0.0. I have a pretty clear idea in mind how we should implement it so that it is both efficient and fast.

@tcldr
Copy link
Author

tcldr commented Aug 2, 2023

Interesting, that will be great to see.

I wonder if it would be worthwhile starting a discussion on the future of async sequences - perhaps with a vision document outlining a rough roadmap?

The reason being is that, as a consumer, I’m still unclear on the ultimate vision for async sequences.It’s hard for someone on the outside of internal discussions to know if something they expect doesn’t exist because a) the plans exist on some internal roadmap, or b) because the feature is fundamentally incompatible with the vision for async sequences. For example, I’m still unclear whether or not primary associated types are planned, whether or not we’ll define an AsyncSource type to generalise sending, whether or not there’s understanding of/or a plan to handle same actor communication, etc.

Some of the current characteristics of async sequences don’t match people’s intuition: ‘why don’t the stock source sequences return a multicast sequence?’, or ‘why does my UI behave badly when I try and use async sequences to commute data from A to B on the MainActor?’.

As a consumer it would be really good to know if that’s because these things have a planned resolution, or because the type is being used in a way in that it wasn’t designed for in the first place. There’s a lot of sharp edges with the current implementation, and with some wider discussions it might be possible to iron them out.

@tcldr tcldr closed this Aug 2, 2023
@tcldr tcldr deleted the pr/share branch August 2, 2023 07:33
@tcldr tcldr restored the pr/share branch July 18, 2024 11:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants