Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mqtt5 Init From Pointer #262

Merged
merged 17 commits into from
May 7, 2024
114 changes: 61 additions & 53 deletions Source/AwsCommonRuntimeKit/mqtt/Mqtt5Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -278,14 +278,16 @@ public class Mqtt5Client {
// MARK: - Internal/Private

/// Handles lifecycle events from native Mqtt Client
internal func MqttClientLifeycyleEvents(_ lifecycleEvent: UnsafePointer<aws_mqtt5_client_lifecycle_event>?) {
internal func MqttClientHandleLifecycleEvent(_ lifecycleEvent: UnsafePointer<aws_mqtt5_client_lifecycle_event>?) {

// todo simplify?
guard let lifecycleEvent: UnsafePointer<aws_mqtt5_client_lifecycle_event> = lifecycleEvent else {
fatalError("MqttClientLifecycleEvents was called from native without an aws_mqtt5_client_lifecycle_event.")
}

let crtError = CRTError(code: lifecycleEvent.pointee.error_code)

// todo simplify?
if let userData = lifecycleEvent.pointee.user_data {
let callbackCore: MqttCallbackCore = Unmanaged<MqttCallbackCore>.fromOpaque(userData).takeUnretainedValue()

Expand All @@ -301,22 +303,26 @@ internal func MqttClientLifeycyleEvents(_ lifecycleEvent: UnsafePointer<aws_mqtt

case AWS_MQTT5_CLET_CONNECTION_SUCCESS:

guard let connackPacket = ConnackPacket.convertFromNative(lifecycleEvent.pointee.connack_data) else {
guard let connackView = lifecycleEvent.pointee.connack_data else {
fatalError("ConnackPacket missing in a Connection Success lifecycle event.")
}
let connackPacket = ConnackPacket(connackView)

guard let negotiatedSettings = NegotiatedSettings.convertFromNative(lifecycleEvent.pointee.settings) else {
guard let negotiatedSettings = lifecycleEvent.pointee.settings else {
fatalError("NegotiatedSettings missing in a Connection Success lifecycle event.")
}

let lifecycleConnectionSuccessData = LifecycleConnectionSuccessData(
connackPacket: connackPacket,
negotiatedSettings: negotiatedSettings)
negotiatedSettings: NegotiatedSettings(negotiatedSettings))
callbackCore.onLifecycleEventConnectionSuccess(lifecycleConnectionSuccessData)

case AWS_MQTT5_CLET_CONNECTION_FAILURE:

let connackPacket = ConnackPacket.convertFromNative(lifecycleEvent.pointee.connack_data)
var connackPacket: ConnackPacket?
if let connackView = lifecycleEvent.pointee.connack_data {
connackPacket = ConnackPacket(connackView)
}

let lifecycleConnectionFailureData = LifecycleConnectionFailureData(
crtError: crtError,
Expand All @@ -325,10 +331,10 @@ internal func MqttClientLifeycyleEvents(_ lifecycleEvent: UnsafePointer<aws_mqtt

case AWS_MQTT5_CLET_DISCONNECTION:

guard let disconnectPacket = DisconnectPacket.convertFromNative(lifecycleEvent.pointee.disconnect_data) else {
let lifecycleDisconnectData = LifecycleDisconnectData(crtError: crtError)
callbackCore.onLifecycleEventDisconnection(lifecycleDisconnectData)
return
var disconnectPacket: DisconnectPacket?

if let disconnectView: UnsafePointer<aws_mqtt5_packet_disconnect_view> = lifecycleEvent.pointee.disconnect_data {
disconnectPacket = DisconnectPacket(disconnectView)
}

let lifecycleDisconnectData = LifecycleDisconnectData(
Expand All @@ -344,45 +350,45 @@ internal func MqttClientLifeycyleEvents(_ lifecycleEvent: UnsafePointer<aws_mqtt
fatalError("A lifecycle event with an invalid event type was encountered.")
}
}

}
}

internal func MqttClientPublishRecievedEvents(
_ publishPacketView: UnsafePointer<aws_mqtt5_packet_publish_view>?,
_ userData: UnsafeMutableRawPointer?) {
let callbackCore = Unmanaged<MqttCallbackCore>.fromOpaque(userData!).takeUnretainedValue()
internal func MqttClientHandlePublishRecieved(
_ publish: UnsafePointer<aws_mqtt5_packet_publish_view>?,
_ user_data: UnsafeMutableRawPointer?) {
let callbackCore = Unmanaged<MqttCallbackCore>.fromOpaque(user_data!).takeUnretainedValue()

// validate the callback flag, if flag is false, return
callbackCore.rwlock.read {
if callbackCore.callbackFlag == false { return }

guard let publish_packet = PublishPacket.convertFromNative(publishPacketView) else {
fatalError("NegotiatedSettings missing in a Connection Success lifecycle event.")
if let publish {
let publishPacket = PublishPacket(publish)
let publishReceivedData = PublishReceivedData(publishPacket: publishPacket)
callbackCore.onPublishReceivedCallback(publishReceivedData)
} else {
fatalError("MqttClientHandlePublishRecieved called with null publish")
}
let puback = PublishReceivedData(publishPacket: publish_packet)
callbackCore.onPublishReceivedCallback(puback)
}
}

internal func MqttClientWebsocketTransform(
_ rawHttpMessage: OpaquePointer?,
_ userData: UnsafeMutableRawPointer?,
_ completeFn: (@convention(c) (OpaquePointer?, Int32, UnsafeMutableRawPointer?) -> Void)?,
_ completeCtx: UnsafeMutableRawPointer?) {
_ request: OpaquePointer?,
_ user_data: UnsafeMutableRawPointer?,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Camelcase is expected variable naming standard for swift

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed all calls that are coming from native to have arguments that reflect the ones expected in the native function signatures for clarity when working with callbacks. If we change it to camelcase here we should probably do it across all callback functions. Open to going either way as long as we're consistent.

_ complete_fn: (@convention(c) (OpaquePointer?, Int32, UnsafeMutableRawPointer?) -> Void)?,
_ complete_ctx: UnsafeMutableRawPointer?) {

let callbackCore = Unmanaged<MqttCallbackCore>.fromOpaque(userData!).takeUnretainedValue()
let callbackCore = Unmanaged<MqttCallbackCore>.fromOpaque(user_data!).takeUnretainedValue()

// validate the callback flag, if flag is false, return
callbackCore.rwlock.read {
if callbackCore.callbackFlag == false { return }

guard let rawHttpMessage else {
guard let request else {
fatalError("Null HttpRequeset in websocket transform function.")
}
let httpRequest = HTTPRequest(nativeHttpMessage: rawHttpMessage)
let httpRequest = HTTPRequest(nativeHttpMessage: request)
@Sendable func signerTransform(request: HTTPRequestBase, errorCode: Int32) {
completeFn?(request.rawValue, errorCode, completeCtx)
complete_fn?(request.rawValue, errorCode, complete_ctx)
}

if callbackCore.onWebsocketInterceptor != nil {
Expand All @@ -399,65 +405,67 @@ internal func MqttClientTerminationCallback(_ userData: UnsafeMutableRawPointer?
}

/// The completion callback to invoke when subscribe operation completes in native
private func subscribeCompletionCallback(subackPacket: UnsafePointer<aws_mqtt5_packet_suback_view>?,
errorCode: Int32,
userData: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<SubackPacket>>.fromOpaque(userData!).takeRetainedValue()
private func subscribeCompletionCallback(suback: UnsafePointer<aws_mqtt5_packet_suback_view>?,
error_code: Int32,
complete_ctx: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<SubackPacket>>.fromOpaque(complete_ctx!).takeRetainedValue()

guard errorCode == AWS_OP_SUCCESS else {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: errorCode)))
guard error_code == AWS_OP_SUCCESS else {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: error_code)))
}

guard let suback = SubackPacket.convertFromNative(subackPacket) else {
if let suback {
continuationCore.continuation.resume(returning: SubackPacket(suback))
} else {
fatalError("Suback missing in the subscription completion callback.")
}

continuationCore.continuation.resume(returning: suback)
}

/// The completion callback to invoke when publish operation completes in native
private func publishCompletionCallback(packet_type: aws_mqtt5_packet_type,
navtivePublishResult: UnsafeRawPointer?,
errorCode: Int32,
userData: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<PublishResult>>.fromOpaque(userData!).takeRetainedValue()
packet: UnsafeRawPointer?,
error_code: Int32,
complete_ctx: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<PublishResult>>.fromOpaque(complete_ctx!).takeRetainedValue()

if errorCode != AWS_OP_SUCCESS {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: errorCode)))
if error_code != AWS_OP_SUCCESS {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: error_code)))
}

switch packet_type {
case AWS_MQTT5_PT_NONE: // QoS0
return continuationCore.continuation.resume(returning: PublishResult())

case AWS_MQTT5_PT_PUBACK: // QoS1
guard let puback = navtivePublishResult?.assumingMemoryBound(
guard let puback = packet?.assumingMemoryBound(
to: aws_mqtt5_packet_puback_view.self) else {
return continuationCore.continuation.resume(
throwing: CommonRunTimeError.crtError(CRTError.makeFromLastError()))
}
let publishResult = PublishResult(puback: PubackPacket.convertFromNative(puback))
let publishResult = PublishResult(puback: PubackPacket(puback))
return continuationCore.continuation.resume(returning: publishResult)

default:
return continuationCore.continuation.resume(
throwing: CommonRunTimeError.crtError(CRTError(code: AWS_ERROR_UNKNOWN.rawValue)))
}
}

/// The completion callback to invoke when unsubscribe operation completes in native
private func unsubscribeCompletionCallback(unsubackPacket: UnsafePointer<aws_mqtt5_packet_unsuback_view>?,
errorCode: Int32,
userData: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<UnsubackPacket>>.fromOpaque(userData!).takeRetainedValue()
private func unsubscribeCompletionCallback(unsuback: UnsafePointer<aws_mqtt5_packet_unsuback_view>?,
error_code: Int32,
complete_ctx: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<UnsubackPacket>>.fromOpaque(complete_ctx!).takeRetainedValue()

guard errorCode == AWS_OP_SUCCESS else {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: errorCode)))
guard error_code == AWS_OP_SUCCESS else {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: error_code)))
}

guard let unsuback = UnsubackPacket.convertFromNative(unsubackPacket) else {
if let unsuback {
continuationCore.continuation.resume(returning: UnsubackPacket(unsuback))
} else {
fatalError("Unsuback missing in the Unsubscribe completion callback.")
}

continuationCore.continuation.resume(returning: unsuback)
}

/// When the native client calls swift callbacks they are processed through the MqttCallbackCore
Expand Down
70 changes: 31 additions & 39 deletions Source/AwsCommonRuntimeKit/mqtt/Mqtt5Options.swift
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@
_willDelayIntervalSec,
self.receiveMaximum,
self.maximumPacketSize) { (sessionExpiryIntervalSecPointer, requestResponseInformationPointer,
requestProblemInformationPointer, willDelayIntervalSecPointer,

Check warning on line 151 in Source/AwsCommonRuntimeKit/mqtt/Mqtt5Options.swift

View workflow job for this annotation

GitHub Actions / lint

Closure Parameter Position Violation: Closure parameters should be on the same line as opening brace (closure_parameter_position)

Check warning on line 151 in Source/AwsCommonRuntimeKit/mqtt/Mqtt5Options.swift

View workflow job for this annotation

GitHub Actions / lint

Closure Parameter Position Violation: Closure parameters should be on the same line as opening brace (closure_parameter_position)
receiveMaximumPointer, maximumPacketSizePointer) in

Check warning on line 152 in Source/AwsCommonRuntimeKit/mqtt/Mqtt5Options.swift

View workflow job for this annotation

GitHub Actions / lint

Closure Parameter Position Violation: Closure parameters should be on the same line as opening brace (closure_parameter_position)

Check warning on line 152 in Source/AwsCommonRuntimeKit/mqtt/Mqtt5Options.swift

View workflow job for this annotation

GitHub Actions / lint

Closure Parameter Position Violation: Closure parameters should be on the same line as opening brace (closure_parameter_position)

raw_connect_options.session_expiry_interval_seconds = sessionExpiryIntervalSecPointer
raw_connect_options.request_response_information = requestResponseInformationPointer
Expand Down Expand Up @@ -435,8 +435,8 @@
self.httpProxyOptions,
self.topicAliasingOptions,
connnectOptions) { (socketOptionsCPointer, tlsOptionsCPointer,
httpProxyOptionsCPointer, topicAliasingOptionsCPointer,

Check warning on line 438 in Source/AwsCommonRuntimeKit/mqtt/Mqtt5Options.swift

View workflow job for this annotation

GitHub Actions / lint

Closure Parameter Position Violation: Closure parameters should be on the same line as opening brace (closure_parameter_position)

Check warning on line 438 in Source/AwsCommonRuntimeKit/mqtt/Mqtt5Options.swift

View workflow job for this annotation

GitHub Actions / lint

Closure Parameter Position Violation: Closure parameters should be on the same line as opening brace (closure_parameter_position)
connectOptionsCPointer) in

Check warning on line 439 in Source/AwsCommonRuntimeKit/mqtt/Mqtt5Options.swift

View workflow job for this annotation

GitHub Actions / lint

Closure Parameter Position Violation: Closure parameters should be on the same line as opening brace (closure_parameter_position)

raw_options.socket_options = socketOptionsCPointer
raw_options.tls_options = tlsOptionsCPointer
Expand All @@ -457,9 +457,9 @@
raw_options.websocket_handshake_transform_user_data = _userData
}

raw_options.lifecycle_event_handler = MqttClientLifeycyleEvents
raw_options.lifecycle_event_handler = MqttClientHandleLifecycleEvent
raw_options.lifecycle_event_handler_user_data = _userData
raw_options.publish_received_handler = MqttClientPublishRecievedEvents
raw_options.publish_received_handler = MqttClientHandlePublishRecieved
raw_options.publish_received_handler_user_data = _userData
raw_options.client_termination_handler = MqttClientTerminationCallback
raw_options.client_termination_handler_user_data = _userData
Expand Down Expand Up @@ -547,42 +547,34 @@
self.clientId = clientId
}

static func convertFromNative(_ from: UnsafePointer<aws_mqtt5_negotiated_settings>?) -> NegotiatedSettings? {

guard let from else {
return nil
}

let _negotiatedSettings = from.pointee
let negotiatedMaximumQos = QoS(_negotiatedSettings.maximum_qos)
let negotiatedSessionExpiryInterval: TimeInterval = TimeInterval(_negotiatedSettings.session_expiry_interval)
let negotiatedReceiveMaximumFromServer = _negotiatedSettings.receive_maximum_from_server
let negotiatedMaximumPacketSizeToServer = _negotiatedSettings.maximum_packet_size_to_server
let negotiatedTopicAliasMaximumToServer = _negotiatedSettings.topic_alias_maximum_to_server
let negotiatedTopicAliasMaximumToClient = _negotiatedSettings.topic_alias_maximum_to_client
let negotiatedServerKeepAlive: TimeInterval = TimeInterval(_negotiatedSettings.server_keep_alive)
let negotiatedRetainAvailable = _negotiatedSettings.retain_available
let negotiatedWildcardSubscriptionsAvailable = _negotiatedSettings.wildcard_subscriptions_available
let negotiatedSubscriptionIdentifiersAvailable = _negotiatedSettings.subscription_identifiers_available
let negotiatedSharedSubscriptionsAvailable = _negotiatedSettings.shared_subscriptions_available
let negotiatedRejoinedSession = _negotiatedSettings.rejoined_session
let negotiatedClientId = _negotiatedSettings.client_id_storage.toString()

let negotiatedSettings = NegotiatedSettings(
maximumQos: negotiatedMaximumQos,
sessionExpiryInterval: negotiatedSessionExpiryInterval,
receiveMaximumFromServer: negotiatedReceiveMaximumFromServer,
maximumPacketSizeToServer: negotiatedMaximumPacketSizeToServer,
topicAliasMaximumToServer: negotiatedTopicAliasMaximumToServer,
topicAliasMaximumToClient: negotiatedTopicAliasMaximumToClient,
serverKeepAlive: negotiatedServerKeepAlive,
retainAvailable: negotiatedRetainAvailable,
wildcardSubscriptionsAvailable: negotiatedWildcardSubscriptionsAvailable,
subscriptionIdentifiersAvailable: negotiatedSubscriptionIdentifiersAvailable,
sharedSubscriptionsAvailable: negotiatedSharedSubscriptionsAvailable,
rejoinedSession: negotiatedRejoinedSession,
clientId: negotiatedClientId)

return negotiatedSettings
internal convenience init(_ from: UnsafePointer<aws_mqtt5_negotiated_settings>){

Check warning on line 550 in Source/AwsCommonRuntimeKit/mqtt/Mqtt5Options.swift

View workflow job for this annotation

GitHub Actions / lint

Opening Brace Spacing Violation: Opening braces should be preceded by a single space and on the same line as the declaration (opening_brace)
sbSteveK marked this conversation as resolved.
Show resolved Hide resolved
sbSteveK marked this conversation as resolved.
Show resolved Hide resolved
let negotiatedSettings = from.pointee
let negotiatedMaximumQos = QoS(negotiatedSettings.maximum_qos)
let negotiatedSessionExpiryInterval: TimeInterval = TimeInterval(negotiatedSettings.session_expiry_interval)
let negotiatedReceiveMaximumFromServer = negotiatedSettings.receive_maximum_from_server
let negotiatedMaximumPacketSizeToServer = negotiatedSettings.maximum_packet_size_to_server
let negotiatedTopicAliasMaximumToServer = negotiatedSettings.topic_alias_maximum_to_server
let negotiatedTopicAliasMaximumToClient = negotiatedSettings.topic_alias_maximum_to_client
let negotiatedServerKeepAlive: TimeInterval = TimeInterval(negotiatedSettings.server_keep_alive)
let negotiatedRetainAvailable = negotiatedSettings.retain_available
let negotiatedWildcardSubscriptionsAvailable = negotiatedSettings.wildcard_subscriptions_available
let negotiatedSubscriptionIdentifiersAvailable = negotiatedSettings.subscription_identifiers_available
let negotiatedSharedSubscriptionsAvailable = negotiatedSettings.shared_subscriptions_available
let negotiatedRejoinedSession = negotiatedSettings.rejoined_session
let negotiatedClientId = negotiatedSettings.client_id_storage.toString()

self.init(maximumQos: negotiatedMaximumQos,
sessionExpiryInterval: negotiatedSessionExpiryInterval,
receiveMaximumFromServer: negotiatedReceiveMaximumFromServer,
maximumPacketSizeToServer: negotiatedMaximumPacketSizeToServer,
topicAliasMaximumToServer: negotiatedTopicAliasMaximumToServer,
topicAliasMaximumToClient: negotiatedTopicAliasMaximumToClient,
serverKeepAlive: negotiatedServerKeepAlive,
retainAvailable: negotiatedRetainAvailable,
wildcardSubscriptionsAvailable: negotiatedWildcardSubscriptionsAvailable,
subscriptionIdentifiersAvailable: negotiatedSubscriptionIdentifiersAvailable,
sharedSubscriptionsAvailable: negotiatedSharedSubscriptionsAvailable,
rejoinedSession: negotiatedRejoinedSession,
clientId: negotiatedClientId)
}
}
Loading
Loading