Skip to content

Commit

Permalink
tedge/# topic to te/# converter
Browse files Browse the repository at this point in the history
Signed-off-by: Pradeep Kumar K J <pkj@softwareag.com>
  • Loading branch information
PradeepKiruvale committed Aug 9, 2023
1 parent ab4e078 commit ffc81cd
Show file tree
Hide file tree
Showing 8 changed files with 515 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/core/tedge_agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ clap = { version = "3.2", features = ["cargo", "derive"] }
flockfile = { path = "../../common/flockfile" }
hyper = { version = "0.14", features = ["full"] }
lazy_static = "1.4"
log = "0.4"
path-clean = "0.1"
plugin_sm = { path = "../plugin_sm" }
routerify = "3.0"
Expand Down
36 changes: 36 additions & 0 deletions crates/core/tedge_agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,21 @@ use crate::restart_manager::config::RestartManagerConfig;
use crate::software_manager::builder::SoftwareManagerBuilder;
use crate::software_manager::config::SoftwareManagerConfig;
use crate::tedge_operation_converter::builder::TedgeOperationConverterBuilder;
use crate::tedge_to_te_converter::converter::TedgetoTeConverter;
use camino::Utf8PathBuf;
use flockfile::check_another_instance_is_not_running;
use flockfile::Flockfile;
use flockfile::FlockfileError;
use std::fmt::Debug;
use tedge_actors::ConvertingActor;
use tedge_actors::ConvertingActorBuilder;
use tedge_actors::MessageSink;
use tedge_actors::MessageSource;
use tedge_actors::Runtime;
use tedge_health_ext::HealthMonitorBuilder;
use tedge_mqtt_ext::MqttActorBuilder;
use tedge_mqtt_ext::MqttConfig;
use tedge_mqtt_ext::TopicFilter;
use tedge_signal_ext::SignalActor;
use tedge_utils::file::create_directory_with_defaults;
use tracing::info;
Expand Down Expand Up @@ -163,6 +169,9 @@ impl Agent {
// Health actor
let health_actor = HealthMonitorBuilder::new(TEDGE_AGENT, &mut mqtt_actor_builder);

// Tedge to Te topic converter
let tedge_to_te_converter = create_tedge_to_te_converter(&mut mqtt_actor_builder)?;

// Spawn all
runtime.spawn(signal_actor_builder).await?;
runtime.spawn(file_transfer_server_builder).await?;
Expand All @@ -171,9 +180,36 @@ impl Agent {
runtime.spawn(software_update_builder).await?;
runtime.spawn(converter_actor_builder).await?;
runtime.spawn(health_actor).await?;
runtime.spawn(tedge_to_te_converter).await?;

runtime.run_to_completion().await?;

Ok(())
}
}

pub fn create_tedge_to_te_converter(
mqtt_actor_builder: &mut MqttActorBuilder,
) -> Result<ConvertingActorBuilder<TedgetoTeConverter, TopicFilter>, anyhow::Error> {
let tedge_to_te_converter = TedgetoTeConverter::new();
let subscriptions: TopicFilter = vec![
"tedge/measurements",
"tedge/measurements/+",
"tedge/events/+",
"tedge/events/+/+",
"tedge/alarms/+/+",
"tedge/alarms/+/+/+",
"tedge/health/+",
"tedge/health/+/+",
]
.try_into()?;

// Tedge to Te converter
let mut tedge_converter_actor =
ConvertingActor::builder("TedgetoTeConverter", tedge_to_te_converter, subscriptions);

tedge_converter_actor.add_input(mqtt_actor_builder);
tedge_converter_actor.add_sink(mqtt_actor_builder);

Ok(tedge_converter_actor)
}
1 change: 1 addition & 0 deletions crates/core/tedge_agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod restart_manager;
mod software_manager;
mod state_repository;
mod tedge_operation_converter;
mod tedge_to_te_converter;

#[derive(Debug, clap::Parser)]
#[clap(
Expand Down
128 changes: 128 additions & 0 deletions crates/core/tedge_agent/src/tedge_to_te_converter/converter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use log::error;
use serde_json::Value;
use std::collections::HashMap;
use std::convert::Infallible;
use tedge_actors::Converter;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::Topic;

pub struct TedgetoTeConverter {}

impl Converter for TedgetoTeConverter {
type Input = MqttMessage;
type Output = MqttMessage;
type Error = Infallible;

fn convert(&mut self, input: &Self::Input) -> Result<Vec<Self::Output>, Self::Error> {
let messages_or_err = self.try_convert(input.clone());
Ok(self.wrap_errors(messages_or_err))
}
}

