diff --git a/Cargo.lock b/Cargo.lock index ca9e42e0c5f..0fb77029f5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -735,6 +735,7 @@ dependencies = [ "clock", "json-writer", "logged_command", + "nanoid", "plugin_sm", "proptest", "rand", @@ -3823,9 +3824,9 @@ dependencies = [ "http", "log", "log_manager", - "serde", "serde_json", "tedge_actors", + "tedge_api", "tedge_config", "tedge_file_system_ext", "tedge_http_ext", @@ -3833,7 +3834,6 @@ dependencies = [ "tedge_test_utils", "tedge_utils", "thiserror", - "time", "tokio", ] diff --git a/crates/common/mqtt_channel/src/topics.rs b/crates/common/mqtt_channel/src/topics.rs index 11ae1cd030d..511afee352a 100644 --- a/crates/common/mqtt_channel/src/topics.rs +++ b/crates/common/mqtt_channel/src/topics.rs @@ -157,6 +157,16 @@ impl TryInto for &str { } } +impl FromIterator for TopicFilter { + fn from_iter>(filters: T) -> Self { + let mut combined_filters = TopicFilter::empty(); + for filter in filters.into_iter() { + combined_filters.add_all(filter) + } + combined_filters + } +} + impl TryInto for Vec<&str> { type Error = MqttError; diff --git a/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs b/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs index 10a48fdff7b..223c3a44c75 100644 --- a/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs +++ b/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs @@ -385,6 +385,13 @@ define_tedge_config! { #[tedge_config(default(value = "te/+/+/+/+/m/+,tedge/alarms/+/+,tedge/alarms/+/+/+,tedge/events/+,tedge/events/+/+,tedge/health/+,tedge/health/+/+"))] topics: TemplatesSet, + enable: { + /// Enable log management + // TODO turn the default to true, when c8y-log-plugin will be deprecated + #[tedge_config(example = "true", default(value = false))] + log_management: bool, + } + }, #[tedge_config(deprecated_name = "azure")] // for 0.1.0 compatibility diff --git a/crates/core/c8y_api/src/smartrest/topic.rs b/crates/core/c8y_api/src/smartrest/topic.rs index 063cc906f4c..77c947510ae 100644 --- a/crates/core/c8y_api/src/smartrest/topic.rs +++ b/crates/core/c8y_api/src/smartrest/topic.rs @@ -1,6 +1,8 @@ use mqtt_channel::MqttError; use mqtt_channel::Topic; use mqtt_channel::TopicFilter; +use tedge_api::entity_store::EntityMetadata; +use tedge_api::entity_store::EntityType; use tedge_api::topic::ResponseTopic; use tedge_api::TopicError; @@ -88,6 +90,17 @@ impl From for TopicFilter { } } +// FIXME this From conversion is error prone as this can only be used for responses. +impl From<&EntityMetadata> for C8yTopic { + fn from(value: &EntityMetadata) -> Self { + match value.r#type { + EntityType::MainDevice => Self::SmartRestResponse, + EntityType::ChildDevice => Self::ChildSmartRestResponse(value.entity_id.clone()), + EntityType::Service => Self::SmartRestResponse, // TODO how services are handled by c8y? + } + } +} + #[derive(Debug, Clone, Eq, PartialEq)] pub enum MapperSubscribeTopic { C8yTopic(C8yTopic), diff --git a/crates/core/tedge_api/src/entity_store.rs b/crates/core/tedge_api/src/entity_store.rs index 5f1020f91f9..a05c2480f82 100644 --- a/crates/core/tedge_api/src/entity_store.rs +++ b/crates/core/tedge_api/src/entity_store.rs @@ -11,6 +11,7 @@ use std::collections::HashMap; use crate::entity_store; use crate::mqtt_topics::EntityTopicId; +use crate::mqtt_topics::TopicIdError; use mqtt_channel::Message; use mqtt_channel::Topic; @@ -70,6 +71,7 @@ impl EntityStore { let entity_id = main_device.entity_id?; let metadata = EntityMetadata { + topic_id: main_device.topic_id.clone(), entity_id: entity_id.clone(), r#type: main_device.r#type, parent: None, @@ -170,6 +172,7 @@ impl EntityStore { .entity_id .unwrap_or_else(|| self.derive_entity_id(&message.topic_id)); let entity_metadata = EntityMetadata { + topic_id: message.topic_id.clone(), r#type: message.r#type, entity_id: entity_id.clone(), parent, @@ -179,7 +182,7 @@ impl EntityStore { // device is affected if it was previously registered and was updated let previous = self .entities - .insert(message.topic_id.clone(), entity_metadata); + .insert(entity_metadata.topic_id.clone(), entity_metadata); if previous.is_some() { affected_entities.push(message.topic_id); @@ -283,6 +286,7 @@ impl EntityStore { #[derive(Debug, Clone, PartialEq, Eq)] pub struct EntityMetadata { + pub topic_id: EntityTopicId, pub parent: Option, pub r#type: EntityType, pub entity_id: String, @@ -297,9 +301,10 @@ pub enum EntityType { } impl EntityMetadata { - /// Creates a entity metadata for a child device. + /// Creates a entity metadata for the main device. pub fn main_device(device_id: String) -> Self { Self { + topic_id: EntityTopicId::default_main_device(), entity_id: device_id, r#type: EntityType::MainDevice, parent: None, @@ -308,13 +313,14 @@ impl EntityMetadata { } /// Creates a entity metadata for a child device. - pub fn child_device(child_device_id: String) -> Self { - Self { + pub fn child_device(child_device_id: String) -> Result { + Ok(Self { + topic_id: EntityTopicId::default_child_device(&child_device_id)?, entity_id: child_device_id, r#type: EntityType::ChildDevice, parent: Some(EntityTopicId::default_main_device()), other: serde_json::json!({}), - } + }) } } diff --git a/crates/core/tedge_api/src/lib.rs b/crates/core/tedge_api/src/lib.rs index 15409eefbc2..95aabf4692e 100644 --- a/crates/core/tedge_api/src/lib.rs +++ b/crates/core/tedge_api/src/lib.rs @@ -1,19 +1,18 @@ -pub mod error; -pub mod messages; -mod software; -pub mod topic; - pub mod alarm; pub mod builder; pub mod data; pub mod entity_store; +pub mod error; pub mod event; pub mod group; pub mod health; pub mod measurement; +pub mod messages; pub mod mqtt_topics; pub mod parser; pub mod serialize; +mod software; +pub mod topic; pub mod utils; pub use download::*; diff --git a/crates/core/tedge_api/src/messages.rs b/crates/core/tedge_api/src/messages.rs index a4855c81c01..4f36925620d 100644 --- a/crates/core/tedge_api/src/messages.rs +++ b/crates/core/tedge_api/src/messages.rs @@ -5,6 +5,7 @@ use mqtt_channel::Topic; use nanoid::nanoid; use serde::Deserialize; use serde::Serialize; +use time::OffsetDateTime; const SOFTWARE_LIST_REQUEST_TOPIC: &str = "tedge/commands/req/software/list"; const SOFTWARE_LIST_RESPONSE_TOPIC: &str = "tedge/commands/res/software/list"; @@ -534,6 +535,60 @@ impl RestartOperationResponse { } } +#[derive(Debug, Deserialize, Serialize, PartialEq, Copy, Eq, Clone)] +#[serde(rename_all = "camelCase")] +pub enum CommandStatus { + Init, + Executing, + Successful, + Failed, +} + +#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct LogMetadata { + pub types: Vec, +} + +impl<'a> Jsonify<'a> for LogMetadata {} + +#[derive(Debug, Deserialize, Serialize, Eq, PartialEq, Clone)] +#[serde(rename_all = "camelCase")] +pub struct LogUploadCmdPayload { + pub status: CommandStatus, //Define a different enum if this op needs more states, + pub tedge_url: String, + #[serde(rename = "type")] + pub log_type: String, + #[serde(with = "time::serde::rfc3339")] + pub date_from: OffsetDateTime, + #[serde(with = "time::serde::rfc3339")] + pub date_to: OffsetDateTime, + #[serde(skip_serializing_if = "Option::is_none")] + pub search_text: Option, + pub lines: usize, + #[serde(skip_serializing_if = "Option::is_none")] + pub reason: Option, +} + +impl<'a> Jsonify<'a> for LogUploadCmdPayload {} + +impl LogUploadCmdPayload { + pub fn executing(&mut self) { + self.status = CommandStatus::Executing; + self.reason = None; + } + + pub fn successful(&mut self) { + self.status = CommandStatus::Successful; + self.reason = None; + } + + pub fn failed(&mut self, reason: impl Into) { + self.status = CommandStatus::Failed; + self.reason = Some(reason.into()); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/core/tedge_api/src/mqtt_topics.rs b/crates/core/tedge_api/src/mqtt_topics.rs index abb108b3a61..81261a3feda 100644 --- a/crates/core/tedge_api/src/mqtt_topics.rs +++ b/crates/core/tedge_api/src/mqtt_topics.rs @@ -2,6 +2,8 @@ //! //! See https://thin-edge.github.io/thin-edge.io/next/references/mqtt-api/ +use mqtt_channel::TopicFilter; +use std::convert::Infallible; use std::fmt::Display; use std::fmt::Formatter; use std::str::FromStr; @@ -53,28 +55,107 @@ impl Default for MqttSchema { impl MqttSchema { /// Build a new schema using the default root prefix, i.e. `te` + /// + /// ``` + /// let te = tedge_api::mqtt_topics::MqttSchema::default(); + /// assert_eq!(&te.root, "te"); + /// ``` pub fn new() -> Self { MqttSchema::with_root("te".to_string()) } /// Build a new schema using the given root prefix for all topics. + /// ``` + /// let te = tedge_api::mqtt_topics::MqttSchema::with_root("thin-edge".to_string()); + /// assert_eq!(&te.root, "thin-edge"); + /// ``` pub fn with_root(root: String) -> Self { MqttSchema { root } } /// Get the topic addressing a given entity channel + /// ``` + /// # use tedge_api::mqtt_topics::{MqttSchema, Channel, EntityTopicId}; + /// # use mqtt_channel::Topic; + /// + /// let te = MqttSchema::default(); + /// let child_device: EntityTopicId = "device/child001//".parse().unwrap(); + /// let channel = Channel::AlarmMetadata { + /// alarm_type: "sensors".to_string(), + /// }; + /// + /// let topic = te.topic_for(&child_device, &channel); + /// assert_eq!( + /// topic.name, + /// "te/device/child001///a/sensors/meta" + /// ); + /// ``` pub fn topic_for(&self, entity: &EntityTopicId, channel: &Channel) -> mqtt_channel::Topic { let topic = format!("{}/{}/{}", self.root, entity, channel); mqtt_channel::Topic::new(&topic).unwrap() } /// Get the entity channel addressed by some topic + /// + /// ``` + /// # use tedge_api::mqtt_topics::{MqttSchema, Channel, EntityTopicId}; + /// # use mqtt_channel::Topic; + /// + /// let te = MqttSchema::default(); + /// let topic = Topic::new_unchecked("te/device/child001/service/service001/m/measurement_type"); + /// + /// let (entity_identifier, channel) = te.entity_channel_of(&topic).unwrap(); + /// assert_eq!(entity_identifier , "device/child001/service/service001"); + /// assert_eq!(channel, Channel::Measurement { + /// measurement_type: "measurement_type".to_string(), + /// }) + /// ``` pub fn entity_channel_of( &self, topic: &mqtt_channel::Topic, ) -> Result<(EntityTopicId, Channel), EntityTopicError> { self.parse(&topic.name) } + + /// Get the topic filter to subscribe to messages from specific entities and channels + /// + /// ``` + /// use mqtt_channel::Topic; + /// use tedge_api::mqtt_topics::{ChannelFilter, EntityFilter, MqttSchema}; + /// + /// let te = MqttSchema::default(); + /// let topics = te.topics(EntityFilter::AnyEntity, ChannelFilter::Measurement); + /// + /// assert!(topics.accept_topic(&Topic::new_unchecked("te/device/main///m/"))); + /// assert!(topics.accept_topic(&Topic::new_unchecked("te/device/child///m/m_type"))); + /// assert!(topics.accept_topic(&Topic::new_unchecked("te/device/child/service/collected/m/collectd"))); + /// + /// assert!(! topics.accept_topic(&Topic::new_unchecked("not-te/device/main///m/"))); + /// assert!(! topics.accept_topic(&Topic::new_unchecked("te/device/main///not-m/"))); + /// assert!(! topics.accept_topic(&Topic::new_unchecked("te/device/main///m/t/not-meta"))); + /// assert!(! topics.accept_topic(&Topic::new_unchecked("te/device/main///m/t/meta/too-long"))); + /// assert!(! topics.accept_topic(&Topic::new_unchecked("te/device/main/too/short"))); + /// assert!(! topics.accept_topic(&Topic::new_unchecked("te/device/main/missing/sep/m"))); + /// ``` + pub fn topics(&self, entity: EntityFilter, channel: ChannelFilter) -> TopicFilter { + let entity = match entity { + EntityFilter::AnyEntity => "+/+/+/+".to_string(), + EntityFilter::Entity(entity) => entity.to_string(), + }; + let channel = match channel { + ChannelFilter::EntityMetadata => "".to_string(), + ChannelFilter::Measurement => "/m/+".to_string(), + ChannelFilter::MeasurementMetadata => "/m/+/meta".to_string(), + ChannelFilter::Event => "/e/+".to_string(), + ChannelFilter::EventMetadata => "/e/+/meta".to_string(), + ChannelFilter::Alarm => "/a/+".to_string(), + ChannelFilter::AlarmMetadata => "/a/+/meta".to_string(), + ChannelFilter::Command(operation) => format!("/cmd/{operation}/+"), + ChannelFilter::CommandMetadata(operation) => format!("/cmd/{operation}"), + }; + + TopicFilter::new_unchecked(&format!("{}/{entity}{channel}", self.root)) + } } impl MqttSchema { @@ -241,14 +322,31 @@ pub enum TopicIdError { #[derive(Debug, Clone, PartialEq, Eq)] pub enum Channel { EntityMetadata, - Measurement { measurement_type: String }, - Event { event_type: String }, - Alarm { alarm_type: String }, - Command { operation: String, cmd_id: String }, - MeasurementMetadata { measurement_type: String }, - EventMetadata { event_type: String }, - AlarmMetadata { alarm_type: String }, - CommandMetadata { operation: String }, + Measurement { + measurement_type: String, + }, + Event { + event_type: String, + }, + Alarm { + alarm_type: String, + }, + Command { + operation: OperationType, + cmd_id: String, + }, + MeasurementMetadata { + measurement_type: String, + }, + EventMetadata { + event_type: String, + }, + AlarmMetadata { + alarm_type: String, + }, + CommandMetadata { + operation: OperationType, + }, } impl FromStr for Channel { @@ -280,10 +378,10 @@ impl FromStr for Channel { }), ["cmd", operation] => Ok(Channel::CommandMetadata { - operation: operation.to_string(), + operation: operation.parse().unwrap(), // Infallible }), ["cmd", operation, cmd_id] => Ok(Channel::Command { - operation: operation.to_string(), + operation: operation.parse().unwrap(), // Infallible cmd_id: cmd_id.to_string(), }), @@ -320,6 +418,41 @@ impl Channel { } } +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum OperationType { + Restart, + SoftwareList, + SoftwareUpdate, + LogUpload, + Custom(String), +} + +impl FromStr for OperationType { + type Err = Infallible; + + fn from_str(s: &str) -> Result { + match s { + "restart" => Ok(OperationType::Restart), + "software_list" => Ok(OperationType::SoftwareList), + "software_update" => Ok(OperationType::SoftwareUpdate), + "log_upload" => Ok(OperationType::LogUpload), + operation => Ok(OperationType::Custom(operation.to_string())), + } + } +} + +impl Display for OperationType { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + OperationType::Restart => write!(f, "restart"), + OperationType::SoftwareList => write!(f, "software_list"), + OperationType::SoftwareUpdate => write!(f, "software_update"), + OperationType::LogUpload => write!(f, "log_upload"), + OperationType::Custom(operation) => write!(f, "{operation}"), + } + } +} + #[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)] pub enum ChannelError { #[error("Channel needs to have at least 2 segments")] @@ -329,6 +462,23 @@ pub enum ChannelError { InvalidCategory(String), } +pub enum EntityFilter<'a> { + AnyEntity, + Entity(&'a EntityTopicId), +} + +pub enum ChannelFilter { + EntityMetadata, + Measurement, + Event, + Alarm, + Command(OperationType), + MeasurementMetadata, + EventMetadata, + AlarmMetadata, + CommandMetadata(OperationType), +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/core/tedge_api/src/topic.rs b/crates/core/tedge_api/src/topic.rs index 6c0e62eaefe..f0cf3a02a86 100644 --- a/crates/core/tedge_api/src/topic.rs +++ b/crates/core/tedge_api/src/topic.rs @@ -1,5 +1,6 @@ use crate::error::TopicError; use std::convert::TryFrom; + #[derive(Debug, Clone, Eq, PartialEq)] pub enum ResponseTopic { SoftwareListResponse, diff --git a/crates/extensions/c8y_mapper_ext/Cargo.toml b/crates/extensions/c8y_mapper_ext/Cargo.toml index 56489b473c2..ba2e6195a6e 100644 --- a/crates/extensions/c8y_mapper_ext/Cargo.toml +++ b/crates/extensions/c8y_mapper_ext/Cargo.toml @@ -17,6 +17,7 @@ camino = { workspace = true } clock = { workspace = true } json-writer = { workspace = true } logged_command = { workspace = true } +nanoid = { workspace = true } plugin_sm = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/crates/extensions/c8y_mapper_ext/src/config.rs b/crates/extensions/c8y_mapper_ext/src/config.rs index f38e29fa131..78c85583db4 100644 --- a/crates/extensions/c8y_mapper_ext/src/config.rs +++ b/crates/extensions/c8y_mapper_ext/src/config.rs @@ -1,10 +1,13 @@ +use crate::Capabilities; use c8y_api::smartrest::error::OperationsError; use c8y_api::smartrest::operations::Operations; use c8y_api::smartrest::topic::C8yTopic; use camino::Utf8PathBuf; use std::path::Path; use std::path::PathBuf; +use tedge_api::mqtt_topics::MqttSchema; use tedge_api::topic::ResponseTopic; +use tedge_api::DEFAULT_FILE_TRANSFER_DIR_NAME; use tedge_config::ConfigNotSet; use tedge_config::ReadError; use tedge_config::TEdgeConfig; @@ -16,35 +19,48 @@ pub const MQTT_MESSAGE_SIZE_THRESHOLD: usize = 16184; pub struct C8yMapperConfig { pub config_dir: PathBuf, pub logs_path: Utf8PathBuf, + pub data_dir: Utf8PathBuf, pub device_id: String, pub device_type: String, pub service_type: String, pub ops_dir: PathBuf, + pub file_transfer_dir: Utf8PathBuf, pub c8y_host: String, + pub tedge_http_host: String, pub topics: TopicFilter, + pub capabilities: Capabilities, } impl C8yMapperConfig { + #[allow(clippy::too_many_arguments)] pub fn new( config_dir: PathBuf, logs_path: Utf8PathBuf, + data_dir: Utf8PathBuf, device_id: String, device_type: String, service_type: String, c8y_host: String, + tedge_http_host: String, topics: TopicFilter, + capabilities: Capabilities, ) -> Self { let ops_dir = config_dir.join("operations").join("c8y"); + let file_transfer_dir = data_dir.join(DEFAULT_FILE_TRANSFER_DIR_NAME); Self { config_dir, logs_path, + data_dir, device_id, device_type, service_type, ops_dir, + file_transfer_dir, c8y_host, + tedge_http_host, topics, + capabilities, } } @@ -55,13 +71,29 @@ impl C8yMapperConfig { let config_dir: PathBuf = config_dir.as_ref().into(); let logs_path = tedge_config.logs.path.clone(); + let data_dir = tedge_config.data.path.clone(); let device_id = tedge_config.device.id.try_read(tedge_config)?.to_string(); let device_type = tedge_config.device.ty.clone(); let service_type = tedge_config.service.ty.clone(); let c8y_host = tedge_config.c8y_url().or_config_not_set()?.to_string(); + let tedge_http_address = tedge_config.http.bind.address; + let tedge_http_port = tedge_config.http.bind.port; + let mqtt_schema = MqttSchema::default(); // later get the value from tedge config - // The topics to subscribe = default internal topics + user configurable external topics - let mut topics = Self::internal_topic_filter(&config_dir)?; + let tedge_http_host = format!("{}:{}", tedge_http_address, tedge_http_port); + + let capabilities = Capabilities { + log_management: tedge_config.c8y.enable.log_management, + }; + + let mut topics = Self::default_internal_topic_filter(&config_dir)?; + + // Add feature topic filters + if capabilities.log_management { + topics.add_all(crate::log_upload::log_upload_topic_filter(&mqtt_schema)); + } + + // Add user configurable external topic filters for topic in tedge_config.c8y.topics.0.clone() { if topics.add(&topic).is_err() { warn!("The configured topic '{topic}' is invalid and ignored."); @@ -71,15 +103,20 @@ impl C8yMapperConfig { Ok(C8yMapperConfig::new( config_dir, logs_path, + data_dir, device_id, device_type, service_type, c8y_host, + tedge_http_host, topics, + capabilities, )) } - pub fn internal_topic_filter(config_dir: &Path) -> Result { + pub fn default_internal_topic_filter( + config_dir: &Path, + ) -> Result { let mut topic_filter: TopicFilter = vec![ "c8y-internal/alarms/+/+", "c8y-internal/alarms/+/+/+", diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index 875423ddeed..574cec18544 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -60,6 +60,7 @@ use tedge_api::event::ThinEdgeEvent; use tedge_api::mqtt_topics::Channel; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; +use tedge_api::mqtt_topics::OperationType; use tedge_api::topic::RequestTopic; use tedge_api::topic::ResponseTopic; use tedge_api::Auth; @@ -154,14 +155,15 @@ impl CumulocityConverter { pub struct CumulocityConverter { pub(crate) size_threshold: SizeThreshold, + pub config: C8yMapperConfig, pub(crate) mapper_config: MapperConfig, - device_name: String, + pub device_name: String, device_type: String, alarm_converter: AlarmConverter, pub operations: Operations, operation_logs: OperationLogs, mqtt_publisher: LoggingSender, - http_proxy: C8YHttpProxy, + pub http_proxy: C8YHttpProxy, pub cfg_dir: PathBuf, pub ops_dir: PathBuf, pub children: HashMap, @@ -181,10 +183,11 @@ impl CumulocityConverter { let device_type = config.device_type.clone(); let service_type = config.service_type.clone(); let c8y_host = config.c8y_host.clone(); + let cfg_dir = config.config_dir.clone(); let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD); - let ops_dir = config.ops_dir; + let ops_dir = config.ops_dir.clone(); let operations = Operations::try_new(ops_dir.clone())?; let children = get_child_ops(ops_dir.clone())?; @@ -205,6 +208,7 @@ impl CumulocityConverter { Ok(CumulocityConverter { size_threshold, + config, mapper_config, device_name: device_id, device_type, @@ -212,7 +216,7 @@ impl CumulocityConverter { operations, operation_logs, http_proxy, - cfg_dir: config.config_dir, + cfg_dir, ops_dir, children, mqtt_publisher, @@ -413,14 +417,29 @@ impl CumulocityConverter { payload: &str, ) -> Result, CumulocityMapperError> { match get_smartrest_device_id(payload) { - Some(device_id) if device_id == self.device_name => { + Some(device_id) => { match get_smartrest_template_id(payload).as_str() { - "528" => self.forward_software_request(payload).await, - "510" => Self::forward_restart_request(payload), - template => self.forward_operation_request(payload, template).await, + "522" => self.convert_log_upload_request(payload), + "528" if device_id == self.device_name => { + self.forward_software_request(payload).await + } + "510" if device_id == self.device_name => { + Self::forward_restart_request(payload) + } + template if device_id == self.device_name => { + self.forward_operation_request(payload, template).await + } + "106" if device_id != self.device_name => { + self.register_child_device_supported_operations(payload) + } + _ => { + // Ignore any other child device incoming request as not yet supported + debug!("Ignored. Message not yet supported: {payload}"); + Ok(vec![]) + } } } - _ => { + None => { match get_smartrest_template_id(payload).as_str() { "106" => self.register_child_device_supported_operations(payload), // Ignore any other child device incoming request as not yet supported @@ -708,6 +727,24 @@ impl CumulocityConverter { let mut messages = match &channel { Channel::Measurement { .. } => self.try_convert_measurement(&source, message)?, + + Channel::Command { .. } if message.payload_bytes().is_empty() => { + // The command has been fully processed + vec![] + } + + Channel::CommandMetadata { + operation: OperationType::LogUpload, + } => self.convert_log_metadata(&source, message)?, + + Channel::Command { + operation: OperationType::LogUpload, + cmd_id, + } => { + self.handle_log_upload_state_change(&source, cmd_id, message) + .await? + } + _ => vec![], }; @@ -755,9 +792,10 @@ impl CumulocityConverter { Ok(publish_restart_operation_status(message.payload_str()?).await?) } Ok(MapperSubscribeTopic::C8yTopic(_)) => self.parse_c8y_topics(message).await, - _ => Err(ConversionError::UnsupportedTopic( - message.topic.name.clone(), - )), + _ => { + error!("Unsupported topic: {}", message.topic.name); + Ok(vec![]) + } }, }?; @@ -1109,6 +1147,7 @@ pub fn check_tedge_agent_status(message: &Message) -> Result = diff --git a/crates/extensions/c8y_mapper_ext/src/error.rs b/crates/extensions/c8y_mapper_ext/src/error.rs index aa97d2b49ce..27d28f25c8d 100644 --- a/crates/extensions/c8y_mapper_ext/src/error.rs +++ b/crates/extensions/c8y_mapper_ext/src/error.rs @@ -72,6 +72,9 @@ pub enum ConversionError { #[error(transparent)] FromSmartRestSerializerError(#[from] c8y_api::smartrest::error::SmartRestSerializerError), + #[error(transparent)] + FromSmartRestDeserializerError(#[from] c8y_api::smartrest::error::SmartRestDeserializerError), + #[error("Unsupported topic: {0}")] UnsupportedTopic(String), @@ -117,6 +120,12 @@ pub enum ConversionError { #[derive(thiserror::Error, Debug)] #[allow(clippy::enum_variant_names)] pub enum CumulocityMapperError { + #[error("Unknown device id: '{device_id}'")] + UnknownDevice { device_id: String }, + + #[error("Unregistered device topic: '{topic_id}'")] + UnregisteredDevice { topic_id: String }, + #[error(transparent)] InvalidTopicError(#[from] tedge_api::TopicError), diff --git a/crates/extensions/c8y_mapper_ext/src/json.rs b/crates/extensions/c8y_mapper_ext/src/json.rs index 6b8376b3846..68bb48042b0 100644 --- a/crates/extensions/c8y_mapper_ext/src/json.rs +++ b/crates/extensions/c8y_mapper_ext/src/json.rs @@ -315,7 +315,7 @@ mod tests { expected_output: Value, ) { let timestamp = datetime!(2021-04-08 0:00:0 +05:00); - let entity = EntityMetadata::child_device(child_id.to_string()); + let entity = EntityMetadata::child_device(child_id.to_string()).unwrap(); let output = from_thin_edge_json_with_timestamp(thin_edge_json, timestamp, &entity); assert_json_eq!( serde_json::from_str::(output.unwrap().as_str()).unwrap(), diff --git a/crates/extensions/c8y_mapper_ext/src/lib.rs b/crates/extensions/c8y_mapper_ext/src/lib.rs index a3ff31d4f87..8d53bf74cc4 100644 --- a/crates/extensions/c8y_mapper_ext/src/lib.rs +++ b/crates/extensions/c8y_mapper_ext/src/lib.rs @@ -6,8 +6,21 @@ pub mod dynamic_discovery; pub mod error; mod fragments; pub mod json; +mod log_upload; mod serializer; pub mod service_monitor; - #[cfg(test)] mod tests; + +#[derive(Debug, serde::Deserialize)] +pub struct Capabilities { + log_management: bool, +} + +impl Default for Capabilities { + fn default() -> Self { + Capabilities { + log_management: true, + } + } +} diff --git a/crates/extensions/c8y_mapper_ext/src/log_upload.rs b/crates/extensions/c8y_mapper_ext/src/log_upload.rs new file mode 100644 index 00000000000..3fe713d6b18 --- /dev/null +++ b/crates/extensions/c8y_mapper_ext/src/log_upload.rs @@ -0,0 +1,223 @@ +use crate::converter::CumulocityConverter; +use crate::error::ConversionError; +use crate::error::CumulocityMapperError; +use crate::error::CumulocityMapperError::UnknownDevice; +use c8y_api::smartrest::smartrest_deserializer::SmartRestLogRequest; +use c8y_api::smartrest::smartrest_deserializer::SmartRestRequestGeneric; +use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations; +use c8y_api::smartrest::smartrest_serializer::SmartRestSerializer; +use c8y_api::smartrest::smartrest_serializer::SmartRestSetOperationToExecuting; +use c8y_api::smartrest::smartrest_serializer::SmartRestSetOperationToFailed; +use c8y_api::smartrest::smartrest_serializer::SmartRestSetOperationToSuccessful; +use c8y_api::smartrest::topic::C8yTopic; +use nanoid::nanoid; +use tedge_api::entity_store::EntityType; +use tedge_api::messages::CommandStatus; +use tedge_api::messages::LogMetadata; +use tedge_api::messages::LogUploadCmdPayload; +use tedge_api::mqtt_topics::Channel; +use tedge_api::mqtt_topics::ChannelFilter::Command; +use tedge_api::mqtt_topics::ChannelFilter::CommandMetadata; +use tedge_api::mqtt_topics::EntityFilter::AnyEntity; +use tedge_api::mqtt_topics::EntityTopicId; +use tedge_api::mqtt_topics::MqttSchema; +use tedge_api::mqtt_topics::OperationType; +use tedge_api::Jsonify; +use tedge_mqtt_ext::Message; +use tedge_mqtt_ext::MqttMessage; +use tedge_mqtt_ext::QoS; +use tedge_mqtt_ext::TopicFilter; +use tedge_utils::file::create_directory_with_defaults; +use tedge_utils::file::create_file_with_defaults; + +pub fn log_upload_topic_filter(mqtt_schema: &MqttSchema) -> TopicFilter { + [ + mqtt_schema.topics(AnyEntity, Command(OperationType::LogUpload)), + mqtt_schema.topics(AnyEntity, CommandMetadata(OperationType::LogUpload)), + ] + .into_iter() + .collect() +} + +impl CumulocityConverter { + /// Convert a SmartREST logfile request to a Thin Edge log_upload command + pub fn convert_log_upload_request( + &self, + smartrest: &str, + ) -> Result, CumulocityMapperError> { + if !self.config.capabilities.log_management { + // Log_management is disabled + return Ok(vec![]); + } + + let log_request = SmartRestLogRequest::from_smartrest(smartrest)?; + let target = self + .entity_store + .get_by_id(&log_request.device) + .ok_or_else(|| UnknownDevice { + device_id: log_request.device.to_string(), + })?; + + let cmd_id = nanoid!(); + let channel = Channel::Command { + operation: OperationType::LogUpload, + cmd_id: cmd_id.clone(), + }; + let topic = self.mqtt_schema.topic_for(&target.topic_id, &channel); + + let tedge_url = format!( + "http://{}/tedge/file-transfer/{}/log_upload/{}-{}", + &self.config.tedge_http_host, target.entity_id, log_request.log_type, cmd_id + ); + + let request = LogUploadCmdPayload { + status: CommandStatus::Init, + tedge_url, + log_type: log_request.log_type, + date_from: log_request.date_from, + date_to: log_request.date_to, + search_text: log_request.search_text, + lines: log_request.lines, + reason: None, + }; + + // Command messages must be retained + Ok(vec![Message::new(&topic, request.to_json()?).with_retain()]) + } + + /// Address a received log_upload command. If its status is + /// - "executing", it converts the message to SmartREST "Executing". + /// - "successful", it uploads a log file to c8y and converts the message to SmartREST "Successful". + /// - "failed", it converts the message to SmartREST "Failed". + pub async fn handle_log_upload_state_change( + &mut self, + topic_id: &EntityTopicId, + cmd_id: &str, + message: &Message, + ) -> Result, ConversionError> { + if !self.config.capabilities.log_management { + // Log_management is disabled + return Ok(vec![]); + } + + // get the device metadata from its id + let device = self.entity_store.get(topic_id).ok_or_else(|| { + CumulocityMapperError::UnregisteredDevice { + topic_id: topic_id.to_string(), + } + })?; + let external_id = device.entity_id.to_string(); + + let c8y_topic: C8yTopic = device.into(); + let smartrest_topic = c8y_topic.to_topic()?; + + let payload = message.payload_str()?; + let response = &LogUploadCmdPayload::from_json(payload)?; + + let messages = match response.status { + CommandStatus::Executing => { + let smartrest_operation_status = SmartRestSetOperationToExecuting::new( + CumulocitySupportedOperations::C8yLogFileRequest, + ) + .to_smartrest()?; + vec![Message::new(&smartrest_topic, smartrest_operation_status)] + } + CommandStatus::Successful => { + let uploaded_file_path = self + .config + .file_transfer_dir + .join(&device.entity_id) + .join("log_upload") + .join(format!("{}-{}", response.log_type, cmd_id)); + let result = self + .http_proxy + .upload_file( + uploaded_file_path.as_std_path(), + &response.log_type, + external_id, + ) + .await; // We need to get rid of this await, otherwise it blocks + + let smartrest_operation_status = match result { + Ok(url) => SmartRestSetOperationToSuccessful::new( + CumulocitySupportedOperations::C8yLogFileRequest, + ) + .with_response_parameter(&url) + .to_smartrest()?, + Err(err) => SmartRestSetOperationToFailed::new( + CumulocitySupportedOperations::C8yLogFileRequest, + format!("Upload failed with {}", err), + ) + .to_smartrest()?, + }; + + let c8y_notification = Message::new(&smartrest_topic, smartrest_operation_status); + let clean_operation = Message::new(&message.topic, "") + .with_retain() + .with_qos(QoS::AtLeastOnce); + vec![c8y_notification, clean_operation] + } + CommandStatus::Failed => { + let smartrest_operation_status = SmartRestSetOperationToFailed::new( + CumulocitySupportedOperations::C8yLogFileRequest, + response.reason.clone().unwrap_or_default(), + ) + .to_smartrest()?; + let c8y_notification = Message::new(&smartrest_topic, smartrest_operation_status); + let clean_operation = Message::new(&message.topic, "") + .with_retain() + .with_qos(QoS::AtLeastOnce); + vec![c8y_notification, clean_operation] + } + _ => { + vec![] // Do nothing as other components might handle those states + } + }; + + Ok(messages) + } + + /// Converts a log_upload metadata message to + /// - supported operation "c8y_LogfileRequest" + /// - supported log types + pub fn convert_log_metadata( + &mut self, + topic_id: &EntityTopicId, + message: &Message, + ) -> Result, ConversionError> { + if !self.config.capabilities.log_management { + // Log_management is disabled + return Ok(vec![]); + } + + let metadata = LogMetadata::from_json(message.payload_str()?)?; + + // get the device metadata from its id + let device = self.entity_store.get(topic_id).ok_or_else(|| { + CumulocityMapperError::UnregisteredDevice { + topic_id: topic_id.to_string(), + } + })?; + + // Create a c8y_LogfileRequest operation file + let dir_path = match device.r#type { + EntityType::MainDevice => self.ops_dir.clone(), + EntityType::ChildDevice => self.ops_dir.join(&device.entity_id), + EntityType::Service => { + // No support for service log management + return Ok(vec![]); + } + }; + create_directory_with_defaults(&dir_path)?; + create_file_with_defaults(dir_path.join("c8y_LogfileRequest"), None)?; + + // To SmartREST supported log types + let mut types = metadata.types; + types.sort(); + let supported_log_types = types.join(","); + let payload = format!("118,{supported_log_types}"); + + let c8y_topic: C8yTopic = device.into(); + Ok(vec![MqttMessage::new(&c8y_topic.to_topic()?, payload)]) + } +} diff --git a/crates/extensions/c8y_mapper_ext/src/serializer.rs b/crates/extensions/c8y_mapper_ext/src/serializer.rs index d3065075636..782fde65f89 100644 --- a/crates/extensions/c8y_mapper_ext/src/serializer.rs +++ b/crates/extensions/c8y_mapper_ext/src/serializer.rs @@ -478,7 +478,7 @@ mod tests { fn serialize_timestamp_child_message() -> anyhow::Result<()> { let timestamp = datetime!(2021-06-22 17:03:14.123456789 +05:00); - let entity = EntityMetadata::child_device("child1".to_string()); + let entity = EntityMetadata::child_device("child1".to_string())?; let mut serializer = C8yJsonSerializer::new(timestamp, &entity); serializer.visit_timestamp(timestamp)?; serializer.visit_measurement("temperature", 25.5)?; diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index 35bd6e508ba..1f598e4381e 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -2,10 +2,12 @@ use super::actor::C8yMapperBuilder; use super::actor::SyncComplete; use super::actor::SyncStart; use super::config::C8yMapperConfig; +use crate::Capabilities; use assert_json_diff::assert_json_include; use c8y_api::smartrest::topic::C8yTopic; use c8y_http_proxy::messages::C8YRestRequest; use c8y_http_proxy::messages::C8YRestResult; +use c8y_http_proxy::messages::Url; use serde_json::json; use std::fs; use std::fs::File; @@ -22,6 +24,9 @@ use tedge_actors::Sender; use tedge_actors::SimpleMessageBox; use tedge_actors::SimpleMessageBoxBuilder; use tedge_actors::WrappedInput; +use tedge_api::mqtt_topics::Channel; +use tedge_api::mqtt_topics::MqttSchema; +use tedge_api::mqtt_topics::OperationType; use tedge_api::SoftwareUpdateResponse; use tedge_file_system_ext::FsWatchEvent; use tedge_mqtt_ext::test_helpers::assert_received_contains_str; @@ -1247,6 +1252,403 @@ async fn inventory_registers_unknown_entity_once() { ); } +#[tokio::test] +async fn mapper_converts_smartrest_logfile_req_to_log_upload_cmd_for_main_device() { + let (mqtt, _http, _fs, _timer) = spawn_c8y_mapper_actor(&TempTedgeDir::new(), true).await; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + mqtt.skip(6).await; //Skip all init messages + + // Simulate c8y_LogfileRequest SmartREST request + mqtt.send(MqttMessage::new( + &C8yTopic::downstream_topic(), + "522,test-device,logfileA,2013-06-22T17:03:14.123+02:00,2013-06-23T18:03:14.123+02:00,ERROR,1000", + )) + .await + .expect("Send failed"); + + let (topic, received_json) = mqtt + .recv() + .await + .map(|msg| { + ( + msg.topic, + serde_json::from_str::(msg.payload.as_str().expect("UTF8")) + .expect("JSON"), + ) + }) + .unwrap(); + + let mqtt_schema = MqttSchema::default(); + let (entity, channel) = mqtt_schema.entity_channel_of(&topic).unwrap(); + assert_eq!(entity, "device/main//"); + + if let Channel::Command { + operation: OperationType::LogUpload, + cmd_id, + } = channel + { + // Validate the topic name + assert_eq!( + topic.name, + format!("te/device/main///cmd/log_upload/{cmd_id}") + ); + + // Validate the payload JSON + let expected_json = json!({ + "status": "init", + "tedgeUrl": format!("http://localhost:8888/tedge/file-transfer/test-device/log_upload/logfileA-{cmd_id}"), + "type": "logfileA", + "dateFrom": "2013-06-22T17:03:14.123+02:00", + "dateTo": "2013-06-23T18:03:14.123+02:00", + "searchText": "ERROR", + "lines": 1000 + }); + + assert_json_diff::assert_json_include!(actual: received_json, expected: expected_json); + } else { + panic!("Unexpected response on channel: {:?}", topic) + } +} + +#[tokio::test] +async fn mapper_converts_smartrest_logfile_req_to_log_upload_cmd_for_child_device() { + let (mqtt, _http, _fs, _timer) = spawn_c8y_mapper_actor(&TempTedgeDir::new(), true).await; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + mqtt.skip(6).await; //Skip all init messages + + // The child device must be registered first + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/DeviceSerial//"), + r#"{ "@type":"child-device", "@id":"DeviceSerial" }"#, + )) + .await + .expect("fail to register the child-device"); + + // Simulate c8y_LogfileRequest SmartREST request + mqtt.send(MqttMessage::new( + &C8yTopic::downstream_topic(), + "522,DeviceSerial,logfileA,2013-06-22T17:03:14.123+02:00,2013-06-23T18:03:14.123+02:00,ERROR,1000", + )) + .await + .expect("Send failed"); + + let (topic, received_json) = mqtt + .recv() + .await + .map(|msg| { + ( + msg.topic, + serde_json::from_str::(msg.payload.as_str().expect("UTF8")) + .expect("JSON"), + ) + }) + .unwrap(); + + let mqtt_schema = MqttSchema::default(); + let (entity, channel) = mqtt_schema.entity_channel_of(&topic).unwrap(); + assert_eq!(entity, "device/DeviceSerial//"); + + if let Channel::Command { + operation: OperationType::LogUpload, + cmd_id, + } = channel + { + // Validate the topic name + assert_eq!( + topic.name, + format!("te/device/DeviceSerial///cmd/log_upload/{cmd_id}") + ); + + // Validate the payload JSON + let expected_json = json!({ + "status": "init", + "tedgeUrl": format!("http://localhost:8888/tedge/file-transfer/DeviceSerial/log_upload/logfileA-{cmd_id}"), + "type": "logfileA", + "dateFrom": "2013-06-22T17:03:14.123+02:00", + "dateTo": "2013-06-23T18:03:14.123+02:00", + "searchText": "ERROR", + "lines": 1000 + }); + + assert_json_diff::assert_json_include!(actual: received_json, expected: expected_json); + } else { + panic!("Unexpected response on channel: {:?}", topic) + } +} + +#[tokio::test] +async fn mapper_converts_log_upload_cmd_to_supported_op_and_types_for_main_device() { + let ttd = TempTedgeDir::new(); + let (mqtt, _http, _fs, _timer) = spawn_c8y_mapper_actor(&ttd, true).await; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + mqtt.skip(6).await; //Skip all init messages + + // Simulate log_upload cmd metadata message + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/main///cmd/log_upload"), + r#"{"types" : [ "typeA", "typeB", "typeC" ]}"#, + )) + .await + .expect("Send failed"); + + assert_received_contains_str(&mut mqtt, [("c8y/s/us", "118,typeA,typeB,typeC")]).await; + + // Validate if the supported operation file is created + assert!(ttd + .path() + .join("operations/c8y/c8y_LogfileRequest") + .exists()); +} + +#[tokio::test] +async fn mapper_converts_log_upload_cmd_to_supported_op_and_types_for_child_device() { + let ttd = TempTedgeDir::new(); + let (mqtt, _http, _fs, _timer) = spawn_c8y_mapper_actor(&ttd, true).await; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + mqtt.skip(6).await; //Skip all init messages + + // Simulate log_upload cmd metadata message + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/child1///cmd/log_upload"), + r#"{"types" : [ "typeA", "typeB", "typeC" ]}"#, + )) + .await + .expect("Send failed"); + + assert_received_contains_str( + &mut mqtt, + [ + ( + "te/device/child1//", + r#"{ "@type":"child-device", "@id":"child1"}"#, + ), + ("c8y/s/us", "101,child1,child1,thin-edge.io-child"), + ], + ) + .await; + assert_received_contains_str(&mut mqtt, [("c8y/s/us/child1", "118,typeA,typeB,typeC")]).await; + + // Validate if the supported operation file is created + assert!(ttd + .path() + .join("operations/c8y/child1/c8y_LogfileRequest") + .exists()); +} + +#[tokio::test] +async fn handle_log_upload_executing_and_failed_cmd_for_main_device() { + let (mqtt, _http, _fs, _timer) = spawn_c8y_mapper_actor(&TempTedgeDir::new(), true).await; + + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + mqtt.skip(6).await; //Skip all init messages + + // Simulate log_upload command with "executing" state + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/main///cmd/log_upload/1234"), + json!({ + "status": "executing", + "tedgeUrl": format!("http://localhost:8888/tedge/file-transfer/main/log_upload/typeA-1234"), + "type": "typeA", + "dateFrom": "2013-06-22T17:03:14.123+02:00", + "dateTo": "2013-06-23T18:03:14.123+02:00", + "searchText": "ERROR", + "lines": 1000 + }) + .to_string(), + )) + .await + .expect("Send failed"); + + // Expect `501` smartrest message on `c8y/s/us`. + assert_received_contains_str(&mut mqtt, [("c8y/s/us", "501,c8y_LogfileRequest")]).await; + + // Simulate log_upload command with "failed" state + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/main///cmd/log_upload/1234"), + json!({ + "status": "failed", + "tedgeUrl": format!("http://localhost:8888/tedge/file-transfer/main/log_upload/typeA-1234"), + "type": "typeA", + "dateFrom": "2013-06-22T17:03:14.123+02:00", + "dateTo": "2013-06-23T18:03:14.123+02:00", + "searchText": "ERROR", + "lines": 1000, + "reason": "Something went wrong" + }) + .to_string(), + )) + .await + .expect("Send failed"); + + // Expect `502` smartrest message on `c8y/s/us`. + assert_received_contains_str( + &mut mqtt, + [( + "c8y/s/us", + "502,c8y_LogfileRequest,\"Something went wrong\"", + )], + ) + .await; +} + +#[tokio::test] +async fn handle_log_upload_executing_and_failed_cmd_for_child_device() { + let (mqtt, _http, _fs, _timer) = spawn_c8y_mapper_actor(&TempTedgeDir::new(), true).await; + + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + mqtt.skip(6).await; //Skip all init messages + + // Simulate log_upload command with "executing" state + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/child1///cmd/log_upload/1234"), + json!({ + "status": "executing", + "tedgeUrl": format!("http://localhost:8888/tedge/file-transfer/child1/log_upload/typeA-1234"), + "type": "typeA", + "dateFrom": "2013-06-22T17:03:14.123+02:00", + "dateTo": "2013-06-23T18:03:14.123+02:00", + "searchText": "ERROR", + "lines": 1000 + }) + .to_string(), + )) + .await + .expect("Send failed"); + + assert_received_contains_str( + &mut mqtt, + [ + ( + "te/device/child1//", + r#"{ "@type":"child-device", "@id":"child1"}"#, + ), + ("c8y/s/us", "101,child1,child1,thin-edge.io-child"), + ], + ) + .await; + + // Expect `501` smartrest message on `c8y/s/us/child1`. + assert_received_contains_str(&mut mqtt, [("c8y/s/us/child1", "501,c8y_LogfileRequest")]).await; + + // Simulate log_upload command with "failed" state + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/child1///cmd/log_upload/1234"), + json!({ + "status": "failed", + "tedgeUrl": format!("http://localhost:8888/tedge/file-transfer/child1/log_upload/typeA-1234"), + "type": "typeA", + "dateFrom": "2013-06-22T17:03:14.123+02:00", + "dateTo": "2013-06-23T18:03:14.123+02:00", + "searchText": "ERROR", + "lines": 1000, + "reason": "Something went wrong" + }) + .to_string(), + )) + .await + .expect("Send failed"); + + // Expect `502` smartrest message on `c8y/s/us/child1`. + assert_received_contains_str( + &mut mqtt, + [( + "c8y/s/us/child1", + "502,c8y_LogfileRequest,\"Something went wrong\"", + )], + ) + .await; +} + +#[tokio::test] +async fn handle_log_upload_successful_cmd_for_main_device() { + let ttd = TempTedgeDir::new(); + let (mqtt, http, _fs, _timer) = spawn_c8y_mapper_actor(&ttd, true).await; + spawn_dummy_c8y_http_proxy(http); + + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + mqtt.skip(6).await; //Skip all init messages + + // Simulate a log file is uploaded to the file transfer repository + ttd.dir("tedge") + .dir("file-transfer") + .dir("main") + .dir("log_upload") + .file("typeA-1234"); + + // Simulate log_upload command with "executing" state + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/main///cmd/log_upload/1234"), + json!({ + "status": "successful", + "tedgeUrl": format!("http://localhost:8888/tedge/file-transfer/main/log_upload/typeA-1234"), + "type": "typeA", + "dateFrom": "2013-06-22T17:03:14.123+02:00", + "dateTo": "2013-06-23T18:03:14.123+02:00", + "searchText": "ERROR", + "lines": 1000 + }) + .to_string(), + )) + .await + .expect("Send failed"); + + // Expect `503` smartrest message on `c8y/s/us`. + assert_received_contains_str( + &mut mqtt, + [("c8y/s/us", "503,c8y_LogfileRequest,http://c8y-binary.url")], + ) + .await; +} + +#[tokio::test] +#[ignore = "FIXME"] +async fn handle_log_upload_successful_cmd_for_child_device() { + let ttd = TempTedgeDir::new(); + let (mqtt, http, _fs, _timer) = spawn_c8y_mapper_actor(&ttd, true).await; + spawn_dummy_c8y_http_proxy(http); + + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + mqtt.skip(6).await; //Skip all init messages + + // Simulate a log file is uploaded to the file transfer repository + ttd.dir("tedge") + .dir("file-transfer") + .dir("child1") + .dir("log_upload") + .file("typeA-1234"); + + // Simulate log_upload command with "executing" state + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/child1///cmd/log_upload/1234"), + json!({ + "status": "successful", + "tedgeUrl": format!("http://localhost:8888/tedge/file-transfer/child1/log_upload/typeA-1234"), + "type": "typeA", + "dateFrom": "2013-06-22T17:03:14.123+02:00", + "dateTo": "2013-06-23T18:03:14.123+02:00", + "searchText": "ERROR", + "lines": 1000 + }) + .to_string(), + )) + .await + .expect("Send failed"); + + // Expect `503` smartrest message on `c8y/s/us`. + assert_received_contains_str( + &mut mqtt, + [( + "c8y/s/us/child1", + "503,c8y_LogfileRequest,http://c8y-binary.url", + )], + ) + .await; +} + fn assert_command_exec_log_content(cfg_dir: TempTedgeDir, expected_contents: &str) { let paths = fs::read_dir(cfg_dir.to_path_buf().join("tedge/agent")).unwrap(); for path in paths { @@ -1335,17 +1737,23 @@ async fn spawn_c8y_mapper_actor( let device_type = "test-device-type".into(); let service_type = "service".into(); let c8y_host = "test.c8y.io".into(); - let mut topics = C8yMapperConfig::internal_topic_filter(config_dir.path()).unwrap(); + let tedge_http_host = "localhost:8888".into(); + let mqtt_schema = MqttSchema::default(); + let mut topics = C8yMapperConfig::default_internal_topic_filter(config_dir.path()).unwrap(); + topics.add_all(crate::log_upload::log_upload_topic_filter(&mqtt_schema)); topics.add_all(C8yMapperConfig::default_external_topic_filter()); let config = C8yMapperConfig::new( config_dir.to_path_buf(), config_dir.utf8_path_buf(), + config_dir.utf8_path_buf(), device_name, device_type, service_type, c8y_host, + tedge_http_host, topics, + Capabilities::default(), ); let mut mqtt_builder: SimpleMessageBoxBuilder = @@ -1401,6 +1809,13 @@ fn spawn_dummy_c8y_http_proxy(mut http: SimpleMessageBox { + let _ = http + .send(Ok(c8y_http_proxy::messages::C8YRestResponse::Url(Url( + "http://c8y-binary.url".into(), + )))) + .await; + } _ => {} } } diff --git a/crates/extensions/tedge_log_manager/Cargo.toml b/crates/extensions/tedge_log_manager/Cargo.toml index e48c49be5ec..e4845b6e41a 100644 --- a/crates/extensions/tedge_log_manager/Cargo.toml +++ b/crates/extensions/tedge_log_manager/Cargo.toml @@ -14,16 +14,15 @@ async-trait = { workspace = true } http = { workspace = true } log = { workspace = true } log_manager = { workspace = true } -serde = { workspace = true } serde_json = { workspace = true } tedge_actors = { workspace = true } +tedge_api = { workspace = true } tedge_config = { workspace = true } tedge_file_system_ext = { workspace = true } tedge_http_ext = { workspace = true } tedge_mqtt_ext = { workspace = true } tedge_utils = { workspace = true } thiserror = { workspace = true } -time = { workspace = true, features = ["formatting", "serde"] } tokio = { workspace = true, features = ["macros"] } [dev-dependencies] @@ -35,4 +34,3 @@ tedge_http_ext = { workspace = true, features = [ "test_helpers", ] } tedge_test_utils = { workspace = true } -time = { workspace = true, features = ["macros"] } diff --git a/crates/extensions/tedge_log_manager/src/actor.rs b/crates/extensions/tedge_log_manager/src/actor.rs index d5219222bc5..d33123bfe1e 100644 --- a/crates/extensions/tedge_log_manager/src/actor.rs +++ b/crates/extensions/tedge_log_manager/src/actor.rs @@ -1,5 +1,6 @@ use async_trait::async_trait; use http::status::StatusCode; +use log::debug; use log::error; use log::info; use log_manager::LogPluginConfig; @@ -14,6 +15,7 @@ use tedge_actors::NoMessage; use tedge_actors::RuntimeError; use tedge_actors::Sender; use tedge_actors::SimpleMessageBox; +use tedge_api::Jsonify; use tedge_file_system_ext::FsWatchEvent; use tedge_http_ext::HttpError; use tedge_http_ext::HttpRequest; @@ -24,11 +26,10 @@ use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::Topic; use super::error::LogManagementError; -use super::json::CommandStatus; -use super::json::LogRequestPayload; -use super::json::LogResponsePayload; use super::LogManagerConfig; use super::DEFAULT_PLUGIN_CONFIG_FILE_NAME; +use tedge_api::messages::CommandStatus; +use tedge_api::messages::LogUploadCmdPayload; fan_in_message_type!(LogInput[MqttMessage, FsWatchEvent] : Debug); fan_in_message_type!(LogOutput[MqttMessage]: Debug); @@ -83,20 +84,26 @@ impl LogManagerActor { pub async fn process_mqtt_message(&mut self, message: MqttMessage) -> Result<(), ChannelError> { if self.config.logfile_request_topic.accept(&message) { - match LogRequestPayload::try_from(message.clone()) { - Ok(request) => { - if (request.status.eq(&CommandStatus::Executing) - || request.status.eq(&CommandStatus::Init)) - && !self.config.current_operations.contains(&message.topic.name) - { + match request_from_message(&message) { + Ok(Some(request)) => match request.status { + CommandStatus::Init => { info!("Log request received: {request:?}"); self.config .current_operations .insert(message.topic.name.clone()); - self.handle_logfile_request_operation(&message.topic, &request) + self.start_executing_logfile_request(&message.topic, request) .await?; } - } + CommandStatus::Executing => { + debug!("Executing log request: {request:?}"); + self.handle_logfile_request_operation(&message.topic, request) + .await?; + } + CommandStatus::Successful | CommandStatus::Failed => { + self.config.current_operations.remove(&message.topic.name); + } + }, + Ok(None) => {} Err(err) => { error!("Incorrect log request payload: {}", err); } @@ -111,51 +118,31 @@ impl LogManagerActor { Ok(()) } + pub async fn start_executing_logfile_request( + &mut self, + topic: &Topic, + mut request: LogUploadCmdPayload, + ) -> Result<(), ChannelError> { + request.executing(); + self.publish_command_status(topic, &request).await + } + pub async fn handle_logfile_request_operation( &mut self, topic: &Topic, - request: &LogRequestPayload, + mut request: LogUploadCmdPayload, ) -> Result<(), ChannelError> { - if !request.status.eq(&CommandStatus::Executing) { - self.mqtt_publisher - .send(create_operation_message( - topic, - request, - CommandStatus::Executing, - )) - .await?; - } - match self.execute_logfile_request_operation(request).await { + match self.execute_logfile_request_operation(&request).await { Ok(()) => { - self.mqtt_publisher - .send(create_operation_message( - topic, - request, - CommandStatus::Successful, - )) - .await?; - - self.config.current_operations.remove(&topic.name); - - info!( - "Log request processed for log type: {}.", - request.log.log_type - ); + request.successful(); + self.publish_command_status(topic, &request).await?; + info!("Log request processed for log type: {}.", request.log_type); Ok(()) } Err(error) => { let error_message = format!("Handling of operation failed with {}", error); - self.mqtt_publisher - .send(create_operation_message_with_reason( - topic, - request, - CommandStatus::Failed, - &error_message, - )) - .await?; - - self.config.current_operations.remove(&topic.name); - + request.failed(&error_message); + self.publish_command_status(topic, &request).await?; error!("{}", error_message); Ok(()) } @@ -169,14 +156,14 @@ impl LogManagerActor { /// - sends request successful (mqtt) async fn execute_logfile_request_operation( &mut self, - request: &LogRequestPayload, + request: &LogUploadCmdPayload, ) -> Result<(), LogManagementError> { let log_content = log_manager::new_read_logs( &self.plugin_config.files, - &request.log.log_type, - request.log.date_from, - request.log.lines, - &request.log.search_text, + &request.log_type, + request.date_from, + request.lines.to_owned(), + &request.search_text, )?; self.send_log_file_http(log_content, request.tedge_url.clone()) @@ -251,23 +238,35 @@ impl LogManagerActor { let msg = MqttMessage::new(&self.config.logtype_reload_topic, payload).with_retain(); self.mqtt_publisher.send(msg).await } + + async fn publish_command_status( + &mut self, + topic: &Topic, + request: &LogUploadCmdPayload, + ) -> Result<(), ChannelError> { + match request_into_message(topic, request) { + Ok(message) => self.mqtt_publisher.send(message).await?, + Err(err) => error!("Fail to build a message for {:?}: {err}", request), + } + Ok(()) + } } -fn create_operation_message_with_reason( - topic: &Topic, - request: &LogRequestPayload, - status: CommandStatus, - reason: &str, -) -> MqttMessage { - let payload = LogResponsePayload::from_log_request(request, status).with_reason(reason); - MqttMessage::new(topic, payload.to_string()).with_retain() +fn request_from_message( + message: &MqttMessage, +) -> Result, LogManagementError> { + if message.payload_bytes().is_empty() { + Ok(None) + } else { + Ok(Some(LogUploadCmdPayload::from_json( + message.payload_str()?, + )?)) + } } -fn create_operation_message( +fn request_into_message( topic: &Topic, - request: &LogRequestPayload, - status: CommandStatus, -) -> MqttMessage { - let payload = LogResponsePayload::from_log_request(request, status); - MqttMessage::new(topic, payload.to_string()).with_retain() + request: &LogUploadCmdPayload, +) -> Result { + Ok(MqttMessage::new(topic, request.to_json()?).with_retain()) } diff --git a/crates/extensions/tedge_log_manager/src/json.rs b/crates/extensions/tedge_log_manager/src/json.rs deleted file mode 100644 index f9514cef475..00000000000 --- a/crates/extensions/tedge_log_manager/src/json.rs +++ /dev/null @@ -1,253 +0,0 @@ -use crate::error::LogManagementError; -use serde::Deserialize; -use serde::Serialize; -use std::convert::TryFrom; -use tedge_mqtt_ext::MqttMessage; -use time::OffsetDateTime; - -// This Enum will be reverted once tedge mapper crate will be merged -#[derive(Debug, Deserialize, Serialize, PartialEq, Copy, Eq, Clone)] -#[serde(rename_all = "camelCase")] -pub enum CommandStatus { - Init, - Executing, - Successful, - Failed, -} - -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -#[serde(rename_all = "camelCase")] -pub struct LogInfo { - #[serde(rename = "type")] - pub log_type: String, - #[serde(with = "time::serde::rfc3339")] - pub date_from: OffsetDateTime, - #[serde(with = "time::serde::rfc3339")] - pub date_to: OffsetDateTime, - pub lines: usize, - #[serde(skip_serializing_if = "Option::is_none")] - pub search_text: Option, -} - -impl LogInfo { - pub fn new( - log_type: &str, - date_from: OffsetDateTime, - date_to: OffsetDateTime, - lines: usize, - ) -> Self { - Self { - log_type: log_type.to_string(), - date_from, - date_to, - search_text: None, - lines, - } - } - - pub fn with_search_text(self, needle: &str) -> Self { - Self { - search_text: Some(needle.into()), - ..self - } - } -} - -impl ToString for LogInfo { - fn to_string(&self) -> String { - serde_json::to_string(&self).expect("infallible") - } -} - -#[derive(Deserialize, Debug, PartialEq)] -#[serde(rename_all = "camelCase")] -pub struct LogRequestPayload { - pub status: CommandStatus, - pub tedge_url: String, - #[serde(flatten)] - pub log: LogInfo, -} - -impl TryFrom for LogRequestPayload { - type Error = LogManagementError; - - fn try_from(value: MqttMessage) -> Result { - let payload = value.payload.as_str()?; - let request: LogRequestPayload = serde_json::from_str(payload)?; - Ok(request) - } -} - -#[derive(Serialize, Debug)] -#[serde(rename_all = "camelCase")] -pub struct LogResponsePayload { - pub status: CommandStatus, - pub tedge_url: String, - #[serde(flatten)] - pub log: LogInfo, - #[serde(skip_serializing_if = "Option::is_none")] - pub reason: Option, -} - -impl LogResponsePayload { - pub fn from_log_request(request: &LogRequestPayload, status: CommandStatus) -> Self { - Self { - status, - tedge_url: request.tedge_url.clone(), - log: request.log.clone(), - reason: None, - } - } - - pub fn with_reason(self, reason: &str) -> Self { - Self { - reason: Some(reason.into()), - ..self - } - } -} - -impl ToString for LogResponsePayload { - fn to_string(&self) -> String { - serde_json::to_string(&self).expect("infallible") - } -} - -#[cfg(test)] -mod tests { - use super::*; - use assert_json_diff::*; - use serde_json::json; - use time::macros::datetime; - - #[test] - fn serialize_response_payload() { - let log = LogInfo::new( - "type_one", - datetime!(1970-01-01 00:00:00 +00:00), - datetime!(1970-01-01 00:00:03 +00:00), - 7, - ); - let request_payload = LogRequestPayload { - status: CommandStatus::Init, - tedge_url: "http://127.0.0.1:3000/tedge/file-transfer/main/logfile/type_one-opid" - .to_string(), - log, - }; - - let response_payload = - LogResponsePayload::from_log_request(&request_payload, CommandStatus::Executing); - - let json = serde_json::to_string(&response_payload).unwrap(); - - let expected_json = json!({ - "status": "executing", - "tedgeUrl": "http://127.0.0.1:3000/tedge/file-transfer/main/logfile/type_one-opid", - "type": "type_one", - "dateFrom": "1970-01-01T00:00:00Z", - "dateTo": "1970-01-01T00:00:03Z", - "lines": 7 - }); - - assert_json_eq!( - serde_json::from_str::(&json).unwrap(), - expected_json - ); - } - - #[test] - fn serialize_response_payload_with_reason() { - let log = LogInfo::new( - "type_one", - datetime!(1970-01-01 00:00:00 +00:00), - datetime!(1970-01-01 00:00:03 +00:00), - 7, - ); - let request_payload = LogRequestPayload { - status: CommandStatus::Init, - tedge_url: "http://127.0.0.1:3000/tedge/file-transfer/main/logfile/type_one-opid" - .to_string(), - log, - }; - - let response_payload = - LogResponsePayload::from_log_request(&request_payload, CommandStatus::Executing) - .with_reason("something"); - - let json = serde_json::to_string(&response_payload).unwrap(); - - let expected_json = json!({ - "status": "executing", - "tedgeUrl": "http://127.0.0.1:3000/tedge/file-transfer/main/logfile/type_one-opid", - "type": "type_one", - "dateFrom": "1970-01-01T00:00:00Z", - "dateTo": "1970-01-01T00:00:03Z", - "lines": 7, - "reason": "something" - }); - - assert_json_eq!( - serde_json::from_str::(&json).unwrap(), - expected_json - ); - } - - #[test] - fn deserialize_log_request() { - let data = r#" - { - "status": "init", - "tedgeUrl": "http://127.0.0.1:3000/tedge/file-transfer/main/log_upload/type_one-1234", - "type": "type_one", - "dateFrom": "1970-01-01T00:00:00+00:00", - "dateTo": "1970-01-01T00:00:03+00:00", - "lines": 7 - }"#; - let value: LogRequestPayload = serde_json::from_str(data).unwrap(); - - let expected_value = LogRequestPayload { - status: CommandStatus::Init, - tedge_url: "http://127.0.0.1:3000/tedge/file-transfer/main/log_upload/type_one-1234" - .to_string(), - log: LogInfo { - log_type: "type_one".to_string(), - date_from: datetime!(1970-01-01 00:00:00 +00:00), - date_to: datetime!(1970-01-01 00:00:03 +00:00), - lines: 7, - search_text: None, - }, - }; - - assert_eq!(value, expected_value); - } - - #[test] - fn deserialize_log_request_with_search_text() { - let data = r#" - { - "status": "init", - "tedgeUrl": "http://127.0.0.1:3000/tedge/file-transfer/main/log_upload/type_one-1234", - "type": "type_one", - "dateFrom": "1970-01-01T00:00:00+00:00", - "dateTo": "1970-01-01T00:00:03+00:00", - "lines": 7, - "searchText": "needle" - }"#; - let value: LogRequestPayload = serde_json::from_str(data).unwrap(); - - let expected_value = LogRequestPayload { - status: CommandStatus::Init, - tedge_url: "http://127.0.0.1:3000/tedge/file-transfer/main/log_upload/type_one-1234" - .to_string(), - log: LogInfo { - log_type: "type_one".to_string(), - date_from: datetime!(1970-01-01 00:00:00 +00:00), - date_to: datetime!(1970-01-01 00:00:03 +00:00), - lines: 7, - search_text: Some("needle".to_string()), - }, - }; - - assert_eq!(value, expected_value); - } -} diff --git a/crates/extensions/tedge_log_manager/src/lib.rs b/crates/extensions/tedge_log_manager/src/lib.rs index 270c9bd87c0..044d96ceecb 100644 --- a/crates/extensions/tedge_log_manager/src/lib.rs +++ b/crates/extensions/tedge_log_manager/src/lib.rs @@ -1,7 +1,6 @@ mod actor; mod config; mod error; -mod json; #[cfg(test)] mod tests; diff --git a/crates/extensions/tedge_log_manager/src/tests.rs b/crates/extensions/tedge_log_manager/src/tests.rs index f7b3ff5e91a..3e6e77fb7ec 100644 --- a/crates/extensions/tedge_log_manager/src/tests.rs +++ b/crates/extensions/tedge_log_manager/src/tests.rs @@ -182,13 +182,16 @@ async fn log_manager_upload_log_files_on_request() -> Result<(), anyhow::Error> .await?; // The log manager notifies that the request has been received and is processed + let executing_message = mqtt.recv().await; assert_eq!( - mqtt.recv().await, - Some(MqttMessage::new( + executing_message, + Some(MqttMessage::new( &logfile_topic, r#"{"status":"executing","tedgeUrl":"http://127.0.0.1:3000/tedge/file-transfer/main/log_upload/type_two-1234","type":"type_two","dateFrom":"1970-01-01T00:00:00Z","dateTo":"1970-01-01T00:00:30Z","lines":1000}"# ).with_retain()) ); + // This message being published over MQTT is also received by the log-manager itself + mqtt.send(executing_message.unwrap()).await?; // Then uploads the requested content over HTTP let actual_request = http.recv().await; @@ -243,13 +246,16 @@ async fn request_logtype_that_does_not_exist() -> Result<(), anyhow::Error> { .await?; // The log manager notifies that the request has been received and is processed + let executing_message = mqtt.recv().await; assert_eq!( - mqtt.recv().await, + executing_message, Some(MqttMessage::new( &logfile_topic, r#"{"status":"executing","tedgeUrl":"http://127.0.0.1:3000/tedge/file-transfer/main/log_upload/type_four-1234","type":"type_four","dateFrom":"1970-01-01T00:00:00Z","dateTo":"1970-01-01T00:00:30Z","lines":1000}"# ).with_retain()) ); + // This message being published over MQTT is also received by the log-manager itself + mqtt.send(executing_message.unwrap()).await?; // Finally, the log manager notifies that given log type does not exists assert_eq!( @@ -287,13 +293,16 @@ async fn put_logfiles_without_permissions() -> Result<(), anyhow::Error> { .await?; // The log manager notifies that the request has been received and is processed + let executing_message = mqtt.recv().await; assert_eq!( - mqtt.recv().await, + executing_message, Some(MqttMessage::new( &logfile_topic, r#"{"status":"executing","tedgeUrl":"http://127.0.0.1:3000/tedge/file-transfer/main/log_upload/type_two-1234","type":"type_two","dateFrom":"1970-01-01T00:00:00Z","dateTo":"1970-01-01T00:00:30Z","lines":1000}"# ).with_retain()) ); + // This message being published over MQTT is also received by the log-manager itself + mqtt.send(executing_message.unwrap()).await?; // Then uploads the requested content over HTTP assert!(http.recv().await.is_some()); @@ -397,13 +406,16 @@ async fn read_log_from_file_that_does_not_exist() -> Result<(), anyhow::Error> { .await?; // The log manager notifies that the request has been received and is processed + let executing_message = mqtt.recv().await; assert_eq!( - mqtt.recv().await, - Some(MqttMessage::new( + executing_message, + Some(MqttMessage::new( &logfile_topic, r#"{"status":"executing","tedgeUrl":"http://127.0.0.1:3000/tedge/file-transfer/main/log_upload/type_three-1234","type":"type_three","dateFrom":"1970-01-01T00:00:00Z","dateTo":"1970-01-01T00:00:30Z","lines":1000}"# ).with_retain()) ); + // This message being published over MQTT is also received by the log-manager itself + mqtt.send(executing_message.unwrap()).await?; // Finally, the log manager notifies that given log type does not exists assert_eq!(