Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions Sources/Realtime/V2/PushV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,18 @@ actor PushV2 {
await channel?.socket?.push(message)

if channel?.config.broadcast.acknowledgeBroadcasts == true {
return await withCheckedContinuation {
receivedContinuation = $0
do {
return try await withTimeout(interval: channel?.socket?.options.timeoutInterval ?? 10) {
await withCheckedContinuation {
self.receivedContinuation = $0
}
}
} catch is TimeoutError {
channel?.logger?.debug("Push timed out.")
return .timeout
} catch {
channel?.logger?.error("Error sending push: \(error)")
return .error
}
}

Expand Down
13 changes: 12 additions & 1 deletion Sources/Realtime/V2/RealtimeChannelV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,18 @@ public actor RealtimeChannelV2 {
)
)

_ = await statusChange.first { @Sendable in $0 == .subscribed }
do {
try await withTimeout(interval: socket?.options.timeoutInterval ?? 10) { [self] in
_ = await statusChange.first { @Sendable in $0 == .subscribed }
}
} catch {
if error is TimeoutError {
logger?.debug("subscribe timed out.")
await subscribe()
} else {
logger?.error("subscribe failed: \(error)")
}
}
}

public func unsubscribe() async {
Expand Down
39 changes: 39 additions & 0 deletions Sources/_Helpers/Task+withTimeout.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//
// Task+withTimeout.swift
//
//
// Created by Guilherme Souza on 19/04/24.
//

import Foundation

@discardableResult
package func withTimeout<R: Sendable>(
interval: TimeInterval,
@_inheritActorContext operation: @escaping @Sendable () async throws -> R
) async throws -> R {
try await withThrowingTaskGroup(of: R.self) { group in
defer {
group.cancelAll()
}

let deadline = Date(timeIntervalSinceNow: interval)

group.addTask {
try await operation()
}

group.addTask {
let interval = deadline.timeIntervalSinceNow
if interval > 0 {
try await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(interval))
}
try Task.checkCancellation()
throw TimeoutError()
}

return try await group.next()!
}
}

package struct TimeoutError: Error, Hashable {}
Loading