From 4ef61bb2e4c0a00d11577ee76a48eb47ecb3f8f6 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Thu, 17 Oct 2024 12:00:12 -0300 Subject: [PATCH 1/3] feat(realtime): add callback for handling system events --- .../Realtime/RealtimeChannel+AsyncAwait.swift | 15 +++++++++++++++ Sources/Realtime/V2/CallbackManager.swift | 16 ++++++++++++++++ Sources/Realtime/V2/RealtimeChannelV2.swift | 8 ++++++++ 3 files changed, 39 insertions(+) diff --git a/Sources/Realtime/RealtimeChannel+AsyncAwait.swift b/Sources/Realtime/RealtimeChannel+AsyncAwait.swift index 02f3fc89a..d6a854cee 100644 --- a/Sources/Realtime/RealtimeChannel+AsyncAwait.swift +++ b/Sources/Realtime/RealtimeChannel+AsyncAwait.swift @@ -104,6 +104,21 @@ extension RealtimeChannelV2 { return stream } + + /// Listen for `system` event. + public func system() -> AsyncStream { + let (stream, continuation) = AsyncStream.makeStream() + + let subscription = onSystem { + continuation.yield($0) + } + + continuation.onTermination = { _ in + subscription.cancel() + } + + return stream + } /// Listen for broadcast messages sent by other clients within the same channel under a specific `event`. @available(*, deprecated, renamed: "broadcastStream(event:)") diff --git a/Sources/Realtime/V2/CallbackManager.swift b/Sources/Realtime/V2/CallbackManager.swift index ea567b65f..7e21b9b0f 100644 --- a/Sources/Realtime/V2/CallbackManager.swift +++ b/Sources/Realtime/V2/CallbackManager.swift @@ -75,6 +75,15 @@ final class CallbackManager: Sendable { } } + @discardableResult + func addSystemCallback(callback: @escaping @Sendable (RealtimeMessageV2) -> Void) -> Int { + mutableState.withValue { + $0.id += 1 + $0.callbacks.append(.system(SystemCallback(id: $0.id, callback: callback))) + return $0.id + } + } + func setServerChanges(changes: [PostgresJoinConfig]) { mutableState.withValue { $0.serverChanges = changes @@ -167,16 +176,23 @@ struct PresenceCallback { var callback: @Sendable (any PresenceAction) -> Void } +struct SystemCallback { + var id: Int + var callback: @Sendable (RealtimeMessageV2) -> Void +} + enum RealtimeCallback { case postgres(PostgresCallback) case broadcast(BroadcastCallback) case presence(PresenceCallback) + case system(SystemCallback) var id: Int { switch self { case let .postgres(callback): callback.id case let .broadcast(callback): callback.id case let .presence(callback): callback.id + case let .system(callback): callback.id } } } diff --git a/Sources/Realtime/V2/RealtimeChannelV2.swift b/Sources/Realtime/V2/RealtimeChannelV2.swift index f29b144f6..6a9123d7a 100644 --- a/Sources/Realtime/V2/RealtimeChannelV2.swift +++ b/Sources/Realtime/V2/RealtimeChannelV2.swift @@ -545,6 +545,14 @@ public final class RealtimeChannelV2: Sendable { } } + public func onSystem(callback: @escaping @Sendable (RealtimeMessageV2) -> Void) -> Subscription { + let id = callbackManager.addSystemCallback(callback: callback) + return Subscription { [weak callbackManager, logger] in + logger?.debug("Removing system callback with id: \(id)") + callbackManager?.removeCallback(id: id) + } + } + @discardableResult func push(_ event: String, ref: String? = nil, payload: JSONObject = [:]) async -> PushStatus { let push = mutableState.withValue { From 72fccff901f1a8d216f627484f36d01d5ec8200f Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Mon, 4 Nov 2024 14:56:26 -0300 Subject: [PATCH 2/3] trigger event --- Sources/Realtime/V2/CallbackManager.swift | 13 ++++++ Sources/Realtime/V2/RealtimeChannelV2.swift | 46 +++++++++++++------ Sources/Realtime/V2/RealtimeMessageV2.swift | 21 ++++++--- .../RealtimeTests/CallbackManagerTests.swift | 38 +++++++++++---- .../RealtimeTests/RealtimeChannelTests.swift | 13 +++++- 5 files changed, 101 insertions(+), 30 deletions(-) diff --git a/Sources/Realtime/V2/CallbackManager.swift b/Sources/Realtime/V2/CallbackManager.swift index 7e21b9b0f..1723b9f47 100644 --- a/Sources/Realtime/V2/CallbackManager.swift +++ b/Sources/Realtime/V2/CallbackManager.swift @@ -154,6 +154,19 @@ final class CallbackManager: Sendable { } } + func triggerSystem(message: RealtimeMessageV2) { + let systemCallbacks = mutableState.callbacks.compactMap { + if case .system(let callback) = $0 { + return callback + } + return nil + } + + for systemCallback in systemCallbacks { + systemCallback.callback(message) + } + } + func reset() { mutableState.setValue(MutableState()) } diff --git a/Sources/Realtime/V2/RealtimeChannelV2.swift b/Sources/Realtime/V2/RealtimeChannelV2.swift index 6a9123d7a..8facb88c4 100644 --- a/Sources/Realtime/V2/RealtimeChannelV2.swift +++ b/Sources/Realtime/V2/RealtimeChannelV2.swift @@ -7,8 +7,8 @@ import ConcurrencyExtras import Foundation -import Helpers import HTTPTypes +import Helpers #if canImport(FoundationNetworking) import FoundationNetworking @@ -59,7 +59,9 @@ extension Socket { addChannel: { [weak client] in client?.addChannel($0) }, removeChannel: { [weak client] in await client?.removeChannel($0) }, push: { [weak client] in await client?.push($0) }, - httpSend: { [weak client] in try await client?.http.send($0) ?? .init(data: Data(), response: HTTPURLResponse()) } + httpSend: { [weak client] in + try await client?.http.send($0) ?? .init(data: Data(), response: HTTPURLResponse()) + } ) } } @@ -185,7 +187,8 @@ public final class RealtimeChannelV2: Sendable { @available( *, deprecated, - message: "manually updating auth token per channel is not recommended, please use `setAuth` in RealtimeClient instead." + message: + "manually updating auth token per channel is not recommended, please use `setAuth` in RealtimeClient instead." ) public func updateAuth(jwt: String?) async { logger?.debug("Updating auth token for channel \(topic)") @@ -238,8 +241,8 @@ public final class RealtimeChannelV2: Sendable { event: event, payload: message, private: config.isPrivate - ), - ], + ) + ] ] ) ) @@ -295,20 +298,27 @@ public final class RealtimeChannelV2: Sendable { func onMessage(_ message: RealtimeMessageV2) async { do { - guard let eventType = message.eventType else { + guard let eventType = message._eventType else { logger?.debug("Received message without event type: \(message)") return } switch eventType { case .tokenExpired: - logger?.debug( - "Received token expired event. This should not happen, please report this warning." - ) + // deprecated type + break case .system: - logger?.debug("Subscribed to channel \(message.topic)") - status = .subscribed + if message.status == .ok { + logger?.debug("Subscribed to channel \(message.topic)") + status = .subscribed + } else { + logger?.debug( + "Failed to subscribe to channel \(message.topic): \(message.payload)" + ) + } + + callbackManager.triggerSystem(message: message) case .reply: guard @@ -545,14 +555,24 @@ public final class RealtimeChannelV2: Sendable { } } - public func onSystem(callback: @escaping @Sendable (RealtimeMessageV2) -> Void) -> Subscription { + /// Listen for `system` event. + public func onSystem( + callback: @escaping @Sendable (RealtimeMessageV2) -> Void + ) -> RealtimeSubscription { let id = callbackManager.addSystemCallback(callback: callback) - return Subscription { [weak callbackManager, logger] in + return RealtimeSubscription { [weak callbackManager, logger] in logger?.debug("Removing system callback with id: \(id)") callbackManager?.removeCallback(id: id) } } + /// Listen for `system` event. + public func onSystem( + callback: @escaping @Sendable () -> Void + ) -> RealtimeSubscription { + self.onSystem { _ in callback() } + } + @discardableResult func push(_ event: String, ref: String? = nil, payload: JSONObject = [:]) async -> PushStatus { let push = mutableState.withValue { diff --git a/Sources/Realtime/V2/RealtimeMessageV2.swift b/Sources/Realtime/V2/RealtimeMessageV2.swift index ff45913e0..d288aece0 100644 --- a/Sources/Realtime/V2/RealtimeMessageV2.swift +++ b/Sources/Realtime/V2/RealtimeMessageV2.swift @@ -23,15 +23,22 @@ public struct RealtimeMessageV2: Hashable, Codable, Sendable { self.payload = payload } - var status: PushStatus? { + /// Status for the received message if any. + public var status: PushStatus? { payload["status"] .flatMap(\.stringValue) .flatMap(PushStatus.init(rawValue:)) } - public var eventType: EventType? { + @available( + *, deprecated, + message: "Access to event type will be removed, please inspect raw event value instead." + ) + public var eventType: EventType? { _eventType } + + var _eventType: EventType? { switch event { - case ChannelEvent.system where status == .ok: .system + case ChannelEvent.system: .system case ChannelEvent.postgresChanges: .postgresChanges case ChannelEvent.broadcast: @@ -44,9 +51,6 @@ public struct RealtimeMessageV2: Hashable, Codable, Sendable { .presenceDiff case ChannelEvent.presenceState: .presenceState - case ChannelEvent.system - where payload["message"]?.stringValue?.contains("access token has expired") == true: - .tokenExpired case ChannelEvent.reply: .reply default: @@ -62,6 +66,11 @@ public struct RealtimeMessageV2: Hashable, Codable, Sendable { case error case presenceDiff case presenceState + @available( + *, deprecated, + message: + "tokenExpired gets returned as system, check payload for verifying if is a token expiration." + ) case tokenExpired case reply } diff --git a/Tests/RealtimeTests/CallbackManagerTests.swift b/Tests/RealtimeTests/CallbackManagerTests.swift index 145db28ed..779993b2f 100644 --- a/Tests/RealtimeTests/CallbackManagerTests.swift +++ b/Tests/RealtimeTests/CallbackManagerTests.swift @@ -8,9 +8,10 @@ import ConcurrencyExtras import CustomDump import Helpers -@testable import Realtime import XCTest +@testable import Realtime + final class CallbackManagerTests: XCTestCase { func testIntegration() { let callbackManager = CallbackManager() @@ -52,13 +53,15 @@ final class CallbackManagerTests: XCTestCase { let callbackManager = CallbackManager() XCTAssertNoLeak(callbackManager) - let changes = [PostgresJoinConfig( - event: .update, - schema: "public", - table: "users", - filter: nil, - id: 1 - )] + let changes = [ + PostgresJoinConfig( + event: .update, + schema: "public", + table: "users", + filter: nil, + id: 1 + ) + ] callbackManager.setServerChanges(changes: changes) @@ -118,7 +121,8 @@ final class CallbackManagerTests: XCTestCase { receivedActions.withValue { $0.append(action) } } - let deleteSpecificUserId = callbackManager + let deleteSpecificUserId = + callbackManager .addPostgresCallback(filter: deleteSpecificUserFilter) { action in receivedActions.withValue { $0.append(action) } } @@ -215,6 +219,22 @@ final class CallbackManagerTests: XCTestCase { expectNoDifference(receivedAction.value?.joins, joins) expectNoDifference(receivedAction.value?.leaves, leaves) } + + func testTriggerSystem() { + let callbackManager = CallbackManager() + + let receivedMessage = LockIsolated(RealtimeMessageV2?.none) + callbackManager.addSystemCallback { message in + receivedMessage.setValue(message) + } + + callbackManager.triggerSystem( + message: RealtimeMessageV2( + joinRef: nil, ref: nil, topic: "test", event: "system", payload: ["status": "ok"])) + + XCTAssertEqual(receivedMessage.value?._eventType, .system) + XCTAssertEqual(receivedMessage.value?.status, .ok) + } } extension XCTestCase { diff --git a/Tests/RealtimeTests/RealtimeChannelTests.swift b/Tests/RealtimeTests/RealtimeChannelTests.swift index b260f40db..a6403cd30 100644 --- a/Tests/RealtimeTests/RealtimeChannelTests.swift +++ b/Tests/RealtimeTests/RealtimeChannelTests.swift @@ -6,10 +6,11 @@ // import InlineSnapshotTesting -@testable import Realtime import XCTest import XCTestDynamicOverlay +@testable import Realtime + final class RealtimeChannelTests: XCTestCase { let sut = RealtimeChannelV2( topic: "topic", @@ -48,9 +49,13 @@ final class RealtimeChannelTests: XCTestCase { sut.onPresenceChange { _ in }.store(in: &subscriptions) + sut.onSystem { + } + .store(in: &subscriptions) + assertInlineSnapshot(of: sut.callbackManager.callbacks, as: .dump) { """ - ▿ 7 elements + ▿ 8 elements ▿ RealtimeCallback ▿ postgres: PostgresCallback - callback: (Function) @@ -112,6 +117,10 @@ final class RealtimeChannelTests: XCTestCase { ▿ presence: PresenceCallback - callback: (Function) - id: 7 + ▿ RealtimeCallback + ▿ system: SystemCallback + - callback: (Function) + - id: 8 """ } From cc147e6dffb6c82492054ba3a4626553fce23320 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Mon, 4 Nov 2024 15:01:53 -0300 Subject: [PATCH 3/3] fix test --- .../RealtimeMessageV2Tests.swift | 71 ++++++++++++------- 1 file changed, 45 insertions(+), 26 deletions(-) diff --git a/Tests/RealtimeTests/RealtimeMessageV2Tests.swift b/Tests/RealtimeTests/RealtimeMessageV2Tests.swift index fb29b19bb..0df944a6e 100644 --- a/Tests/RealtimeTests/RealtimeMessageV2Tests.swift +++ b/Tests/RealtimeTests/RealtimeMessageV2Tests.swift @@ -5,56 +5,75 @@ // Created by Guilherme Souza on 26/06/24. // -@testable import Realtime import XCTest +@testable import Realtime + final class RealtimeMessageV2Tests: XCTestCase { func testStatus() { - var message = RealtimeMessageV2(joinRef: nil, ref: nil, topic: "heartbeat", event: "event", payload: ["status": "ok"]) + var message = RealtimeMessageV2( + joinRef: nil, ref: nil, topic: "heartbeat", event: "event", payload: ["status": "ok"]) XCTAssertEqual(message.status, .ok) - message = RealtimeMessageV2(joinRef: nil, ref: nil, topic: "heartbeat", event: "event", payload: ["status": "timeout"]) + message = RealtimeMessageV2( + joinRef: nil, ref: nil, topic: "heartbeat", event: "event", payload: ["status": "timeout"]) XCTAssertEqual(message.status, .timeout) - message = RealtimeMessageV2(joinRef: nil, ref: nil, topic: "heartbeat", event: "event", payload: ["status": "error"]) + message = RealtimeMessageV2( + joinRef: nil, ref: nil, topic: "heartbeat", event: "event", payload: ["status": "error"]) XCTAssertEqual(message.status, .error) - message = RealtimeMessageV2(joinRef: nil, ref: nil, topic: "heartbeat", event: "event", payload: ["status": "invalid"]) + message = RealtimeMessageV2( + joinRef: nil, ref: nil, topic: "heartbeat", event: "event", payload: ["status": "invalid"]) XCTAssertNil(message.status) } func testEventType() { - let payloadWithTokenExpiredMessage: JSONObject = ["message": "access token has expired"] let payloadWithStatusOK: JSONObject = ["status": "ok"] let payloadWithNoStatus: JSONObject = [:] - let systemEventMessage = RealtimeMessageV2(joinRef: nil, ref: nil, topic: "topic", event: ChannelEvent.system, payload: payloadWithStatusOK) - let postgresChangesEventMessage = RealtimeMessageV2(joinRef: nil, ref: nil, topic: "topic", event: ChannelEvent.postgresChanges, payload: payloadWithNoStatus) - let tokenExpiredEventMessage = RealtimeMessageV2(joinRef: nil, ref: nil, topic: "topic", event: ChannelEvent.system, payload: payloadWithTokenExpiredMessage) + let systemEventMessage = RealtimeMessageV2( + joinRef: nil, ref: nil, topic: "topic", event: ChannelEvent.system, + payload: payloadWithStatusOK) + let postgresChangesEventMessage = RealtimeMessageV2( + joinRef: nil, ref: nil, topic: "topic", event: ChannelEvent.postgresChanges, + payload: payloadWithNoStatus) - XCTAssertEqual(systemEventMessage.eventType, .system) - XCTAssertEqual(postgresChangesEventMessage.eventType, .postgresChanges) - XCTAssertEqual(tokenExpiredEventMessage.eventType, .tokenExpired) + XCTAssertEqual(systemEventMessage._eventType, .system) + XCTAssertEqual(postgresChangesEventMessage._eventType, .postgresChanges) - let broadcastEventMessage = RealtimeMessageV2(joinRef: nil, ref: nil, topic: "topic", event: ChannelEvent.broadcast, payload: payloadWithNoStatus) - XCTAssertEqual(broadcastEventMessage.eventType, .broadcast) + let broadcastEventMessage = RealtimeMessageV2( + joinRef: nil, ref: nil, topic: "topic", event: ChannelEvent.broadcast, + payload: payloadWithNoStatus) + XCTAssertEqual(broadcastEventMessage._eventType, .broadcast) - let closeEventMessage = RealtimeMessageV2(joinRef: nil, ref: nil, topic: "topic", event: ChannelEvent.close, payload: payloadWithNoStatus) - XCTAssertEqual(closeEventMessage.eventType, .close) + let closeEventMessage = RealtimeMessageV2( + joinRef: nil, ref: nil, topic: "topic", event: ChannelEvent.close, + payload: payloadWithNoStatus) + XCTAssertEqual(closeEventMessage._eventType, .close) - let errorEventMessage = RealtimeMessageV2(joinRef: nil, ref: nil, topic: "topic", event: ChannelEvent.error, payload: payloadWithNoStatus) - XCTAssertEqual(errorEventMessage.eventType, .error) + let errorEventMessage = RealtimeMessageV2( + joinRef: nil, ref: nil, topic: "topic", event: ChannelEvent.error, + payload: payloadWithNoStatus) + XCTAssertEqual(errorEventMessage._eventType, .error) - let presenceDiffEventMessage = RealtimeMessageV2(joinRef: nil, ref: nil, topic: "topic", event: ChannelEvent.presenceDiff, payload: payloadWithNoStatus) - XCTAssertEqual(presenceDiffEventMessage.eventType, .presenceDiff) + let presenceDiffEventMessage = RealtimeMessageV2( + joinRef: nil, ref: nil, topic: "topic", event: ChannelEvent.presenceDiff, + payload: payloadWithNoStatus) + XCTAssertEqual(presenceDiffEventMessage._eventType, .presenceDiff) - let presenceStateEventMessage = RealtimeMessageV2(joinRef: nil, ref: nil, topic: "topic", event: ChannelEvent.presenceState, payload: payloadWithNoStatus) - XCTAssertEqual(presenceStateEventMessage.eventType, .presenceState) + let presenceStateEventMessage = RealtimeMessageV2( + joinRef: nil, ref: nil, topic: "topic", event: ChannelEvent.presenceState, + payload: payloadWithNoStatus) + XCTAssertEqual(presenceStateEventMessage._eventType, .presenceState) - let replyEventMessage = RealtimeMessageV2(joinRef: nil, ref: nil, topic: "topic", event: ChannelEvent.reply, payload: payloadWithNoStatus) - XCTAssertEqual(replyEventMessage.eventType, .reply) + let replyEventMessage = RealtimeMessageV2( + joinRef: nil, ref: nil, topic: "topic", event: ChannelEvent.reply, + payload: payloadWithNoStatus) + XCTAssertEqual(replyEventMessage._eventType, .reply) - let unknownEventMessage = RealtimeMessageV2(joinRef: nil, ref: nil, topic: "topic", event: "unknown_event", payload: payloadWithNoStatus) - XCTAssertNil(unknownEventMessage.eventType) + let unknownEventMessage = RealtimeMessageV2( + joinRef: nil, ref: nil, topic: "topic", event: "unknown_event", payload: payloadWithNoStatus) + XCTAssertNil(unknownEventMessage._eventType) } }