Skip to content

Commit

Permalink
renaming to match native callback signatures. PubackPacket and Suback…
Browse files Browse the repository at this point in the history
…Packet inits()
  • Loading branch information
sbSteveK committed May 3, 2024
1 parent 5a95dd5 commit 03961d4
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 69 deletions.
78 changes: 40 additions & 38 deletions Source/AwsCommonRuntimeKit/mqtt/Mqtt5Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -349,41 +349,41 @@ internal func MqttClientHandleLifecycleEvent(_ lifecycleEvent: UnsafePointer<aws
}

internal func MqttClientHandlePublishRecieved(
_ publishPacketView: UnsafePointer<aws_mqtt5_packet_publish_view>?,
_ userData: UnsafeMutableRawPointer?) {
let callbackCore = Unmanaged<MqttCallbackCore>.fromOpaque(userData!).takeUnretainedValue()
_ publish: UnsafePointer<aws_mqtt5_packet_publish_view>?,
_ user_data: UnsafeMutableRawPointer?) {
let callbackCore = Unmanaged<MqttCallbackCore>.fromOpaque(user_data!).takeUnretainedValue()

// validate the callback flag, if flag is false, return
callbackCore.rwlock.read {
if callbackCore.callbackFlag == false { return }
if let publishPacketView {
let publishPacket = PublishPacket(publishPacketView)
if let publish {
let publishPacket = PublishPacket(publish)
let publishReceivedData = PublishReceivedData(publishPacket: publishPacket)
callbackCore.onPublishReceivedCallback(publishReceivedData)
} else {
fatalError("MqttClientHandlePublishRecieved called with null publishPacketView")
fatalError("MqttClientHandlePublishRecieved called with null publish")
}
}
}

internal func MqttClientWebsocketTransform(
_ rawHttpMessage: OpaquePointer?,
_ userData: UnsafeMutableRawPointer?,
_ completeFn: (@convention(c) (OpaquePointer?, Int32, UnsafeMutableRawPointer?) -> Void)?,
_ completeCtx: UnsafeMutableRawPointer?) {
_ request: OpaquePointer?,
_ user_data: UnsafeMutableRawPointer?,
_ complete_fn: (@convention(c) (OpaquePointer?, Int32, UnsafeMutableRawPointer?) -> Void)?,
_ complete_ctx: UnsafeMutableRawPointer?) {

let callbackCore = Unmanaged<MqttCallbackCore>.fromOpaque(userData!).takeUnretainedValue()
let callbackCore = Unmanaged<MqttCallbackCore>.fromOpaque(user_data!).takeUnretainedValue()

// validate the callback flag, if flag is false, return
callbackCore.rwlock.read {
if callbackCore.callbackFlag == false { return }

guard let rawHttpMessage else {
guard let request else {
fatalError("Null HttpRequeset in websocket transform function.")
}
let httpRequest = HTTPRequest(nativeHttpMessage: rawHttpMessage)
let httpRequest = HTTPRequest(nativeHttpMessage: request)
@Sendable func signerTransform(request: HTTPRequestBase, errorCode: Int32) {
completeFn?(request.rawValue, errorCode, completeCtx)
complete_fn?(request.rawValue, errorCode, complete_ctx)
}

if callbackCore.onWebsocketInterceptor != nil {
Expand All @@ -400,61 +400,63 @@ internal func MqttClientTerminationCallback(_ userData: UnsafeMutableRawPointer?
}

/// The completion callback to invoke when subscribe operation completes in native
private func subscribeCompletionCallback(subackPacket: UnsafePointer<aws_mqtt5_packet_suback_view>?,
errorCode: Int32,
userData: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<SubackPacket>>.fromOpaque(userData!).takeRetainedValue()
private func subscribeCompletionCallback(suback: UnsafePointer<aws_mqtt5_packet_suback_view>?,
error_code: Int32,
complete_ctx: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<SubackPacket>>.fromOpaque(complete_ctx!).takeRetainedValue()

guard errorCode == AWS_OP_SUCCESS else {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: errorCode)))
guard error_code == AWS_OP_SUCCESS else {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: error_code)))
}

guard let suback = SubackPacket.convertFromNative(subackPacket) else {
if let suback {
continuationCore.continuation.resume(returning: SubackPacket(suback))
} else {
fatalError("Suback missing in the subscription completion callback.")
}

continuationCore.continuation.resume(returning: suback)
}

/// The completion callback to invoke when publish operation completes in native
private func publishCompletionCallback(packet_type: aws_mqtt5_packet_type,
navtivePublishResult: UnsafeRawPointer?,
errorCode: Int32,
userData: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<PublishResult>>.fromOpaque(userData!).takeRetainedValue()
packet: UnsafeRawPointer?,
error_code: Int32,
complete_ctx: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<PublishResult>>.fromOpaque(complete_ctx!).takeRetainedValue()

if errorCode != AWS_OP_SUCCESS {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: errorCode)))
if error_code != AWS_OP_SUCCESS {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: error_code)))
}

switch packet_type {
case AWS_MQTT5_PT_NONE: // QoS0
return continuationCore.continuation.resume(returning: PublishResult())

case AWS_MQTT5_PT_PUBACK: // QoS1
guard let puback = navtivePublishResult?.assumingMemoryBound(
guard let puback = packet?.assumingMemoryBound(
to: aws_mqtt5_packet_puback_view.self) else {
return continuationCore.continuation.resume(
throwing: CommonRunTimeError.crtError(CRTError.makeFromLastError()))
}
let publishResult = PublishResult(puback: PubackPacket.convertFromNative(puback))
let publishResult = PublishResult(puback: PubackPacket(puback))
return continuationCore.continuation.resume(returning: publishResult)

default:
return continuationCore.continuation.resume(
throwing: CommonRunTimeError.crtError(CRTError(code: AWS_ERROR_UNKNOWN.rawValue)))
}
}

