Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ECO-4948] Runs room checks prior to performing presence operations #135

Merged
merged 1 commit into from
Nov 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Runs room checks prior to performing presence operations via `feature…
…Channel.waitToBeAbleToPerformPresenceOperations` which was implemented as part of [1].

It also became necessary to clean up the example app as this uncovered race conditions between concurrently running Tasks.
  • Loading branch information
umair-ably committed Nov 21, 2024
commit 370d4446025388d2c35df11bdb1ca772785c16c3
104 changes: 65 additions & 39 deletions Example/AblyChatExample/ContentView.swift
Original file line number Diff line number Diff line change
@@ -134,12 +134,14 @@ struct ContentView: View {
}
}
}
.tryTask { try await setDefaultTitle() }
.tryTask { try await attachRoom() }
.tryTask { try await showMessages() }
.tryTask { try await showReactions() }
.tryTask { try await showPresence() }
.tryTask { try await showOccupancy() }
.tryTask {
try await setDefaultTitle()
try await attachRoom()
try await showMessages()
try await showReactions()
try await showPresence()
try await showOccupancy()
}
.tryTask {
// NOTE: As we implement more features, move them out of the `if Environment.current == .mock` block and into the main block just above.
if Environment.current == .mock {
@@ -179,70 +181,94 @@ struct ContentView: View {
}
}

for await message in messagesSubscription {
withAnimation {
messages.insert(BasicListItem(id: message.timeserial, title: message.clientID, text: message.text), at: 0)
// Continue listening for messages on a background task so this function can return
Task {
for await message in messagesSubscription {
withAnimation {
messages.insert(BasicListItem(id: message.timeserial, title: message.clientID, text: message.text), at: 0)
}
}
}
}

func showReactions() async throws {
let reactionSubscription = try await room().reactions.subscribe(bufferingPolicy: .unbounded)
for await reaction in reactionSubscription {
withAnimation {
showReaction(reaction.displayedText)

// Continue listening for reactions on a background task so this function can return
Task {
for await reaction in reactionSubscription {
withAnimation {
showReaction(reaction.displayedText)
}
}
}
}

func showPresence() async throws {
try await room().presence.enter(data: .init(userCustomData: ["status": .string("📱 Online")]))

for await event in try await room().presence.subscribe(events: [.enter, .leave, .update]) {
withAnimation {
let status = event.data?.userCustomData?["status"]?.value as? String
let clientPresenceChangeMessage = "\(event.clientID) \(event.action.displayedText)"
let presenceMessage = status != nil ? "\(clientPresenceChangeMessage) with status: \(status!)" : clientPresenceChangeMessage
// Continue listening for new presence events on a background task so this function can return
Task {
for await event in try await room().presence.subscribe(events: [.enter, .leave, .update]) {
withAnimation {
let status = event.data?.userCustomData?["status"]?.value as? String
let clientPresenceChangeMessage = "\(event.clientID) \(event.action.displayedText)"
let presenceMessage = status != nil ? "\(clientPresenceChangeMessage) with status: \(status!)" : clientPresenceChangeMessage

messages.insert(BasicListItem(id: UUID().uuidString, title: "System", text: presenceMessage), at: 0)
messages.insert(BasicListItem(id: UUID().uuidString, title: "System", text: presenceMessage), at: 0)
}
}
}
}

func showTypings() async throws {
for await typing in try await room().typing.subscribe(bufferingPolicy: .unbounded) {
withAnimation {
typingInfo = "Typing: \(typing.currentlyTyping.joined(separator: ", "))..."
Task {
try? await Task.sleep(nanoseconds: 1 * 1_000_000_000)
withAnimation {
typingInfo = ""
// Continue listening for typing events on a background task so this function can return
Task {
for await typing in try await room().typing.subscribe(bufferingPolicy: .unbounded) {
withAnimation {
typingInfo = "Typing: \(typing.currentlyTyping.joined(separator: ", "))..."
Task {
try? await Task.sleep(nanoseconds: 1 * 1_000_000_000)
withAnimation {
typingInfo = ""
}
}
}
}
}
}

func showOccupancy() async throws {
for await event in try await room().occupancy.subscribe(bufferingPolicy: .unbounded) {
withAnimation {
occupancyInfo = "Connections: \(event.presenceMembers) (\(event.connections))"
// Continue listening for occupancy events on a background task so this function can return
let currentOccupancy = try await room().occupancy.get()
withAnimation {
occupancyInfo = "Connections: \(currentOccupancy.presenceMembers) (\(currentOccupancy.connections))"
}

Task {
for await event in try await room().occupancy.subscribe(bufferingPolicy: .unbounded) {
withAnimation {
occupancyInfo = "Connections: \(event.presenceMembers) (\(event.connections))"
}
}
}
}

func showRoomStatus() async throws {
for await status in try await room().onStatusChange(bufferingPolicy: .unbounded) {
withAnimation {
if status.current.isAttaching {
statusInfo = "\(status.current)...".capitalized
} else {
statusInfo = "\(status.current)".capitalized
if status.current == .attached {
Task {
try? await Task.sleep(nanoseconds: 1 * 1_000_000_000)
withAnimation {
statusInfo = ""
// Continue listening for status change events on a background task so this function can return
Task {
for await status in try await room().onStatusChange(bufferingPolicy: .unbounded) {
withAnimation {
if status.current.isAttaching {
statusInfo = "\(status.current)...".capitalized
} else {
statusInfo = "\(status.current)".capitalized
if status.current == .attached {
Task {
try? await Task.sleep(nanoseconds: 1 * 1_000_000_000)
withAnimation {
statusInfo = ""
}
}
}
}
52 changes: 52 additions & 0 deletions Sources/AblyChat/DefaultPresence.swift
Original file line number Diff line number Diff line change
@@ -21,6 +21,15 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities {
// (CHA-PR6) It must be possible to retrieve all the @Members of the presence set. The behaviour depends on the current room status, as presence operations in a Realtime Client cause implicit attaches.
internal func get() async throws -> [PresenceMember] {
logger.log(message: "Getting presence", level: .debug)

// CHA-PR6b to CHA-PR6f
do {
try await featureChannel.waitToBeAbleToPerformPresenceOperations(requestedByFeature: RoomFeature.presence)
} catch {
logger.log(message: "Error waiting to be able to perform presence get operation: \(error)", level: .error)
throw error
}

return try await withCheckedThrowingContinuation { continuation in
channel.presence.get { [processPresenceGet] members, error in
do {
@@ -36,6 +45,15 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities {

internal func get(params: PresenceQuery) async throws -> [PresenceMember] {
logger.log(message: "Getting presence with params: \(params)", level: .debug)

// CHA-PR6b to CHA-PR6f
do {
try await featureChannel.waitToBeAbleToPerformPresenceOperations(requestedByFeature: RoomFeature.presence)
} catch {
logger.log(message: "Error waiting to be able to perform presence get operation: \(error)", level: .error)
throw error
}

return try await withCheckedThrowingContinuation { continuation in
channel.presence.get(params.asARTRealtimePresenceQuery()) { [processPresenceGet] members, error in
do {
@@ -52,6 +70,15 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities {
// (CHA-PR5) It must be possible to query if a given clientId is in the presence set.
internal func isUserPresent(clientID: String) async throws -> Bool {
logger.log(message: "Checking if user is present with clientID: \(clientID)", level: .debug)

// CHA-PR6b to CHA-PR6f
do {
try await featureChannel.waitToBeAbleToPerformPresenceOperations(requestedByFeature: RoomFeature.presence)
} catch {
logger.log(message: "Error waiting to be able to perform presence get operation: \(error)", level: .error)
throw error
}

return try await withCheckedThrowingContinuation { continuation in
channel.presence.get(ARTRealtimePresenceQuery(clientId: clientID, connectionId: nil)) { [logger] members, error in
guard let members else {
@@ -68,6 +95,14 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities {
// (CHA-PR3a) Users may choose to enter presence, optionally providing custom data to enter with. The overall presence data must retain the format specified in CHA-PR2.
internal func enter(data: PresenceData? = nil) async throws {
logger.log(message: "Entering presence", level: .debug)

// CHA-PR3c to CHA-PR3g
do {
try await featureChannel.waitToBeAbleToPerformPresenceOperations(requestedByFeature: RoomFeature.presence)
} catch {
logger.log(message: "Error waiting to be able to perform presence enter operation: \(error)", level: .error)
throw error
}
return try await withCheckedThrowingContinuation { continuation in
channel.presence.enterClient(clientID, data: data?.asQueryItems()) { [logger] error in
if let error {
@@ -83,6 +118,15 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities {
// (CHA-PR10a) Users may choose to update their presence data, optionally providing custom data to update with. The overall presence data must retain the format specified in CHA-PR2.
internal func update(data: PresenceData? = nil) async throws {
logger.log(message: "Updating presence", level: .debug)

// CHA-PR10c to CHA-PR10g
do {
try await featureChannel.waitToBeAbleToPerformPresenceOperations(requestedByFeature: RoomFeature.presence)
} catch {
logger.log(message: "Error waiting to be able to perform presence update operation: \(error)", level: .error)
throw error
}

return try await withCheckedThrowingContinuation { continuation in
channel.presence.update(data?.asQueryItems()) { [logger] error in
if let error {
@@ -98,6 +142,14 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities {
// (CHA-PR4a) Users may choose to leave presence, which results in them being removed from the Realtime presence set.
internal func leave(data: PresenceData? = nil) async throws {
logger.log(message: "Leaving presence", level: .debug)

// CHA-PR6b to CHA-PR6f
do {
try await featureChannel.waitToBeAbleToPerformPresenceOperations(requestedByFeature: RoomFeature.presence)
} catch {
logger.log(message: "Error waiting to be able to perform presence leave operation: \(error)", level: .error)
throw error
}
return try await withCheckedThrowingContinuation { continuation in
channel.presence.leave(data?.asQueryItems()) { [logger] error in
if let error {
33 changes: 18 additions & 15 deletions Tests/AblyChatTests/IntegrationTests.swift
Original file line number Diff line number Diff line change
@@ -109,17 +109,20 @@ struct IntegrationTests {
// (13) Subscribe to occupancy
let rxOccupancySubscription = await rxRoom.occupancy.subscribe(bufferingPolicy: .unbounded)

// (14) Enter presence on the other client and check that we receive the updated occupancy on the subscription
// (14) Attach the room so we can perform presence operations
try await txRoom.attach()

// (15) Enter presence on the other client and check that we receive the updated occupancy on the subscription
try await txRoom.presence.enter(data: nil)

// It can take a moment for the occupancy to update from the clients entering presence above, so we’ll wait 2 seconds here.
try await Task.sleep(nanoseconds: 2_000_000_000)

// (15) Check that we received an updated presence count when getting the occupancy
// (16) Check that we received an updated presence count when getting the occupancy
let updatedCurrentOccupancy = try await rxRoom.occupancy.get()
#expect(updatedCurrentOccupancy.presenceMembers == 1) // 1 for txClient entering presence

// (16) Check that we received an updated presence count on the subscription
// (17) Check that we received an updated presence count on the subscription
let rxOccupancyEventFromSubscription = try #require(await rxOccupancySubscription.first { _ in true })

#expect(rxOccupancyEventFromSubscription.presenceMembers == 1) // 1 for txClient entering presence
@@ -131,64 +134,64 @@ struct IntegrationTests {

// MARK: - Presence

// (17) Subscribe to presence
// (18) Subscribe to presence
let rxPresenceSubscription = await rxRoom.presence.subscribe(events: [.enter, .leave, .update])

// (18) Send `.enter` presence event with custom data on the other client and check that we receive it on the subscription
// (19) Send `.enter` presence event with custom data on the other client and check that we receive it on the subscription
try await txRoom.presence.enter(data: .init(userCustomData: ["randomData": .string("randomValue")]))
let rxPresenceEnterTxEvent = try #require(await rxPresenceSubscription.first { _ in true })
#expect(rxPresenceEnterTxEvent.action == .enter)
#expect(rxPresenceEnterTxEvent.data?.userCustomData?["randomData"]?.value as? String == "randomValue")

// (19) Send `.update` presence event with custom data on the other client and check that we receive it on the subscription
// (20) Send `.update` presence event with custom data on the other client and check that we receive it on the subscription
try await txRoom.presence.update(data: .init(userCustomData: ["randomData": .string("randomValue")]))
let rxPresenceUpdateTxEvent = try #require(await rxPresenceSubscription.first { _ in true })
#expect(rxPresenceUpdateTxEvent.action == .update)
#expect(rxPresenceUpdateTxEvent.data?.userCustomData?["randomData"]?.value as? String == "randomValue")

// (20) Send `.leave` presence event with custom data on the other client and check that we receive it on the subscription
// (21) Send `.leave` presence event with custom data on the other client and check that we receive it on the subscription
try await txRoom.presence.leave(data: .init(userCustomData: ["randomData": .string("randomValue")]))
let rxPresenceLeaveTxEvent = try #require(await rxPresenceSubscription.first { _ in true })
#expect(rxPresenceLeaveTxEvent.action == .leave)
#expect(rxPresenceLeaveTxEvent.data?.userCustomData?["randomData"]?.value as? String == "randomValue")

// (21) Send `.enter` presence event with custom data on our client and check that we receive it on the subscription
// (22) Send `.enter` presence event with custom data on our client and check that we receive it on the subscription
try await txRoom.presence.enter(data: .init(userCustomData: ["randomData": .string("randomValue")]))
let rxPresenceEnterRxEvent = try #require(await rxPresenceSubscription.first { _ in true })
#expect(rxPresenceEnterRxEvent.action == .enter)
#expect(rxPresenceEnterRxEvent.data?.userCustomData?["randomData"]?.value as? String == "randomValue")

// (22) Send `.update` presence event with custom data on our client and check that we receive it on the subscription
// (23) Send `.update` presence event with custom data on our client and check that we receive it on the subscription
try await txRoom.presence.update(data: .init(userCustomData: ["randomData": .string("randomValue")]))
let rxPresenceUpdateRxEvent = try #require(await rxPresenceSubscription.first { _ in true })
#expect(rxPresenceUpdateRxEvent.action == .update)
#expect(rxPresenceUpdateRxEvent.data?.userCustomData?["randomData"]?.value as? String == "randomValue")

// (23) Send `.leave` presence event with custom data on our client and check that we receive it on the subscription
// (24) Send `.leave` presence event with custom data on our client and check that we receive it on the subscription
try await txRoom.presence.leave(data: .init(userCustomData: ["randomData": .string("randomValue")]))
let rxPresenceLeaveRxEvent = try #require(await rxPresenceSubscription.first { _ in true })
#expect(rxPresenceLeaveRxEvent.action == .leave)
#expect(rxPresenceLeaveRxEvent.data?.userCustomData?["randomData"]?.value as? String == "randomValue")

// MARK: - Detach

// (24) Detach the room
// (25) Detach the room
try await rxRoom.detach()

// (25) Check that we received a DETACHED status change as a result of detaching the room
// (26) Check that we received a DETACHED status change as a result of detaching the room
_ = try #require(await rxRoomStatusSubscription.first { $0.current == .detached })
#expect(await rxRoom.status == .detached)

// MARK: - Release

// (26) Release the room
// (27) Release the room
try await rxClient.rooms.release(roomID: roomID)

// (27) Check that we received a RELEASED status change as a result of releasing the room
// (28) Check that we received a RELEASED status change as a result of releasing the room
_ = try #require(await rxRoomStatusSubscription.first { $0.current == .released })
#expect(await rxRoom.status == .released)

// (28) Fetch the room we just released and check it’s a new object
// (29) Fetch the room we just released and check it’s a new object
let postReleaseRxRoom = try await rxClient.rooms.get(roomID: roomID, options: .init())
#expect(postReleaseRxRoom !== rxRoom)
}