Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Replay] implement with a bounded buffer strategy (v1.1) #245

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions Sources/AsyncAlgorithms/AsyncReplaySequence.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2022 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//

import DequeModule

public extension AsyncSequence {
func replay(count: Int) -> AsyncReplaySequence<Self> {
AsyncReplaySequence(base: self, count: count)
}
}

public struct AsyncReplaySequence<Base: AsyncSequence>: AsyncSequence {
public typealias Element = Base.Element
public typealias AsyncIterator = Iterator

private let base: Base
private let count: Int
private let history: ManagedCriticalState<Deque<Result<Base.Element?, Error>>>

public init(base: Base, count: Int) {
self.base = base
self.count = count
self.history = ManagedCriticalState([])
}

private func push(element: Result<Element?, Error>) {
self.history.withCriticalRegion { history in
if history.count >= count {
_ = history.popFirst()
}
history.append(element)
}
}

private func dumpHistory(into localHistory: inout Deque<Result<Base.Element?, Error>>?) {
self.history.withCriticalRegion { localHistory = $0 }
}

public func makeAsyncIterator() -> AsyncIterator {
return Iterator(
asyncReplaySequence: self,
base: self.base.makeAsyncIterator()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can do it like this. The problem here is that if the upstream AsyncSequence is unicast then this will can potentially crash or not work.

In general, I think we need to approach the implementation a bit differently since the replay algorithm is basically a multicast algorithm. So it shares some of the same implementation characteristics as broadcast does. Specifically, I think we need to only ever create one upstream iterator. That should be done on the first call to next to any of the replay iterators. Furthermore, we need to make sure that we only issue a single next call to the upstream even if we have multiple downstreams.

Before going ahead and implementing this further, I would recommend to wait how the broadcast algorithm evolves and once we have that we should revisit replay. Let me know what you think!

)
}

public struct Iterator: AsyncIteratorProtocol {
let asyncReplaySequence: AsyncReplaySequence<Base>
var base: Base.AsyncIterator
var history: Deque<Result<Base.Element?, Error>>?

public mutating func next() async rethrows -> Element? {
if self.history == nil {
// first call to next, we make sure we have the latest available history
self.asyncReplaySequence.dumpHistory(into: &self.history)
}

if self.history!.isEmpty {
// nothing to replay, we request the next element from the base and push it in the history
let element: Result<Base.Element?, Error>
do {
element = .success(try await self.base.next())
} catch {
element = .failure(error)
}

self.asyncReplaySequence.push(element: element)
return try element._rethrowGet()
} else {
guard !Task.isCancelled else { return nil }

// we replay the oldest element from the history
let element = self.history!.popFirst()!
return try element._rethrowGet()
}
}
}
}

extension AsyncReplaySequence: Sendable where Base: Sendable, Base.Element: Sendable { }
97 changes: 97 additions & 0 deletions Tests/AsyncAlgorithmsTests/TestReplay.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2022 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//

import XCTest
import AsyncAlgorithms

final class TestReplay: XCTestCase {
func test_given_a_replayed_sequence_when_next_sequence_is_iterated_then_elements_are_replayed_in_the_limit_of_count() async {
let channel = AsyncChannel<Int>()

// Given
let replayed = channel.replay(count: 2)

Task {
await channel.send(1)
await channel.send(2)
await channel.send(3)
}

var iterator1 = replayed.makeAsyncIterator()
_ = await iterator1.next() // 1
_ = await iterator1.next() // 2
_ = await iterator1.next() // 3

Task {
await channel.send(4)
await channel.send(5)
await channel.send(6)
}

// When
var received = [Int]()
var iterator2 = replayed.makeAsyncIterator()
received.append(await iterator2.next()!) // 2
received.append(await iterator2.next()!) // 3
received.append(await iterator2.next()!) // 4
received.append(await iterator2.next()!) // 5
received.append(await iterator2.next()!) // 6

// Then
XCTAssertEqual(received, [2, 3, 4, 5, 6])
}

func test_given_a_replayed_sequence_when_base_is_finished_then_pastEnd_is_nil() async {
// Given
let replayed = [1, 2, 3].async.replay(count: 0)

var iterator = replayed.makeAsyncIterator()

// When
while let _ = await iterator.next() {}

// Then
let pastEnd = await iterator.next()
XCTAssertNil(pastEnd)
}

func test_given_a_failed_replayed_sequence_when_next_sequence_is_iterated_then_elements_are_replayed_with_failure() async throws {
let channel = AsyncThrowingChannel<Int, Error>()

// Given
let replayed = channel.replay(count: 2)

Task {
await channel.send(1)
await channel.send(2)
channel.fail(Failure())
}

var iterator1 = replayed.makeAsyncIterator()
_ = try await iterator1.next() // 1
_ = try await iterator1.next() // 2
_ = try? await iterator1.next() // failure

// When
var received = [Int]()
do {
for try await element in replayed {
received.append(element)
}
XCTFail("Replayed should fail at element number 2")
} catch {
XCTAssertTrue(error is Failure)
}

// Then
XCTAssertEqual(received, [2])
}
}