Skip to content

Commit

Permalink
Mqtt5 Init From Pointer (#262)
Browse files Browse the repository at this point in the history
  • Loading branch information
sbSteveK authored May 7, 2024
1 parent 48cd103 commit 605cc26
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 345 deletions.
176 changes: 89 additions & 87 deletions Source/AwsCommonRuntimeKit/mqtt/Mqtt5Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -278,111 +278,111 @@ public class Mqtt5Client {
// MARK: - Internal/Private

/// Handles lifecycle events from native Mqtt Client
internal func MqttClientLifeycyleEvents(_ lifecycleEvent: UnsafePointer<aws_mqtt5_client_lifecycle_event>?) {
internal func MqttClientHandleLifecycleEvent(_ lifecycleEvent: UnsafePointer<aws_mqtt5_client_lifecycle_event>?) {

guard let lifecycleEvent: UnsafePointer<aws_mqtt5_client_lifecycle_event> = lifecycleEvent else {
fatalError("MqttClientLifecycleEvents was called from native without an aws_mqtt5_client_lifecycle_event.")
}

let callbackCore = Unmanaged<MqttCallbackCore>.fromOpaque(lifecycleEvent.pointee.user_data).takeUnretainedValue()
let crtError = CRTError(code: lifecycleEvent.pointee.error_code)

if let userData = lifecycleEvent.pointee.user_data {
let callbackCore: MqttCallbackCore = Unmanaged<MqttCallbackCore>.fromOpaque(userData).takeUnretainedValue()
// validate the callback flag, if flag is false, return
callbackCore.rwlock.read {
if callbackCore.callbackFlag == false { return }

// validate the callback flag, if flag is false, return
callbackCore.rwlock.read {
if callbackCore.callbackFlag == false { return }
switch lifecycleEvent.pointee.event_type {
case AWS_MQTT5_CLET_ATTEMPTING_CONNECT:

switch lifecycleEvent.pointee.event_type {
case AWS_MQTT5_CLET_ATTEMPTING_CONNECT:
let lifecycleAttemptingConnectData = LifecycleAttemptingConnectData()
callbackCore.onLifecycleEventAttemptingConnect(lifecycleAttemptingConnectData)

let lifecycleAttemptingConnectData = LifecycleAttemptingConnectData()
callbackCore.onLifecycleEventAttemptingConnect(lifecycleAttemptingConnectData)
case AWS_MQTT5_CLET_CONNECTION_SUCCESS:

case AWS_MQTT5_CLET_CONNECTION_SUCCESS:
guard let connackView = lifecycleEvent.pointee.connack_data else {
fatalError("ConnackPacket missing in a Connection Success lifecycle event.")
}
let connackPacket = ConnackPacket(connackView)

guard let connackPacket = ConnackPacket.convertFromNative(lifecycleEvent.pointee.connack_data) else {
fatalError("ConnackPacket missing in a Connection Success lifecycle event.")
}
guard let negotiatedSettings = lifecycleEvent.pointee.settings else {
fatalError("NegotiatedSettings missing in a Connection Success lifecycle event.")
}

guard let negotiatedSettings = NegotiatedSettings.convertFromNative(lifecycleEvent.pointee.settings) else {
fatalError("NegotiatedSettings missing in a Connection Success lifecycle event.")
}
let lifecycleConnectionSuccessData = LifecycleConnectionSuccessData(
connackPacket: connackPacket,
negotiatedSettings: NegotiatedSettings(negotiatedSettings))
callbackCore.onLifecycleEventConnectionSuccess(lifecycleConnectionSuccessData)

let lifecycleConnectionSuccessData = LifecycleConnectionSuccessData(
connackPacket: connackPacket,
negotiatedSettings: negotiatedSettings)
callbackCore.onLifecycleEventConnectionSuccess(lifecycleConnectionSuccessData)
case AWS_MQTT5_CLET_CONNECTION_FAILURE:

case AWS_MQTT5_CLET_CONNECTION_FAILURE:
var connackPacket: ConnackPacket?
if let connackView = lifecycleEvent.pointee.connack_data {
connackPacket = ConnackPacket(connackView)
}

let connackPacket = ConnackPacket.convertFromNative(lifecycleEvent.pointee.connack_data)
let lifecycleConnectionFailureData = LifecycleConnectionFailureData(
crtError: crtError,
connackPacket: connackPacket)
callbackCore.onLifecycleEventConnectionFailure(lifecycleConnectionFailureData)

let lifecycleConnectionFailureData = LifecycleConnectionFailureData(
crtError: crtError,
connackPacket: connackPacket)
callbackCore.onLifecycleEventConnectionFailure(lifecycleConnectionFailureData)
case AWS_MQTT5_CLET_DISCONNECTION:

case AWS_MQTT5_CLET_DISCONNECTION:
var disconnectPacket: DisconnectPacket?

guard let disconnectPacket = DisconnectPacket.convertFromNative(lifecycleEvent.pointee.disconnect_data) else {
let lifecycleDisconnectData = LifecycleDisconnectData(crtError: crtError)
callbackCore.onLifecycleEventDisconnection(lifecycleDisconnectData)
return
}
if let disconnectView: UnsafePointer<aws_mqtt5_packet_disconnect_view> = lifecycleEvent.pointee.disconnect_data {
disconnectPacket = DisconnectPacket(disconnectView)
}

let lifecycleDisconnectData = LifecycleDisconnectData(
crtError: crtError,
disconnectPacket: disconnectPacket)
callbackCore.onLifecycleEventDisconnection(lifecycleDisconnectData)
let lifecycleDisconnectData = LifecycleDisconnectData(
crtError: crtError,
disconnectPacket: disconnectPacket)
callbackCore.onLifecycleEventDisconnection(lifecycleDisconnectData)

case AWS_MQTT5_CLET_STOPPED:
case AWS_MQTT5_CLET_STOPPED:

callbackCore.onLifecycleEventStoppedCallback(LifecycleStoppedData())
callbackCore.onLifecycleEventStoppedCallback(LifecycleStoppedData())

default:
fatalError("A lifecycle event with an invalid event type was encountered.")
}
default:
fatalError("A lifecycle event with an invalid event type was encountered.")
}

}
}

internal func MqttClientPublishRecievedEvents(
_ publishPacketView: UnsafePointer<aws_mqtt5_packet_publish_view>?,
_ userData: UnsafeMutableRawPointer?) {
let callbackCore = Unmanaged<MqttCallbackCore>.fromOpaque(userData!).takeUnretainedValue()
internal func MqttClientHandlePublishRecieved(
_ publish: UnsafePointer<aws_mqtt5_packet_publish_view>?,
_ user_data: UnsafeMutableRawPointer?) {
let callbackCore = Unmanaged<MqttCallbackCore>.fromOpaque(user_data!).takeUnretainedValue()

// validate the callback flag, if flag is false, return
callbackCore.rwlock.read {
if callbackCore.callbackFlag == false { return }

guard let publish_packet = PublishPacket.convertFromNative(publishPacketView) else {
fatalError("NegotiatedSettings missing in a Connection Success lifecycle event.")
if let publish {
let publishPacket = PublishPacket(publish)
let publishReceivedData = PublishReceivedData(publishPacket: publishPacket)
callbackCore.onPublishReceivedCallback(publishReceivedData)
} else {
fatalError("MqttClientHandlePublishRecieved called with null publish")
}
let puback = PublishReceivedData(publishPacket: publish_packet)
callbackCore.onPublishReceivedCallback(puback)
}
}

internal func MqttClientWebsocketTransform(
_ rawHttpMessage: OpaquePointer?,
_ userData: UnsafeMutableRawPointer?,
_ completeFn: (@convention(c) (OpaquePointer?, Int32, UnsafeMutableRawPointer?) -> Void)?,
_ completeCtx: UnsafeMutableRawPointer?) {
_ request: OpaquePointer?,
_ user_data: UnsafeMutableRawPointer?,
_ complete_fn: (@convention(c) (OpaquePointer?, Int32, UnsafeMutableRawPointer?) -> Void)?,
_ complete_ctx: UnsafeMutableRawPointer?) {

let callbackCore = Unmanaged<MqttCallbackCore>.fromOpaque(userData!).takeUnretainedValue()
let callbackCore = Unmanaged<MqttCallbackCore>.fromOpaque(user_data!).takeUnretainedValue()

// validate the callback flag, if flag is false, return
callbackCore.rwlock.read {
if callbackCore.callbackFlag == false { return }

guard let rawHttpMessage else {
guard let request else {
fatalError("Null HttpRequeset in websocket transform function.")
}
let httpRequest = HTTPRequest(nativeHttpMessage: rawHttpMessage)
let httpRequest = HTTPRequest(nativeHttpMessage: request)
@Sendable func signerTransform(request: HTTPRequestBase, errorCode: Int32) {
completeFn?(request.rawValue, errorCode, completeCtx)
complete_fn?(request.rawValue, errorCode, complete_ctx)
}

if callbackCore.onWebsocketInterceptor != nil {
Expand All @@ -399,65 +399,67 @@ internal func MqttClientTerminationCallback(_ userData: UnsafeMutableRawPointer?
}

/// The completion callback to invoke when subscribe operation completes in native
private func subscribeCompletionCallback(subackPacket: UnsafePointer<aws_mqtt5_packet_suback_view>?,
errorCode: Int32,
userData: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<SubackPacket>>.fromOpaque(userData!).takeRetainedValue()
private func subscribeCompletionCallback(suback: UnsafePointer<aws_mqtt5_packet_suback_view>?,
error_code: Int32,
complete_ctx: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<SubackPacket>>.fromOpaque(complete_ctx!).takeRetainedValue()

guard errorCode == AWS_OP_SUCCESS else {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: errorCode)))
guard error_code == AWS_OP_SUCCESS else {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: error_code)))
}

guard let suback = SubackPacket.convertFromNative(subackPacket) else {
if let suback {
continuationCore.continuation.resume(returning: SubackPacket(suback))
} else {
fatalError("Suback missing in the subscription completion callback.")
}

continuationCore.continuation.resume(returning: suback)
}

/// The completion callback to invoke when publish operation completes in native
private func publishCompletionCallback(packet_type: aws_mqtt5_packet_type,
navtivePublishResult: UnsafeRawPointer?,
errorCode: Int32,
userData: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<PublishResult>>.fromOpaque(userData!).takeRetainedValue()
packet: UnsafeRawPointer?,
error_code: Int32,
complete_ctx: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<PublishResult>>.fromOpaque(complete_ctx!).takeRetainedValue()

if errorCode != AWS_OP_SUCCESS {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: errorCode)))
if error_code != AWS_OP_SUCCESS {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: error_code)))
}

switch packet_type {
case AWS_MQTT5_PT_NONE: // QoS0
return continuationCore.continuation.resume(returning: PublishResult())

case AWS_MQTT5_PT_PUBACK: // QoS1
guard let puback = navtivePublishResult?.assumingMemoryBound(
guard let puback = packet?.assumingMemoryBound(
to: aws_mqtt5_packet_puback_view.self) else {
return continuationCore.continuation.resume(
throwing: CommonRunTimeError.crtError(CRTError.makeFromLastError()))
}
let publishResult = PublishResult(puback: PubackPacket.convertFromNative(puback))
let publishResult = PublishResult(puback: PubackPacket(puback))
return continuationCore.continuation.resume(returning: publishResult)

default:
return continuationCore.continuation.resume(
throwing: CommonRunTimeError.crtError(CRTError(code: AWS_ERROR_UNKNOWN.rawValue)))
}
}

/// The completion callback to invoke when unsubscribe operation completes in native
private func unsubscribeCompletionCallback(unsubackPacket: UnsafePointer<aws_mqtt5_packet_unsuback_view>?,
errorCode: Int32,
userData: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<UnsubackPacket>>.fromOpaque(userData!).takeRetainedValue()
private func unsubscribeCompletionCallback(unsuback: UnsafePointer<aws_mqtt5_packet_unsuback_view>?,
error_code: Int32,
complete_ctx: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<UnsubackPacket>>.fromOpaque(complete_ctx!).takeRetainedValue()

guard errorCode == AWS_OP_SUCCESS else {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: errorCode)))
guard error_code == AWS_OP_SUCCESS else {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: error_code)))
}

guard let unsuback = UnsubackPacket.convertFromNative(unsubackPacket) else {
if let unsuback {
continuationCore.continuation.resume(returning: UnsubackPacket(unsuback))
} else {
fatalError("Unsuback missing in the Unsubscribe completion callback.")
}

continuationCore.continuation.resume(returning: unsuback)
}

/// When the native client calls swift callbacks they are processed through the MqttCallbackCore
Expand Down
Loading

0 comments on commit 605cc26

Please sign in to comment.