Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions Sources/Auth/AuthClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ public final class AuthClient: Sendable {
/// - Parameters:
/// - email: User's email address.
/// - password: Password for the user.
/// - data: User's metadata.
/// - data: Custom data object to store additional user metadata.
/// - redirectTo: The redirect URL embedded in the email link, defaults to ``Configuration/redirectToURL`` if not provided.
/// - captchaToken: Optional captcha token for securing this endpoint.
@discardableResult
public func signUp(
email: String,
Expand Down Expand Up @@ -145,7 +147,8 @@ public final class AuthClient: Sendable {
/// - Parameters:
/// - phone: User's phone number with international prefix.
/// - password: Password for the user.
/// - data: User's metadata.
/// - data: Custom data object to store additional user metadata.
/// - captchaToken: Optional captcha token for securing this endpoint.
@discardableResult
public func signUp(
phone: String,
Expand Down Expand Up @@ -184,6 +187,10 @@ public final class AuthClient: Sendable {
}

/// Log in an existing user with an email and password.
/// - Parameters:
/// - email: User's email address.
/// - password: User's password.
/// - captchaToken: Optional captcha token for securing this endpoint.
@discardableResult
public func signIn(
email: String,
Expand All @@ -207,6 +214,10 @@ public final class AuthClient: Sendable {
}

/// Log in an existing user with a phone and password.
/// - Parameters:
/// - email: User's phone number.
/// - password: User's password.
/// - captchaToken: Optional captcha token for securing this endpoint.
@discardableResult
public func signIn(
phone: String,
Expand Down Expand Up @@ -333,8 +344,7 @@ public final class AuthClient: Sendable {
/// - data: User's metadata.
/// - captchaToken: Captcha verification token.
///
/// - Note: You need to configure a WhatsApp sender on Twillo if you are using phone sign in with
/// the `whatsapp` channel.
/// - Note: You need to configure a WhatsApp sender on Twillo if you are using phone sign in with the `whatsapp` channel.
public func signInWithOTP(
phone: String,
channel: MessagingChannel = .sms,
Expand Down Expand Up @@ -362,8 +372,7 @@ public final class AuthClient: Sendable {
/// Attempts a single-sign on using an enterprise Identity Provider.
/// - Parameters:
/// - domain: The email domain to use for signing in.
/// - redirectTo: The URL to redirect the user to after they sign in with the third-party
/// provider.
/// - redirectTo: The URL to redirect the user to after they sign in with the third-party provider.
/// - captchaToken: The captcha token to be used for captcha verification.
/// - Returns: A URL that you can use to initiate the provider's authentication flow.
public func signInWithSSO(
Expand Down
4 changes: 2 additions & 2 deletions Sources/Realtime/V2/PushV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ actor PushV2 {
}

func send() async -> PushStatus {
await channel?.socket?.push(message)
await channel?.socket.push(message)

if channel?.config.broadcast.acknowledgeBroadcasts == true {
do {
return try await withTimeout(interval: channel?.socket?.options.timeoutInterval ?? 10) {
return try await withTimeout(interval: channel?.socket.options().timeoutInterval ?? 10) {
await withCheckedContinuation {
self.receivedContinuation = $0
}
Expand Down
102 changes: 67 additions & 35 deletions Sources/Realtime/V2/RealtimeChannelV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,34 @@ public struct RealtimeChannelConfig: Sendable {
public var presence: PresenceJoinConfig
}

public actor RealtimeChannelV2 {
struct Socket: Sendable {
var status: @Sendable () -> RealtimeClientV2.Status
var options: @Sendable () -> RealtimeClientOptions
var accessToken: @Sendable () -> String?
var makeRef: @Sendable () -> Int

var connect: @Sendable () async -> Void
var addChannel: @Sendable (_ channel: RealtimeChannelV2) -> Void
var removeChannel: @Sendable (_ channel: RealtimeChannelV2) async -> Void
var push: @Sendable (_ message: RealtimeMessageV2) async -> Void
}

extension Socket {
init(client: RealtimeClientV2) {
self.init(
status: { [weak client] in client?.status ?? .disconnected },
options: { [weak client] in client?.options ?? .init() },
accessToken: { [weak client] in client?.mutableState.accessToken },
makeRef: { [weak client] in client?.makeRef() ?? 0 },
connect: { [weak client] in await client?.connect() },
addChannel: { [weak client] in client?.addChannel($0) },
removeChannel: { [weak client] in await client?.removeChannel($0) },
push: { [weak client] in await client?.push($0) }
)
}
}

public final class RealtimeChannelV2: Sendable {
public typealias Subscription = ObservationToken

public enum Status: Sendable {
Expand All @@ -24,24 +51,22 @@ public actor RealtimeChannelV2 {
case unsubscribing
}

weak var socket: RealtimeClientV2? {
didSet {
assert(oldValue == nil, "socket should not be modified once set")
}
struct MutableState {
var clientChanges: [PostgresJoinConfig] = []
var joinRef: String?
var pushes: [String: PushV2] = [:]
}

private let mutableState = LockIsolated(MutableState())

let topic: String
let config: RealtimeChannelConfig
let logger: (any SupabaseLogger)?
let socket: Socket

private let callbackManager = CallbackManager()

private let statusEventEmitter = EventEmitter<Status>(initialEvent: .unsubscribed)

private var clientChanges: [PostgresJoinConfig] = []
private var joinRef: String?
private var pushes: [String: PushV2] = [:]

public private(set) var status: Status {
get { statusEventEmitter.lastEvent }
set { statusEventEmitter.emit(newValue) }
Expand All @@ -54,13 +79,13 @@ public actor RealtimeChannelV2 {
init(
topic: String,
config: RealtimeChannelConfig,
socket: RealtimeClientV2,
socket: Socket,
logger: (any SupabaseLogger)?
) {
self.socket = socket
self.topic = topic
self.config = config
self.logger = logger
self.socket = socket
}

deinit {
Expand All @@ -69,32 +94,33 @@ public actor RealtimeChannelV2 {

/// Subscribes to the channel
public func subscribe() async {
if await socket?.status != .connected {
if socket?.options.connectOnSubscribe != true {
if socket.status() != .connected {
if socket.options().connectOnSubscribe != true {
fatalError(
"You can't subscribe to a channel while the realtime client is not connected. Did you forget to call `realtime.connect()`?"
)
}
await socket?.connect()
await socket.connect()
}

await socket?.addChannel(self)
socket.addChannel(self)

status = .subscribing
logger?.debug("subscribing to channel \(topic)")

let joinConfig = RealtimeJoinConfig(
broadcast: config.broadcast,
presence: config.presence,
postgresChanges: clientChanges
postgresChanges: mutableState.clientChanges
)

let payload = await RealtimeJoinPayload(
let payload = RealtimeJoinPayload(
config: joinConfig,
accessToken: socket?.accessToken
accessToken: socket.accessToken()
)

joinRef = await socket?.makeRef().description
let joinRef = socket.makeRef().description
mutableState.withValue { $0.joinRef = joinRef }

logger?.debug("subscribing to channel with body: \(joinConfig)")

Expand All @@ -109,7 +135,7 @@ public actor RealtimeChannelV2 {
)

do {
try await withTimeout(interval: socket?.options.timeoutInterval ?? 10) { [self] in
try await withTimeout(interval: socket.options().timeoutInterval) { [self] in
_ = await statusChange.first { @Sendable in $0 == .subscribed }
}
} catch {
Expand All @@ -128,8 +154,8 @@ public actor RealtimeChannelV2 {

await push(
RealtimeMessageV2(
joinRef: joinRef,
ref: socket?.makeRef().description,
joinRef: mutableState.joinRef,
ref: socket.makeRef().description,
topic: topic,
event: ChannelEvent.leave,
payload: [:]
Expand All @@ -141,8 +167,8 @@ public actor RealtimeChannelV2 {
logger?.debug("Updating auth token for channel \(topic)")
await push(
RealtimeMessageV2(
joinRef: joinRef,
ref: socket?.makeRef().description,
joinRef: mutableState.joinRef,
ref: socket.makeRef().description,
topic: topic,
event: ChannelEvent.accessToken,
payload: ["access_token": .string(jwt)]
Expand All @@ -162,8 +188,8 @@ public actor RealtimeChannelV2 {

await push(
RealtimeMessageV2(
joinRef: joinRef,
ref: socket?.makeRef().description,
joinRef: mutableState.joinRef,
ref: socket.makeRef().description,
topic: topic,
event: ChannelEvent.broadcast,
payload: [
Expand All @@ -187,8 +213,8 @@ public actor RealtimeChannelV2 {

await push(
RealtimeMessageV2(
joinRef: joinRef,
ref: socket?.makeRef().description,
joinRef: mutableState.joinRef,
ref: socket.makeRef().description,
topic: topic,
event: ChannelEvent.presence,
payload: [
Expand All @@ -203,8 +229,8 @@ public actor RealtimeChannelV2 {
public func untrack() async {
await push(
RealtimeMessageV2(
joinRef: joinRef,
ref: socket?.makeRef().description,
joinRef: mutableState.joinRef,
ref: socket.makeRef().description,
topic: topic,
event: ChannelEvent.presence,
payload: [
Expand Down Expand Up @@ -329,7 +355,7 @@ public actor RealtimeChannelV2 {
Task { [weak self] in
guard let self else { return }

await socket?.removeChannel(self)
await socket.removeChannel(self)
logger?.debug("Unsubscribed from channel \(message.topic)")
}

Expand Down Expand Up @@ -439,7 +465,9 @@ public actor RealtimeChannelV2 {
filter: filter
)

clientChanges.append(config)
mutableState.withValue {
$0.clientChanges.append(config)
}

let id = callbackManager.addPostgresCallback(filter: config, callback: callback)
return Subscription { [weak callbackManager, logger] in
Expand All @@ -464,14 +492,18 @@ public actor RealtimeChannelV2 {
private func push(_ message: RealtimeMessageV2) async -> PushStatus {
let push = PushV2(channel: self, message: message)
if let ref = message.ref {
pushes[ref] = push
mutableState.withValue {
$0.pushes[ref] = push
}
}
return await push.send()
}

private func didReceiveReply(ref: String, status: String) {
Task {
let push = pushes.removeValue(forKey: ref)
let push = mutableState.withValue {
$0.pushes.removeValue(forKey: ref)
}
await push?.didReceive(status: PushStatus(rawValue: status) ?? .ok)
}
}
Expand Down
Loading