From 8d4ecbfe31fafd687b128e9d5d67703f7f69f1c1 Mon Sep 17 00:00:00 2001 From: PradeepKiruvale Date: Thu, 2 Feb 2023 17:34:18 +0530 Subject: [PATCH] Alarms with custom fragments (#1699) Signed-off-by: Pradeep Kumar K J --- Cargo.lock | 2 + crates/core/c8y_api/Cargo.toml | 1 + crates/core/c8y_api/src/json_c8y.rs | 180 +++++++++++++++++- crates/core/c8y_api/src/smartrest/alarm.rs | 42 +++- crates/core/tedge_api/Cargo.toml | 1 + crates/core/tedge_api/src/alarm.rs | 45 ++++- .../tedge_mapper/src/c8y/alarm_converter.rs | 22 ++- crates/core/tedge_mapper/src/c8y/converter.rs | 1 - crates/core/tedge_mapper/src/c8y/tests.rs | 84 ++++++++ 9 files changed, 362 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 674061e377c..08bb602450f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -479,6 +479,7 @@ dependencies = [ "csv", "download", "futures", + "maplit", "mockall", "mockito", "mqtt_channel", @@ -3283,6 +3284,7 @@ dependencies = [ "csv", "download", "json-writer", + "maplit", "mockall", "mqtt_channel", "nanoid", diff --git a/crates/core/c8y_api/Cargo.toml b/crates/core/c8y_api/Cargo.toml index cd9e8c5f14a..0a621c3ed99 100644 --- a/crates/core/c8y_api/Cargo.toml +++ b/crates/core/c8y_api/Cargo.toml @@ -35,6 +35,7 @@ tracing = { version = "0.1", features = ["attributes", "log"] } anyhow = "1.0" assert-json-diff = "2.0" assert_matches = "1.5" +maplit = "1.0.2" mockito = "0.31" regex = "1.7" tempfile = "3.3" diff --git a/crates/core/c8y_api/src/json_c8y.rs b/crates/core/c8y_api/src/json_c8y.rs index d0f88daff80..855cef10f95 100644 --- a/crates/core/c8y_api/src/json_c8y.rs +++ b/crates/core/c8y_api/src/json_c8y.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +use tedge_api::alarm::ThinEdgeAlarm; use tedge_api::Jsonify; use tedge_api::SoftwareListResponse; use tedge_api::SoftwareModule; @@ -209,10 +210,113 @@ fn update_the_external_source_event( Ok(()) } +fn make_c8y_source_fragment(source_name: &str) -> Option { + Some(SourceInfo::new(source_name.into(), "c8y_Serial".into())) +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct SourceInfo { + #[serde(rename = "externalId")] + pub id: String, + #[serde(rename = "type")] + pub source_type: String, +} + +impl SourceInfo { + pub fn new(id: String, source_type: String) -> Self { + Self { id, source_type } + } +} +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct C8yCreateAlarm { + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "externalSource")] + pub source: Option, + + pub severity: String, + + #[serde(rename = "type")] + pub alarm_type: String, + + #[serde(with = "time::serde::rfc3339")] + pub time: OffsetDateTime, + + pub text: String, + + #[serde(flatten)] + pub fragments: HashMap, +} + +impl C8yCreateAlarm { + pub fn new( + source: Option, + severity: String, + alarm_type: String, + time: OffsetDateTime, + text: String, + fragments: HashMap, + ) -> Self { + Self { + source, + severity, + alarm_type, + time, + text, + fragments, + } + } +} + +impl TryFrom<&ThinEdgeAlarm> for C8yCreateAlarm { + type Error = SMCumulocityMapperError; + + fn try_from(alarm: &ThinEdgeAlarm) -> Result { + let severity = alarm.severity.to_string(); + let alarm_type = alarm.name.to_owned(); + let text; + let time; + let fragments; + + match &alarm.to_owned().data { + None => { + text = alarm_type.clone(); + time = OffsetDateTime::now_utc(); + fragments = HashMap::new(); + } + Some(data) => { + text = data.text.clone().unwrap_or_else(|| alarm_type.clone()); + time = data.time.unwrap_or_else(OffsetDateTime::now_utc); + fragments = data.alarm_data.clone(); + } + } + + let source = if let Some(external_source) = &alarm.source { + make_c8y_source_fragment(external_source) + } else { + None + }; + + Ok(Self { + source, + severity, + alarm_type, + time, + text, + fragments, + }) + } +} + #[cfg(test)] mod tests { use anyhow::Result; use assert_matches::assert_matches; + use serde_json::json; + use tedge_api::alarm::AlarmSeverity; + use tedge_api::alarm::ThinEdgeAlarm; + use tedge_api::alarm::ThinEdgeAlarmData; use tedge_api::event::ThinEdgeEventData; use test_case::test_case; use time::macros::datetime; @@ -245,7 +349,7 @@ mod tests { let input_json = r#"{ "id":"1", "status":"successful", - "currentSoftwareList":[ + "currentSoftwareList":[ {"type":"debian", "modules":[ {"name":"a"}, {"name":"b","version":"1.0"}, @@ -471,4 +575,78 @@ mod tests { Ok(()) } + + #[test_case( + ThinEdgeAlarm { + name: "temperature alarm".into(), + severity: AlarmSeverity::Critical, + data: Some(ThinEdgeAlarmData { + text: Some("Temperature went high".into()), + time: Some(datetime!(2021-04-23 19:00:00 +05:00)), + alarm_data: HashMap::new(), + }), + source: None, + }, + C8yCreateAlarm { + severity: "CRITICAL".to_string(), + source: None, + alarm_type: "temperature alarm".into(), + time: datetime!(2021-04-23 19:00:00 +05:00), + text: "Temperature went high".into(), + fragments: HashMap::new(), + } + ;"critical alarm translation" + )] + #[test_case( + ThinEdgeAlarm { + name: "temperature alarm".into(), + severity: AlarmSeverity::Critical, + data: Some(ThinEdgeAlarmData { + text: Some("Temperature went high".into()), + time: Some(datetime!(2021-04-23 19:00:00 +05:00)), + alarm_data: maplit::hashmap!{"SomeCustomFragment".to_string() => json!({"nested": {"value":"extra info"}})}, + }), + source: None, + }, + C8yCreateAlarm { + severity: "CRITICAL".to_string(), + source: None, + alarm_type: "temperature alarm".into(), + time: datetime!(2021-04-23 19:00:00 +05:00), + text: "Temperature went high".into(), + fragments: maplit::hashmap!{"SomeCustomFragment".to_string() => json!({"nested": {"value":"extra info"}})}, + } + ;"critical alarm translation with custom fragment" + )] + #[test_case( + ThinEdgeAlarm { + name: "temperature alarm".into(), + severity: AlarmSeverity::Critical, + data: Some(ThinEdgeAlarmData { + text: Some("Temperature went high".into()), + time: Some(datetime!(2021-04-23 19:00:00 +05:00)), + alarm_data: maplit::hashmap!{"SomeCustomFragment".to_string() => json!({"nested": {"value":"extra info"}})}, + }), + source: Some("external_source".into()), + }, + C8yCreateAlarm { + severity: "CRITICAL".to_string(), + source: Some(SourceInfo::new("external_source".to_string(),"c8y_Serial".to_string())), + alarm_type: "temperature alarm".into(), + time: datetime!(2021-04-23 19:00:00 +05:00), + text: "Temperature went high".into(), + fragments: maplit::hashmap!{"SomeCustomFragment".to_string() => json!({"nested": {"value":"extra info"}})}, + } + ;"critical alarm translation of child device with custom fragment" + )] + fn check_alarm_translation( + tedge_alarm: ThinEdgeAlarm, + expected_c8y_alarm: C8yCreateAlarm, + ) -> Result<()> { + let actual_c8y_alarm = C8yCreateAlarm::try_from(&tedge_alarm)?; + + assert_eq!(actual_c8y_alarm, expected_c8y_alarm); + + Ok(()) + } } diff --git a/crates/core/c8y_api/src/smartrest/alarm.rs b/crates/core/c8y_api/src/smartrest/alarm.rs index c369d8d179e..313db5d66ae 100644 --- a/crates/core/c8y_api/src/smartrest/alarm.rs +++ b/crates/core/c8y_api/src/smartrest/alarm.rs @@ -1,10 +1,9 @@ +use crate::smartrest::error::SmartRestSerializerError; use tedge_api::alarm::AlarmSeverity; use tedge_api::alarm::ThinEdgeAlarm; use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; -use crate::smartrest::error::SmartRestSerializerError; - /// Converts from thin-edge alarm to C8Y alarm SmartREST message pub fn serialize_alarm(alarm: ThinEdgeAlarm) -> Result { match alarm.data { @@ -16,9 +15,7 @@ pub fn serialize_alarm(alarm: ThinEdgeAlarm) -> Result 303, AlarmSeverity::Warning => 304, }; - let current_timestamp = OffsetDateTime::now_utc(); - let smartrest_message = format!( "{},{},\"{}\",{}", smartrest_code, @@ -29,7 +26,6 @@ pub fn serialize_alarm(alarm: ThinEdgeAlarm) -> Result Result, + pub source: Option, } -#[derive(Debug, Deserialize, Eq, PartialEq)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)] pub enum AlarmSeverity { Critical, Major, @@ -21,13 +27,16 @@ pub enum AlarmSeverity { } /// In-memory representation of ThinEdge JSON alarm payload -#[derive(Debug, Deserialize, Eq, PartialEq)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)] pub struct ThinEdgeAlarmData { pub text: Option, #[serde(default)] #[serde(with = "time::serde::rfc3339::option")] pub time: Option, + + #[serde(flatten)] + pub alarm_data: HashMap, } #[derive(thiserror::Error, Debug)] @@ -71,6 +80,17 @@ impl TryFrom<&str> for AlarmSeverity { } } +impl fmt::Display for AlarmSeverity { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + AlarmSeverity::Critical => write!(f, "CRITICAL"), + AlarmSeverity::Major => write!(f, "MAJOR"), + AlarmSeverity::Minor => write!(f, "MINOR"), + AlarmSeverity::Warning => write!(f, "WARNING"), + } + } +} + impl ThinEdgeAlarm { pub fn try_from( mqtt_topic: &str, @@ -106,10 +126,18 @@ impl ThinEdgeAlarm { Some(serde_json::from_str(mqtt_payload)?) }; + // The 4th part of the topic name is the alarm source - if any + let external_source = if topic_split.len() == 5 { + Some(topic_split[4].to_string()) + } else { + None + }; + Ok(Self { name: alarm_name.into(), severity: alarm_severity.try_into()?, data: alarm_data, + source: external_source, }) } else { Err(ThinEdgeJsonDeserializerError::UnsupportedTopic( @@ -123,6 +151,7 @@ impl ThinEdgeAlarm { mod tests { use super::*; use assert_matches::assert_matches; + use maplit::hashmap; use serde_json::json; use serde_json::Value; use test_case::test_case; @@ -140,7 +169,9 @@ mod tests { data: Some(ThinEdgeAlarmData { text: Some("I raised it".into()), time: Some(datetime!(2021-04-23 19:00:00 +05:00)), + alarm_data: hashmap!{}, }), + source: None, }; "critical alarm parsing" )] @@ -155,7 +186,9 @@ mod tests { data: Some(ThinEdgeAlarmData { text: Some("I raised it".into()), time: None, + alarm_data: hashmap!{}, }), + source: None, }; "major alarm parsing without timestamp" )] @@ -170,7 +203,9 @@ mod tests { data: Some(ThinEdgeAlarmData { text: None, time: Some(datetime!(2021-04-23 19:00:00 +05:00)), + alarm_data: hashmap!{}, }), + source: None, }; "minor alarm parsing without text" )] @@ -183,7 +218,9 @@ mod tests { data: Some(ThinEdgeAlarmData { text: None, time: None, + alarm_data: hashmap!{}, }), + source: None, }; "warning alarm parsing without text or timestamp" )] @@ -199,7 +236,9 @@ mod tests { data: Some(ThinEdgeAlarmData { text: Some("I raised it".into()), time: Some(datetime!(2021-04-23 19:00:00 +05:00)), + alarm_data: hashmap!{}, }), + source: Some("extern_sensor".to_string()), }; "critical alarm parsing with childId" )] diff --git a/crates/core/tedge_mapper/src/c8y/alarm_converter.rs b/crates/core/tedge_mapper/src/c8y/alarm_converter.rs index 3dfec69a133..888bc41c8ce 100644 --- a/crates/core/tedge_mapper/src/c8y/alarm_converter.rs +++ b/crates/core/tedge_mapper/src/c8y/alarm_converter.rs @@ -1,3 +1,4 @@ +use c8y_api::json_c8y::C8yCreateAlarm; use c8y_api::smartrest::topic::SMARTREST_PUBLISH_TOPIC; use std::collections::hash_map::Entry; use std::collections::HashMap; @@ -12,6 +13,7 @@ use crate::core::error::ConversionError; const TEDGE_ALARMS_TOPIC: &str = "tedge/alarms/"; const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/"; +const C8Y_JSON_MQTT_ALARMS_TOPIC: &str = "c8y/alarm/alarms/create"; #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) enum AlarmConverter { @@ -66,12 +68,20 @@ impl AlarmConverter { payload: mqtt_payload.chars().take(50).collect(), } })?; - let smartrest_alarm = alarm::serialize_alarm(tedge_alarm)?; - let c8y_alarm_topic = Topic::new_unchecked( - self.get_c8y_alarm_topic(input_message.topic.name.as_str())? - .as_str(), - ); - output_messages.push(Message::new(&c8y_alarm_topic, smartrest_alarm)); + let c8y_alarm = C8yCreateAlarm::try_from(&tedge_alarm)?; + + // If the message doesn't contain any fields other than `text` and `time`, convert to SmartREST + if c8y_alarm.fragments.is_empty() { + let smartrest_alarm = alarm::serialize_alarm(tedge_alarm)?; + let c8y_alarm_topic = Topic::new_unchecked( + &self.get_c8y_alarm_topic(input_message.topic.name.as_str())?, + ); + output_messages.push(Message::new(&c8y_alarm_topic, smartrest_alarm)); + } else { + let cumulocity_alarm_json = serde_json::to_string(&c8y_alarm)?; + let c8y_alarm_topic = Topic::new_unchecked(C8Y_JSON_MQTT_ALARMS_TOPIC); + output_messages.push(Message::new(&c8y_alarm_topic, cumulocity_alarm_json)); + } // Persist a copy of the alarm to an internal topic for reconciliation on next restart let alarm_id = input_message diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs index 39a7651022d..797aca3610c 100644 --- a/crates/core/tedge_mapper/src/c8y/converter.rs +++ b/crates/core/tedge_mapper/src/c8y/converter.rs @@ -76,7 +76,6 @@ const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/"; const TEDGE_EVENTS_TOPIC: &str = "tedge/events/"; const C8Y_JSON_MQTT_EVENTS_TOPIC: &str = "c8y/event/events/create"; const TEDGE_AGENT_LOG_DIR: &str = "tedge/agent"; - const CREATE_EVENT_SMARTREST_CODE: u16 = 400; #[derive(Debug)] diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs index e97a3afdd5a..db1359058d2 100644 --- a/crates/core/tedge_mapper/src/c8y/tests.rs +++ b/crates/core/tedge_mapper/src/c8y/tests.rs @@ -439,6 +439,90 @@ async fn c8y_mapper_child_alarm_mapping_to_smartrest() { sm_mapper.abort(); } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[serial] +async fn c8y_mapper_alarm_with_custom_fragment_mapping_to_c8y_json() { + let broker = mqtt_tests::test_mqtt_broker(); + + let mut messages = broker + .messages_published_on("c8y/alarm/alarms/create") + .await; + let cfg_dir = TempTedgeDir::new(); + // Start the C8Y Mapper + let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port, &cfg_dir).await.unwrap(); + + broker + .publish_with_opts( + "tedge/alarms/major/temperature_alarm", + r#"{ "text": "Temperature high","time":"2023-01-25T18:41:14.776170774Z","customFragment": {"nested":{"value": "extra info"}} }"#, + mqtt_channel::QoS::AtLeastOnce, + true, + ) + .await + .unwrap(); + + let expected_msg = json!({"severity":"MAJOR","type":"temperature_alarm","time":"2023-01-25T18:41:14.776170774Z","text":"Temperature high","customFragment":{"nested":{"value":"extra info"}}}); + + while let Ok(Some(msg)) = messages.next().with_timeout(TEST_TIMEOUT_MS).await { + assert_json_include!(actual:serde_json::from_str::(&msg).unwrap(), expected:expected_msg); + } + + //Clear the previously published alarm + broker + .publish_with_opts( + "tedge/alarms/major/temperature_alarm", + "", + mqtt_channel::QoS::AtLeastOnce, + true, + ) + .await + .unwrap(); + + sm_mapper.abort(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[serial] +async fn c8y_mapper_child_alarm_with_custom_fragment_mapping_to_c8y_json() { + let broker = mqtt_tests::test_mqtt_broker(); + + let mut messages = broker + .messages_published_on("c8y/alarm/alarms/create") + .await; + let cfg_dir = TempTedgeDir::new(); + // Start the C8Y Mapper + let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port, &cfg_dir).await.unwrap(); + + broker + .publish_with_opts( + "tedge/alarms/major/temperature_alarm/external_sensor", + r#"{ "text":"Temperature high","time":"2023-01-25T18:41:14.776170774Z","customFragment":{"nested":{"value":"extra info"}}}"#, + mqtt_channel::QoS::AtLeastOnce, + true, + ) + .await + .unwrap(); + + let expected_msg = json!({"severity":"MAJOR","type":"temperature_alarm","time":"2023-01-25T18:41:14.776170774Z","text":"Temperature high","externalSource":{"externalId":"external_sensor","type":"c8y_Serial"},"customFragment":{"nested":{"value":"extra info"}}}); + + while let Ok(Some(msg)) = messages.next().with_timeout(TEST_TIMEOUT_MS).await { + assert_json_include!(actual:serde_json::from_str::(&msg).unwrap(), expected:expected_msg); + } + + //Clear the previously published alarm + broker + .publish_with_opts( + "tedge/alarms/major/temperature_alarm", + "", + mqtt_channel::QoS::AtLeastOnce, + true, + ) + .await + .unwrap(); + + sm_mapper.abort(); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[serial] async fn c8y_mapper_syncs_pending_alarms_on_startup() {