/// The completion callback to invoke when unsubscribe operation completes in native
private func unsubscribeCompletionCallback(unsubackPacket: UnsafePointer<aws_mqtt5_packet_unsuback_view>?,
errorCode: Int32,
userData: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<UnsubackPacket>>.fromOpaque(userData!).takeRetainedValue()
private func unsubscribeCompletionCallback(unsuback: UnsafePointer<aws_mqtt5_packet_unsuback_view>?,
error_code: Int32,
complete_ctx: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<UnsubackPacket>>.fromOpaque(complete_ctx!).takeRetainedValue()

guard errorCode == AWS_OP_SUCCESS else {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: errorCode)))
guard error_code == AWS_OP_SUCCESS else {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: error_code)))
}

guard let unsuback = UnsubackPacket.convertFromNative(unsubackPacket) else {
guard let unsuback = UnsubackPacket.convertFromNative(unsuback) else {
fatalError("Unsuback missing in the Unsubscribe completion callback.")
}

Expand Down
81 changes: 50 additions & 31 deletions Source/AwsCommonRuntimeKit/mqtt/Mqtt5Packets.swift
Original file line number Diff line number Diff line change
Expand Up @@ -305,22 +305,19 @@ public class PubackPacket {
self.userProperties = userProperties
}

static func convertFromNative(_ from: UnsafePointer<aws_mqtt5_packet_puback_view>?) -> PubackPacket? {
guard let from = from else {
return nil
internal convenience init(_ from: UnsafePointer<aws_mqtt5_packet_puback_view>) {
let pubackView = from.pointee
guard let reasonCode = PubackReasonCode(rawValue: Int(pubackView.reason_code.rawValue))
else {
fatalError("Puback from native has an invalid reason code.")
}
let pubackPointer = from.pointee

guard let reasonCode = PubackReasonCode(rawValue: Int(pubackPointer.reason_code.rawValue))
else {fatalError("SubackPacket from native has an invalid reason code.")}

let reasonString = pubackPointer.reason_string?.pointee.toString()

let reasonString = pubackView.reason_string?.pointee.toString()
let userProperties = convertOptionalUserProperties(
count: pubackPointer.user_property_count,
userPropertiesPointer: pubackPointer.user_properties)

return PubackPacket(reasonCode: reasonCode, reasonString: reasonString, userProperties: userProperties)
count: pubackView.user_property_count,
userPropertiesPointer: pubackView.user_properties)
self.init(reasonCode: reasonCode,
reasonString: reasonString,
userProperties: userProperties)
}
}

Expand Down Expand Up @@ -503,35 +500,57 @@ public class SubackPacket {
self.userProperties = userProperties
}

public static func convertFromNative(_ from: UnsafePointer<aws_mqtt5_packet_suback_view>?) -> SubackPacket? {

guard let from else {
return nil
}

let subackPointer = from.pointee

internal convenience init(_ from: UnsafePointer<aws_mqtt5_packet_suback_view>) {
let subackView = from.pointee
var subackReasonCodes: [SubackReasonCode] = []
for i in 0..<subackPointer.reason_code_count {
let reasonCodePointer = subackPointer.reason_codes.advanced(by: Int(i)).pointee
for i in 0..<subackView.reason_code_count {
let reasonCodePointer = subackView.reason_codes.advanced(by: Int(i)).pointee
guard let reasonCode = SubackReasonCode(rawValue: Int(reasonCodePointer.rawValue)) else {
fatalError("SubackPacket from native has an invalid reason code.")
}
subackReasonCodes.append(reasonCode)
}

let reasonString = subackPointer.reason_string?.pointee.toString()
let reasonString = subackView.reason_string?.pointee.toString()

let userProperties = convertOptionalUserProperties(
count: subackPointer.user_property_count,
userPropertiesPointer: subackPointer.user_properties)
count: subackView.user_property_count,
userPropertiesPointer: subackView.user_properties)

let suback = SubackPacket(reasonCodes: subackReasonCodes,
reasonString: reasonString,
userProperties: userProperties)
return suback
self.init(reasonCodes: subackReasonCodes,
reasonString: reasonString,
userProperties: userProperties)
}

// public static func convertFromNative(_ from: UnsafePointer<aws_mqtt5_packet_suback_view>?) -> SubackPacket? {

// guard let from else {
// return nil
// }

// let subackPointer = from.pointee

// var subackReasonCodes: [SubackReasonCode] = []
// for i in 0..<subackPointer.reason_code_count {
// let reasonCodePointer = subackPointer.reason_codes.advanced(by: Int(i)).pointee
// guard let reasonCode = SubackReasonCode(rawValue: Int(reasonCodePointer.rawValue)) else {
// fatalError("SubackPacket from native has an invalid reason code.")
// }
// subackReasonCodes.append(reasonCode)
// }

// let reasonString = subackPointer.reason_string?.pointee.toString()

// let userProperties = convertOptionalUserProperties(
// count: subackPointer.user_property_count,
// userPropertiesPointer: subackPointer.user_properties)

// let suback = SubackPacket(reasonCodes: subackReasonCodes,
// reasonString: reasonString,
// userProperties: userProperties)
// return suback
// }

}

/// Data model of an `MQTT5 UNSUBSCRIBE <https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc384800445>`_ packet.
Expand Down

0 comments on commit 03961d4

Please sign in to comment.