Skip to content

Commit

Permalink
perf(mqtt): move instead of clone incoming data
Browse files Browse the repository at this point in the history
  • Loading branch information
EdJoPaTo committed Dec 6, 2023
1 parent 4701d5b commit db71cb7
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 33 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Display broker text easier to read
- Display version & broker in the lower right corner
- Display MQTT connection error in its own area
- Performance: Less clones on interactive draw
- Build: always build with TLS support
- Performance: Less clones on interactive draw
- Performance: Dont keep Timezone information of each message
- Performance: Dont clone each incoming MQTT payload

### Fixed

Expand Down
2 changes: 1 addition & 1 deletion src/clean_retained.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub fn clean_retained(mut client: Client, mut connection: Connection, dry_run: b
{
let qos = format::qos(publish.qos);
let size = publish.payload.len();
let payload = format::payload(&Payload::new(&publish.payload), size);
let payload = format::payload(&Payload::new(publish.payload), size);
println!("QoS:{qos:11} {topic:50} {payload}");
}
amount += 1;
Expand Down
24 changes: 12 additions & 12 deletions src/interactive/mqtt_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ impl MqttHistory {
}
}

fn entry(&mut self, topic: &str) -> NodeId {
if let Some(id) = self.ids.get(topic) {
fn entry(&mut self, topic: String) -> NodeId {
if let Some(id) = self.ids.get(&topic) {
*id
} else {
let mut parent = self.tree.root().id();
Expand All @@ -66,12 +66,12 @@ impl MqttHistory {
parent = nodemut.append(Topic::new(part.into())).id();
}
}
self.ids.insert(topic.to_owned(), parent);
self.ids.insert(topic, parent);
parent
}
}

pub fn add(&mut self, topic: &str, history_entry: HistoryEntry) {
pub fn add(&mut self, topic: String, history_entry: HistoryEntry) {
let id = self.entry(topic);
self.tree
.get_mut(id)
Expand Down Expand Up @@ -193,20 +193,20 @@ impl MqttHistory {
pub fn example() -> Self {
let mut history = Self::new();
history.add(
"test",
HistoryEntry::new_now(false, rumqttc::QoS::AtLeastOnce, &"A".into()),
"test".to_owned(),
HistoryEntry::new_now(false, rumqttc::QoS::AtLeastOnce, "A".into()),
);
history.add(
"foo/test",
HistoryEntry::new_now(false, rumqttc::QoS::AtLeastOnce, &"B".into()),
"foo/test".to_owned(),
HistoryEntry::new_now(false, rumqttc::QoS::AtLeastOnce, "B".into()),
);
history.add(
"test",
HistoryEntry::new_now(false, rumqttc::QoS::AtLeastOnce, &"C".into()),
"test".to_owned(),
HistoryEntry::new_now(false, rumqttc::QoS::AtLeastOnce, "C".into()),
);
history.add(
"foo/bar",
HistoryEntry::new_now(false, rumqttc::QoS::AtLeastOnce, &"D".into()),
"foo/bar".to_owned(),
HistoryEntry::new_now(false, rumqttc::QoS::AtLeastOnce, "D".into()),
);
history
}
Expand Down
4 changes: 2 additions & 2 deletions src/interactive/mqtt_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ fn thread_logic(
continue;
}
history.write().unwrap().add(
&publish.topic,
HistoryEntry::new_now(publish.retain, publish.qos, &publish.payload),
publish.topic,
HistoryEntry::new_now(publish.retain, publish.qos, publish.payload),
);
}
rumqttc::Event::Outgoing(rumqttc::Outgoing::Disconnect) => {
Expand Down
15 changes: 7 additions & 8 deletions src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,13 @@ pub fn show(mut connection: Connection, verbose: bool) {
Time::Retained
} else {
Time::Local(Local::now().naive_local())
};
println!(
"{:12} QoS:{:11} {:50} {}",
time.to_string(),
format::qos(publish.qos),
publish.topic,
format::payload(&Payload::new(&publish.payload), publish.payload.len())
);
}
.to_string();
let qos = format::qos(publish.qos);
let topic = publish.topic;
let size = publish.payload.len();
let payload = format::payload(&Payload::new(publish.payload), size);
println!("{time:12} QoS:{qos:11} {topic:50} {payload}");
}
Ok(rumqttc::Event::Incoming(packet)) => {
if verbose {
Expand Down
8 changes: 4 additions & 4 deletions src/mqtt/history_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ pub enum Payload {
}

impl Payload {
pub fn new(payload: &bytes::Bytes) -> Self {
match String::from_utf8(payload.to_vec()) {
pub fn new(payload: bytes::Bytes) -> Self {
match String::from_utf8(payload.into()) {
Ok(str) => {
serde_json::from_str(&str).map_or_else(|_| Self::String(str.into()), Self::Json)
}
Expand All @@ -63,7 +63,7 @@ pub struct HistoryEntry {
}

impl HistoryEntry {
pub fn new_now(retain: bool, qos: QoS, payload: &bytes::Bytes) -> Self {
pub fn new_now(retain: bool, qos: QoS, payload: bytes::Bytes) -> Self {
let time = if retain {
Time::Retained
} else {
Expand Down Expand Up @@ -102,7 +102,7 @@ fn time_retained_to_string() {

#[cfg(test)]
fn json_macro(json_str: &'static str) -> Option<String> {
let payload = Payload::new(&json_str.into());
let payload = Payload::new(json_str.into());
payload
.as_optional_json()
.map(std::string::ToString::to_string)
Expand Down
8 changes: 3 additions & 5 deletions src/read_one.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,10 @@ pub fn show(mut client: Client, mut connection: Connection, ignore_retained: boo
continue;
}
eprintln!("{}", publish.topic);
done = match Payload::new(&publish.payload) {
let size = publish.payload.len();
done = match Payload::new(publish.payload) {
Payload::NotUtf8(err) => {
eprintln!(
"Payload ({}) is not valid UTF-8: {err}",
publish.payload.len()
);
eprintln!("Payload ({size}) is not valid UTF-8: {err}");
Finished::NonUtf8
}
Payload::String(str) => {
Expand Down

0 comments on commit db71cb7

Please sign in to comment.