Skip to content

Commit

Permalink
publish packet
Browse files Browse the repository at this point in the history
  • Loading branch information
sbSteveK committed May 3, 2024
1 parent 48cd103 commit 5a95dd5
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 53 deletions.
15 changes: 8 additions & 7 deletions Source/AwsCommonRuntimeKit/mqtt/Mqtt5Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ public class Mqtt5Client {
// MARK: - Internal/Private

/// Handles lifecycle events from native Mqtt Client
internal func MqttClientLifeycyleEvents(_ lifecycleEvent: UnsafePointer<aws_mqtt5_client_lifecycle_event>?) {
internal func MqttClientHandleLifecycleEvent(_ lifecycleEvent: UnsafePointer<aws_mqtt5_client_lifecycle_event>?) {

guard let lifecycleEvent: UnsafePointer<aws_mqtt5_client_lifecycle_event> = lifecycleEvent else {
fatalError("MqttClientLifecycleEvents was called from native without an aws_mqtt5_client_lifecycle_event.")
Expand Down Expand Up @@ -348,20 +348,21 @@ internal func MqttClientLifeycyleEvents(_ lifecycleEvent: UnsafePointer<aws_mqtt
}
}

internal func MqttClientPublishRecievedEvents(
internal func MqttClientHandlePublishRecieved(
_ publishPacketView: UnsafePointer<aws_mqtt5_packet_publish_view>?,
_ userData: UnsafeMutableRawPointer?) {
let callbackCore = Unmanaged<MqttCallbackCore>.fromOpaque(userData!).takeUnretainedValue()

// validate the callback flag, if flag is false, return
callbackCore.rwlock.read {
if callbackCore.callbackFlag == false { return }

guard let publish_packet = PublishPacket.convertFromNative(publishPacketView) else {
fatalError("NegotiatedSettings missing in a Connection Success lifecycle event.")
if let publishPacketView {
let publishPacket = PublishPacket(publishPacketView)
let publishReceivedData = PublishReceivedData(publishPacket: publishPacket)
callbackCore.onPublishReceivedCallback(publishReceivedData)
} else {
fatalError("MqttClientHandlePublishRecieved called with null publishPacketView")
}
let puback = PublishReceivedData(publishPacket: publish_packet)
callbackCore.onPublishReceivedCallback(puback)
}
}

Expand Down
4 changes: 2 additions & 2 deletions Source/AwsCommonRuntimeKit/mqtt/Mqtt5Options.swift
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,9 @@ public class MqttClientOptions: CStructWithUserData {
raw_options.websocket_handshake_transform_user_data = _userData
}

raw_options.lifecycle_event_handler = MqttClientLifeycyleEvents
raw_options.lifecycle_event_handler = MqttClientHandleLifecycleEvent
raw_options.lifecycle_event_handler_user_data = _userData
raw_options.publish_received_handler = MqttClientPublishRecievedEvents
raw_options.publish_received_handler = MqttClientHandlePublishRecieved
raw_options.publish_received_handler_user_data = _userData
raw_options.client_termination_handler = MqttClientTerminationCallback
raw_options.client_termination_handler_user_data = _userData
Expand Down
77 changes: 33 additions & 44 deletions Source/AwsCommonRuntimeKit/mqtt/Mqtt5Packets.swift
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,39 @@ public class PublishPacket: CStruct {
self.userProperties = userProperties
}

internal convenience init(_ from: UnsafePointer<aws_mqtt5_packet_publish_view>) {
let publishView = from.pointee
let qos = QoS(publishView.qos)
let payload = Data(bytes: publishView.payload.ptr, count: publishView.payload.len)
let payloadFormatIndicator: PayloadFormatIndicator? = publishView.payload_format != nil ?
PayloadFormatIndicator(publishView.payload_format.pointee) : nil
let messageExpiryInterval = convertOptionalUInt32(publishView.message_expiry_interval_seconds)
let messageExpiryIntervalTimeInterval: TimeInterval? = messageExpiryInterval.map { TimeInterval($0) }
let correlationDataPointer: Data? = publishView.correlation_data != nil ?
Data(bytes: publishView.correlation_data!.pointee.ptr, count: publishView.correlation_data!.pointee.len) : nil
var identifier: [UInt32]? = []
for i in 0..<publishView.subscription_identifier_count {
let subscription_identifier: UInt32 = UInt32(publishView.subscription_identifiers.advanced(by: Int(i)).pointee)
identifier?.append(subscription_identifier)
}
let userProperties = convertOptionalUserProperties(
count: publishView.user_property_count,
userPropertiesPointer: publishView.user_properties)

self.init(qos: qos,
topic: publishView.topic.toString(),
payload: payload,
retain: publishView.retain,
payloadFormatIndicator: payloadFormatIndicator,
messageExpiryInterval: messageExpiryIntervalTimeInterval,
topicAlias: convertOptionalUInt16(publishView.topic_alias),
responseTopic: convertAwsByteCursorToOptionalString(publishView.response_topic),
correlationData: correlationDataPointer,
subscriptionIdentifiers: identifier,
contentType: convertAwsByteCursorToOptionalString(publishView.content_type),
userProperties: userProperties)
}

/// Get payload converted to a utf8 String
public func payloadAsString() -> String? {
if let data = payload {
Expand Down Expand Up @@ -239,50 +272,6 @@ public class PublishPacket: CStruct {
}
}
}

static func convertFromNative(_ from: UnsafePointer<aws_mqtt5_packet_publish_view>?) -> PublishPacket? {
guard let from else {
return nil
}
let publishView = from.pointee

let qos = QoS(publishView.qos)
let payload = Data(bytes: publishView.payload.ptr, count: publishView.payload.len)

let payloadFormatIndicator: PayloadFormatIndicator? = publishView.payload_format != nil ?
PayloadFormatIndicator(publishView.payload_format.pointee) : nil

let messageExpiryInterval = convertOptionalUInt32(publishView.message_expiry_interval_seconds)
let messageExpiryIntervalTimeInterval: TimeInterval? = messageExpiryInterval.map { TimeInterval($0) }

let correlationDataPointer: Data? = publishView.correlation_data != nil ?
Data(bytes: publishView.correlation_data!.pointee.ptr, count: publishView.correlation_data!.pointee.len) : nil

var identifier: [UInt32]? = []
for i in 0..<publishView.subscription_identifier_count {
let subscription_identifier: UInt32 = UInt32(publishView.subscription_identifiers.advanced(by: Int(i)).pointee)
identifier?.append(subscription_identifier)
}

let userProperties = convertOptionalUserProperties(
count: publishView.user_property_count,
userPropertiesPointer: publishView.user_properties)

let publishPacket = PublishPacket(qos: qos,
topic: publishView.topic.toString(),
payload: payload,
retain: publishView.retain,
payloadFormatIndicator: payloadFormatIndicator,
messageExpiryInterval: messageExpiryIntervalTimeInterval,
topicAlias: convertOptionalUInt16(publishView.topic_alias),
responseTopic: convertAwsByteCursorToOptionalString(publishView.response_topic),
correlationData: correlationDataPointer,
subscriptionIdentifiers: identifier,
contentType: convertAwsByteCursorToOptionalString(publishView.content_type),
userProperties: userProperties)

return publishPacket
}
}

/// Publish result returned by Publish operation.
Expand Down

0 comments on commit 5a95dd5

Please sign in to comment.