Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Apple realtime with multiple subscriptions #830

Merged
merged 8 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion templates/apple/Package.swift.twig
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ let package = Package(
),
],
dependencies: [
.package(url: "https://github.com/swift-server/async-http-client.git", from: "1.9.0"),
.package(url: "https://github.com/swift-server/async-http-client.git", from: "1.17.0"),
.package(url: "https://github.com/apple/swift-nio.git", from: "2.32.0"),
],
targets: [
Expand Down
10 changes: 7 additions & 3 deletions templates/swift/Sources/Models/RealtimeModels.swift.twig
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import Foundation

public class RealtimeSubscription {
public var close: () -> Void
private var close: () async throws -> Void

init(close: @escaping () -> Void) {
init(close: @escaping () async throws-> Void) {
self.close = close
}

public func close() async throws {
try await self.close()
}
}

public class RealtimeCallback {
Expand All @@ -14,7 +18,7 @@ public class RealtimeCallback {

init(
for channels: Set<String>,
and callback: @escaping (RealtimeResponseEvent) -> Void
with callback: @escaping (RealtimeResponseEvent) -> Void
) {
self.channels = channels
self.callback = callback
Expand Down
100 changes: 55 additions & 45 deletions templates/swift/Sources/Services/Realtime.swift.twig
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,23 @@ open class Realtime : Service {

private let TYPE_ERROR = "error"
private let TYPE_EVENT = "event"
private let DEBOUNCE_MILLIS = 1
private let DEBOUNCE_NANOS = 1_000_000

private var socketClient: WebSocketClient? = nil
private var activeChannels = Set<String>()
private var activeSubscriptions = [Int: RealtimeCallback]()

let connectSync = DispatchQueue(label: "ConnectSync")
let callbackSync = DispatchQueue(label: "CallbackSync")

private var subCallDepth = 0
private var reconnectAttempts = 0
private var subscriptionsCounter = 0
private var reconnect = true

private func createSocket() {
private func createSocket() async throws {
guard activeChannels.count > 0 else {
reconnect = false
closeSocket()
try await closeSocket()
return
}

Expand All @@ -38,17 +37,31 @@ open class Realtime : Service {

if (socketClient != nil) {
reconnect = false
closeSocket()
} else {
socketClient = WebSocketClient(url, tlsEnabled: !client.selfSigned, delegate: self)!
try await closeSocket()
}

try! socketClient?.connect()
socketClient = WebSocketClient(
url,
tlsEnabled: !client.selfSigned,
delegate: self
)

try await socketClient?.connect()
abnegate marked this conversation as resolved.
Show resolved Hide resolved
}

private func closeSocket() {
socketClient?.close()
//socket?.close(RealtimeCode.POLICY_VIOLATION.value, null)
private func closeSocket() async throws {
guard let client = socketClient,
let group = client.threadGroup else {
return
}

if (client.isConnected) {
let promise = group.any().makePromise(of: Void.self)
client.close(promise: promise)
try await promise.futureResult.get()
}

try await group.shutdownGracefully()
}

private func getTimeout() -> Int {
Expand All @@ -63,8 +76,8 @@ open class Realtime : Service {
public func subscribe(
channel: String,
callback: @escaping (RealtimeResponseEvent) -> Void
) -> RealtimeSubscription {
return subscribe(
) async throws -> RealtimeSubscription {
return try await subscribe(
channels: [channel],
payloadType: String.self,
callback: callback
Expand All @@ -74,8 +87,8 @@ open class Realtime : Service {
public func subscribe(
channels: Set<String>,
callback: @escaping (RealtimeResponseEvent) -> Void
) -> RealtimeSubscription {
return subscribe(
) async throws -> RealtimeSubscription {
return try await subscribe(
channels: channels,
payloadType: String.self,
callback: callback
Expand All @@ -86,8 +99,8 @@ open class Realtime : Service {
channel: String,
payloadType: T.Type,
callback: @escaping (RealtimeResponseEvent) -> Void
) -> RealtimeSubscription {
return subscribe(
) async throws -> RealtimeSubscription {
return try await subscribe(
channels: [channel],
payloadType: T.self,
callback: callback
Expand All @@ -98,36 +111,38 @@ open class Realtime : Service {
channels: Set<String>,
payloadType: T.Type,
callback: @escaping (RealtimeResponseEvent) -> Void
) -> RealtimeSubscription {
) async throws -> RealtimeSubscription {
subscriptionsCounter += 1
let counter = subscriptionsCounter

let count = subscriptionsCounter

channels.forEach {
activeChannels.insert($0)
}

activeSubscriptions[counter] = RealtimeCallback(
activeSubscriptions[count] = RealtimeCallback(
for: Set(channels),
and: callback
with: callback
)

connectSync.sync {
subCallDepth+=1
}

DispatchQueue.main.asyncAfter(deadline: .now() + .milliseconds(DEBOUNCE_MILLIS)) {
if (self.subCallDepth == 1) {
self.createSocket()
}
self.connectSync.sync {
self.subCallDepth-=1
}
try await Task.sleep(nanoseconds: UInt64(DEBOUNCE_NANOS))

if self.subCallDepth == 1 {
try await self.createSocket()
}

connectSync.sync {
self.subCallDepth -= 1
}

return RealtimeSubscription {
self.activeSubscriptions[counter] = nil
self.activeSubscriptions[count] = nil
abnegate marked this conversation as resolved.
Show resolved Hide resolved
self.cleanUp(channels: channels)
abnegate marked this conversation as resolved.
Show resolved Hide resolved
self.createSocket()
try await self.createSocket()
}
}

Expand Down Expand Up @@ -163,7 +178,7 @@ extension Realtime: WebSocketClientDelegate {
}
}

public func onClose(channel: Channel, data: Data) {
public func onClose(channel: Channel, data: Data) async throws {
if (!reconnect) {
reconnect = true
return
Expand All @@ -173,10 +188,11 @@ extension Realtime: WebSocketClientDelegate {

print("Realtime disconnected. Re-connecting in \(timeout / 1000) seconds.")

DispatchQueue.main.asyncAfter(deadline: .now() + .milliseconds(timeout)) {
self.reconnectAttempts += 1
self.createSocket()
}
try await Task.sleep(nanoseconds: UInt64(timeout * 1_000_000))

self.reconnectAttempts += 1

try await self.createSocket()
}

public func onError(error: Swift.Error?, status: HTTPResponseStatus?) {
Expand All @@ -188,16 +204,10 @@ extension Realtime: WebSocketClientDelegate {
}

func handleResponseEvent(from json: [String: Any]) {
guard let data = json["data"] as? [String: Any] else {
return
}
guard let channels = data["channels"] as? Array<String> else {
return
}
guard let events = data["events"] as? Array<String> else {
return
}
guard let payload = data["payload"] as? [String: Any] else {
guard let data = json["data"] as? [String: Any],
let channels = data["channels"] as? [String],
let events = data["events"] as? [String],
let payload = data["payload"] as? [String: Any] else {
return
}
guard channels.contains(where: { channel in
Expand Down
Loading