Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…qtt5_deadlock
  • Loading branch information
xiazhvera committed May 13, 2024
2 parents 67807ab + 605cc26 commit be77cdd
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 301 deletions.
86 changes: 41 additions & 45 deletions Source/AwsCommonRuntimeKit/mqtt/Mqtt5Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -341,17 +341,15 @@ public class Mqtt5Client {
// MARK: - Internal/Private

/// Handles lifecycle events from native Mqtt Client
internal func MqttClientLifeycyleEvents(_ lifecycleEvent: UnsafePointer<aws_mqtt5_client_lifecycle_event>?) {
internal func MqttClientHandleLifecycleEvent(_ lifecycleEvent: UnsafePointer<aws_mqtt5_client_lifecycle_event>?) {

guard let lifecycleEvent: UnsafePointer<aws_mqtt5_client_lifecycle_event> = lifecycleEvent else {
fatalError("MqttClientLifecycleEvents was called from native without an aws_mqtt5_client_lifecycle_event.")
}

let client: Mqtt5Client = Unmanaged<Mqtt5Client>.fromOpaque(userData).takeUnretainedValue()
let crtError = CRTError(code: lifecycleEvent.pointee.error_code)

if let userData = lifecycleEvent.pointee.user_data {
let client: Mqtt5Client = Unmanaged<Mqtt5Client>.fromOpaque(userData).takeUnretainedValue()

// validate the callback flag, if flag is false, return
client.rwlock.read {
if client.callbackFlag == false { return }
Expand Down Expand Up @@ -420,18 +418,14 @@ internal func MqttClientLifeycyleEvents(_ lifecycleEvent: UnsafePointer<aws_mqtt
}
}

internal func MqttClientPublishRecievedEvents(
_ publishPacketView: UnsafePointer<aws_mqtt5_packet_publish_view>?,
_ userData: UnsafeMutableRawPointer?) {
let client = Unmanaged<Mqtt5Client>.fromOpaque(userData!).takeUnretainedValue()
internal func MqttClientHandlePublishRecieved(
_ publish: UnsafePointer<aws_mqtt5_packet_publish_view>?,
_ user_data: UnsafeMutableRawPointer?) {
let client = Unmanaged<MqttCallbackCore>.fromOpaque(user_data!).takeUnretainedValue()

// validate the callback flag, if flag is false, return
client.rwlock.read {
if client.callbackFlag == false { return }

guard let publish_packet = PublishPacket.convertFromNative(publishPacketView) else {
fatalError("NegotiatedSettings missing in a Connection Success lifecycle event.")
}
let puback = PublishReceivedData(publishPacket: publish_packet)
Task {
await client.onPublishReceivedCallback(puback)
Expand All @@ -440,23 +434,23 @@ internal func MqttClientPublishRecievedEvents(
}

internal func MqttClientWebsocketTransform(
_ rawHttpMessage: OpaquePointer?,
_ userData: UnsafeMutableRawPointer?,
_ completeFn: (@convention(c) (OpaquePointer?, Int32, UnsafeMutableRawPointer?) -> Void)?,
_ completeCtx: UnsafeMutableRawPointer?) {
_ request: OpaquePointer?,
_ user_data: UnsafeMutableRawPointer?,
_ complete_fn: (@convention(c) (OpaquePointer?, Int32, UnsafeMutableRawPointer?) -> Void)?,
_ complete_ctx: UnsafeMutableRawPointer?) {

let client = Unmanaged<Mqtt5Client>.fromOpaque(userData!).takeUnretainedValue()

// validate the callback flag, if flag is false, return
client.rwlock.read {
if client.callbackFlag == false { return }

guard let rawHttpMessage else {
guard let request else {
fatalError("Null HttpRequeset in websocket transform function.")
}
let httpRequest = HTTPRequest(nativeHttpMessage: rawHttpMessage)
let httpRequest = HTTPRequest(nativeHttpMessage: request)
@Sendable func signerTransform(request: HTTPRequestBase, errorCode: Int32) {
completeFn?(request.rawValue, errorCode, completeCtx)
complete_fn?(request.rawValue, errorCode, complete_ctx)
}

if client.onWebsocketInterceptor != nil {
Expand All @@ -475,63 +469,65 @@ internal func MqttClientTerminationCallback(_ userData: UnsafeMutableRawPointer?
}

/// The completion callback to invoke when subscribe operation completes in native
private func subscribeCompletionCallback(subackPacket: UnsafePointer<aws_mqtt5_packet_suback_view>?,
errorCode: Int32,
userData: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<SubackPacket>>.fromOpaque(userData!).takeRetainedValue()
private func subscribeCompletionCallback(suback: UnsafePointer<aws_mqtt5_packet_suback_view>?,
error_code: Int32,
complete_ctx: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<SubackPacket>>.fromOpaque(complete_ctx!).takeRetainedValue()

guard errorCode == AWS_OP_SUCCESS else {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: errorCode)))
guard error_code == AWS_OP_SUCCESS else {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: error_code)))
}

guard let suback = SubackPacket.convertFromNative(subackPacket) else {
if let suback {
continuationCore.continuation.resume(returning: SubackPacket(suback))
} else {
fatalError("Suback missing in the subscription completion callback.")
}

continuationCore.continuation.resume(returning: suback)
}

/// The completion callback to invoke when publish operation completes in native
private func publishCompletionCallback(packet_type: aws_mqtt5_packet_type,
navtivePublishResult: UnsafeRawPointer?,
errorCode: Int32,
userData: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<PublishResult>>.fromOpaque(userData!).takeRetainedValue()
packet: UnsafeRawPointer?,
error_code: Int32,
complete_ctx: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<PublishResult>>.fromOpaque(complete_ctx!).takeRetainedValue()

if errorCode != AWS_OP_SUCCESS {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: errorCode)))
if error_code != AWS_OP_SUCCESS {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: error_code)))
}

switch packet_type {
case AWS_MQTT5_PT_NONE: // QoS0
return continuationCore.continuation.resume(returning: PublishResult())

case AWS_MQTT5_PT_PUBACK: // QoS1
guard let puback = navtivePublishResult?.assumingMemoryBound(
guard let puback = packet?.assumingMemoryBound(
to: aws_mqtt5_packet_puback_view.self) else {
return continuationCore.continuation.resume(
throwing: CommonRunTimeError.crtError(CRTError.makeFromLastError()))
}
let publishResult = PublishResult(puback: PubackPacket.convertFromNative(puback))
let publishResult = PublishResult(puback: PubackPacket(puback))
return continuationCore.continuation.resume(returning: publishResult)

default:
return continuationCore.continuation.resume(
throwing: CommonRunTimeError.crtError(CRTError(code: AWS_ERROR_UNKNOWN.rawValue)))
}
}

/// The completion callback to invoke when unsubscribe operation completes in native
private func unsubscribeCompletionCallback(unsubackPacket: UnsafePointer<aws_mqtt5_packet_unsuback_view>?,
errorCode: Int32,
userData: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<UnsubackPacket>>.fromOpaque(userData!).takeRetainedValue()
private func unsubscribeCompletionCallback(unsuback: UnsafePointer<aws_mqtt5_packet_unsuback_view>?,
error_code: Int32,
complete_ctx: UnsafeMutableRawPointer?) {
let continuationCore = Unmanaged<ContinuationCore<UnsubackPacket>>.fromOpaque(complete_ctx!).takeRetainedValue()

guard errorCode == AWS_OP_SUCCESS else {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: errorCode)))
guard error_code == AWS_OP_SUCCESS else {
return continuationCore.continuation.resume(throwing: CommonRunTimeError.crtError(CRTError(code: error_code)))
}

guard let unsuback = UnsubackPacket.convertFromNative(unsubackPacket) else {
if let unsuback {
continuationCore.continuation.resume(returning: UnsubackPacket(unsuback))
} else {
fatalError("Unsuback missing in the Unsubscribe completion callback.")
}

continuationCore.continuation.resume(returning: unsuback)
}
97 changes: 26 additions & 71 deletions Source/AwsCommonRuntimeKit/mqtt/Mqtt5Options.swift
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,12 @@ public class MqttConnectOptions: CStruct {
_requestProblemInformation,
_willDelayIntervalSec,
self.receiveMaximum,
self.maximumPacketSize) { (sessionExpiryIntervalSecPointer, requestResponseInformationPointer,
requestProblemInformationPointer, willDelayIntervalSecPointer,
receiveMaximumPointer, maximumPacketSizePointer) in
self.maximumPacketSize) { sessionExpiryIntervalSecPointer,
requestResponseInformationPointer,

Check warning on line 151 in Source/AwsCommonRuntimeKit/mqtt/Mqtt5Options.swift

View workflow job for this annotation

GitHub Actions / lint

Closure Parameter Position Violation: Closure parameters should be on the same line as opening brace (closure_parameter_position)
requestProblemInformationPointer,

Check warning on line 152 in Source/AwsCommonRuntimeKit/mqtt/Mqtt5Options.swift

View workflow job for this annotation

GitHub Actions / lint

Closure Parameter Position Violation: Closure parameters should be on the same line as opening brace (closure_parameter_position)
willDelayIntervalSecPointer,

Check warning on line 153 in Source/AwsCommonRuntimeKit/mqtt/Mqtt5Options.swift

View workflow job for this annotation

GitHub Actions / lint

Closure Parameter Position Violation: Closure parameters should be on the same line as opening brace (closure_parameter_position)
receiveMaximumPointer,

Check warning on line 154 in Source/AwsCommonRuntimeKit/mqtt/Mqtt5Options.swift

View workflow job for this annotation

GitHub Actions / lint

Closure Parameter Position Violation: Closure parameters should be on the same line as opening brace (closure_parameter_position)
maximumPacketSizePointer in

Check warning on line 155 in Source/AwsCommonRuntimeKit/mqtt/Mqtt5Options.swift

View workflow job for this annotation

GitHub Actions / lint

Closure Parameter Position Violation: Closure parameters should be on the same line as opening brace (closure_parameter_position)

raw_connect_options.session_expiry_interval_seconds = sessionExpiryIntervalSecPointer
raw_connect_options.request_response_information = requestResponseInformationPointer
Expand Down Expand Up @@ -434,9 +437,11 @@ public class MqttClientOptions: CStructWithUserData {
tls_options,
self.httpProxyOptions,
self.topicAliasingOptions,
connnectOptions) { (socketOptionsCPointer, tlsOptionsCPointer,
httpProxyOptionsCPointer, topicAliasingOptionsCPointer,
connectOptionsCPointer) in
connnectOptions) { socketOptionsCPointer,
tlsOptionsCPointer,

Check warning on line 441 in Source/AwsCommonRuntimeKit/mqtt/Mqtt5Options.swift

View workflow job for this annotation

GitHub Actions / lint

Closure Parameter Position Violation: Closure parameters should be on the same line as opening brace (closure_parameter_position)
httpProxyOptionsCPointer,

Check warning on line 442 in Source/AwsCommonRuntimeKit/mqtt/Mqtt5Options.swift

View workflow job for this annotation

GitHub Actions / lint

Closure Parameter Position Violation: Closure parameters should be on the same line as opening brace (closure_parameter_position)
topicAliasingOptionsCPointer,

Check warning on line 443 in Source/AwsCommonRuntimeKit/mqtt/Mqtt5Options.swift

View workflow job for this annotation

GitHub Actions / lint

Closure Parameter Position Violation: Closure parameters should be on the same line as opening brace (closure_parameter_position)
connectOptionsCPointer in

raw_options.socket_options = socketOptionsCPointer
raw_options.tls_options = tlsOptionsCPointer
Expand Down Expand Up @@ -519,70 +524,20 @@ public class NegotiatedSettings {
/// The final client id in use by the newly-established connection. This will be the configured client id if one was given in the configuration, otherwise, if no client id was specified, this will be the client id assigned by the server. Reconnection attempts will always use the auto-assigned client id, allowing for auto-assigned session resumption.
public let clientId: String

public init (maximumQos: QoS,
sessionExpiryInterval: TimeInterval,
receiveMaximumFromServer: UInt16,
maximumPacketSizeToServer: UInt32,
topicAliasMaximumToServer: UInt16,
topicAliasMaximumToClient: UInt16,
serverKeepAlive: TimeInterval,
retainAvailable: Bool,
wildcardSubscriptionsAvailable: Bool,
subscriptionIdentifiersAvailable: Bool,
sharedSubscriptionsAvailable: Bool,
rejoinedSession: Bool,
clientId: String) {
self.maximumQos = maximumQos
self.sessionExpiryInterval = sessionExpiryInterval
self.receiveMaximumFromServer = receiveMaximumFromServer
self.maximumPacketSizeToServer = maximumPacketSizeToServer
self.topicAliasMaximumToServer = topicAliasMaximumToServer
self.topicAliasMaximumToClient = topicAliasMaximumToClient
self.serverKeepAlive = serverKeepAlive
self.retainAvailable = retainAvailable
self.wildcardSubscriptionsAvailable = wildcardSubscriptionsAvailable
self.subscriptionIdentifiersAvailable = subscriptionIdentifiersAvailable
self.sharedSubscriptionsAvailable = sharedSubscriptionsAvailable
self.rejoinedSession = rejoinedSession
self.clientId = clientId
}

static func convertFromNative(_ from: UnsafePointer<aws_mqtt5_negotiated_settings>?) -> NegotiatedSettings? {

guard let from else {
return nil
}

let _negotiatedSettings = from.pointee
let negotiatedMaximumQos = QoS(_negotiatedSettings.maximum_qos)
let negotiatedSessionExpiryInterval: TimeInterval = TimeInterval(_negotiatedSettings.session_expiry_interval)
let negotiatedReceiveMaximumFromServer = _negotiatedSettings.receive_maximum_from_server
let negotiatedMaximumPacketSizeToServer = _negotiatedSettings.maximum_packet_size_to_server
let negotiatedTopicAliasMaximumToServer = _negotiatedSettings.topic_alias_maximum_to_server
let negotiatedTopicAliasMaximumToClient = _negotiatedSettings.topic_alias_maximum_to_client
let negotiatedServerKeepAlive: TimeInterval = TimeInterval(_negotiatedSettings.server_keep_alive)
let negotiatedRetainAvailable = _negotiatedSettings.retain_available
let negotiatedWildcardSubscriptionsAvailable = _negotiatedSettings.wildcard_subscriptions_available
let negotiatedSubscriptionIdentifiersAvailable = _negotiatedSettings.subscription_identifiers_available
let negotiatedSharedSubscriptionsAvailable = _negotiatedSettings.shared_subscriptions_available
let negotiatedRejoinedSession = _negotiatedSettings.rejoined_session
let negotiatedClientId = _negotiatedSettings.client_id_storage.toString()

let negotiatedSettings = NegotiatedSettings(
maximumQos: negotiatedMaximumQos,
sessionExpiryInterval: negotiatedSessionExpiryInterval,
receiveMaximumFromServer: negotiatedReceiveMaximumFromServer,
maximumPacketSizeToServer: negotiatedMaximumPacketSizeToServer,
topicAliasMaximumToServer: negotiatedTopicAliasMaximumToServer,
topicAliasMaximumToClient: negotiatedTopicAliasMaximumToClient,
serverKeepAlive: negotiatedServerKeepAlive,
retainAvailable: negotiatedRetainAvailable,
wildcardSubscriptionsAvailable: negotiatedWildcardSubscriptionsAvailable,
subscriptionIdentifiersAvailable: negotiatedSubscriptionIdentifiersAvailable,
sharedSubscriptionsAvailable: negotiatedSharedSubscriptionsAvailable,
rejoinedSession: negotiatedRejoinedSession,
clientId: negotiatedClientId)

return negotiatedSettings
internal init(_ settings: UnsafePointer<aws_mqtt5_negotiated_settings>){
let negotiatedSettings = settings.pointee
self.maximumQos = QoS(negotiatedSettings.maximum_qos)
self.sessionExpiryInterval = TimeInterval(negotiatedSettings.session_expiry_interval)
self.receiveMaximumFromServer = negotiatedSettings.receive_maximum_from_server
self.maximumPacketSizeToServer = negotiatedSettings.maximum_packet_size_to_server
self.topicAliasMaximumToServer = negotiatedSettings.topic_alias_maximum_to_server
self.topicAliasMaximumToClient = negotiatedSettings.topic_alias_maximum_to_client
self.serverKeepAlive = TimeInterval(negotiatedSettings.server_keep_alive)
self.retainAvailable = negotiatedSettings.retain_available
self.wildcardSubscriptionsAvailable = negotiatedSettings.wildcard_subscriptions_available
self.subscriptionIdentifiersAvailable = negotiatedSettings.subscription_identifiers_available
self.sharedSubscriptionsAvailable = negotiatedSettings.shared_subscriptions_available
self.rejoinedSession = negotiatedSettings.rejoined_session
self.clientId = negotiatedSettings.client_id_storage.toString()
}
}
Loading

0 comments on commit be77cdd

Please sign in to comment.