impl TedgetoTeConverter {
pub fn new() -> Self {
TedgetoTeConverter {}
}

fn try_convert(
&mut self,
message: MqttMessage,
) -> Result<Vec<tedge_mqtt_ext::Message>, serde_json::Error> {
match message.topic.clone() {
topic if topic.name.starts_with("tedge/measurements") => {
Ok(self.convert_measurement(message))
}
topic if topic.name.starts_with("tedge/events") => Ok(self.convert_event(message)),
topic if topic.name.starts_with("tedge/alarms") => self.convert_alarm(message),
topic if topic.name.starts_with("tedge/health") => {
Ok(self.convert_health_status_message(message))
}
_ => Ok(vec![]),
}
}

// tedge/measurements -> te/device/main///m/
// tedge/measurements/child -> te/device/child///m/
fn convert_measurement(&mut self, mut message: MqttMessage) -> Vec<tedge_mqtt_ext::Message> {
let te_topic = match message.topic.name.split('/').collect::<Vec<_>>()[..] {
["tedge", "measurements"] => Topic::new_unchecked("te/device/main///m/"),
["tedge", "measurements", cid] => {
Topic::new_unchecked(format!("te/device/{cid}///m/").as_str())
}
_ => return vec![],
};

message.topic = te_topic;
vec![(message)]
}

// tedge/alarms/severity/alarm_type -> te/device/main///a/alarm_type, put severity in payload
// tedge/alarms/severity/child/alarm_type -> te/device/child///a/alarm_type, put severity in payload
fn convert_alarm(
&mut self,
mut message: MqttMessage,
) -> Result<Vec<MqttMessage>, serde_json::Error> {
let (te_topic, severity) = match message.topic.name.split('/').collect::<Vec<_>>()[..] {
["tedge", "alarms", severity, alarm_type] => (
Topic::new_unchecked(format!("te/device/main///a/{alarm_type}").as_str()),
severity,
),
["tedge", "alarms", severity, cid, alarm_type] => (
Topic::new_unchecked(format!("te/device/{cid}///a/{alarm_type}").as_str()),
severity,
),
_ => return Ok(vec![]),
};

let mut alarm: HashMap<String, Value> = serde_json::from_slice(message.payload.as_bytes())?;
alarm.insert("severity".to_string(), severity.into());
message.topic = te_topic;
message.payload = serde_json::to_string(&alarm)?.into();
Ok(vec![message])
}

// tedge/events/event_type -> te/device/main///e/event_type
// tedge/events/child/event_type -> te/device/child///e/event_type
fn convert_event(&mut self, mut message: MqttMessage) -> Vec<tedge_mqtt_ext::Message> {
let topic = match message.topic.name.split('/').collect::<Vec<_>>()[..] {
["tedge", "events", event_type] => {
Topic::new_unchecked(format!("te/device/main///e/{event_type}").as_str())
}
["tedge", "events", cid, event_type] => {
Topic::new_unchecked(format!("te/device/{cid}///e/{event_type}").as_str())
}
_ => return vec![],
};

message.topic = topic;
vec![message]
}

// tedge/health/service-name -> te/device/main/service/<service-name>/status/health
// tedge/health/child/service-name -> te/device/child/service/<service-name>/status/health
fn convert_health_status_message(&mut self, mut message: MqttMessage) -> Vec<MqttMessage> {
let topic = match message.topic.name.split('/').collect::<Vec<_>>()[..] {
["tedge", "health", service_name] => Topic::new_unchecked(
format!("te/device/main/service/{service_name}/status/health").as_str(),
),
["tedge", "health", cid, service_name] => Topic::new_unchecked(
format!("te/device/{cid}/service/{service_name}/status/health").as_str(),
),
_ => return vec![],
};
message.topic = topic;
vec![message]
}

fn wrap_errors(
&self,
messages_or_err: Result<Vec<MqttMessage>, serde_json::Error>,
) -> Vec<MqttMessage> {
messages_or_err.unwrap_or_else(|error| vec![self.new_error_message(error)])
}

fn new_error_message(&self, error: serde_json::Error) -> MqttMessage {
error!("Mapping error: {}", error);
MqttMessage::new(&Topic::new_unchecked("tedge/errors"), error.to_string())
}
}
4 changes: 4 additions & 0 deletions crates/core/tedge_agent/src/tedge_to_te_converter/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod converter;

#[cfg(test)]
mod tests;
Loading

0 comments on commit ffc81cd

Please sign in to comment.