Skip to content

Commit 2d4855f

Browse files
committed
feat(realtime): add heartbeat callback
1 parent 5c06db6 commit 2d4855f

File tree

1 file changed

+57
-29
lines changed

1 file changed

+57
-29
lines changed

Sources/Realtime/RealtimeClientV2.swift

Lines changed: 57 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public final class RealtimeClientV2: Sendable {
3232
var messageTask: Task<Void, Never>?
3333

3434
var connectionTask: Task<Void, Never>?
35-
var channels: [RealtimeChannelV2] = []
35+
var channels: [String: RealtimeChannelV2] = [:]
3636
var sendBuffer: [@Sendable () -> Void] = []
3737

3838
var conn: (any WebSocket)?
@@ -51,13 +51,11 @@ public final class RealtimeClientV2: Sendable {
5151

5252
/// All managed channels indexed by their topics.
5353
public var channels: [String: RealtimeChannelV2] {
54-
mutableState.channels.reduce(
55-
into: [:],
56-
{ $0[$1.topic] = $1 }
57-
)
54+
mutableState.channels
5855
}
5956

6057
private let statusSubject = AsyncValueSubject<RealtimeClientStatus>(.disconnected)
58+
private let heartbeatSubject = AsyncValueSubject<RealtimeMessageV2?>(nil)
6159

6260
/// Listen for connection status changes.
6361
///
@@ -72,6 +70,13 @@ public final class RealtimeClientV2: Sendable {
7270
set { statusSubject.yield(newValue) }
7371
}
7472

73+
/// Listen for heartbeat checks.
74+
///
75+
/// You can also use ``onHeartbeat(_:)`` for a closure based method.
76+
public var heartbeat: AsyncStream<RealtimeMessageV2> {
77+
heartbeatSubject.values.compactMap { $0 }.eraseToStream()
78+
}
79+
7580
/// Listen for connection status changes.
7681
/// - Parameter listener: Closure that will be called when connection status changes.
7782
/// - Returns: An observation handle that can be used to stop listening.
@@ -84,6 +89,21 @@ public final class RealtimeClientV2: Sendable {
8489
return RealtimeSubscription { task.cancel() }
8590
}
8691

92+
/// Listen for heatbeat checks.
93+
/// - Parameter listener: Closure that will be called when a heartbeat is received.
94+
/// - Returns: An observation handle that can be used to stop listening.
95+
///
96+
/// - Nite: Use ``heartbeat`` if you prefer to use Async/Await.
97+
public func onHeartbeat(
98+
_ listener: @escaping @Sendable (RealtimeMessageV2) -> Void
99+
) -> RealtimeSubscription {
100+
let task = heartbeatSubject.onChange { message in
101+
guard let message else { return }
102+
listener(message)
103+
}
104+
return RealtimeSubscription { task.cancel() }
105+
}
106+
87107
public convenience init(url: URL, options: RealtimeClientOptions) {
88108
var interceptors: [any HTTPClientInterceptor] = []
89109

@@ -139,7 +159,7 @@ public final class RealtimeClientV2: Sendable {
139159
mutableState.withValue {
140160
$0.heartbeatTask?.cancel()
141161
$0.messageTask?.cancel()
142-
$0.channels = []
162+
$0.channels = [:]
143163
}
144164
}
145165

@@ -246,25 +266,31 @@ public final class RealtimeClientV2: Sendable {
246266
_ topic: String,
247267
options: @Sendable (inout RealtimeChannelConfig) -> Void = { _ in }
248268
) -> RealtimeChannelV2 {
249-
var config = RealtimeChannelConfig(
250-
broadcast: BroadcastJoinConfig(acknowledgeBroadcasts: false, receiveOwnBroadcasts: false),
251-
presence: PresenceJoinConfig(key: ""),
252-
isPrivate: false
253-
)
254-
options(&config)
269+
mutableState.withValue {
270+
let realtimeTopic = "realtime:\(topic)"
255271

256-
let channel = RealtimeChannelV2(
257-
topic: "realtime:\(topic)",
258-
config: config,
259-
socket: self,
260-
logger: self.options.logger
261-
)
272+
if let channel = $0.channels[realtimeTopic] {
273+
return channel
274+
}
262275

263-
mutableState.withValue {
264-
$0.channels.append(channel)
265-
}
276+
var config = RealtimeChannelConfig(
277+
broadcast: BroadcastJoinConfig(acknowledgeBroadcasts: false, receiveOwnBroadcasts: false),
278+
presence: PresenceJoinConfig(key: ""),
279+
isPrivate: false
280+
)
281+
options(&config)
282+
283+
let channel = RealtimeChannelV2(
284+
topic: realtimeTopic,
285+
config: config,
286+
socket: self,
287+
logger: self.options.logger
288+
)
289+
290+
$0.channels[realtimeTopic] = channel
266291

267-
return channel
292+
return channel
293+
}
268294
}
269295

270296
@available(
@@ -274,7 +300,7 @@ public final class RealtimeClientV2: Sendable {
274300
)
275301
public func addChannel(_ channel: RealtimeChannelV2) {
276302
mutableState.withValue {
277-
$0.channels.append(channel)
303+
$0.channels[channel.topic] = channel
278304
}
279305
}
280306

@@ -294,9 +320,7 @@ public final class RealtimeClientV2: Sendable {
294320

295321
func _remove(_ channel: RealtimeChannelV2) {
296322
mutableState.withValue {
297-
$0.channels.removeAll {
298-
$0.joinRef == channel.joinRef
299-
}
323+
$0.channels[channel.topic] = nil
300324
}
301325
}
302326

@@ -453,7 +477,11 @@ public final class RealtimeClientV2: Sendable {
453477
}
454478

455479
private func onMessage(_ message: RealtimeMessageV2) async {
456-
let channels = mutableState.withValue {
480+
if message.topic == "phoenix", message.event == "phx_reply" {
481+
heartbeatSubject.yield(message)
482+
}
483+
484+
let channel = mutableState.withValue {
457485
if let ref = message.ref, ref == $0.pendingHeartbeatRef {
458486
$0.pendingHeartbeatRef = nil
459487
options.logger?.debug("heartbeat received")
@@ -462,10 +490,10 @@ public final class RealtimeClientV2: Sendable {
462490
.debug("Received event \(message.event) for channel \(message.topic)")
463491
}
464492

465-
return $0.channels.filter { $0.topic == message.topic }
493+
return $0.channels[message.topic]
466494
}
467495

468-
for channel in channels {
496+
if let channel {
469497
await channel.onMessage(message)
470498
}
471499
}

0 commit comments

Comments
 (0)