Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Create SubscriptionStorage type for helping with common activities (TODO
document)

Namely, managing a list of subscriptions and emitting values on all of
these subscriptions.

TODO what about example app?
TODO need to do MessageSubscriptions too

TODO handle nullability of returned listeners once
ably/ably-cocoa#2004 merged and released
  • Loading branch information
lawrence-forooghian committed Jan 7, 2025
1 parent 2cd7d84 commit d54309a
Show file tree
Hide file tree
Showing 16 changed files with 164 additions and 109 deletions.
6 changes: 5 additions & 1 deletion Sources/AblyChat/DefaultConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ internal final class DefaultConnection: Connection {
let subscription = Subscription<ConnectionStatusChange>(bufferingPolicy: bufferingPolicy)

// (CHA-CS5) The chat client must monitor the underlying realtime connection for connection status changes.
realtime.connection.on { [weak self] stateChange in
let eventListener = realtime.connection.on { [weak self] stateChange in
guard let self else {
return
}
Expand Down Expand Up @@ -95,6 +95,10 @@ internal final class DefaultConnection: Connection {
}
}

subscription.setOnTermination { [weak self] in
self?.realtime.connection.off(eventListener)
}

return subscription
}
}
Expand Down
15 changes: 13 additions & 2 deletions Sources/AblyChat/DefaultMessages.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities {
private let clientID: String
private let logger: InternalLogger

// TODO: https://github.com/ably-labs/ably-chat-swift/issues/36 - Handle unsubscribing in line with CHA-M4b
// UUID acts as a unique identifier for each listener/subscription. MessageSubscriptionWrapper houses the subscription and the serial of when it was attached or resumed.
private var subscriptionPoints: [UUID: MessageSubscriptionWrapper] = [:]

Expand Down Expand Up @@ -53,7 +52,7 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities {
// (CHA-M4c) When a realtime message with name set to message.created is received, it is translated into a message event, which contains a type field with the event type as well as a message field containing the Message Struct. This event is then broadcast to all subscribers.
// (CHA-M4d) If a realtime message with an unknown name is received, the SDK shall silently discard the message, though it may log at DEBUG or TRACE level.
// (CHA-M5k) Incoming realtime events that are malformed (unknown field should be ignored) shall not be emitted to subscribers.
channel.subscribe(RealtimeMessageName.chatMessage.rawValue) { message in
let eventListener = channel.subscribe(RealtimeMessageName.chatMessage.rawValue) { message in
Task {
// TODO: Revisit errors thrown as part of https://github.com/ably-labs/ably-chat-swift/issues/32
guard let ablyCocoaData = message.data,
Expand Down Expand Up @@ -104,6 +103,18 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities {
}
}

messageSubscription.setOnTermination {
Task {
await MainActor.run { [weak self] () -> Void in
guard let self else {
return
}
self.channel.unsubscribe(eventListener)
self.subscriptionPoints.removeValue(forKey: uuid)
}
}
}

return messageSubscription
}

Expand Down
12 changes: 11 additions & 1 deletion Sources/AblyChat/DefaultOccupancy.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ internal final class DefaultOccupancy: Occupancy, EmitsDiscontinuities {
// (CHA-04d) If an invalid occupancy event is received on the channel, it shall be dropped.
internal func subscribe(bufferingPolicy: BufferingPolicy) async -> Subscription<OccupancyEvent> {
logger.log(message: "Subscribing to occupancy events", level: .debug)

let subscription = Subscription<OccupancyEvent>(bufferingPolicy: bufferingPolicy)
channel.subscribe(OccupancyEvents.meta.rawValue) { [logger] message in

let eventListener = channel.subscribe(OccupancyEvents.meta.rawValue) { [logger] message in
logger.log(message: "Received occupancy message: \(message)", level: .debug)
guard let data = message.data as? [String: Any],
let metrics = data["metrics"] as? [String: Any]
Expand All @@ -40,6 +42,14 @@ internal final class DefaultOccupancy: Occupancy, EmitsDiscontinuities {
logger.log(message: "Emitting occupancy event: \(occupancyEvent)", level: .debug)
subscription.emit(occupancyEvent)
}

subscription.setOnTermination { [weak self] in
// TODO why is eventListener nullable?
if let eventListener {
self?.channel.off(eventListener)
}
}

return subscription
}

Expand Down
21 changes: 19 additions & 2 deletions Sources/AblyChat/DefaultPresence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -199,21 +199,28 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities {
internal func subscribe(event: PresenceEventType, bufferingPolicy: BufferingPolicy) async -> Subscription<PresenceEvent> {
logger.log(message: "Subscribing to presence events", level: .debug)
let subscription = Subscription<PresenceEvent>(bufferingPolicy: bufferingPolicy)
channel.presence.subscribe(event.toARTPresenceAction()) { [processPresenceSubscribe, logger] message in
let eventListener = channel.presence.subscribe(event.toARTPresenceAction()) { [processPresenceSubscribe, logger] message in
logger.log(message: "Received presence message: \(message)", level: .debug)
Task {
// processPresenceSubscribe is logging so we don't need to log here
let presenceEvent = try processPresenceSubscribe(message, event)
subscription.emit(presenceEvent)
}
}
subscription.setOnTermination { [weak channel] in
// TODO why is eventListener nullable?
if let eventListener {
channel?.presence.unsubscribe(eventListener)
}
}
return subscription
}

internal func subscribe(events: [PresenceEventType], bufferingPolicy: BufferingPolicy) async -> Subscription<PresenceEvent> {
logger.log(message: "Subscribing to presence events", level: .debug)
let subscription = Subscription<PresenceEvent>(bufferingPolicy: bufferingPolicy)
for event in events {

let eventListeners = events.map { event in
channel.presence.subscribe(event.toARTPresenceAction()) { [processPresenceSubscribe, logger] message in
logger.log(message: "Received presence message: \(message)", level: .debug)
Task {
Expand All @@ -222,6 +229,16 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities {
}
}
}

subscription.setOnTermination { [weak self] in
for eventListener in eventListeners {
// TODO why is eventListener nil?
if let eventListener {
self?.channel.presence.unsubscribe(eventListener)
}
}
}

return subscription
}

Expand Down
17 changes: 7 additions & 10 deletions Sources/AblyChat/DefaultRoomLifecycleContributor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import Ably
internal actor DefaultRoomLifecycleContributor: RoomLifecycleContributor, EmitsDiscontinuities, CustomDebugStringConvertible {
internal nonisolated let channel: DefaultRoomLifecycleContributorChannel
internal nonisolated let feature: RoomFeature
private var discontinuitySubscriptions: [Subscription<DiscontinuityEvent>] = []
private var discontinuitySubscriptions = SubscriptionStorage<DiscontinuityEvent>()

internal init(channel: DefaultRoomLifecycleContributorChannel, feature: RoomFeature) {
self.channel = channel
Expand All @@ -13,16 +13,11 @@ internal actor DefaultRoomLifecycleContributor: RoomLifecycleContributor, EmitsD
// MARK: - Discontinuities

internal func emitDiscontinuity(_ discontinuity: DiscontinuityEvent) {
for subscription in discontinuitySubscriptions {
subscription.emit(discontinuity)
}
discontinuitySubscriptions.emit(discontinuity)
}

internal func onDiscontinuity(bufferingPolicy: BufferingPolicy) -> Subscription<DiscontinuityEvent> {
let subscription = Subscription<DiscontinuityEvent>(bufferingPolicy: bufferingPolicy)
// TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36)
discontinuitySubscriptions.append(subscription)
return subscription
return discontinuitySubscriptions.create(bufferingPolicy: bufferingPolicy)
}

// MARK: - CustomDebugStringConvertible
Expand Down Expand Up @@ -56,9 +51,11 @@ internal final class DefaultRoomLifecycleContributorChannel: RoomLifecycleContri
}

internal func subscribeToState() async -> Subscription<ARTChannelStateChange> {
// TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36)
let subscription = Subscription<ARTChannelStateChange>(bufferingPolicy: .unbounded)
underlyingChannel.on { subscription.emit($0) }
let eventListener = underlyingChannel.on { subscription.emit($0) }
subscription.setOnTermination { [weak underlyingChannel] in
underlyingChannel?.unsubscribe(eventListener)
}
return subscription
}

Expand Down
6 changes: 5 additions & 1 deletion Sources/AblyChat/DefaultRoomReactions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ internal final class DefaultRoomReactions: RoomReactions, EmitsDiscontinuities {
let subscription = Subscription<Reaction>(bufferingPolicy: bufferingPolicy)

// (CHA-ER4c) Realtime events with an unknown name shall be silently discarded.
channel.subscribe(RoomReactionEvents.reaction.rawValue) { [clientID, logger] message in
let eventListener = channel.subscribe(RoomReactionEvents.reaction.rawValue) { [clientID, logger] message in
logger.log(message: "Received roomReaction message: \(message)", level: .debug)
Task {
do {
Expand Down Expand Up @@ -82,6 +82,10 @@ internal final class DefaultRoomReactions: RoomReactions, EmitsDiscontinuities {
}
}

subscription.setOnTermination { [weak self] in
self?.channel.unsubscribe(eventListener)
}

return subscription
}

Expand Down
10 changes: 9 additions & 1 deletion Sources/AblyChat/DefaultTyping.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ internal final class DefaultTyping: Typing {
let subscription = Subscription<TypingEvent>(bufferingPolicy: bufferingPolicy)
let eventTracker = EventTracker()

channel.presence.subscribe { [weak self] message in
let eventListener = channel.presence.subscribe { [weak self] message in
guard let self else {
return
}
Expand Down Expand Up @@ -72,6 +72,14 @@ internal final class DefaultTyping: Typing {
logger.log(message: "Failed to fetch presence set after \(maxRetryDuration) seconds. Giving up.", level: .error)
}
}

subscription.setOnTermination { [weak self] in
// TODO why is eventListener nullable?
if let eventListener {
self?.channel.presence.unsubscribe(eventListener)
}
}

return subscription
}

Expand Down
4 changes: 4 additions & 0 deletions Sources/AblyChat/Messages.swift
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ public struct MessageSubscription: Sendable, AsyncSequence {
subscription.emit(element)
}

internal func setOnTermination(_ onTermination: (@Sendable () -> Void)?) {
subscription.setOnTermination(onTermination)
}

public func getPreviousMessages(params: QueryOptions) async throws -> any PaginatedResult<Message> {
try await getPreviousMessages(params)
}
Expand Down
Loading

0 comments on commit d54309a

Please sign in to comment.