Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mqtt5 Init From Pointer #262

Merged
merged 17 commits into from
May 7, 2024
176 changes: 89 additions & 87 deletions Source/AwsCommonRuntimeKit/mqtt/Mqtt5Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -278,111 +278,111 @@ public class Mqtt5Client {
// MARK: - Internal/Private

/// Handles lifecycle events from native Mqtt Client
internal func MqttClientLifeycyleEvents(_ lifecycleEvent: UnsafePointer<aws_mqtt5_client_lifecycle_event>?) {
internal func MqttClientHandleLifecycleEvent(_ lifecycleEvent: UnsafePointer<aws_mqtt5_client_lifecycle_event>?) {

guard let lifecycleEvent: UnsafePointer<aws_mqtt5_client_lifecycle_event> = lifecycleEvent else {
fatalError("MqttClientLifecycleEvents was called from native without an aws_mqtt5_client_lifecycle_event.")
}

let callbackCore = Unmanaged<MqttCallbackCore>.fromOpaque(lifecycleEvent.pointee.user_data).takeUnretainedValue()
let crtError = CRTError(code: lifecycleEvent.pointee.error_code)

if let userData = lifecycleEvent.pointee.user_data {
let callbackCore: MqttCallbackCore = Unmanaged<MqttCallbackCore>.fromOpaque(userData).takeUnretainedValue()
// validate the callback flag, if flag is false, return
callbackCore.rwlock.read {
if callbackCore.callbackFlag == false { return }

// validate the callback flag, if flag is false, return
callbackCore.rwlock.read {
if callbackCore.callbackFlag == false { return }
switch lifecycleEvent.pointee.event_type {
case AWS_MQTT5_CLET_ATTEMPTING_CONNECT:

switch lifecycleEvent.pointee.event_type {
case AWS_MQTT5_CLET_ATTEMPTING_CONNECT:
let lifecycleAttemptingConnectData = LifecycleAttemptingConnectData()
callbackCore.onLifecycleEventAttemptingConnect(lifecycleAttemptingConnectData)

let lifecycleAttemptingConnectData = LifecycleAttemptingConnectData()
callbackCore.onLifecycleEventAttemptingConnect(lifecycleAttemptingConnectData)
case AWS_MQTT5_CLET_CONNECTION_SUCCESS:

case AWS_MQTT5_CLET_CONNECTION_SUCCESS:
guard let connackView = lifecycleEvent.pointee.connack_data else {
fatalError("ConnackPacket missing in a Connection Success lifecycle event.")
}
let connackPacket = ConnackPacket(connackView)

guard let connackPacket = ConnackPacket.convertFromNative(lifecycleEvent.pointee.connack_data) else {
fatalError("ConnackPacket missing in a Connection Success lifecycle event.")
}
guard let negotiatedSettings = lifecycleEvent.pointee.settings else {
fatalError("NegotiatedSettings missing in a Connection Success lifecycle event.")
}

guard let negotiatedSettings = NegotiatedSettings.convertFromNative(lifecycleEvent.pointee.settings) else {
fatalError("NegotiatedSettings missing in a Connection Success lifecycle event.")
}
let lifecycleConnectionSuccessData = LifecycleConnectionSuccessData(
connackPacket: connackPacket,
negotiatedSettings: NegotiatedSettings(negotiatedSettings))
callbackCore.onLifecycleEventConnectionSuccess(lifecycleConnectionSuccessData)

let lifecycleConnectionSuccessData = LifecycleConnectionSuccessData(
connackPacket: connackPacket,
negotiatedSettings: negotiatedSettings)
callbackCore.onLifecycleEventConnectionSuccess(lifecycleConnectionSuccessData)
case AWS_MQTT5_CLET_CONNECTION_FAILURE:

case AWS_MQTT5_CLET_CONNECTION_FAILURE:
var connackPacket: ConnackPacket?
if let connackView = lifecycleEvent.pointee.connack_data {
connackPacket = ConnackPacket(connackView)
}

let connackPacket = ConnackPacket.convertFromNative(lifecycleEvent.pointee.connack_data)
let lifecycleConnectionFailureData = LifecycleConnectionFailureData(
crtError: crtError,
connackPacket: connackPacket)
callbackCore.onLifecycleEventConnectionFailure(lifecycleConnectionFailureData)

let lifecycleConnectionFailureData = LifecycleConnectionFailureData(
crtError: crtError,
connackPacket: connackPacket)
callbackCore.onLifecycleEventConnectionFailure(lifecycleConnectionFailureData)
case AWS_MQTT5_CLET_DISCONNECTION:

case AWS_MQTT5_CLET_DISCONNECTION:
var disconnectPacket: DisconnectPacket?

guard let disconnectPacket = DisconnectPacket.convertFromNative(lifecycleEvent.pointee.disconnect_data) else {
let lifecycleDisconnectData = LifecycleDisconnectData(crtError: crtError)
callbackCore.onLifecycleEventDisconnection(lifecycleDisconnectData)
return
}
if let disconnectView: UnsafePointer<aws_mqtt5_packet_disconnect_view> = lifecycleEvent.pointee.disconnect_data {
disconnectPacket = DisconnectPacket(disconnectView)
}

let lifecycleDisconnectData = LifecycleDisconnectData(
crtError: crtError,
disconnectPacket: disconnectPacket)
callbackCore.onLifecycleEventDisconnection(lifecycleDisconnectData)
let lifecycleDisconnectData = LifecycleDisconnectData(
crtError: crtError,
disconnectPacket: disconnectPacket)
callbackCore.onLifecycleEventDisconnection(lifecycleDisconnectData)

case AWS_MQTT5_CLET_STOPPED:
case AWS_MQTT5_CLET_STOPPED:

callbackCore.onLifecycleEventStoppedCallback(LifecycleStoppedData())
callbackCore.onLifecycleEventStoppedCallback(LifecycleStoppedData())

default:
fatalError("A lifecycle event with an invalid event type was encountered.")
}
default:
fatalError("A lifecycle event with an invalid event type was encountered.")
}

}
}

internal func MqttClientPublishRecievedEvents(
_ publishPacketView: UnsafePointer<aws_mqtt5_packet_publish_view>?,
_ userData: UnsafeMutableRawPointer?) {
let callbackCore = Unmanaged<MqttCallbackCore>.fromOpaque(userData!).takeUnretainedValue()
internal func MqttClientHandlePublishRecieved(
_ publish: UnsafePointer<aws_mqtt5_packet_publish_view>?,
_ user_data: UnsafeMutableRawPointer?) {
let callbackCore = Unmanaged<MqttCallbackCore>.fromOpaque(user_data!).takeUnretainedValue()

// validate the callback flag, if flag is false, return
callbackCore.rwlock.read {
if callbackCore.callbackFlag == false { return }

guard let publish_packet = PublishPacket.convertFromNative(publishPacketView) else {
fatalError("NegotiatedSettings missing in a Connection Success lifecycle event.")
if let publish {
let publishPacket = PublishPacket(publish)
let publishReceivedData = PublishReceivedData(publishPacket: publishPacket)
callbackCore.onPublishReceivedCallback(publishReceivedData)
} else {
fatalError("MqttClientHandlePublishRecieved called with null publish")
}
let puback = PublishReceivedData(publishPacket: publish_packet)
callbackCore.onPublishReceivedCallback(puback)
}
}

internal func MqttClientWebsocketTransform(
_ rawHttpMessage: OpaquePointer?,
_ userData: UnsafeMutableRawPointer?,
_ completeFn: (@convention(c) (OpaquePointer?, Int32, UnsafeMutableRawPointer?) -> Void)?,
_ completeCtx: UnsafeMutableRawPointer?) {
_ request: OpaquePointer?,
_ user_data: UnsafeMutableRawPointer?,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Camelcase is expected variable naming standard for swift

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed all calls that are coming from native to have arguments that reflect the ones expected in the native function signatures for clarity when working with callbacks. If we change it to camelcase here we should probably do it across all callback functions. Open to going either way as long as we're consistent.

_ complete_fn: (@convention(c) (OpaquePointer?, Int32, UnsafeMutableRawPointer?) -> Void)?,
_ complete_ctx: UnsafeMutableRawPointer?) {

let callbackCore = Unmanaged<MqttCallbackCore>.fromOpaque(userData!).takeUnretainedValue()
let callbackCore = Unmanaged<MqttCallbackCore>.fromOpaque(user_data!).takeUnretainedValue()

// validate the callback flag, if flag is false, return
callbackCore.rwlock.read {
if callbackCore.callbackFlag == false { return }

guard let rawHttpMessage else {
guard let request else {
fatalError("Null HttpRequeset in websocket transform function.")
}
let httpRequest = HTTPRequest(nativeHttpMessage: rawHttpMessage)
let httpRequest = HTTPRequest(nativeHttpMessage: request)
@Sendable func signerTransform(request: HTTPRequestBase, errorCode: Int32) {
completeFn?(request.rawValue, errorCode, completeCtx)
complete_fn?(request.rawValue, errorCode, complete_ctx)
}

if callbackCore.onWebsocketInterceptor != nil {
Expand All @@ -399,65 +399,67 @@ internal func MqttClientTerminationCallback(_ userData: UnsafeMutableRawPointer?
}

/// The completion callback to invoke when subscribe operation completes in native
private func subscribeCompletionCallback(subackPacket: UnsafePointer<aws_mqtt5_packet_suback_view>?,
errorCode: Int32,
userData: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<SubackPacket>>.fromOpaque(userData!).takeRetainedValue()
private func subscribeCompletionCallback(suback: UnsafePointer<aws_mqtt5_packet_suback_view>?,
error_code: Int32,
complete_ctx: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<SubackPacket>>.fromOpaque(complete_ctx!).takeRetainedValue()

guard errorCode == AWS_OP_SUCCESS else {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: errorCode)))
guard error_code == AWS_OP_SUCCESS else {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: error_code)))
}

guard let suback = SubackPacket.convertFromNative(subackPacket) else {
if let suback {
continuationCore.continuation.resume(returning: SubackPacket(suback))
} else {
fatalError("Suback missing in the subscription completion callback.")
}

continuationCore.continuation.resume(returning: suback)
}

/// The completion callback to invoke when publish operation completes in native
private func publishCompletionCallback(packet_type: aws_mqtt5_packet_type,
navtivePublishResult: UnsafeRawPointer?,
errorCode: Int32,
userData: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<PublishResult>>.fromOpaque(userData!).takeRetainedValue()
packet: UnsafeRawPointer?,
error_code: Int32,
complete_ctx: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<PublishResult>>.fromOpaque(complete_ctx!).takeRetainedValue()

if errorCode != AWS_OP_SUCCESS {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: errorCode)))
if error_code != AWS_OP_SUCCESS {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: error_code)))
}

switch packet_type {
case AWS_MQTT5_PT_NONE: // QoS0
return continuationCore.continuation.resume(returning: PublishResult())

case AWS_MQTT5_PT_PUBACK: // QoS1
guard let puback = navtivePublishResult?.assumingMemoryBound(
guard let puback = packet?.assumingMemoryBound(
to: aws_mqtt5_packet_puback_view.self) else {
return continuationCore.continuation.resume(
throwing: CommonRunTimeError.crtError(CRTError.makeFromLastError()))
}
let publishResult = PublishResult(puback: PubackPacket.convertFromNative(puback))
let publishResult = PublishResult(puback: PubackPacket(puback))
return continuationCore.continuation.resume(returning: publishResult)

default:
return continuationCore.continuation.resume(
throwing: CommonRunTimeError.crtError(CRTError(code: AWS_ERROR_UNKNOWN.rawValue)))
}
}

/// The completion callback to invoke when unsubscribe operation completes in native
private func unsubscribeCompletionCallback(unsubackPacket: UnsafePointer<aws_mqtt5_packet_unsuback_view>?,
errorCode: Int32,
userData: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<UnsubackPacket>>.fromOpaque(userData!).takeRetainedValue()
private func unsubscribeCompletionCallback(unsuback: UnsafePointer<aws_mqtt5_packet_unsuback_view>?,
error_code: Int32,
complete_ctx: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<UnsubackPacket>>.fromOpaque(complete_ctx!).takeRetainedValue()

guard errorCode == AWS_OP_SUCCESS else {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: errorCode)))
guard error_code == AWS_OP_SUCCESS else {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: error_code)))
}

guard let unsuback = UnsubackPacket.convertFromNative(unsubackPacket) else {
if let unsuback {
continuationCore.continuation.resume(returning: UnsubackPacket(unsuback))
} else {
fatalError("Unsuback missing in the Unsubscribe completion callback.")
}

continuationCore.continuation.resume(returning: unsuback)
}

/// When the native client calls swift callbacks they are processed through the MqttCallbackCore
Expand Down
Loading
Loading