From d16550461635e74118edac3e192209170eb05dee Mon Sep 17 00:00:00 2001 From: Umair Date: Thu, 21 Nov 2024 16:19:34 +0000 Subject: [PATCH] Spec complete for typing indicators with reference to [1] [1] - https://sdk.ably.com/builds/ably/specification/pull/232/chat-features/ --- Example/AblyChatExample/ContentView.swift | 48 +++-- Sources/AblyChat/DefaultConnection.swift | 25 --- Sources/AblyChat/DefaultTyping.swift | 223 +++++++++++++++++++++ Sources/AblyChat/Room.swift | 15 +- Sources/AblyChat/RoomFeature.swift | 4 +- Sources/AblyChat/RoomOptions.swift | 5 +- Sources/AblyChat/TimerManager.swift | 26 +++ Tests/AblyChatTests/DefaultRoomTests.swift | 10 +- Tests/AblyChatTests/IntegrationTests.swift | 85 +++++--- 9 files changed, 363 insertions(+), 78 deletions(-) create mode 100644 Sources/AblyChat/DefaultTyping.swift create mode 100644 Sources/AblyChat/TimerManager.swift diff --git a/Example/AblyChatExample/ContentView.swift b/Example/AblyChatExample/ContentView.swift index 04654383..bb6f7998 100644 --- a/Example/AblyChatExample/ContentView.swift +++ b/Example/AblyChatExample/ContentView.swift @@ -58,6 +58,7 @@ struct ContentView: View { roomID: roomID, options: .init( presence: .init(), + typing: .init(), reactions: .init(), occupancy: .init() ) @@ -90,6 +91,11 @@ struct ContentView: View { .listStyle(PlainListStyle()) HStack { TextField("Type a message...", text: $newMessage) + .onChange(of: newMessage) { + Task { + try await startTyping() + } + } #if !os(tvOS) .textFieldStyle(RoundedBorderTextFieldStyle()) #endif @@ -141,28 +147,17 @@ struct ContentView: View { try await showReactions() try await showPresence() try await showOccupancy() + try await showTypings() await printConnectionStatusChange() } .tryTask { // NOTE: As we implement more features, move them out of the `if Environment.current == .mock` block and into the main block just above. if Environment.current == .mock { - try await showTypings() try await showRoomStatus() } } } - func printConnectionStatusChange() async { - let connectionSubsciption = chatClient.connection.onStatusChange(bufferingPolicy: .unbounded) - - // Continue listening for connection status change on a background task so this function can return - Task { - for await status in connectionSubsciption { - print("Connection status changed to: \(status.current)") - } - } - } - func sendButtonAction() { if newMessage.isEmpty { Task { @@ -234,17 +229,15 @@ struct ContentView: View { } func showTypings() async throws { + let typingSubscription = try await room().typing.subscribe(bufferingPolicy: .unbounded) // Continue listening for typing events on a background task so this function can return Task { - for await typing in try await room().typing.subscribe(bufferingPolicy: .unbounded) { + for await typing in typingSubscription { withAnimation { - typingInfo = "Typing: \(typing.currentlyTyping.joined(separator: ", "))..." - Task { - try? await Task.sleep(nanoseconds: 1 * 1_000_000_000) - withAnimation { - typingInfo = "" - } - } + // Set the typing info to the list of users currently typing + typingInfo = typing.currentlyTyping.isEmpty ? + "" : + "Typing: \(typing.currentlyTyping.joined(separator: ", "))..." } } } @@ -266,6 +259,17 @@ struct ContentView: View { } } + func printConnectionStatusChange() async { + let connectionSubsciption = chatClient.connection.onStatusChange(bufferingPolicy: .unbounded) + + // Continue listening for connection status change on a background task so this function can return + Task { + for await status in connectionSubsciption { + print("Connection status changed to: \(status.current)") + } + } + } + func showRoomStatus() async throws { // Continue listening for status change events on a background task so this function can return Task { @@ -300,6 +304,10 @@ struct ContentView: View { func sendReaction(type: String) async throws { try await room().reactions.send(params: .init(type: type)) } + + func startTyping() async throws { + try await room().typing.start() + } } extension ContentView { diff --git a/Sources/AblyChat/DefaultConnection.swift b/Sources/AblyChat/DefaultConnection.swift index 1692069c..014af8b9 100644 --- a/Sources/AblyChat/DefaultConnection.swift +++ b/Sources/AblyChat/DefaultConnection.swift @@ -99,31 +99,6 @@ internal final class DefaultConnection: Connection { } } -private final actor TimerManager { - private var currentTask: Task? - - internal func setTimer(interval: TimeInterval, handler: @escaping @Sendable () -> Void) { - cancelTimer() - - currentTask = Task { - try? await Task.sleep(nanoseconds: UInt64(interval * 1_000_000_000)) - guard !Task.isCancelled else { - return - } - handler() - } - } - - internal func cancelTimer() { - currentTask?.cancel() - currentTask = nil - } - - internal func hasRunningTask() -> Bool { - currentTask != nil - } -} - private final actor ConnectionStatusManager { private(set) var status: ConnectionStatus = .disconnected private(set) var error: ARTErrorInfo? diff --git a/Sources/AblyChat/DefaultTyping.swift b/Sources/AblyChat/DefaultTyping.swift new file mode 100644 index 00000000..79b1fef8 --- /dev/null +++ b/Sources/AblyChat/DefaultTyping.swift @@ -0,0 +1,223 @@ +import Ably + +internal final class DefaultTyping: Typing { + private let featureChannel: FeatureChannel + private let roomID: String + private let clientID: String + private let logger: InternalLogger + private let timeout: TimeInterval + private let timerManager = TimerManager() + + internal init(featureChannel: FeatureChannel, roomID: String, clientID: String, logger: InternalLogger, timeout: TimeInterval) { + self.roomID = roomID + self.featureChannel = featureChannel + self.clientID = clientID + self.logger = logger + self.timeout = timeout + } + + internal nonisolated var channel: any RealtimeChannelProtocol { + featureChannel.channel + } + + // (CHA-T6) Users may subscribe to typing events – updates to a set of clientIDs that are typing. This operation, like all subscription operations, has no side-effects in relation to room lifecycle. + internal func subscribe(bufferingPolicy: BufferingPolicy) async -> Subscription { + let subscription = Subscription(bufferingPolicy: .unbounded) + let eventTracker = EventTracker() + + channel.presence.subscribe { [weak self] message in + guard let self else { + return + } + logger.log(message: "Received presence message: \(message)", level: .debug) + Task { + let currentEventID = await eventTracker.updateEventID() + let maxRetryDuration: TimeInterval = 30.0 // Max duration as specified in CHA-T6c1 + let baseDelay: TimeInterval = 1.0 // Initial retry delay + let maxDelay: TimeInterval = 5.0 // Maximum delay between retries + + var totalElapsedTime: TimeInterval = 0 + var delay: TimeInterval = baseDelay + + while totalElapsedTime < maxRetryDuration { + do { + // (CHA-T6c) When a presence event is received from the realtime client, the Chat client will perform a presence.get() operation to get the current presence set. This guarantees that we get a fully synced presence set. This is then used to emit the typing clients to the subscriber. + let latestTypingMembers = try await get() + + // (CHA-T6c2) If multiple presence events are received resulting in concurrent presence.get() calls, then we guarantee that only the “latest” event is emitted. That is to say, if presence event A and B occur in that order, then only the typing event generated by B’s call to presence.get() will be emitted to typing subscribers. + let isLatestEvent = await eventTracker.isLatestEvent(currentEventID) + guard isLatestEvent else { + logger.log(message: "Discarding outdated presence.get() result.", level: .debug) + return + } + + let typingEvent = TypingEvent(currentlyTyping: latestTypingMembers) + subscription.emit(typingEvent) + logger.log(message: "Successfully emitted typing event: \(typingEvent)", level: .debug) + return + } catch { + // (CHA-T6c1) [Testable] If the presence.get() operation fails, then it shall be retried using a backoff with jitter, up to a timeout of 30 seconds. + logger.log(message: "Failed to fetch presence set: \(error). Retrying...", level: .error) + // Apply jitter to the delay + let jitter = Double.random(in: 0 ... (delay / 2)) + let backoffDelay = min(delay + jitter, maxDelay) + + try? await Task.sleep(nanoseconds: UInt64(backoffDelay * 1_000_000_000)) + totalElapsedTime += backoffDelay + + // Exponential backoff (double the delay) + delay = min(delay * 2, maxDelay) + } + } + logger.log(message: "Failed to fetch presence set after \(maxRetryDuration) seconds. Giving up.", level: .error) + } + } + return subscription + } + + // (CHA-T2) Users may retrieve a list of the currently typing client IDs. The behaviour depends on the current room status, as presence operations in a Realtime Client cause implicit attaches. + internal func get() async throws -> Set { + logger.log(message: "Getting presence", level: .debug) + + // CHA-T2c to CHA-T2f + do { + try await featureChannel.waitToBeAbleToPerformPresenceOperations(requestedByFeature: RoomFeature.presence) + } catch { + logger.log(message: "Error waiting to be able to perform presence get operation: \(error)", level: .error) + throw error + } + + return try await withCheckedThrowingContinuation { continuation in + channel.presence.get { [processPresenceGet] members, error in + do { + let presenceMembers = try processPresenceGet(members, error) + continuation.resume(returning: presenceMembers) + } catch { + continuation.resume(throwing: error) + // processPresenceGet will log any errors + } + } + } + } + + // (CHA-T4) Users may indicate that they have started typing. + internal func start() async throws { + logger.log(message: "Starting typing indicator for client: \(clientID)", level: .debug) + + do { + try await featureChannel.waitToBeAbleToPerformPresenceOperations(requestedByFeature: RoomFeature.presence) + } catch { + logger.log(message: "Error waiting to be able to perform presence enter operation: \(error)", level: .error) + throw error + } + + return try await withCheckedThrowingContinuation { continuation in + Task { + let isUserTyping = await timerManager.hasRunningTask() + + // (CHA-T4b) If typing is already in progress, the CHA-T3 timeout is extended to be timeoutMs from now. + if isUserTyping { + logger.log(message: "User is already typing. Extending timeout.", level: .debug) + await timerManager.setTimer(interval: timeout) { [stop] in + Task { + try await stop() + } + } + continuation.resume() + } else { + // (CHA-T4a) If typing is not already in progress, per explicit cancellation or the timeout interval in (CHA-T3), then a new typing session is started. + logger.log(message: "User is not typing. Starting typing.", level: .debug) + do { + try startTyping() + continuation.resume() + } catch { + continuation.resume(throwing: error) + } + } + } + } + } + + // (CHA-T5) Users may indicate that they have stopped typing. + internal func stop() async throws { + do { + try await featureChannel.waitToBeAbleToPerformPresenceOperations(requestedByFeature: RoomFeature.presence) + } catch { + logger.log(message: "Error waiting to be able to perform presence leave operation: \(error)", level: .error) + throw error + } + + let isUserTyping = await timerManager.hasRunningTask() + if isUserTyping { + logger.log(message: "Stopping typing indicator for client: \(clientID)", level: .debug) + // (CHA-T5b) If typing is in progress, he CHA-T3 timeout is cancelled. The client then leaves presence. + await timerManager.cancelTimer() + channel.presence.leaveClient(clientID, data: nil) + } else { + // (CHA-T5a) If typing is not in progress, this operation is no-op. + logger.log(message: "User is not typing. No need to leave presence.", level: .debug) + } + } + + // (CHA-T7) Users may subscribe to discontinuity events to know when there’s been a break in typing indicators. Their listener will be called when a discontinuity event is triggered from the room lifecycle. For typing, there shouldn’t need to be user action as the underlying core SDK will heal the presence set. + internal func subscribeToDiscontinuities() async -> Subscription { + await featureChannel.subscribeToDiscontinuities() + } + + private func processPresenceGet(members: [ARTPresenceMessage]?, error: ARTErrorInfo?) throws -> Set { + guard let members else { + let error = error ?? ARTErrorInfo.create(withCode: 50000, status: 500, message: "Received incoming message without data") + logger.log(message: error.message, level: .error) + throw error + } + + let clientIDs = try Set(members.map { member in + guard let clientID = member.clientId else { + let error = ARTErrorInfo.create(withCode: 50000, status: 500, message: "Received incoming message without clientId") + logger.log(message: error.message, level: .error) + throw error + } + + return clientID + }) + + return clientIDs + } + + private func startTyping() throws { + // (CHA-T4a1) When a typing session is started, the client is entered into presence on the typing channel. + channel.presence.enterClient(clientID, data: nil) { [weak self] error in + guard let self else { + return + } + Task { + if let error { + logger.log(message: "Error entering presence: \(error)", level: .error) + throw error + } else { + logger.log(message: "Entered presence - starting timer", level: .debug) + // (CHA-T4a2) When a typing session is started, a timeout is set according to the CHA-T3 timeout interval. When this timeout expires, the typing session is automatically ended by leaving presence. + await timerManager.setTimer(interval: timeout) { [stop] in + Task { + try await stop() + } + } + } + } + } + } +} + +private final actor EventTracker { + private var latestEventID: UUID = .init() + + func updateEventID() -> UUID { + let newID = UUID() + latestEventID = newID + return newID + } + + func isLatestEvent(_ eventID: UUID) -> Bool { + latestEventID == eventID + } +} diff --git a/Sources/AblyChat/Room.swift b/Sources/AblyChat/Room.swift index c7ba88db..12e7ad1e 100644 --- a/Sources/AblyChat/Room.swift +++ b/Sources/AblyChat/Room.swift @@ -64,6 +64,7 @@ internal actor DefaultRoom private let _reactions: (any RoomReactions)? private let _presence: (any Presence)? private let _occupancy: (any Occupancy)? + private let _typing: (any Typing)? // Exposed for testing. private nonisolated let realtime: RealtimeClient @@ -125,6 +126,14 @@ internal actor DefaultRoom roomID: roomID, logger: logger ) : nil + + _typing = options.typing != nil ? DefaultTyping( + featureChannel: featureChannels[.typing]!, + roomID: roomID, + clientID: clientId, + logger: logger, + timeout: options.typing?.timeout ?? 5 + ) : nil } private struct FeatureChannelPartialDependencies { @@ -178,6 +187,7 @@ internal actor DefaultRoom .reactions, .presence, .occupancy, + .typing, ] let channelsByFeature = createChannelsForFeatures(features, roomID: roomID, roomOptions: roomOptions, realtime: realtime) @@ -212,7 +222,10 @@ internal actor DefaultRoom } public nonisolated var typing: any Typing { - fatalError("Not yet implemented") + guard let _typing else { + fatalError("Typing is not enabled for this room") + } + return _typing } public nonisolated var occupancy: any Occupancy { diff --git a/Sources/AblyChat/RoomFeature.swift b/Sources/AblyChat/RoomFeature.swift index 6b887f0d..48b57344 100644 --- a/Sources/AblyChat/RoomFeature.swift +++ b/Sources/AblyChat/RoomFeature.swift @@ -23,8 +23,8 @@ internal enum RoomFeature { // (CHA-ER1) Reactions for a Room are sent on a corresponding realtime channel ::$chat::$reactions. For example, if your room id is my-room then the reactions channel will be my-room::$chat::$reactions. "reactions" case .typing: - // We’ll add these, with reference to the relevant spec points, as we implement these features - fatalError("Don’t know channel name suffix for room feature \(self)") + // (CHA-T1) Typing Indicators for a Room is exposed on a dedicated Realtime channel. These channels use the format ::$chat::$typingIndicators. For example, if your room id is my-room then the typing channel will be my-room::$chat::$typingIndicators. + "typingIndicators" } } } diff --git a/Sources/AblyChat/RoomOptions.swift b/Sources/AblyChat/RoomOptions.swift index 75ece998..4e1ace14 100644 --- a/Sources/AblyChat/RoomOptions.swift +++ b/Sources/AblyChat/RoomOptions.swift @@ -27,10 +27,11 @@ public struct PresenceOptions: Sendable, Equatable { } } +// (CHA-T3) Users may configure a timeout interval for when they are typing. This configuration is provided as part of the RoomOptions typing.timeoutMs property, or idiomatic equivalent. The default is 5000ms. public struct TypingOptions: Sendable, Equatable { - public var timeout: TimeInterval = 10 + public var timeout: TimeInterval = 5 - public init(timeout: TimeInterval = 10) { + public init(timeout: TimeInterval = 5) { self.timeout = timeout } } diff --git a/Sources/AblyChat/TimerManager.swift b/Sources/AblyChat/TimerManager.swift new file mode 100644 index 00000000..0738cd40 --- /dev/null +++ b/Sources/AblyChat/TimerManager.swift @@ -0,0 +1,26 @@ +import Foundation + +internal final actor TimerManager { + private var currentTask: Task? + + internal func setTimer(interval: TimeInterval, handler: @escaping @Sendable () -> Void) { + cancelTimer() + + currentTask = Task { + try? await Task.sleep(nanoseconds: UInt64(interval * 1_000_000_000)) + guard !Task.isCancelled else { + return + } + handler() + } + } + + internal func cancelTimer() { + currentTask?.cancel() + currentTask = nil + } + + internal func hasRunningTask() -> Bool { + currentTask != nil + } +} diff --git a/Tests/AblyChatTests/DefaultRoomTests.swift b/Tests/AblyChatTests/DefaultRoomTests.swift index ca89167e..699a6886 100644 --- a/Tests/AblyChatTests/DefaultRoomTests.swift +++ b/Tests/AblyChatTests/DefaultRoomTests.swift @@ -12,6 +12,7 @@ struct DefaultRoomTests { let channelsList = [ MockRealtimeChannel(name: "basketball::$chat::$chatMessages", attachResult: .success), MockRealtimeChannel(name: "basketball::$chat::$reactions", attachResult: .success), + MockRealtimeChannel(name: "basketball::$chat::$typingIndicators", attachResult: .success), // required as DefaultRoom attaches typingIndicators implicitly for now ] let channels = MockChannels(channels: channelsList) let realtime = MockRealtime.create(channels: channels) @@ -30,6 +31,7 @@ struct DefaultRoomTests { let channelsList = [ MockRealtimeChannel(name: "basketball::$chat::$chatMessages", attachResult: .success), MockRealtimeChannel(name: "basketball::$chat::$reactions", attachResult: .success), + MockRealtimeChannel(name: "basketball::$chat::$typingIndicators", attachResult: .success), // required as DefaultRoom attaches typingIndicators implicitly for now ] let channels = MockChannels(channels: channelsList) let realtime = MockRealtime.create(channels: channels) @@ -38,7 +40,7 @@ struct DefaultRoomTests { // Then: It fetches the …$chatMessages channel (which is used by messages, presence, and occupancy) only once, and the options with which it does so are the result of merging the options used by the presence feature and those used by the occupancy feature let channelsGetArguments = channels.getArguments - #expect(channelsGetArguments.map(\.name).sorted() == ["basketball::$chat::$chatMessages", "basketball::$chat::$reactions"]) + #expect(channelsGetArguments.map(\.name).sorted() == ["basketball::$chat::$chatMessages", "basketball::$chat::$reactions", "basketball::$chat::$typingIndicators"]) let chatMessagesChannelGetOptions = try #require(channelsGetArguments.first { $0.name == "basketball::$chat::$chatMessages" }?.options) #expect(chatMessagesChannelGetOptions.params?["occupancy"] == "metrics") @@ -55,6 +57,7 @@ struct DefaultRoomTests { let channelsList = [ MockRealtimeChannel(name: "basketball::$chat::$chatMessages", attachResult: .success), MockRealtimeChannel(name: "basketball::$chat::$reactions", attachResult: .success), // required as DefaultRoom attaches reactions implicitly for now + MockRealtimeChannel(name: "basketball::$chat::$typingIndicators", attachResult: .success), // required as DefaultRoom attaches typingIndicators implicitly for now ] let channels = MockChannels(channels: channelsList) let realtime = MockRealtime.create(channels: channels) @@ -77,6 +80,7 @@ struct DefaultRoomTests { let channelsList = [ MockRealtimeChannel(name: "basketball::$chat::$chatMessages", attachResult: .success), MockRealtimeChannel(name: "basketball::$chat::$reactions", attachResult: .success), // required as DefaultRoom attaches reactions implicitly for now + MockRealtimeChannel(name: "basketball::$chat::$typingIndicators", attachResult: .success), // required as DefaultRoom attaches typingIndicators implicitly for now ] let channels = MockChannels(channels: channelsList) let realtime = MockRealtime.create(channels: channels) @@ -114,6 +118,7 @@ struct DefaultRoomTests { let channelsList = [ MockRealtimeChannel(name: "basketball::$chat::$chatMessages", detachResult: .success), MockRealtimeChannel(name: "basketball::$chat::$reactions", attachResult: .success), // required as DefaultRoom attaches reactions implicitly for now + MockRealtimeChannel(name: "basketball::$chat::$typingIndicators", attachResult: .success), // required as DefaultRoom attaches typingIndicators implicitly for now ] let channels = MockChannels(channels: channelsList) let realtime = MockRealtime.create(channels: channels) @@ -147,6 +152,7 @@ struct DefaultRoomTests { let channelsList = [ MockRealtimeChannel(name: "basketball::$chat::$chatMessages"), MockRealtimeChannel(name: "basketball::$chat::$reactions", attachResult: .success), // required as DefaultRoom attaches reactions implicitly for now + MockRealtimeChannel(name: "basketball::$chat::$typingIndicators", attachResult: .success), // required as DefaultRoom attaches typingIndicators implicitly for now ] let channels = MockChannels(channels: channelsList) let realtime = MockRealtime.create(channels: channels) @@ -174,6 +180,7 @@ struct DefaultRoomTests { let channelsList = [ MockRealtimeChannel(name: "basketball::$chat::$chatMessages", detachResult: .success), MockRealtimeChannel(name: "basketball::$chat::$reactions", attachResult: .success), // required as DefaultRoom attaches reactions implicitly for now + MockRealtimeChannel(name: "basketball::$chat::$typingIndicators", attachResult: .success), // required as DefaultRoom attaches typingIndicators implicitly for now ] let channels = MockChannels(channels: channelsList) let realtime = MockRealtime.create(channels: channels) @@ -195,6 +202,7 @@ struct DefaultRoomTests { let channelsList = [ MockRealtimeChannel(name: "basketball::$chat::$chatMessages", detachResult: .success), MockRealtimeChannel(name: "basketball::$chat::$reactions", attachResult: .success), // required as DefaultRoom attaches reactions implicitly for now + MockRealtimeChannel(name: "basketball::$chat::$typingIndicators", attachResult: .success), // required as DefaultRoom attaches typingIndicators implicitly for now ] let channels = MockChannels(channels: channelsList) let realtime = MockRealtime.create(channels: channels) diff --git a/Tests/AblyChatTests/IntegrationTests.swift b/Tests/AblyChatTests/IntegrationTests.swift index 3c83799f..5167cd79 100644 --- a/Tests/AblyChatTests/IntegrationTests.swift +++ b/Tests/AblyChatTests/IntegrationTests.swift @@ -36,6 +36,7 @@ struct IntegrationTests { roomID: roomID, options: .init( presence: .init(), + typing: .init(timeout: 2), reactions: .init(), occupancy: .init() ) @@ -44,6 +45,7 @@ struct IntegrationTests { roomID: roomID, options: .init( presence: .init(), + typing: .init(timeout: 2), reactions: .init(), occupancy: .init() ) @@ -61,37 +63,37 @@ struct IntegrationTests { // MARK: - Send and receive messages - // (6) Send a message before subscribing to messages, so that later on we can check history works. + // (1) Send a message before subscribing to messages, so that later on we can check history works. - // Create a throwaway subscription and wait for it to receive a message. This is to make sure that rxRoom has seen the message that we send here, so that the first message we receive on the subscription created in (7) is that which we’ll send in (8), and not that which we send here. + // (2) Create a throwaway subscription and wait for it to receive a message. This is to make sure that rxRoom has seen the message that we send here, so that the first message we receive on the subscription created in (5) is that which we’ll send in (6), and not that which we send here. let throwawayRxMessageSubscription = try await rxRoom.messages.subscribe(bufferingPolicy: .unbounded) - // Send the message + // (3) Send the message let txMessageBeforeRxSubscribe = try await txRoom.messages.send(params: .init(text: "Hello from txRoom, before rxRoom subscribe")) - // Wait for rxRoom to see the message we just sent + // (4) Wait for rxRoom to see the message we just sent let throwawayRxMessage = try #require(await throwawayRxMessageSubscription.first { _ in true }) #expect(throwawayRxMessage == txMessageBeforeRxSubscribe) - // (7) Subscribe to messages + // (5) Subscribe to messages let rxMessageSubscription = try await rxRoom.messages.subscribe(bufferingPolicy: .unbounded) - // (8) Now that we’re subscribed to messages, send a message on the other client and check that we receive it on the subscription + // (6) Now that we’re subscribed to messages, send a message on the other client and check that we receive it on the subscription let txMessageAfterRxSubscribe = try await txRoom.messages.send(params: .init(text: "Hello from txRoom, after rxRoom subscribe")) let rxMessageFromSubscription = try #require(await rxMessageSubscription.first { _ in true }) #expect(rxMessageFromSubscription == txMessageAfterRxSubscribe) - // (9) Fetch historical messages from before subscribing, and check we get txMessageBeforeRxSubscribe + // (7) Fetch historical messages from before subscribing, and check we get txMessageBeforeRxSubscribe let rxMessagesBeforeSubscribing = try await rxMessageSubscription.getPreviousMessages(params: .init()) try #require(rxMessagesBeforeSubscribing.items.count == 1) #expect(rxMessagesBeforeSubscribing.items[0] == txMessageBeforeRxSubscribe) // MARK: - Reactions - // (10) Subscribe to reactions + // (1) Subscribe to reactions let rxReactionSubscription = await rxRoom.reactions.subscribe(bufferingPolicy: .unbounded) - // (11) Now that we’re subscribed to reactions, send a reaction on the other client and check that we receive it on the subscription + // (2) Now that we’re subscribed to reactions, send a reaction on the other client and check that we receive it on the subscription try await txRoom.reactions.send(params: .init(type: "heart")) let rxReactionFromSubscription = try #require(await rxReactionSubscription.first { _ in true }) #expect(rxReactionFromSubscription.type == "heart") @@ -101,28 +103,28 @@ struct IntegrationTests { // It can take a moment for the occupancy to update from the clients connecting above, so we’ll wait a 2 seconds here. try await Task.sleep(nanoseconds: 2_000_000_000) - // (12) Get current occupancy + // (1) Get current occupancy let currentOccupancy = try await rxRoom.occupancy.get() #expect(currentOccupancy.connections != 0) // this flucuates dependant on the number of clients connected e.g. simulators running the test, hence why checking for non-zero #expect(currentOccupancy.presenceMembers == 0) // not yet entered presence - // (13) Subscribe to occupancy + // (2) Subscribe to occupancy let rxOccupancySubscription = await rxRoom.occupancy.subscribe(bufferingPolicy: .unbounded) - // (14) Attach the room so we can perform presence operations + // (3) Attach the room so we can perform presence operations try await txRoom.attach() - // (15) Enter presence on the other client and check that we receive the updated occupancy on the subscription + // (4) Enter presence on the other client and check that we receive the updated occupancy on the subscription try await txRoom.presence.enter(data: nil) // It can take a moment for the occupancy to update from the clients entering presence above, so we’ll wait 2 seconds here. try await Task.sleep(nanoseconds: 2_000_000_000) - // (16) Check that we received an updated presence count when getting the occupancy + // (5) Check that we received an updated presence count when getting the occupancy let updatedCurrentOccupancy = try await rxRoom.occupancy.get() #expect(updatedCurrentOccupancy.presenceMembers == 1) // 1 for txClient entering presence - // (17) Check that we received an updated presence count on the subscription + // (6) Check that we received an updated presence count on the subscription let rxOccupancyEventFromSubscription = try #require(await rxOccupancySubscription.first { _ in true }) #expect(rxOccupancyEventFromSubscription.presenceMembers == 1) // 1 for txClient entering presence @@ -134,64 +136,93 @@ struct IntegrationTests { // MARK: - Presence - // (18) Subscribe to presence + // (1) Subscribe to presence let rxPresenceSubscription = await rxRoom.presence.subscribe(events: [.enter, .leave, .update]) - // (19) Send `.enter` presence event with custom data on the other client and check that we receive it on the subscription + // (2) Send `.enter` presence event with custom data on the other client and check that we receive it on the subscription try await txRoom.presence.enter(data: .init(userCustomData: ["randomData": .string("randomValue")])) let rxPresenceEnterTxEvent = try #require(await rxPresenceSubscription.first { _ in true }) #expect(rxPresenceEnterTxEvent.action == .enter) #expect(rxPresenceEnterTxEvent.data?.userCustomData?["randomData"]?.value as? String == "randomValue") - // (20) Send `.update` presence event with custom data on the other client and check that we receive it on the subscription + // (3) Send `.update` presence event with custom data on the other client and check that we receive it on the subscription try await txRoom.presence.update(data: .init(userCustomData: ["randomData": .string("randomValue")])) let rxPresenceUpdateTxEvent = try #require(await rxPresenceSubscription.first { _ in true }) #expect(rxPresenceUpdateTxEvent.action == .update) #expect(rxPresenceUpdateTxEvent.data?.userCustomData?["randomData"]?.value as? String == "randomValue") - // (21) Send `.leave` presence event with custom data on the other client and check that we receive it on the subscription + // (4) Send `.leave` presence event with custom data on the other client and check that we receive it on the subscription try await txRoom.presence.leave(data: .init(userCustomData: ["randomData": .string("randomValue")])) let rxPresenceLeaveTxEvent = try #require(await rxPresenceSubscription.first { _ in true }) #expect(rxPresenceLeaveTxEvent.action == .leave) #expect(rxPresenceLeaveTxEvent.data?.userCustomData?["randomData"]?.value as? String == "randomValue") - // (22) Send `.enter` presence event with custom data on our client and check that we receive it on the subscription + // (5) Send `.enter` presence event with custom data on our client and check that we receive it on the subscription try await txRoom.presence.enter(data: .init(userCustomData: ["randomData": .string("randomValue")])) let rxPresenceEnterRxEvent = try #require(await rxPresenceSubscription.first { _ in true }) #expect(rxPresenceEnterRxEvent.action == .enter) #expect(rxPresenceEnterRxEvent.data?.userCustomData?["randomData"]?.value as? String == "randomValue") - // (23) Send `.update` presence event with custom data on our client and check that we receive it on the subscription + // (6) Send `.update` presence event with custom data on our client and check that we receive it on the subscription try await txRoom.presence.update(data: .init(userCustomData: ["randomData": .string("randomValue")])) let rxPresenceUpdateRxEvent = try #require(await rxPresenceSubscription.first { _ in true }) #expect(rxPresenceUpdateRxEvent.action == .update) #expect(rxPresenceUpdateRxEvent.data?.userCustomData?["randomData"]?.value as? String == "randomValue") - // (24) Send `.leave` presence event with custom data on our client and check that we receive it on the subscription + // (7) Send `.leave` presence event with custom data on our client and check that we receive it on the subscription try await txRoom.presence.leave(data: .init(userCustomData: ["randomData": .string("randomValue")])) let rxPresenceLeaveRxEvent = try #require(await rxPresenceSubscription.first { _ in true }) #expect(rxPresenceLeaveRxEvent.action == .leave) #expect(rxPresenceLeaveRxEvent.data?.userCustomData?["randomData"]?.value as? String == "randomValue") + // MARK: - Typing Indicators + + // (1) Subscribe to typing indicators + let rxTypingSubscription = await rxRoom.typing.subscribe(bufferingPolicy: .unbounded) + + // (2) Start typing on txRoom and check that we receive the typing event on the subscription + try await txRoom.typing.start() + + // (3) Wait for the typing event to be received + var typingEvents: [TypingEvent] = [] + for await typingEvent in rxTypingSubscription { + typingEvents.append(typingEvent) + if typingEvents.count == 1 { break } + } + + // (4) Check that we received the typing event showing that txRoom is typing + #expect(typingEvents.count == 1) + #expect(typingEvents[0].currentlyTyping.count == 1) + + // (5) Wait for the typing event to be received (auto sent from timeout) + for await typingEvent in rxTypingSubscription { + typingEvents.append(typingEvent) + if typingEvents.count == 2 { break } + } + + // (6) Check that we received the typing event showing that txRoom is no longer typing + #expect(typingEvents.count == 2) + #expect(typingEvents[1].currentlyTyping.isEmpty) + // MARK: - Detach - // (25) Detach the room + // (1) Detach the room try await rxRoom.detach() - // (26) Check that we received a DETACHED status change as a result of detaching the room + // (2) Check that we received a DETACHED status change as a result of detaching the room _ = try #require(await rxRoomStatusSubscription.first { $0.current == .detached }) #expect(await rxRoom.status == .detached) // MARK: - Release - // (27) Release the room + // (1) Release the room try await rxClient.rooms.release(roomID: roomID) - // (28) Check that we received a RELEASED status change as a result of releasing the room + // (2) Check that we received a RELEASED status change as a result of releasing the room _ = try #require(await rxRoomStatusSubscription.first { $0.current == .released }) #expect(await rxRoom.status == .released) - // (29) Fetch the room we just released and check it’s a new object + // (3) Fetch the room we just released and check it’s a new object let postReleaseRxRoom = try await rxClient.rooms.get(roomID: roomID, options: .init()) #expect(postReleaseRxRoom !== rxRoom) }