Skip to content

Commit

Permalink
Better fix re eventloop.
Browse files Browse the repository at this point in the history
Signed-off-by: Suneet Nangia <suneetnangia@gmail.com>
  • Loading branch information
suneetnangia committed Mar 14, 2024
1 parent 5c70710 commit e849605
Showing 1 changed file with 24 additions and 17 deletions.
41 changes: 24 additions & 17 deletions crates/outbound-mqtt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub use host_component::OutboundMqttComponent;

pub struct OutboundMqtt {
allowed_hosts: spin_outbound_networking::AllowedHostsConfig,
connections: table::Table<(AsyncClient, rumqttc::EventLoop)>,
connections: table::Table<AsyncClient>,
}

impl Default for OutboundMqtt {
Expand Down Expand Up @@ -44,10 +44,28 @@ impl OutboundMqtt {
})?;
conn_opts.set_credentials(username, password);
conn_opts.set_keep_alive(keep_alive_interval);
let (client, event_loop) = AsyncClient::new(conn_opts, MQTT_CHANNEL_CAP);
let (client, mut event_loop) = AsyncClient::new(conn_opts, MQTT_CHANNEL_CAP);

// Start the task to poll the event loop which sends the messages to the MQTT broker.
// Publisher client code wait for this task to complete before returning, ensuring all events are processed.
tokio::spawn(
#[allow(unreachable_code)]
async move {
loop {
event_loop
.poll()
.await
.map_err(|err| v2::Error::ConnectionFailed(err.to_string()))?;
}

// This is unreachable but it's a hint for the compiler for return type from async block i.e. for ? to work.
// https://rust-lang.github.io/async-book/07_workarounds/02_err_in_async_blocks.html
Ok::<(), v2::Error>(())
},
);

self.connections
.push((client, event_loop))
.push(client)
.map(Resource::new_own)
.map_err(|_| Error::TooManyConnections)
}
Expand Down Expand Up @@ -88,7 +106,7 @@ impl v2::HostConnection for OutboundMqtt {
qos: Qos,
) -> Result<Result<(), Error>> {
Ok(async {
let (client, eventloop) = self.get_conn(connection).await.map_err(other_error)?;
let client = self.get_conn(connection).await.map_err(other_error)?;
let qos = convert_to_mqtt_qos_value(qos);

// Message published to EventLoop (not MQTT Broker)
Expand All @@ -97,18 +115,7 @@ impl v2::HostConnection for OutboundMqtt {
.await
.map_err(other_error)?;

// Poll event loop until outgoing publish event is iterated over to send the message to MQTT broker or capture/throw error.
// We may revisit this later to manage long running connections, high throughput use cases and their issues in the connection pool.
loop {
let event = eventloop
.poll()
.await
.map_err(|err| v2::Error::ConnectionFailed(err.to_string()))?;

if let rumqttc::Event::Outgoing(rumqttc::Outgoing::Publish(_)) = event {
return Ok(());
}
}
Ok(())
}
.await)
}
Expand All @@ -135,7 +142,7 @@ impl OutboundMqtt {
async fn get_conn(
&mut self,
connection: Resource<MqttConnection>,
) -> Result<&mut (AsyncClient, rumqttc::EventLoop), Error> {
) -> Result<&mut AsyncClient, Error> {
self.connections
.get_mut(connection.rep())
.ok_or(Error::Other(
Expand Down

0 comments on commit e849605

Please sign in to comment.