From d04e599d7f7dd46802ae5a30878919049c6e79b7 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Fri, 19 Apr 2024 10:08:32 -0300 Subject: [PATCH 1/5] fix(realtime): handle timeout when subscribing to channel --- Sources/Realtime/V2/PushV2.swift | 13 +- Sources/_Helpers/Task+withTimeout.swift | 39 +++++ Tests/RealtimeTests/RealtimeTests.swift | 191 +++++++++------------ Tests/_HelpersTests/WithTimeoutTests.swift | 34 ++++ 4 files changed, 167 insertions(+), 110 deletions(-) create mode 100644 Sources/_Helpers/Task+withTimeout.swift create mode 100644 Tests/_HelpersTests/WithTimeoutTests.swift diff --git a/Sources/Realtime/V2/PushV2.swift b/Sources/Realtime/V2/PushV2.swift index 54ec2b87b..4df526af7 100644 --- a/Sources/Realtime/V2/PushV2.swift +++ b/Sources/Realtime/V2/PushV2.swift @@ -23,8 +23,17 @@ actor PushV2 { await channel?.socket?.push(message) if channel?.config.broadcast.acknowledgeBroadcasts == true { - return await withCheckedContinuation { - receivedContinuation = $0 + do { + return try await withTimeout(interval: channel?.socket?.config.timeoutInterval ?? 10) { + await withCheckedContinuation { + self.receivedContinuation = $0 + } + } + } catch is TimeoutError { + return .timeout + } catch { + channel?.logger?.error("error sending Push: \(error)") + return .error } } diff --git a/Sources/_Helpers/Task+withTimeout.swift b/Sources/_Helpers/Task+withTimeout.swift new file mode 100644 index 000000000..290b759ee --- /dev/null +++ b/Sources/_Helpers/Task+withTimeout.swift @@ -0,0 +1,39 @@ +// +// Task+withTimeout.swift +// +// +// Created by Guilherme Souza on 19/04/24. +// + +import Foundation + +@discardableResult +package func withTimeout( + interval: TimeInterval, + @_inheritActorContext operation: @escaping @Sendable () async throws -> R +) async throws -> R { + try await withThrowingTaskGroup(of: R.self) { group in + defer { + group.cancelAll() + } + + let deadline = Date(timeIntervalSinceNow: interval) + + group.addTask { + try await operation() + } + + group.addTask { + let interval = deadline.timeIntervalSinceNow + if interval > 0 { + try await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(interval)) + } + try Task.checkCancellation() + throw TimeoutError() + } + + return try await group.next()! + } +} + +package struct TimeoutError: Error, Hashable {} diff --git a/Tests/RealtimeTests/RealtimeTests.swift b/Tests/RealtimeTests/RealtimeTests.swift index 4bb393131..a6b8c3c13 100644 --- a/Tests/RealtimeTests/RealtimeTests.swift +++ b/Tests/RealtimeTests/RealtimeTests.swift @@ -33,139 +33,114 @@ final class RealtimeTests: XCTestCase { ) } - func testBehavior_Closure() async { - let channel = await sut.channel("public:messages") - _ = await channel.onPostgresChange(InsertAction.self, table: "messages") { _ in } - _ = await channel.onPostgresChange(UpdateAction.self, table: "messages") { _ in } - _ = await channel.onPostgresChange(DeleteAction.self, table: "messages") { _ in } + func testBehavior() async throws { + try await withTimeout(interval: 2) { [self] in + let channel = await sut.channel("public:messages") + _ = await channel.postgresChange(InsertAction.self, table: "messages") + _ = await channel.postgresChange(UpdateAction.self, table: "messages") + _ = await channel.postgresChange(DeleteAction.self, table: "messages") - let statusChange = await sut.statusChange + let statusChange = await sut.statusChange - await connectSocketAndWait() + await connectSocketAndWait() - let status = await statusChange.prefix(3).collect() - XCTAssertEqual(status, [.disconnected, .connecting, .connected]) + let status = await statusChange.prefix(3).collect() + XCTAssertEqual(status, [.disconnected, .connecting, .connected]) - let messageTask = await sut.messageTask - XCTAssertNotNil(messageTask) + let messageTask = await sut.messageTask + XCTAssertNotNil(messageTask) - let heartbeatTask = await sut.heartbeatTask - XCTAssertNotNil(heartbeatTask) + let heartbeatTask = await sut.heartbeatTask + XCTAssertNotNil(heartbeatTask) - let subscription = Task { - await channel.subscribe() - } - await Task.megaYield() - ws.mockReceive(.messagesSubscribed) - - // Wait until channel subscribed - await subscription.value - - XCTAssertNoDifference(ws.sentMessages.value, [.subscribeToMessages]) - } - - func testBehavior_AsyncAwait() async { - let channel = await sut.channel("public:messages") - _ = await channel.postgresChange(InsertAction.self, table: "messages") - _ = await channel.postgresChange(UpdateAction.self, table: "messages") - _ = await channel.postgresChange(DeleteAction.self, table: "messages") - - let statusChange = await sut.statusChange - - await connectSocketAndWait() - - let status = await statusChange.prefix(3).collect() - XCTAssertEqual(status, [.disconnected, .connecting, .connected]) - - let messageTask = await sut.messageTask - XCTAssertNotNil(messageTask) + let subscription = Task { + await channel.subscribe() + } + await Task.megaYield() + ws.mockReceive(.messagesSubscribed) - let heartbeatTask = await sut.heartbeatTask - XCTAssertNotNil(heartbeatTask) + // Wait until channel subscribed + await subscription.value - let subscription = Task { - await channel.subscribe() + XCTAssertNoDifference(ws.sentMessages.value, [.subscribeToMessages]) } - await Task.megaYield() - ws.mockReceive(.messagesSubscribed) - - // Wait until channel subscribed - await subscription.value - - XCTAssertNoDifference(ws.sentMessages.value, [.subscribeToMessages]) } func testHeartbeat() async throws { - let expectation = expectation(description: "heartbeat") - expectation.expectedFulfillmentCount = 2 - - ws.on { message in - if message.event == "heartbeat" { - expectation.fulfill() - return RealtimeMessageV2( - joinRef: message.joinRef, - ref: message.ref, - topic: "phoenix", - event: "phx_reply", - payload: [ - "response": [:], - "status": "ok", - ] - ) - } + try await withTimeout(interval: 4) { [self] in + let expectation = expectation(description: "heartbeat") + expectation.expectedFulfillmentCount = 2 + + ws.on { message in + if message.event == "heartbeat" { + expectation.fulfill() + return RealtimeMessageV2( + joinRef: message.joinRef, + ref: message.ref, + topic: "phoenix", + event: "phx_reply", + payload: [ + "response": [:], + "status": "ok", + ] + ) + } - return nil - } + return nil + } - await connectSocketAndWait() + await connectSocketAndWait() - await fulfillment(of: [expectation], timeout: 3) + await fulfillment(of: [expectation], timeout: 3) + } } func testHeartbeat_whenNoResponse_shouldReconnect() async throws { - let sentHeartbeatExpectation = expectation(description: "sentHeartbeat") + try await withTimeout(interval: 6) { [self] in + let sentHeartbeatExpectation = expectation(description: "sentHeartbeat") - ws.on { - if $0.event == "heartbeat" { - sentHeartbeatExpectation.fulfill() - } + ws.on { + if $0.event == "heartbeat" { + sentHeartbeatExpectation.fulfill() + } - return nil - } + return nil + } - let statuses = LockIsolated<[RealtimeClientV2.Status]>([]) + let statuses = LockIsolated<[RealtimeClientV2.Status]>([]) - Task { - for await status in await sut.statusChange { - statuses.withValue { - $0.append(status) + Task { + for await status in await sut.statusChange { + statuses.withValue { + $0.append(status) + } } } + await Task.megaYield() + await connectSocketAndWait() + + await fulfillment(of: [sentHeartbeatExpectation], timeout: 2) + + let pendingHeartbeatRef = await sut.pendingHeartbeatRef + XCTAssertNotNil(pendingHeartbeatRef) + + // Wait until next heartbeat + try await Task.sleep(nanoseconds: NSEC_PER_SEC * 2) + + // Wait for reconnect delay + try await Task.sleep(nanoseconds: NSEC_PER_SEC * 1) + + XCTAssertEqual( + statuses.value, + [ + .disconnected, + .connecting, + .connected, + .disconnected, + .connecting, + ] + ) } - await Task.megaYield() - await connectSocketAndWait() - - await fulfillment(of: [sentHeartbeatExpectation], timeout: 2) - - let pendingHeartbeatRef = await sut.pendingHeartbeatRef - XCTAssertNotNil(pendingHeartbeatRef) - - // Wait until next heartbeat - try await Task.sleep(nanoseconds: NSEC_PER_SEC * 2) - - // Wait for reconnect delay - try await Task.sleep(nanoseconds: NSEC_PER_SEC * 1) - - XCTAssertEqual( - statuses.value, - [ - .disconnected, - .connecting, - .connected, - .disconnected, - .connecting, - ] - ) } private func connectSocketAndWait() async { diff --git a/Tests/_HelpersTests/WithTimeoutTests.swift b/Tests/_HelpersTests/WithTimeoutTests.swift new file mode 100644 index 000000000..aefa6fb88 --- /dev/null +++ b/Tests/_HelpersTests/WithTimeoutTests.swift @@ -0,0 +1,34 @@ +// +// WithTimeoutTests.swift +// +// +// Created by Guilherme Souza on 19/04/24. +// + +import _Helpers +import Foundation +import XCTest + +final class WithTimeoutTests: XCTestCase { + func testWithTimeout() async { + do { + try await withTimeout(interval: 0.25) { + try await Task.sleep(nanoseconds: NSEC_PER_SEC) + } + XCTFail("Task should timeout.") + } catch { + XCTAssertTrue(error is TimeoutError) + } + + do { + let answer = try await withTimeout(interval: 1.25) { + try await Task.sleep(nanoseconds: NSEC_PER_SEC) + return 42 + } + + XCTAssertEqual(answer, 42) + } catch { + XCTFail("Should not throw error: \(error)") + } + } +} From a346c576d23cae9dcaf885306f6e13464a4ca57e Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Fri, 19 Apr 2024 11:49:48 -0300 Subject: [PATCH 2/5] test: add test for channel subscribe timeout --- Sources/Realtime/V2/PushV2.swift | 3 +- Sources/Realtime/V2/RealtimeChannelV2.swift | 13 +- Tests/RealtimeTests/RealtimeTests.swift | 129 ++++++++++++++++---- 3 files changed, 121 insertions(+), 24 deletions(-) diff --git a/Sources/Realtime/V2/PushV2.swift b/Sources/Realtime/V2/PushV2.swift index 4df526af7..1b5717ae1 100644 --- a/Sources/Realtime/V2/PushV2.swift +++ b/Sources/Realtime/V2/PushV2.swift @@ -30,9 +30,10 @@ actor PushV2 { } } } catch is TimeoutError { + channel?.logger?.debug("Push timed out.") return .timeout } catch { - channel?.logger?.error("error sending Push: \(error)") + channel?.logger?.error("Error sending push: \(error)") return .error } } diff --git a/Sources/Realtime/V2/RealtimeChannelV2.swift b/Sources/Realtime/V2/RealtimeChannelV2.swift index 41538c07d..e83ddcf9f 100644 --- a/Sources/Realtime/V2/RealtimeChannelV2.swift +++ b/Sources/Realtime/V2/RealtimeChannelV2.swift @@ -108,7 +108,18 @@ public actor RealtimeChannelV2 { ) ) - _ = await statusChange.first { @Sendable in $0 == .subscribed } + do { + try await withTimeout(interval: socket?.config.timeoutInterval ?? 10) { [self] in + _ = await statusChange.first { @Sendable in $0 == .subscribed } + } + } catch { + if error is TimeoutError { + logger?.debug("subscribe timed out.") + await subscribe() + } else { + logger?.error("subscribe failed: \(error)") + } + } } public func unsubscribe() async { diff --git a/Tests/RealtimeTests/RealtimeTests.swift b/Tests/RealtimeTests/RealtimeTests.swift index a6b8c3c13..50f7fa07f 100644 --- a/Tests/RealtimeTests/RealtimeTests.swift +++ b/Tests/RealtimeTests/RealtimeTests.swift @@ -27,7 +27,9 @@ final class RealtimeTests: XCTestCase { options: RealtimeClientOptions( headers: ["apikey": apiKey], heartbeatInterval: 1, - reconnectDelay: 1 + reconnectDelay: 1, + timeoutInterval: 2, + logger: TestLogger() ), ws: ws ) @@ -62,10 +64,85 @@ final class RealtimeTests: XCTestCase { // Wait until channel subscribed await subscription.value - XCTAssertNoDifference(ws.sentMessages.value, [.subscribeToMessages]) + XCTAssertNoDifference(ws.sentMessages.value, [.subscribeToMessages(ref: "1", joinRef: "1")]) } } + func testSubscribeTimeout() async throws { + let channel = await sut.channel("public:messages") + let joinEventCount = LockIsolated(0) + + ws.on { message in + if message.event == "heartbeat" { + return RealtimeMessageV2( + joinRef: message.joinRef, + ref: message.ref, + topic: "phoenix", + event: "phx_reply", + payload: [ + "response": [:], + "status": "ok", + ] + ) + } + + if message.event == "phx_join" { + joinEventCount.withValue { $0 += 1 } + + // Skip first join. + if joinEventCount.value == 2 { + return .messagesSubscribed + } + } + + return nil + } + + await connectSocketAndWait() + + Task { + await channel.subscribe() + } + + await Task.megaYield() + + try? await Task.sleep(nanoseconds: NSEC_PER_SEC * 2) + + let joinSentMessages = ws.sentMessages.value.filter { $0.event == "phx_join" } + + let expectedMessages = try [ + RealtimeMessageV2( + joinRef: "1", + ref: "1", + topic: "realtime:public:messages", + event: "phx_join", + payload: JSONObject( + RealtimeJoinPayload( + config: RealtimeJoinConfig(), + accessToken: apiKey + ) + ) + ), + RealtimeMessageV2( + joinRef: "3", + ref: "3", + topic: "realtime:public:messages", + event: "phx_join", + payload: JSONObject( + RealtimeJoinPayload( + config: RealtimeJoinConfig(), + accessToken: apiKey + ) + ) + ), + ] + + XCTAssertNoDifference( + joinSentMessages, + expectedMessages + ) + } + func testHeartbeat() async throws { try await withTimeout(interval: 4) { [self] in let expectation = expectation(description: "heartbeat") @@ -155,27 +232,29 @@ final class RealtimeTests: XCTestCase { } extension RealtimeMessageV2 { - static let subscribeToMessages = Self( - joinRef: "1", - ref: "1", - topic: "realtime:public:messages", - event: "phx_join", - payload: [ - "access_token": "anon.api.key", - "config": [ - "broadcast": [ - "self": false, - "ack": false, - ], - "postgres_changes": [ - ["table": "messages", "event": "INSERT", "schema": "public"], - ["table": "messages", "schema": "public", "event": "UPDATE"], - ["schema": "public", "table": "messages", "event": "DELETE"], + static func subscribeToMessages(ref: String?, joinRef: String?) -> RealtimeMessageV2 { + Self( + joinRef: joinRef, + ref: ref, + topic: "realtime:public:messages", + event: "phx_join", + payload: [ + "access_token": "anon.api.key", + "config": [ + "broadcast": [ + "self": false, + "ack": false, + ], + "postgres_changes": [ + ["table": "messages", "event": "INSERT", "schema": "public"], + ["table": "messages", "schema": "public", "event": "UPDATE"], + ["schema": "public", "table": "messages", "event": "DELETE"], + ], + "presence": ["key": ""], ], - "presence": ["key": ""], - ], - ] - ) + ] + ) + } static let messagesSubscribed = Self( joinRef: nil, @@ -205,3 +284,9 @@ extension RealtimeMessageV2 { ] ) } + +struct TestLogger: SupabaseLogger { + func log(message: SupabaseLogMessage) { + print(message.description) + } +} From 05a79afb9d4b2f0844154ce98ed01a26002c3c9b Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Thu, 16 May 2024 05:45:08 -0300 Subject: [PATCH 3/5] Fix typo --- Sources/Realtime/V2/PushV2.swift | 2 +- Sources/Realtime/V2/RealtimeChannelV2.swift | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/Realtime/V2/PushV2.swift b/Sources/Realtime/V2/PushV2.swift index 1b5717ae1..fbbdfd9bc 100644 --- a/Sources/Realtime/V2/PushV2.swift +++ b/Sources/Realtime/V2/PushV2.swift @@ -24,7 +24,7 @@ actor PushV2 { if channel?.config.broadcast.acknowledgeBroadcasts == true { do { - return try await withTimeout(interval: channel?.socket?.config.timeoutInterval ?? 10) { + return try await withTimeout(interval: channel?.socket?.options.timeoutInterval ?? 10) { await withCheckedContinuation { self.receivedContinuation = $0 } diff --git a/Sources/Realtime/V2/RealtimeChannelV2.swift b/Sources/Realtime/V2/RealtimeChannelV2.swift index e83ddcf9f..f43622693 100644 --- a/Sources/Realtime/V2/RealtimeChannelV2.swift +++ b/Sources/Realtime/V2/RealtimeChannelV2.swift @@ -109,7 +109,7 @@ public actor RealtimeChannelV2 { ) do { - try await withTimeout(interval: socket?.config.timeoutInterval ?? 10) { [self] in + try await withTimeout(interval: socket?.options.timeoutInterval ?? 10) { [self] in _ = await statusChange.first { @Sendable in $0 == .subscribed } } } catch { From b6acced970dbb9540eaae0b737bbeab75fa5eb6f Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Tue, 28 May 2024 14:40:03 -0300 Subject: [PATCH 4/5] test: add test for subscribe with timeout --- Tests/RealtimeTests/RealtimeTests.swift | 51 +++++++++++++++++++ .../xcshareddata/swiftpm/Package.resolved | 9 ---- 2 files changed, 51 insertions(+), 9 deletions(-) diff --git a/Tests/RealtimeTests/RealtimeTests.swift b/Tests/RealtimeTests/RealtimeTests.swift index 50f7fa07f..829689143 100644 --- a/Tests/RealtimeTests/RealtimeTests.swift +++ b/Tests/RealtimeTests/RealtimeTests.swift @@ -35,6 +35,12 @@ final class RealtimeTests: XCTestCase { ) } + override func tearDown() async throws { + await sut.disconnect() + + try await super.tearDown() + } + func testBehavior() async throws { try await withTimeout(interval: 2) { [self] in let channel = await sut.channel("public:messages") @@ -220,6 +226,51 @@ final class RealtimeTests: XCTestCase { } } + func testChannelSubscribe_timeout() async throws { + let channel = await sut.channel("test") + await connectSocketAndWait() + + Task { + await channel.subscribe() + } + + await Task.megaYield() + + try? await Task.sleep(nanoseconds: NSEC_PER_SEC * 2) + + let joinMessages = ws.sentMessages.value.filter { $0.event == "phx_join" } + + XCTAssertNoDifference( + joinMessages, + [ + RealtimeMessageV2( + joinRef: "1", + ref: "1", + topic: "realtime:test", + event: "phx_join", + payload: try JSONObject( + RealtimeJoinPayload( + config: RealtimeJoinConfig(), + accessToken: apiKey + ) + ) + ), + RealtimeMessageV2( + joinRef: "3", + ref: "3", + topic: "realtime:test", + event: "phx_join", + payload: try JSONObject( + RealtimeJoinPayload( + config: RealtimeJoinConfig(), + accessToken: apiKey + ) + ) + ), + ] + ) + } + private func connectSocketAndWait() async { let connection = Task { await sut.connect() diff --git a/supabase-swift.xcworkspace/xcshareddata/swiftpm/Package.resolved b/supabase-swift.xcworkspace/xcshareddata/swiftpm/Package.resolved index 62754b644..5ac9b2e65 100644 --- a/supabase-swift.xcworkspace/xcshareddata/swiftpm/Package.resolved +++ b/supabase-swift.xcworkspace/xcshareddata/swiftpm/Package.resolved @@ -36,15 +36,6 @@ "version" : "4.1.1" } }, - { - "identity" : "keychainaccess", - "kind" : "remoteSourceControl", - "location" : "https://github.com/kishikawakatsumi/KeychainAccess", - "state" : { - "revision" : "84e546727d66f1adc5439debad16270d0fdd04e7", - "version" : "4.2.2" - } - }, { "identity" : "svgview", "kind" : "remoteSourceControl", From aba9ff19c4476eb98ae342322cb5967d6d31ef2e Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Tue, 28 May 2024 14:42:48 -0300 Subject: [PATCH 5/5] remove duplicated test --- Tests/RealtimeTests/RealtimeTests.swift | 45 ------------------------- 1 file changed, 45 deletions(-) diff --git a/Tests/RealtimeTests/RealtimeTests.swift b/Tests/RealtimeTests/RealtimeTests.swift index 829689143..f4a1e3c55 100644 --- a/Tests/RealtimeTests/RealtimeTests.swift +++ b/Tests/RealtimeTests/RealtimeTests.swift @@ -226,51 +226,6 @@ final class RealtimeTests: XCTestCase { } } - func testChannelSubscribe_timeout() async throws { - let channel = await sut.channel("test") - await connectSocketAndWait() - - Task { - await channel.subscribe() - } - - await Task.megaYield() - - try? await Task.sleep(nanoseconds: NSEC_PER_SEC * 2) - - let joinMessages = ws.sentMessages.value.filter { $0.event == "phx_join" } - - XCTAssertNoDifference( - joinMessages, - [ - RealtimeMessageV2( - joinRef: "1", - ref: "1", - topic: "realtime:test", - event: "phx_join", - payload: try JSONObject( - RealtimeJoinPayload( - config: RealtimeJoinConfig(), - accessToken: apiKey - ) - ) - ), - RealtimeMessageV2( - joinRef: "3", - ref: "3", - topic: "realtime:test", - event: "phx_join", - payload: try JSONObject( - RealtimeJoinPayload( - config: RealtimeJoinConfig(), - accessToken: apiKey - ) - ) - ), - ] - ) - } - private func connectSocketAndWait() async { let connection = Task { await sut.connect()