Skip to content

Commit

Permalink
feat(tui): show mqtt connection error instead of panic
Browse files Browse the repository at this point in the history
When the mqtt server is restarted or gone for a short time the tui will
not panic anymore.
(panics from the tui end up with a unusable terminal)

Starting when the mqtt server isnt available still panics before the tui
is started.
EdJoPaTo committed Apr 27, 2021
1 parent ac9012e commit 0e60660
Showing 3 changed files with 107 additions and 43 deletions.
25 changes: 17 additions & 8 deletions src/interactive/ui.rs
Original file line number Diff line number Diff line change
@@ -2,14 +2,14 @@ use std::cmp::min;
use std::error::Error;

use json::JsonValue;
use tui::{
backend::Backend,
layout::{Constraint, Direction, Layout, Rect},
style::{Color, Style},
text::Spans,
widgets::{Block, Borders, List, ListItem, Paragraph, Wrap},
Frame,
};
use tui::backend::Backend;
use tui::layout::{Constraint, Direction, Layout, Rect};
use tui::style::Modifier;
use tui::style::{Color, Style};
use tui::text::Span;
use tui::text::Spans;
use tui::widgets::{Block, Borders, List, ListItem, Paragraph, Wrap};
use tui::Frame;
use tui_tree_widget::{Tree, TreeState};

use crate::format;
@@ -45,6 +45,15 @@ where
let subscribed = format!("Subscribed Topic: {}", app.subscribe_topic);
let mut text = vec![Spans::from(host), Spans::from(subscribed)];

if let Some(err) = app.history.has_connection_err().unwrap() {
text.push(Spans::from(Span::styled(
format!("MQTT Connection Error: {}", err),
Style::default()
.fg(Color::Red)
.add_modifier(Modifier::BOLD | Modifier::SLOW_BLINK),
)))
}

if let Some(topic) = &app.selected_topic {
text.push(Spans::from(format!("Selected Topic: {}", topic)));
}
5 changes: 2 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -49,9 +49,8 @@ fn main() -> Result<(), Box<dyn Error>> {
.value_of("Topic")
.expect("Topic could not be read from command line");

client.subscribe(topic, QoS::ExactlyOnce)?;

let history = mqtt_history::MqttHistory::new(connection)?;
let history =
mqtt_history::MqttHistory::new(client.to_owned(), connection, topic.to_owned())?;

interactive::show(host, port, topic, &history)?;
client.disconnect()?;
120 changes: 88 additions & 32 deletions src/mqtt_history.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::thread;
use std::thread::JoinHandle;
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{self, sleep, JoinHandle};
use std::time::Duration;

use chrono::{DateTime, Local};
use rumqttc::{Connection, Publish};
use rumqttc::{Client, Connection, ConnectionError, Publish, QoS};

pub struct TopicMessagesLastPayload {
pub topic: String,
@@ -18,40 +18,74 @@ pub struct HistoryEntry {
pub time: DateTime<Local>,
}

type ConnectionErrorArc = Arc<RwLock<Option<ConnectionError>>>;
type HistoryArc = Arc<Mutex<HashMap<String, Vec<HistoryEntry>>>>;

pub struct MqttHistory {
connection_err: ConnectionErrorArc,
handle: JoinHandle<()>,
history: HistoryArc,
}

impl MqttHistory {
pub fn new(mut connection: Connection) -> anyhow::Result<Self> {
pub fn new(
mut client: Client,
mut connection: Connection,
subscribe_topic: String,
) -> anyhow::Result<Self> {
// Iterate until there is a ConnAck. When this fails it still fails in the main thread which is less messy. Happens for example when the host is wrong.
for notification in connection.iter() {
if let rumqttc::Event::Incoming(rumqttc::Packet::ConnAck(_)) =
notification.expect("connection error")
{
client.subscribe(&subscribe_topic, QoS::ExactlyOnce)?;
break;
}
}

let connection_err = Arc::new(RwLock::new(None));
let history = Arc::new(Mutex::new(HashMap::new()));

let handle = {
let connection_err = Arc::clone(&connection_err);
let history = Arc::clone(&history);
thread::Builder::new()
.name("mqtt connection".into())
.spawn(move || thread_logic(connection, &history))?
.spawn(move || {
thread_logic(
client,
connection,
&subscribe_topic,
&connection_err,
&history,
)
})?
};

Ok(Self { history, handle })
Ok(Self {
connection_err,
handle,
history,
})
}

pub fn join(self) -> std::thread::Result<()> {
self.handle.join()
}

pub fn has_connection_err(&self) -> anyhow::Result<Option<String>> {
match self.connection_err.read() {
Ok(bla) => {
if let Some(err) = &*bla {
Ok(Some(format!("{}", err)))
} else {
Ok(None)
}
}
Err(err) => Err(anyhow::anyhow!("mqtt history thread paniced {}", err)),
}
}

pub fn get(&self, topic: &str) -> anyhow::Result<Option<Vec<HistoryEntry>>> {
let history = self
.history
@@ -90,37 +124,59 @@ impl MqttHistory {
}
}

fn thread_logic(mut connection: Connection, history: &HistoryArc) {
fn thread_logic(
mut client: Client,
mut connection: Connection,
subscribe_topic: &str,
connection_err: &ConnectionErrorArc,
history: &HistoryArc,
) {
for notification in connection.iter() {
// While only writing to history on Incoming Publish locking the mutex here is still useful
// When something panics here, it will poison the mutex and end the main process
let mut history = history.lock().unwrap();

match notification.expect("connection error") {
rumqttc::Event::Incoming(packet) => {
if let rumqttc::Packet::Publish(publish) = packet {
if publish.dup {
continue;
}
let mut connection_err = connection_err.write().unwrap();

let time = Local::now();
let topic = &publish.topic;
match notification {
Ok(e) => {
*connection_err = None;

if !history.contains_key(topic) {
history.insert(topic.to_owned(), Vec::new());
match e {
rumqttc::Event::Incoming(rumqttc::Packet::ConnAck(ack)) => {
if !ack.session_present {
client.subscribe(subscribe_topic, QoS::ExactlyOnce).unwrap();
}
}
rumqttc::Event::Incoming(packet) => {
if let rumqttc::Packet::Publish(publish) = packet {
if publish.dup {
continue;
}

let time = Local::now();
let topic = &publish.topic;

let mut history = history.lock().unwrap();

if !history.contains_key(topic) {
history.insert(topic.to_owned(), Vec::new());
}

let vec = history.get_mut(topic).unwrap();
vec.push(HistoryEntry {
packet: publish,
time,
});
}
}
rumqttc::Event::Outgoing(packet) => {
if let rumqttc::Outgoing::Disconnect = packet {
break;
}
}

let vec = history.get_mut(topic).unwrap();
vec.push(HistoryEntry {
packet: publish,
time,
});
}
}
rumqttc::Event::Outgoing(packet) => {
if let rumqttc::Outgoing::Disconnect = packet {
break;
}
Err(err) => {
*connection_err = Some(err);
drop(connection_err);
sleep(Duration::from_millis(5000));
}
};
}

0 comments on commit 0e60660

Please sign in to comment.