From 4a7d04ccbf1558923331c9b7ac5fdb07488b9d65 Mon Sep 17 00:00:00 2001 From: Thibault Wittemberg Date: Sat, 31 Dec 2022 16:39:45 +0100 Subject: [PATCH] broadcast: implement algorithm with buffering strategy --- .../Broacast/AsyncBroadcastSequence.swift | 46 +++ .../Broacast/BroadcastStateMachine.swift | 241 ++++++++++++++++ .../Broacast/BroadcastStorage.swift | 160 +++++++++++ .../Broacast/UnicastStateMachine.swift | 132 +++++++++ .../TestBroadcastSequence.swift | 262 ++++++++++++++++++ 5 files changed, 841 insertions(+) create mode 100644 Sources/AsyncAlgorithms/Broacast/AsyncBroadcastSequence.swift create mode 100644 Sources/AsyncAlgorithms/Broacast/BroadcastStateMachine.swift create mode 100644 Sources/AsyncAlgorithms/Broacast/BroadcastStorage.swift create mode 100644 Sources/AsyncAlgorithms/Broacast/UnicastStateMachine.swift create mode 100644 Tests/AsyncAlgorithmsTests/TestBroadcastSequence.swift diff --git a/Sources/AsyncAlgorithms/Broacast/AsyncBroadcastSequence.swift b/Sources/AsyncAlgorithms/Broacast/AsyncBroadcastSequence.swift new file mode 100644 index 00000000..47b090b5 --- /dev/null +++ b/Sources/AsyncAlgorithms/Broacast/AsyncBroadcastSequence.swift @@ -0,0 +1,46 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +public extension AsyncSequence { + func broadcast() -> AsyncBroadcastSequence { + AsyncBroadcastSequence(base: self) + } +} + +public struct AsyncBroadcastSequence: AsyncSequence where Base: Sendable, Base.Element: Sendable { + public typealias Element = Base.Element + public typealias AsyncIterator = Iterator + + private let storage: BroadcastStorage + + public init(base: Base) { + self.storage = BroadcastStorage(base: base) + } + + public func makeAsyncIterator() -> Iterator { + Iterator(storage: self.storage) + } + + public struct Iterator: AsyncIteratorProtocol { + private var id: Int + private let storage: BroadcastStorage + + init(storage: BroadcastStorage) { + self.storage = storage + self.id = storage.generateId() + } + + public mutating func next() async rethrows -> Element? { + let element = await self.storage.next(id: self.id) + return try element?._rethrowGet() + } + } +} diff --git a/Sources/AsyncAlgorithms/Broacast/BroadcastStateMachine.swift b/Sources/AsyncAlgorithms/Broacast/BroadcastStateMachine.swift new file mode 100644 index 00000000..8ba0c497 --- /dev/null +++ b/Sources/AsyncAlgorithms/Broacast/BroadcastStateMachine.swift @@ -0,0 +1,241 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +struct BroadcastStateMachine: Sendable +where Base: Sendable, Base.Element: Sendable { + typealias Channel = UnicastStateMachine + + enum State { + case initial(base: Base, channels: [Int: Channel]) + case broadcasting( + task: Task, + suspendedProducer: UnsafeContinuation?, + channels: [Int: Channel], + isBusy: Bool, + demands: Set + ) + case finished(channels: [Int: Channel]) + } + + private var state: State + + init(base: Base) { + self.state = .initial(base: base, channels: [:]) + } + + func task() -> Task? { + switch self.state { + case .broadcasting(let task, _, _, _, _): + return task + default: + return nil + } + } + + mutating func taskIsStarted( + id: Int, + task: Task, + continuation: UnsafeContinuation, Never> + ) -> Channel.NextIsSuspendedAction { + switch self.state { + case .initial(_, var channels): + precondition(channels[id] != nil, "Invalid state.") + var channel = channels[id]! + let action = channel.nextIsSuspended(continuation: continuation) + channels[id] = channel + + self.state = .broadcasting(task: task, suspendedProducer: nil, channels: channels, isBusy: true, demands: [id]) + return action + case .broadcasting: + preconditionFailure("Invalid state.") + case .finished: + preconditionFailure("Invalid state.") + } + } + + enum ProducerIsSuspendedAction { + case resume + case suspend + } + + mutating func producerIsSuspended( + continuation: UnsafeContinuation + ) -> ProducerIsSuspendedAction { + switch self.state { + case .initial: + preconditionFailure("Invalid state.") + case .broadcasting(let task, let suspendedProducer, let channels, _, var demands): + precondition(suspendedProducer == nil, "Invalid state.") + + if !demands.isEmpty { + demands.removeAll() + self.state = .broadcasting(task: task, suspendedProducer: continuation, channels: channels, isBusy: true, demands: demands) + return .resume + } + + self.state = .broadcasting(task: task, suspendedProducer: continuation, channels: channels, isBusy: false, demands: demands) + return .suspend + case .finished: + preconditionFailure("Invalid state.") + } + } + + mutating func element(element: Base.Element) -> [Channel.SendAction] { + switch state { + case .initial: + preconditionFailure("Invalid state.") + case .broadcasting(let task, _, var channels, _, let demands): + var actions = [Channel.SendAction]() + for entry in channels { + let id = entry.key + var channel = entry.value + actions.append(channel.send(element)) + channels[id] = channel + } + + self.state = .broadcasting(task: task, suspendedProducer: nil, channels: channels, isBusy: false, demands: demands) + return actions + case .finished: + preconditionFailure("Invalid state.") + } + } + + mutating func finish(error: Error? = nil) -> [Channel.FinishAction] { + switch state { + case .initial: + preconditionFailure("Invalid state.") + case .broadcasting(_, _, var channels, _, _): + var actions = [Channel.FinishAction]() + for entry in channels { + let id = entry.key + var channel = entry.value + actions.append(channel.finish(error: error)) + channels[id] = channel + } + + self.state = .finished(channels: channels) + return actions + case .finished: + preconditionFailure("Invalid state.") + } + } + + mutating func next(id: Int) -> Channel.NextAction { + switch self.state { + case .initial(let base, var channels): + var channel = Channel() + let action = channel.next() + channels[id] = channel + self.state = .initial(base: base, channels: channels) + return action + case .broadcasting(let task, let suspendedProducer, var channels, let isBusy, let demands): + if var channel = channels[id] { + let action = channel.next() + channels[id] = channel + + self.state = .broadcasting(task: task, suspendedProducer: suspendedProducer, channels: channels, isBusy: isBusy, demands: demands) + return action + } + var channel = Channel() + let action = channel.next() + channels[id] = channel + + self.state = .broadcasting(task: task, suspendedProducer: suspendedProducer, channels: channels, isBusy: isBusy, demands: demands) + return action + case .finished(var channels): + if var channel = channels[id] { + let action = channel.next() + channels[id] = channel + self.state = .finished(channels: channels) + return action + } + var channel = Channel() + let action = channel.next() + channels[id] = channel + + self.state = .finished(channels: channels) + return action + } + } + + enum NextIsSuspendedAction { + case nextIsSuspendedAction(action: Channel.NextIsSuspendedAction) + case resumeProducerAndNextIsSuspendedAction(continuation: UnsafeContinuation?, action: Channel.NextIsSuspendedAction) + case startTask(base: Base) + } + + mutating func nextIsSuspended( + id: Int, + continuation: UnsafeContinuation, Never> + ) -> NextIsSuspendedAction { + switch self.state { + case .initial(let base, _): + return .startTask(base: base) + case .broadcasting(let task, let suspendedProducer, var channels, let isBusy, var demands): + guard channels[id] != nil else { return .nextIsSuspendedAction(action: .resume(element: .success(nil))) } + + if isBusy { + demands.update(with: id) + var channel = channels[id]! + let action = channel.nextIsSuspended(continuation: continuation) + channels[id] = channel + + self.state = .broadcasting(task: task, suspendedProducer: suspendedProducer, channels: channels, isBusy: isBusy, demands: demands) + return .nextIsSuspendedAction(action: action) + } + demands.removeAll() + var channel = channels[id]! + let action = channel.nextIsSuspended(continuation: continuation) + channels[id] = channel + + self.state = .broadcasting(task: task, suspendedProducer: suspendedProducer, channels: channels, isBusy: true, demands: demands) + return .resumeProducerAndNextIsSuspendedAction(continuation: suspendedProducer, action: action) + case .finished(var channels): + precondition(channels[id] != nil, "Invalid state.") + var channel = channels[id]! + let action = channel.nextIsSuspended(continuation: continuation) + channels[id] = channel + + self.state = .finished(channels: channels) + return .nextIsSuspendedAction(action: action) + } + } + + enum NextIsCancelledAction { + case nextIsCancelledAction(continuation: UnsafeContinuation, Never>?) + } + + mutating func nextIsCancelled( + id: Int + ) -> NextIsCancelledAction { + switch self.state { + case .initial: + preconditionFailure("Invalid state.") + case .broadcasting(let task, let suspendedProducer, var channels, let isBusy, var demands): + precondition(channels[id] != nil, "Invalid state.") + var channel = channels[id]! + demands.remove(id) + let continuation = channel.nextIsCancelled() + channels[id] = nil + + self.state = .broadcasting(task: task, suspendedProducer: suspendedProducer, channels: channels, isBusy: isBusy, demands: demands) + return .nextIsCancelledAction(continuation: continuation) + + case .finished(var channels): + var channel = channels[id]! + let continuation = channel.nextIsCancelled() + channels[id] = nil + + self.state = .finished(channels: channels) + return .nextIsCancelledAction(continuation: continuation) + } + } +} diff --git a/Sources/AsyncAlgorithms/Broacast/BroadcastStorage.swift b/Sources/AsyncAlgorithms/Broacast/BroadcastStorage.swift new file mode 100644 index 00000000..f3b8195e --- /dev/null +++ b/Sources/AsyncAlgorithms/Broacast/BroadcastStorage.swift @@ -0,0 +1,160 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +final class BroadcastStorage: Sendable where Base: Sendable, Base.Element: Sendable { + private let stateMachine: ManagedCriticalState> + private let ids: ManagedCriticalState + + init(base: Base) { + self.stateMachine = ManagedCriticalState(BroadcastStateMachine(base: base)) + self.ids = ManagedCriticalState(0) + } + + func generateId() -> Int { + self.ids.withCriticalRegion { ids in + ids += 1 + return ids + } + } + + func next(id: Int) async -> Result? { + let (shouldExit, element) = self.stateMachine.withCriticalRegion { stateMachine -> (Bool, Result?) in + let action = stateMachine.next(id: id) + switch action { + case .suspend: + return (false, nil) + case .exit(let element): + return (true, element) + } + } + + if shouldExit { + return element + } + + return await withTaskCancellationHandler { + await withUnsafeContinuation { (continuation: UnsafeContinuation, Never>) in + self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.nextIsSuspended( + id: id, + continuation: continuation + ) + switch action { + case .startTask(let base): + self.startTask(stateMachine: &stateMachine, base: base, id: id, downstreamContinuation: continuation) + case .nextIsSuspendedAction(.resume(let element)): + continuation.resume(returning: element) + case .nextIsSuspendedAction(.suspend): + break + case .resumeProducerAndNextIsSuspendedAction(let upstreamContinuation, .resume(let element)): + upstreamContinuation?.resume() + continuation.resume(returning: element) + case .resumeProducerAndNextIsSuspendedAction(let upstreamContinuation, .suspend): + upstreamContinuation?.resume() + break + } + } + } + } onCancel: { + self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.nextIsCancelled(id: id) + switch action { + case .nextIsCancelledAction(let continuation): + continuation?.resume(returning: .success(nil)) + } + } + } + } + + private func startTask( + stateMachine: inout BroadcastStateMachine, + base: Base, + id: Int, + downstreamContinuation: UnsafeContinuation, Never> + ) { + let task = Task { + do { + var iterator = base.makeAsyncIterator() + loop: while true { + await withUnsafeContinuation { (continuation: UnsafeContinuation) in + self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.producerIsSuspended( + continuation: continuation + ) + + switch action { + case .resume: + continuation.resume() + case .suspend: + break + } + } + } + + guard let element = try await iterator.next() else { + break loop + } + + self.stateMachine.withCriticalRegion { stateMachine in + let actions = stateMachine.element(element: element) + for action in actions { + switch action { + case .none: + break + case .resumeConsumer(let continuation): + continuation?.resume(returning: .success(element)) + } + } + } + } + + self.stateMachine.withCriticalRegion { stateMachine in + let actions = stateMachine.finish() + for action in actions { + switch action { + case .none: + break + case .resumeConsumer(let continuation, _): + continuation?.resume(returning: .success(nil)) + } + } + } + } catch { + self.stateMachine.withCriticalRegion { stateMachine in + let actions = stateMachine.finish(error: error) + for action in actions { + switch action { + case .none: + break + case .resumeConsumer(let continuation, _): + continuation?.resume(returning: .failure(error)) + } + } + + } + } + } + + let action = stateMachine.taskIsStarted(id: id, task: task, continuation: downstreamContinuation) + + switch action { + case .suspend: + break + case .resume(let element): + downstreamContinuation.resume(returning: element) + } + } + + deinit { + let task = self.stateMachine.withCriticalRegion { $0.task() } + task?.cancel() + } +} diff --git a/Sources/AsyncAlgorithms/Broacast/UnicastStateMachine.swift b/Sources/AsyncAlgorithms/Broacast/UnicastStateMachine.swift new file mode 100644 index 00000000..0edae9c4 --- /dev/null +++ b/Sources/AsyncAlgorithms/Broacast/UnicastStateMachine.swift @@ -0,0 +1,132 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +@_implementationOnly import DequeModule + +struct UnicastStateMachine: Sendable { + enum State { + case buffering( + elements: Deque>, + suspendedConsumer: UnsafeContinuation, Never>? + ) + case finished(elements: Deque>) + } + + private var state: State = .buffering(elements: [], suspendedConsumer: nil) + + enum SendAction { + case none + case resumeConsumer(continuation: UnsafeContinuation, Never>?) + } + + mutating func send(_ element: Element) -> SendAction { + switch self.state { + case .buffering(let elements, let suspendedConsumer) where suspendedConsumer != nil: + // we are waiting for a producer, we can resume the awaiting consumer + self.state = .buffering(elements: elements, suspendedConsumer: nil) + return .resumeConsumer(continuation: suspendedConsumer) + case .buffering(var elements, _): + elements.append(.success(element)) + self.state = .buffering(elements: elements, suspendedConsumer: nil) + return .none + case .finished: + return .none + } + } + + enum FinishAction { + case none + case resumeConsumer(continuation: UnsafeContinuation, Never>?, error: Error?) + } + + mutating func finish(error: Error?) -> FinishAction { + switch self.state { + case .buffering(_, let suspendedConsumer) where suspendedConsumer != nil: + // we are waiting for a producer, we can resume the awaiting consumer with nil + self.state = .finished(elements: []) + return .resumeConsumer(continuation: suspendedConsumer, error: error) + case .buffering(var elements, _): + if let error { + elements.append(.failure(error)) + } + self.state = .finished(elements: elements) + return .none + case .finished: + return .none + } + } + + enum NextAction { + case suspend + case exit(element: Result) + } + + mutating func next() -> NextAction { + switch self.state { + case .buffering(var elements, _) where !elements.isEmpty: + // we have stacked values, we deliver the first to the iteration + let element = elements.popFirst()! + self.state = .buffering(elements: elements, suspendedConsumer: nil) + return .exit(element: element) + case .buffering(_, let suspendedConsumer) where suspendedConsumer != nil: + // a consumer is already suspended, this is an error + preconditionFailure("Invalid state. A consumer is already suspended") + case .buffering(_, _): + return .suspend + case .finished(var elements) where !elements.isEmpty: + let element = elements.popFirst()! + self.state = .finished(elements: elements) + return .exit(element: element) + case .finished: + return .exit(element: .success(nil)) + } + } + + enum NextIsSuspendedAction { + case resume(element: Result) + case suspend + } + + mutating func nextIsSuspended( + continuation: UnsafeContinuation, Never> + ) -> NextIsSuspendedAction { + switch self.state { + case .buffering(var elements, _) where !elements.isEmpty: + // we have stacked values, we resume the continuation with the first element + let element = elements.popFirst()! + self.state = .buffering(elements: elements, suspendedConsumer: nil) + return .resume(element: element) + case .buffering(_, let suspendedConsumer) where suspendedConsumer != nil: + // a consumer is already suspended, this is an error + preconditionFailure("Invalid state. A consumer is already suspended") + case .buffering(let elements, _): + // we suspend the consumer + self.state = .buffering(elements: elements, suspendedConsumer: continuation) + return .suspend + case .finished(var elements) where !elements.isEmpty: + let element = elements.popFirst()! + self.state = .finished(elements: elements) + return .resume(element: element) + case .finished: + return .resume(element: .success(nil)) + } + } + + mutating func nextIsCancelled() -> UnsafeContinuation, Never>? { + switch self.state { + case .buffering(_, let suspendedConsumer): + self.state = .finished(elements: []) + return suspendedConsumer + case .finished: + return nil + } + } +} diff --git a/Tests/AsyncAlgorithmsTests/TestBroadcastSequence.swift b/Tests/AsyncAlgorithmsTests/TestBroadcastSequence.swift new file mode 100644 index 00000000..6bfeefe3 --- /dev/null +++ b/Tests/AsyncAlgorithmsTests/TestBroadcastSequence.swift @@ -0,0 +1,262 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +@preconcurrency import XCTest +import AsyncAlgorithms + +final class TestBroadcast: XCTestCase { + func test_given_a_base_sequence_when_broadcasting_to_two_tasks_then_the_base_sequence_is_iterated_once() async { + // Given + let elements = (0..<10).map { $0 } + let base = ReportingAsyncSequence(elements) + + let expectedNexts = elements.map { _ in ReportingAsyncSequence.Event.next } + + // When + let broadcasted = base.broadcast() + await withTaskGroup(of: Void.self) { group in + group.addTask { + for await _ in broadcasted {} + } + group.addTask { + for await _ in broadcasted {} + } + await group.waitForAll() + } + + // Then + XCTAssertEqual( + base.events, + [ReportingAsyncSequence.Event.makeAsyncIterator] + expectedNexts + [ReportingAsyncSequence.Event.next] + ) + } + + func test_given_a_base_sequence_when_broadcasting_to_two_tasks_then_they_receive_the_base_elements() async { + // Given + let base = (0..<10).map { $0 } + let expected = (0...4).map { $0 } + + // When + let broadcasted = base.async.map { try throwOn(5, $0) }.broadcast() + let results = await withTaskGroup(of: [Int].self) { group in + group.addTask { + var received = [Int]() + do { + for try await element in broadcasted { + received.append(element) + } + XCTFail("The broadcast should fail before finish") + } catch { + XCTAssertTrue(error is Failure) + } + + return received + } + group.addTask { + var received = [Int]() + do { + for try await element in broadcasted { + received.append(element) + } + XCTFail("The broadcast should fail before finish") + } catch { + XCTAssertTrue(error is Failure) + } + + return received + } + + return await Array(group) + } + + // Then + XCTAssertEqual(results[0], expected) + XCTAssertEqual(results[0], results[1]) + } + + func test_given_a_throwing_base_sequence_when_broadcasting_to_two_tasks_then_they_receive_the_base_elements_and_failure() async { + // Given + let base = (0..<10).map { $0 } + + // When + let broadcasted = base.async.broadcast() + let results = await withTaskGroup(of: [Int].self) { group in + group.addTask { + await Array(broadcasted) + } + group.addTask { + await Array(broadcasted) + } + return await Array(group) + } + + // Then + XCTAssertEqual(results[0], base) + XCTAssertEqual(results[0], results[1]) + } + + func test_given_a_base_sequence_when_broadcasting_to_two_tasks_then_they_receive_finish_and_pastEnd_is_nil() async { + // Given + let base = (0..<10).map { $0 } + + // When + let broadcasted = base.async.broadcast() + await withTaskGroup(of: Void.self) { group in + group.addTask { + var iterator = broadcasted.makeAsyncIterator() + while let _ = await iterator.next() {} + let pastEnd = await iterator.next() + + // Then + XCTAssertNil(pastEnd) + } + group.addTask { + var iterator = broadcasted.makeAsyncIterator() + while let _ = await iterator.next() {} + let pastEnd = await iterator.next() + + // Then + XCTAssertNil(pastEnd) + } + + await group.waitForAll() + } + } + + func test_given_a_base_sequence_when_broadcasting_to_two_tasks_then_the_buffer_is_used() async { + let task1IsIsFinished = expectation(description: "") + + // Given + let base = (0..<10).map { $0 } + + // When + let broadcasted = base.async.broadcast() + let results = await withTaskGroup(of: [Int].self) { group in + group.addTask { + let result = await Array(broadcasted) + task1IsIsFinished.fulfill() + return result + } + group.addTask { + var result = [Int]() + var iterator = broadcasted.makeAsyncIterator() + let firstElement = await iterator.next() + result.append(firstElement!) + self.wait(for: [task1IsIsFinished], timeout: 1.0) + + while let element = await iterator.next() { + result.append(element) + } + + return result + } + return await Array(group) + } + + // Then + XCTAssertEqual(results[0], base) + XCTAssertEqual(results[0], results[1]) + } + + func test_given_a_channel_when_broadcasting_to_two_tasks_then_they_received_the_channel_elements() async { + // Given + let elements = (0..<10).map { $0 } + let base = AsyncChannel() + + // When + let broadcasted = base.broadcast() + let results = await withTaskGroup(of: [Int].self) { group in + group.addTask { + var sent = [Int]() + for element in elements { + sent.append(element) + await base.send(element) + } + base.finish() + return sent + } + group.addTask { + await Array(broadcasted) + } + group.addTask { + await Array(broadcasted) + } + return await Array(group) + } + + // Then + XCTAssertEqual(results[0], elements) + XCTAssertEqual(results[0], results[1]) + } + + func test_given_a_broadcasted_sequence_when_cancelling_task_iteration_finishes() async { + let task1CanCancel = expectation(description: "") + let task1IsCancelled = expectation(description: "") + + let task2CanCancel = expectation(description: "") + let task2IsCancelled = expectation(description: "") + + // Given + let base = (0..<10).map { $0 } + let broadcasted = base.async.broadcast() + + let task1 = Task { + var received = [Int?]() + + var iterator = broadcasted.makeAsyncIterator() + let element = await iterator.next() + received.append(element) + + task1CanCancel.fulfill() + + wait(for: [task1IsCancelled], timeout: 1.0) + + // Then + let pastCancelled = await iterator.next() + XCTAssertNil(pastCancelled) + + return received + } + + let task2 = Task { + var received = [Int?]() + + var iterator = broadcasted.makeAsyncIterator() + let element = await iterator.next() + received.append(element) + + task2CanCancel.fulfill() + + wait(for: [task2IsCancelled], timeout: 1.0) + + // Then + let pastCancelled = await iterator.next() + XCTAssertNil(pastCancelled) + + return received + } + + wait(for: [task1CanCancel, task2CanCancel], timeout: 1.0) + + // When + task1.cancel() + task2.cancel() + + task1IsCancelled.fulfill() + task2IsCancelled.fulfill() + + let elements1 = await task1.value + let elements2 = await task2.value + + // Then + XCTAssertEqual(elements1, elements2) + } +}