From 87afc1c9dd3913cb268b1909c3872e92197ee84e Mon Sep 17 00:00:00 2001 From: Lawrence Forooghian Date: Wed, 18 Dec 2024 17:23:09 -0300 Subject: [PATCH] WIP Create SubscriptionStorage type for helping with common activities (TODO document) Namely, managing a list of subscriptions and emitting values on all of these subscriptions. TODO what about example app? TODO need to do MessageSubscriptions too TODO handle nullability of returned listeners once https://github.com/ably/ably-cocoa/pull/2004 merged and released --- Sources/AblyChat/DefaultConnection.swift | 6 +- Sources/AblyChat/DefaultMessages.swift | 15 +++- Sources/AblyChat/DefaultOccupancy.swift | 12 ++- Sources/AblyChat/DefaultPresence.swift | 21 +++++- .../DefaultRoomLifecycleContributor.swift | 17 ++--- Sources/AblyChat/DefaultRoomReactions.swift | 6 +- Sources/AblyChat/DefaultTyping.swift | 10 ++- Sources/AblyChat/Messages.swift | 4 + Sources/AblyChat/RoomLifecycleManager.swift | 74 +++++-------------- Sources/AblyChat/Rooms.swift | 11 +-- Sources/AblyChat/Subscription.swift | 14 +++- Sources/AblyChat/SubscriptionStorage.swift | 36 +++++++++ .../Mocks/MockFeatureChannel.swift | 11 +-- .../Mocks/MockRealtimeChannel.swift | 4 +- .../MockRoomLifecycleContributorChannel.swift | 10 +-- .../Mocks/MockRoomLifecycleManager.swift | 11 +-- Tests/AblyChatTests/SubscriptionTests.swift | 21 ++++++ 17 files changed, 172 insertions(+), 111 deletions(-) create mode 100644 Sources/AblyChat/SubscriptionStorage.swift diff --git a/Sources/AblyChat/DefaultConnection.swift b/Sources/AblyChat/DefaultConnection.swift index 014af8b9..ca4ba40a 100644 --- a/Sources/AblyChat/DefaultConnection.swift +++ b/Sources/AblyChat/DefaultConnection.swift @@ -30,7 +30,7 @@ internal final class DefaultConnection: Connection { let subscription = Subscription(bufferingPolicy: bufferingPolicy) // (CHA-CS5) The chat client must monitor the underlying realtime connection for connection status changes. - realtime.connection.on { [weak self] stateChange in + let eventListener = realtime.connection.on { [weak self] stateChange in guard let self else { return } @@ -95,6 +95,10 @@ internal final class DefaultConnection: Connection { } } + subscription.setOnTermination { [weak self] in + self?.realtime.connection.off(eventListener) + } + return subscription } } diff --git a/Sources/AblyChat/DefaultMessages.swift b/Sources/AblyChat/DefaultMessages.swift index 8229d3ae..e4595621 100644 --- a/Sources/AblyChat/DefaultMessages.swift +++ b/Sources/AblyChat/DefaultMessages.swift @@ -15,7 +15,6 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities { private let clientID: String private let logger: InternalLogger - // TODO: https://github.com/ably-labs/ably-chat-swift/issues/36 - Handle unsubscribing in line with CHA-M4b // UUID acts as a unique identifier for each listener/subscription. MessageSubscriptionWrapper houses the subscription and the serial of when it was attached or resumed. private var subscriptionPoints: [UUID: MessageSubscriptionWrapper] = [:] @@ -53,7 +52,7 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities { // (CHA-M4c) When a realtime message with name set to message.created is received, it is translated into a message event, which contains a type field with the event type as well as a message field containing the Message Struct. This event is then broadcast to all subscribers. // (CHA-M4d) If a realtime message with an unknown name is received, the SDK shall silently discard the message, though it may log at DEBUG or TRACE level. // (CHA-M5k) Incoming realtime events that are malformed (unknown field should be ignored) shall not be emitted to subscribers. - channel.subscribe(RealtimeMessageName.chatMessage.rawValue) { message in + let eventListener = channel.subscribe(RealtimeMessageName.chatMessage.rawValue) { message in Task { // TODO: Revisit errors thrown as part of https://github.com/ably-labs/ably-chat-swift/issues/32 guard let ablyCocoaData = message.data, @@ -104,6 +103,18 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities { } } + messageSubscription.setOnTermination { + Task { + await MainActor.run { [weak self] () in + guard let self else { + return + } + channel.unsubscribe(eventListener) + subscriptionPoints.removeValue(forKey: uuid) + } + } + } + return messageSubscription } diff --git a/Sources/AblyChat/DefaultOccupancy.swift b/Sources/AblyChat/DefaultOccupancy.swift index 5fa7ef70..da2d2b3f 100644 --- a/Sources/AblyChat/DefaultOccupancy.swift +++ b/Sources/AblyChat/DefaultOccupancy.swift @@ -22,8 +22,10 @@ internal final class DefaultOccupancy: Occupancy, EmitsDiscontinuities { // (CHA-04d) If an invalid occupancy event is received on the channel, it shall be dropped. internal func subscribe(bufferingPolicy: BufferingPolicy) async -> Subscription { logger.log(message: "Subscribing to occupancy events", level: .debug) + let subscription = Subscription(bufferingPolicy: bufferingPolicy) - channel.subscribe(OccupancyEvents.meta.rawValue) { [logger] message in + + let eventListener = channel.subscribe(OccupancyEvents.meta.rawValue) { [logger] message in logger.log(message: "Received occupancy message: \(message)", level: .debug) guard let data = message.data as? [String: Any], let metrics = data["metrics"] as? [String: Any] @@ -40,6 +42,14 @@ internal final class DefaultOccupancy: Occupancy, EmitsDiscontinuities { logger.log(message: "Emitting occupancy event: \(occupancyEvent)", level: .debug) subscription.emit(occupancyEvent) } + + subscription.setOnTermination { [weak self] in + // TODO: why is eventListener nullable? + if let eventListener { + self?.channel.off(eventListener) + } + } + return subscription } diff --git a/Sources/AblyChat/DefaultPresence.swift b/Sources/AblyChat/DefaultPresence.swift index b6470c08..d34f08bd 100644 --- a/Sources/AblyChat/DefaultPresence.swift +++ b/Sources/AblyChat/DefaultPresence.swift @@ -199,7 +199,7 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities { internal func subscribe(event: PresenceEventType, bufferingPolicy: BufferingPolicy) async -> Subscription { logger.log(message: "Subscribing to presence events", level: .debug) let subscription = Subscription(bufferingPolicy: bufferingPolicy) - channel.presence.subscribe(event.toARTPresenceAction()) { [processPresenceSubscribe, logger] message in + let eventListener = channel.presence.subscribe(event.toARTPresenceAction()) { [processPresenceSubscribe, logger] message in logger.log(message: "Received presence message: \(message)", level: .debug) Task { // processPresenceSubscribe is logging so we don't need to log here @@ -207,13 +207,20 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities { subscription.emit(presenceEvent) } } + subscription.setOnTermination { [weak channel] in + // TODO: why is eventListener nullable? + if let eventListener { + channel?.presence.unsubscribe(eventListener) + } + } return subscription } internal func subscribe(events: [PresenceEventType], bufferingPolicy: BufferingPolicy) async -> Subscription { logger.log(message: "Subscribing to presence events", level: .debug) let subscription = Subscription(bufferingPolicy: bufferingPolicy) - for event in events { + + let eventListeners = events.map { event in channel.presence.subscribe(event.toARTPresenceAction()) { [processPresenceSubscribe, logger] message in logger.log(message: "Received presence message: \(message)", level: .debug) Task { @@ -222,6 +229,16 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities { } } } + + subscription.setOnTermination { [weak self] in + for eventListener in eventListeners { + // TODO: why is eventListener nil? + if let eventListener { + self?.channel.presence.unsubscribe(eventListener) + } + } + } + return subscription } diff --git a/Sources/AblyChat/DefaultRoomLifecycleContributor.swift b/Sources/AblyChat/DefaultRoomLifecycleContributor.swift index c7e38ade..d95dddcc 100644 --- a/Sources/AblyChat/DefaultRoomLifecycleContributor.swift +++ b/Sources/AblyChat/DefaultRoomLifecycleContributor.swift @@ -3,7 +3,7 @@ import Ably internal actor DefaultRoomLifecycleContributor: RoomLifecycleContributor, EmitsDiscontinuities, CustomDebugStringConvertible { internal nonisolated let channel: DefaultRoomLifecycleContributorChannel internal nonisolated let feature: RoomFeature - private var discontinuitySubscriptions: [Subscription] = [] + private var discontinuitySubscriptions = SubscriptionStorage() internal init(channel: DefaultRoomLifecycleContributorChannel, feature: RoomFeature) { self.channel = channel @@ -13,16 +13,11 @@ internal actor DefaultRoomLifecycleContributor: RoomLifecycleContributor, EmitsD // MARK: - Discontinuities internal func emitDiscontinuity(_ discontinuity: DiscontinuityEvent) { - for subscription in discontinuitySubscriptions { - subscription.emit(discontinuity) - } + discontinuitySubscriptions.emit(discontinuity) } internal func onDiscontinuity(bufferingPolicy: BufferingPolicy) -> Subscription { - let subscription = Subscription(bufferingPolicy: bufferingPolicy) - // TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36) - discontinuitySubscriptions.append(subscription) - return subscription + discontinuitySubscriptions.create(bufferingPolicy: bufferingPolicy) } // MARK: - CustomDebugStringConvertible @@ -56,9 +51,11 @@ internal final class DefaultRoomLifecycleContributorChannel: RoomLifecycleContri } internal func subscribeToState() async -> Subscription { - // TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36) let subscription = Subscription(bufferingPolicy: .unbounded) - underlyingChannel.on { subscription.emit($0) } + let eventListener = underlyingChannel.on { subscription.emit($0) } + subscription.setOnTermination { [weak underlyingChannel] in + underlyingChannel?.unsubscribe(eventListener) + } return subscription } diff --git a/Sources/AblyChat/DefaultRoomReactions.swift b/Sources/AblyChat/DefaultRoomReactions.swift index ea0ac894..b468cbcd 100644 --- a/Sources/AblyChat/DefaultRoomReactions.swift +++ b/Sources/AblyChat/DefaultRoomReactions.swift @@ -40,7 +40,7 @@ internal final class DefaultRoomReactions: RoomReactions, EmitsDiscontinuities { let subscription = Subscription(bufferingPolicy: bufferingPolicy) // (CHA-ER4c) Realtime events with an unknown name shall be silently discarded. - channel.subscribe(RoomReactionEvents.reaction.rawValue) { [clientID, logger] message in + let eventListener = channel.subscribe(RoomReactionEvents.reaction.rawValue) { [clientID, logger] message in logger.log(message: "Received roomReaction message: \(message)", level: .debug) Task { do { @@ -82,6 +82,10 @@ internal final class DefaultRoomReactions: RoomReactions, EmitsDiscontinuities { } } + subscription.setOnTermination { [weak self] in + self?.channel.unsubscribe(eventListener) + } + return subscription } diff --git a/Sources/AblyChat/DefaultTyping.swift b/Sources/AblyChat/DefaultTyping.swift index 89796ca2..af0ca508 100644 --- a/Sources/AblyChat/DefaultTyping.swift +++ b/Sources/AblyChat/DefaultTyping.swift @@ -25,7 +25,7 @@ internal final class DefaultTyping: Typing { let subscription = Subscription(bufferingPolicy: bufferingPolicy) let eventTracker = EventTracker() - channel.presence.subscribe { [weak self] message in + let eventListener = channel.presence.subscribe { [weak self] message in guard let self else { return } @@ -72,6 +72,14 @@ internal final class DefaultTyping: Typing { logger.log(message: "Failed to fetch presence set after \(maxRetryDuration) seconds. Giving up.", level: .error) } } + + subscription.setOnTermination { [weak self] in + // TODO: why is eventListener nullable? + if let eventListener { + self?.channel.presence.unsubscribe(eventListener) + } + } + return subscription } diff --git a/Sources/AblyChat/Messages.swift b/Sources/AblyChat/Messages.swift index e333568a..837ef0e2 100644 --- a/Sources/AblyChat/Messages.swift +++ b/Sources/AblyChat/Messages.swift @@ -219,6 +219,10 @@ public struct MessageSubscription: Sendable, AsyncSequence { subscription.emit(element) } + internal func setOnTermination(_ onTermination: (@Sendable () -> Void)?) { + subscription.setOnTermination(onTermination) + } + public func getPreviousMessages(params: QueryOptions) async throws -> any PaginatedResult { try await getPreviousMessages(params) } diff --git a/Sources/AblyChat/RoomLifecycleManager.swift b/Sources/AblyChat/RoomLifecycleManager.swift index 90f9ed74..22844473 100644 --- a/Sources/AblyChat/RoomLifecycleManager.swift +++ b/Sources/AblyChat/RoomLifecycleManager.swift @@ -87,8 +87,7 @@ internal actor DefaultRoomLifecycleManager! - // TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36) - private var roomStatusChangeSubscriptions: [Subscription] = [] + private var roomStatusChangeSubscriptions = SubscriptionStorage() private var operationResultContinuations = OperationResultContinuations() // MARK: - Initializers and `deinit` @@ -330,15 +329,12 @@ internal actor DefaultRoomLifecycleManager Subscription { - let subscription: Subscription = .init(bufferingPolicy: bufferingPolicy) - roomStatusChangeSubscriptions.append(subscription) - return subscription + roomStatusChangeSubscriptions.create(bufferingPolicy: bufferingPolicy) } #if DEBUG - // TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36) /// Supports the ``testsOnly_onRoomStatusChange()`` method. - private var statusChangeSubscriptions: [Subscription] = [] + private var statusChangeSubscriptions = SubscriptionStorage() internal struct StatusChange { internal var current: Status @@ -347,15 +343,7 @@ internal actor DefaultRoomLifecycleManager Subscription { - let subscription: Subscription = .init(bufferingPolicy: .unbounded) - statusChangeSubscriptions.append(subscription) - return subscription - } - - internal func emitStatusChange(_ change: StatusChange) { - for subscription in statusChangeSubscriptions { - subscription.emit(change) - } + statusChangeSubscriptions.create(bufferingPolicy: .unbounded) } #endif @@ -368,27 +356,20 @@ internal actor DefaultRoomLifecycleManager] = [] + private var stateChangeHandledSubscriptions = SubscriptionStorage() /// Returns a subscription which emits the contributor state changes that have been handled by the manager. /// @@ -401,9 +382,7 @@ internal actor DefaultRoomLifecycleManager Subscription { - let subscription = Subscription(bufferingPolicy: .unbounded) - stateChangeHandledSubscriptions.append(subscription) - return subscription + stateChangeHandledSubscriptions.create(bufferingPolicy: .unbounded) } internal func testsOnly_pendingDiscontinuityEvent(for contributor: Contributor) -> DiscontinuityEvent? { @@ -422,9 +401,8 @@ internal actor DefaultRoomLifecycleManager] = [] + private var transientDisconnectTimeoutHandledSubscriptions = SubscriptionStorage() /// Returns a subscription which emits the IDs of the transient disconnect timeouts that have been handled by the manager. /// @@ -432,9 +410,7 @@ internal actor DefaultRoomLifecycleManager Subscription { - let subscription = Subscription(bufferingPolicy: .unbounded) - transientDisconnectTimeoutHandledSubscriptions.append(subscription) - return subscription + transientDisconnectTimeoutHandledSubscriptions.create(bufferingPolicy: .unbounded) } #endif @@ -560,18 +536,14 @@ internal actor DefaultRoomLifecycleManager] = [] + private var operationWaitEventSubscriptions = SubscriptionStorage() /// Returns a subscription which emits an event each time one room lifecycle operation is going to wait for another to complete. internal func testsOnly_subscribeToOperationWaitEvents() -> Subscription { - let subscription = Subscription(bufferingPolicy: .unbounded) - operationWaitEventSubscriptions.append(subscription) - return subscription + operationWaitEventSubscriptions.create(bufferingPolicy: .unbounded) } #endif @@ -693,9 +662,7 @@ internal actor DefaultRoomLifecycleManager] = [] + private var statusChangeWaitEventSubscriptions = SubscriptionStorage() /// Returns a subscription which emits an event each time ``waitToBeAbleToPerformPresenceOperations(requestedByFeature:)`` is going to wait for a room status change. internal func testsOnly_subscribeToStatusChangeWaitEvents() -> Subscription { - let subscription = Subscription(bufferingPolicy: .unbounded) - statusChangeWaitEventSubscriptions.append(subscription) - return subscription + statusChangeWaitEventSubscriptions.create(bufferingPolicy: .unbounded) } #endif } diff --git a/Sources/AblyChat/Rooms.swift b/Sources/AblyChat/Rooms.swift index 3a061e21..d922a0a5 100644 --- a/Sources/AblyChat/Rooms.swift +++ b/Sources/AblyChat/Rooms.swift @@ -137,22 +137,17 @@ internal actor DefaultRooms: Rooms { internal var waitedOperationType: OperationType } - // TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36) /// Supports the ``testsOnly_subscribeToOperationWaitEvents()`` method. - private var operationWaitEventSubscriptions: [Subscription] = [] + private var operationWaitEventSubscriptions = SubscriptionStorage() /// Returns a subscription which emits an event each time one operation is going to wait for another to complete. internal func testsOnly_subscribeToOperationWaitEvents() -> Subscription { - let subscription = Subscription(bufferingPolicy: .unbounded) - operationWaitEventSubscriptions.append(subscription) - return subscription + operationWaitEventSubscriptions.create(bufferingPolicy: .unbounded) } private func emitOperationWaitEvent(waitingOperationType: OperationType, waitedOperationType: OperationType) { let operationWaitEvent = OperationWaitEvent(waitingOperationType: waitingOperationType, waitedOperationType: waitedOperationType) - for subscription in operationWaitEventSubscriptions { - subscription.emit(operationWaitEvent) - } + operationWaitEventSubscriptions.emit(operationWaitEvent) } #endif diff --git a/Sources/AblyChat/Subscription.swift b/Sources/AblyChat/Subscription.swift index b35ba838..e3fd2a88 100644 --- a/Sources/AblyChat/Subscription.swift +++ b/Sources/AblyChat/Subscription.swift @@ -71,13 +71,19 @@ public struct Subscription: Sendable, AsyncSequence { } } - // TODO: https://github.com/ably-labs/ably-chat-swift/issues/36 Revisit how we want to unsubscribe to fulfil CHA-M4b & CHA-ER4b. I think exposing this publicly for all Subscription types is suitable. - public func unsubscribe() { + // TODO: we should have the ability to have multiple of these so that we don't stand on each other's feet + internal func setOnTermination(_ onTermination: (@Sendable () -> Void)?) { switch mode { case let .default(_, continuation): - continuation.finish() + if let onTermination { + continuation.onTermination = { _ in + onTermination() + } + } else { + continuation.onTermination = nil + } case .mockAsyncSequence: - fatalError("`finish` cannot be called on a Subscription that was created using init(mockAsyncSequence:)") + fatalError("`setOnTermination(_:)` cannot be called on a Subscription that was created using init(mockAsyncSequence:)") } } diff --git a/Sources/AblyChat/SubscriptionStorage.swift b/Sources/AblyChat/SubscriptionStorage.swift new file mode 100644 index 00000000..e424d380 --- /dev/null +++ b/Sources/AblyChat/SubscriptionStorage.swift @@ -0,0 +1,36 @@ +import Foundation + +// TODO: don't like the fact that this is how i'm doing sendability, but having another actor seems equally wild. maybe we could pass in an actor that this thing will perform its mutations on? +// TODO: document +internal class SubscriptionStorage: @unchecked Sendable { + /// Access must be synchronised via ``lock``. + private var subscriptions: [UUID: Subscription] = [:] + private let lock = NSLock() + + internal func create(bufferingPolicy: BufferingPolicy) -> Subscription { + let subscription = Subscription(bufferingPolicy: bufferingPolicy) + let id = UUID() + + lock.lock() + subscriptions[id] = subscription + lock.unlock() + + subscription.setOnTermination { [weak self] in + self?.subscriptionDidTerminate(id: id) + } + + return subscription + } + + private func subscriptionDidTerminate(id: UUID) { + lock.lock() + subscriptions.removeValue(forKey: id) + lock.unlock() + } + + internal func emit(_ element: Element) { + for subscription in subscriptions.values { + subscription.emit(element) + } + } +} diff --git a/Tests/AblyChatTests/Mocks/MockFeatureChannel.swift b/Tests/AblyChatTests/Mocks/MockFeatureChannel.swift index 106084f2..2b665062 100644 --- a/Tests/AblyChatTests/Mocks/MockFeatureChannel.swift +++ b/Tests/AblyChatTests/Mocks/MockFeatureChannel.swift @@ -3,8 +3,7 @@ import Ably final actor MockFeatureChannel: FeatureChannel { let channel: RealtimeChannelProtocol - // TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36) - private var discontinuitySubscriptions: [Subscription] = [] + private var discontinuitySubscriptions = SubscriptionStorage() private let resultOfWaitToBeAbleToPerformPresenceOperations: Result? init( @@ -16,15 +15,11 @@ final actor MockFeatureChannel: FeatureChannel { } func onDiscontinuity(bufferingPolicy: BufferingPolicy) async -> Subscription { - let subscription = Subscription(bufferingPolicy: bufferingPolicy) - discontinuitySubscriptions.append(subscription) - return subscription + discontinuitySubscriptions.create(bufferingPolicy: bufferingPolicy) } func emitDiscontinuity(_ discontinuity: DiscontinuityEvent) { - for subscription in discontinuitySubscriptions { - subscription.emit(discontinuity) - } + discontinuitySubscriptions.emit(discontinuity) } func waitToBeAbleToPerformPresenceOperations(requestedByFeature _: RoomFeature) async throws(ARTErrorInfo) { diff --git a/Tests/AblyChatTests/Mocks/MockRealtimeChannel.swift b/Tests/AblyChatTests/Mocks/MockRealtimeChannel.swift index f4e7c579..071bc4af 100644 --- a/Tests/AblyChatTests/Mocks/MockRealtimeChannel.swift +++ b/Tests/AblyChatTests/Mocks/MockRealtimeChannel.swift @@ -163,7 +163,7 @@ final class MockRealtimeChannel: NSObject, RealtimeChannelProtocol { } func unsubscribe(_: ARTEventListener?) { - fatalError("Not implemented") + // no-op; revisit if we need to test something that depends on this method actually stopping `subscribe` from emitting more events } func unsubscribe(_: String, listener _: ARTEventListener?) { @@ -199,7 +199,7 @@ final class MockRealtimeChannel: NSObject, RealtimeChannelProtocol { } func off(_: ARTEventListener) { - fatalError("Not implemented") + // no-op; revisit if we need to test something that depends on this method actually stopping `on` from emitting more events } func off() { diff --git a/Tests/AblyChatTests/Mocks/MockRoomLifecycleContributorChannel.swift b/Tests/AblyChatTests/Mocks/MockRoomLifecycleContributorChannel.swift index a7b13a85..9b25c414 100644 --- a/Tests/AblyChatTests/Mocks/MockRoomLifecycleContributorChannel.swift +++ b/Tests/AblyChatTests/Mocks/MockRoomLifecycleContributorChannel.swift @@ -8,8 +8,7 @@ final actor MockRoomLifecycleContributorChannel: RoomLifecycleContributorChannel var state: ARTRealtimeChannelState var errorReason: ARTErrorInfo? - // TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36) - private var subscriptions: [Subscription] = [] + private var subscriptions = SubscriptionStorage() private(set) var attachCallCount = 0 private(set) var detachCallCount = 0 @@ -108,8 +107,7 @@ final actor MockRoomLifecycleContributorChannel: RoomLifecycleContributorChannel } func subscribeToState() -> Subscription { - let subscription = Subscription(bufferingPolicy: .unbounded) - subscriptions.append(subscription) + let subscription = subscriptions.create(bufferingPolicy: .unbounded) switch subscribeToStateBehavior { case .justAddSubscription: @@ -122,8 +120,6 @@ final actor MockRoomLifecycleContributorChannel: RoomLifecycleContributorChannel } func emitStateChange(_ stateChange: ARTChannelStateChange) { - for subscription in subscriptions { - subscription.emit(stateChange) - } + subscriptions.emit(stateChange) } } diff --git a/Tests/AblyChatTests/Mocks/MockRoomLifecycleManager.swift b/Tests/AblyChatTests/Mocks/MockRoomLifecycleManager.swift index 061a25f6..e1254bd8 100644 --- a/Tests/AblyChatTests/Mocks/MockRoomLifecycleManager.swift +++ b/Tests/AblyChatTests/Mocks/MockRoomLifecycleManager.swift @@ -8,8 +8,7 @@ actor MockRoomLifecycleManager: RoomLifecycleManager { private(set) var detachCallCount = 0 private(set) var releaseCallCount = 0 private let _roomStatus: RoomStatus? - // TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36) - private var subscriptions: [Subscription] = [] + private var subscriptions = SubscriptionStorage() init(attachResult: Result? = nil, detachResult: Result? = nil, roomStatus: RoomStatus? = nil) { self.attachResult = attachResult @@ -45,15 +44,11 @@ actor MockRoomLifecycleManager: RoomLifecycleManager { } func onRoomStatusChange(bufferingPolicy: BufferingPolicy) async -> Subscription { - let subscription = Subscription(bufferingPolicy: bufferingPolicy) - subscriptions.append(subscription) - return subscription + subscriptions.create(bufferingPolicy: bufferingPolicy) } func emitStatusChange(_ statusChange: RoomStatusChange) { - for subscription in subscriptions { - subscription.emit(statusChange) - } + subscriptions.emit(statusChange) } func waitToBeAbleToPerformPresenceOperations(requestedByFeature _: RoomFeature) async throws(ARTErrorInfo) { diff --git a/Tests/AblyChatTests/SubscriptionTests.swift b/Tests/AblyChatTests/SubscriptionTests.swift index a8f7d434..1d4300f7 100644 --- a/Tests/AblyChatTests/SubscriptionTests.swift +++ b/Tests/AblyChatTests/SubscriptionTests.swift @@ -21,4 +21,25 @@ struct SubscriptionTests { #expect(await emittedElements == ["First", "Second"]) } + + // TODO: + // task cancellation + // user no longer using the subscription + // so that: + // - can remove from an array + // - can teardown ably-cocoa stuff + + @Test + func disposing() async throws { + await Task { + let stream = AsyncStream.makeStream() + stream.continuation.onTermination = { termination in + print("onTermination called: \(termination)") + } + }.value + + // So, it seems like when the stream is deallocated, the `onTermination` gets called. + } + + func subscriptions_discarded() async throws {} }