From 076a1dcc087992637ce99ebbcc587b82433b5843 Mon Sep 17 00:00:00 2001 From: Suneet Nangia Date: Wed, 13 Mar 2024 22:41:12 +0000 Subject: [PATCH 1/3] Improved rumqttc event loop check. Signed-off-by: Suneet Nangia (cherry picked from commit 3c2803bf145731658d9d7d26236ab4988bc4b28e) --- crates/outbound-mqtt/src/lib.rs | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/crates/outbound-mqtt/src/lib.rs b/crates/outbound-mqtt/src/lib.rs index ec3118ba89..e836ca609f 100644 --- a/crates/outbound-mqtt/src/lib.rs +++ b/crates/outbound-mqtt/src/lib.rs @@ -97,16 +97,31 @@ impl v2::HostConnection for OutboundMqtt { .await .map_err(other_error)?; - // Poll EventLoop once to send the message to MQTT broker or capture/throw error + // Poll event loop until outgoing publish event is iterated over to send the message to MQTT broker or capture/throw error. // We may revisit this later to manage long running connections and their issues in the connection pool. - eventloop - .poll() - .await - .map_err(|err: rumqttc::ConnectionError| { - v2::Error::ConnectionFailed(err.to_string()) - })?; - - Ok(()) + loop { + let event = eventloop + .poll() + .await + .map_err(|err| v2::Error::ConnectionFailed(err.to_string()))?; + + match event { + rumqttc::Event::Outgoing(outgoing_event) => { + match outgoing_event { + rumqttc::Outgoing::Publish(_) => { + return Ok(()); + } + _ => { + // We don't care about other outgoing event types in this loop check. + continue; + } + } + } + rumqttc::Event::Incoming(_) => { + // We don't care about incoming event types in this loop check. + } + } + } } .await) } From 2c593bd084d98f493cd54b074059866a9e479f7a Mon Sep 17 00:00:00 2001 From: Suneet Nangia Date: Thu, 14 Mar 2024 12:54:10 +0000 Subject: [PATCH 2/3] Minified the code to check publish in event loop Signed-off-by: Suneet Nangia (cherry picked from commit 5c707104394da30c65f10bb59b4bc76e33d1f2c2) --- crates/outbound-mqtt/src/lib.rs | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) diff --git a/crates/outbound-mqtt/src/lib.rs b/crates/outbound-mqtt/src/lib.rs index e836ca609f..806ce7b309 100644 --- a/crates/outbound-mqtt/src/lib.rs +++ b/crates/outbound-mqtt/src/lib.rs @@ -98,28 +98,15 @@ impl v2::HostConnection for OutboundMqtt { .map_err(other_error)?; // Poll event loop until outgoing publish event is iterated over to send the message to MQTT broker or capture/throw error. - // We may revisit this later to manage long running connections and their issues in the connection pool. + // We may revisit this later to manage long running connections, high throughput use cases and their issues in the connection pool. loop { let event = eventloop .poll() .await .map_err(|err| v2::Error::ConnectionFailed(err.to_string()))?; - match event { - rumqttc::Event::Outgoing(outgoing_event) => { - match outgoing_event { - rumqttc::Outgoing::Publish(_) => { - return Ok(()); - } - _ => { - // We don't care about other outgoing event types in this loop check. - continue; - } - } - } - rumqttc::Event::Incoming(_) => { - // We don't care about incoming event types in this loop check. - } + if let rumqttc::Event::Outgoing(rumqttc::Outgoing::Publish(_)) = event { + return Ok(()); } } } From aaa673c384cb3d16a11151cd843c3f3626bc12e1 Mon Sep 17 00:00:00 2001 From: Suneet Nangia Date: Mon, 1 Apr 2024 11:45:50 +0100 Subject: [PATCH 3/3] Added support for all QoS levels in event polling. Signed-off-by: Suneet Nangia (cherry picked from commit 7577b3c4396189964ecc8e8f6e8a10fb156b6911) --- crates/outbound-mqtt/src/lib.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/crates/outbound-mqtt/src/lib.rs b/crates/outbound-mqtt/src/lib.rs index 806ce7b309..8f085afbed 100644 --- a/crates/outbound-mqtt/src/lib.rs +++ b/crates/outbound-mqtt/src/lib.rs @@ -3,7 +3,7 @@ mod host_component; use std::time::Duration; use anyhow::Result; -use rumqttc::AsyncClient; +use rumqttc::{AsyncClient, Event, Incoming, Outgoing, QoS}; use spin_core::{async_trait, wasmtime::component::Resource}; use spin_world::v2::mqtt::{self as v2, Connection as MqttConnection, Error, Qos}; @@ -105,10 +105,15 @@ impl v2::HostConnection for OutboundMqtt { .await .map_err(|err| v2::Error::ConnectionFailed(err.to_string()))?; - if let rumqttc::Event::Outgoing(rumqttc::Outgoing::Publish(_)) = event { - return Ok(()); + match (qos, event) { + (QoS::AtMostOnce, Event::Outgoing(Outgoing::Publish(_))) + | (QoS::AtLeastOnce, Event::Incoming(Incoming::PubAck(_))) + | (QoS::ExactlyOnce, Event::Outgoing(Outgoing::PubComp(_))) => break, + + (_, _) => continue, } } + Ok(()) } .await) }