Skip to content

Commit

Permalink
Improve feedback and error handling of MQTT input handler
Browse files Browse the repository at this point in the history
  • Loading branch information
twright committed Jan 6, 2025
1 parent 8051bc2 commit d41f6e0
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ tokio-util = "0.7.12"
clap = { version = "4.5.20", features = ["derive"] }
async-stream = "0.3.6"
r2r = { version = "0.9.3", optional=true }
thiserror = "2.0.3"
thiserror = "2.0.9"
justerror = "1.1.0"
serde = {version = "1.0.215", features = ["derive"]}
serde_json = "1.0.133"
Expand Down
19 changes: 10 additions & 9 deletions src/mqtt_input_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl MQTTInputProvider {
receivers.insert(v.clone(), rx);
}

let topics = var_topics.values().map(|t| t.clone()).collect::<Vec<_>>();
let topics = var_topics.values().cloned().collect::<Vec<_>>();

// Spawn a background task to receive messages from the MQTT broker and
// send them to the appropriate channel based on which topic they were
Expand All @@ -73,7 +73,7 @@ impl MQTTInputProvider {
// Create and connect to the MQTT client
let client = provide_mqtt_client(host.clone()).await.unwrap();
let mut stream = client.clone().get_stream(10);
// println!("Connected to MQTT broker");
println!("Connected to MQTT broker with topics {:?}", topics);
let qos = topics.iter().map(|_| QOS).collect::<Vec<_>>();
loop {
match client.subscribe_many(&topics, &qos).await {
Expand Down Expand Up @@ -103,13 +103,14 @@ impl MQTTInputProvider {
)
.as_str(),
);
let sender = senders
.get(&VarName(msg.topic().to_string()))
.expect("Channel not found for topic");
sender
.send(value)
.await
.expect("Failed to send value to channel");
if let Some(sender) = senders.get(&VarName(msg.topic().to_string())) {
sender
.send(value)
.await
.expect("Failed to send value to channel");
} else {
println!("Channel not found for topic {:?}", msg.topic());
}
}
None => {
// Connection lost, try to reconnect
Expand Down

0 comments on commit d41f6e0

Please sign in to comment.