diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs index 194798f6d31..c5767feff9c 100644 --- a/crates/core/tedge_agent/src/agent.rs +++ b/crates/core/tedge_agent/src/agent.rs @@ -121,23 +121,17 @@ impl Default for SmAgentConfig { let response_topic_health = Topic::new_unchecked("tedge/health/tedge-agent"); - let request_topic_list = - Topic::new(SoftwareListRequest::topic_name()).expect("Invalid topic"); + let request_topic_list = SoftwareListRequest::topic(); - let request_topic_update = - Topic::new(SoftwareUpdateRequest::topic_name()).expect("Invalid topic"); + let request_topic_update = SoftwareUpdateRequest::topic(); - let response_topic_list = - Topic::new(SoftwareListResponse::topic_name()).expect("Invalid topic"); + let response_topic_list = SoftwareListResponse::topic(); - let response_topic_update = - Topic::new(SoftwareUpdateResponse::topic_name()).expect("Invalid topic"); + let response_topic_update = SoftwareUpdateResponse::topic(); - let request_topic_restart = - Topic::new(RestartOperationRequest::topic_name()).expect("Invalid topic"); + let request_topic_restart = RestartOperationRequest::topic(); - let response_topic_restart = - Topic::new(RestartOperationResponse::topic_name()).expect("Invalid topic"); + let response_topic_restart = RestartOperationResponse::topic(); let sm_home = Utf8PathBuf::from("/etc/tedge"); @@ -790,8 +784,7 @@ mod tests { // calling handle_restart_operation should create a file in /tmp/tedge_agent_restart let (_output, mut output_stream) = mqtt_tests::output_stream(); - let response_topic_restart = - Topic::new(RestartOperationResponse::topic_name()).expect("Invalid topic"); + let response_topic_restart = RestartOperationResponse::topic(); agent .handle_restart_operation(&mut output_stream, &response_topic_restart) @@ -866,8 +859,7 @@ mod tests { ) .unwrap(); - let response_topic_restart = - Topic::new(SoftwareListResponse::topic_name()).expect("Invalid topic"); + let response_topic_restart = SoftwareListResponse::topic(); let plugins = Arc::new(Mutex::new( ExternalPlugins::open( diff --git a/crates/core/tedge_api/src/lib.rs b/crates/core/tedge_api/src/lib.rs index 7cb2ce72454..7dab9b02292 100644 --- a/crates/core/tedge_api/src/lib.rs +++ b/crates/core/tedge_api/src/lib.rs @@ -32,6 +32,7 @@ pub use software::*; #[cfg(test)] mod tests { use super::*; + use mqtt_channel::Topic; use regex::Regex; #[test] @@ -39,20 +40,28 @@ mod tests { // There are two topics for each kind of requests, // one for the requests, the other for the responses assert_eq!( - SoftwareListRequest::topic_name(), - "tedge/commands/req/software/list" + SoftwareListRequest::topic(), + Topic::new_unchecked("tedge/commands/req/software/list") ); assert_eq!( - SoftwareListResponse::topic_name(), - "tedge/commands/res/software/list" + SoftwareListResponse::topic(), + Topic::new_unchecked("tedge/commands/res/software/list") ); assert_eq!( - SoftwareUpdateRequest::topic_name(), - "tedge/commands/req/software/update" + SoftwareUpdateRequest::topic(), + Topic::new_unchecked("tedge/commands/req/software/update") ); assert_eq!( - SoftwareUpdateResponse::topic_name(), - "tedge/commands/res/software/update" + SoftwareUpdateResponse::topic(), + Topic::new_unchecked("tedge/commands/res/software/update") + ); + assert_eq!( + RestartOperationRequest::topic(), + Topic::new_unchecked("tedge/commands/req/control/restart") + ); + assert_eq!( + RestartOperationResponse::topic(), + Topic::new_unchecked("tedge/commands/res/control/restart") ); } diff --git a/crates/core/tedge_api/src/messages.rs b/crates/core/tedge_api/src/messages.rs index 8b3c4240378..ef3b41f8c01 100644 --- a/crates/core/tedge_api/src/messages.rs +++ b/crates/core/tedge_api/src/messages.rs @@ -6,6 +6,13 @@ use nanoid::nanoid; use serde::Deserialize; use serde::Serialize; +const SOFTWARE_LIST_REQUEST_TOPIC: &str = "tedge/commands/req/software/list"; +const SOFTWARE_LIST_RESPONSE_TOPIC: &str = "tedge/commands/res/software/list"; +const SOFTWARE_UPDATE_REQUEST_TOPIC: &str = "tedge/commands/req/software/update"; +const SOFTWARE_UPDATE_RESPONSE_TOPIC: &str = "tedge/commands/res/software/update"; +const DEVICE_RESTART_REQUEST_TOPIC: &str = "tedge/commands/req/control/restart"; +const DEVICE_RESTART_RESPONSE_TOPIC: &str = "tedge/commands/res/control/restart"; + /// All the messages are serialized using json. pub trait Jsonify<'a> where @@ -58,12 +65,8 @@ impl SoftwareListRequest { SoftwareListRequest { id: id.to_string() } } - pub fn topic_name() -> &'static str { - "tedge/commands/req/software/list" - } - pub fn topic() -> Topic { - Topic::new_unchecked(Self::topic_name()) + Topic::new_unchecked(SOFTWARE_LIST_REQUEST_TOPIC) } } @@ -96,12 +99,8 @@ impl SoftwareUpdateRequest { } } - pub fn topic_name() -> &'static str { - "tedge/commands/req/software/update" - } - pub fn topic() -> Topic { - Topic::new_unchecked(Self::topic_name()) + Topic::new_unchecked(SOFTWARE_UPDATE_REQUEST_TOPIC) } pub fn add_update(&mut self, mut update: SoftwareModuleUpdate) { @@ -213,12 +212,8 @@ impl SoftwareListResponse { } } - pub fn topic_name() -> &'static str { - "tedge/commands/res/software/list" - } - pub fn topic() -> Topic { - Topic::new_unchecked(Self::topic_name()) + Topic::new_unchecked(SOFTWARE_LIST_RESPONSE_TOPIC) } pub fn add_modules(&mut self, plugin_type: &str, modules: Vec) { @@ -269,12 +264,8 @@ impl SoftwareUpdateResponse { } } - pub fn topic_name() -> &'static str { - "tedge/commands/res/software/update" - } - pub fn topic() -> Topic { - Topic::new_unchecked(Self::topic_name()) + Topic::new_unchecked(SOFTWARE_UPDATE_RESPONSE_TOPIC) } pub fn add_modules(&mut self, plugin_type: &str, modules: Vec) { @@ -509,8 +500,8 @@ impl RestartOperationRequest { RestartOperationRequest { id: id.to_string() } } - pub fn topic_name() -> &'static str { - "tedge/commands/req/control/restart" + pub fn topic() -> Topic { + Topic::new_unchecked(DEVICE_RESTART_REQUEST_TOPIC) } } @@ -534,8 +525,8 @@ impl RestartOperationResponse { Self { status, ..self } } - pub fn topic_name() -> &'static str { - "tedge/commands/res/control/restart" + pub fn topic() -> Topic { + Topic::new_unchecked(DEVICE_RESTART_RESPONSE_TOPIC) } pub fn status(&self) -> OperationStatus { diff --git a/crates/core/tedge_mapper/Cargo.toml b/crates/core/tedge_mapper/Cargo.toml index 4cdefeff0aa..7854cd15a57 100644 --- a/crates/core/tedge_mapper/Cargo.toml +++ b/crates/core/tedge_mapper/Cargo.toml @@ -62,9 +62,9 @@ async-trait = "0.1" aws_mapper_ext = { path = "../../extensions/aws_mapper_ext" } az_mapper_ext = { path = "../../extensions/az_mapper_ext" } batcher = { path = "../../common/batcher" } -camino = "1.1.4" c8y_api = { path = "../c8y_api" } c8y_http_proxy = { path = "../../extensions/c8y_http_proxy" } +camino = "1.1" clap = { version = "3.2", features = ["cargo", "derive"] } clock = { path = "../../common/clock" } collectd_ext = { path = "../../extensions/collectd_ext" } @@ -85,15 +85,14 @@ serde_json = "1.0" tedge_actors = { path = "../../core/tedge_actors" } tedge_api = { path = "../tedge_api" } tedge_config = { path = "../../common/tedge_config" } +tedge_file_system_ext = { path = "../../extensions/tedge_file_system_ext" } tedge_health_ext = { path = "../../extensions/tedge_health_ext" } tedge_http_ext = { path = "../../extensions/tedge_http_ext" } tedge_mqtt_ext = { path = "../../extensions/tedge_mqtt_ext" } -tedge_file_system_ext = { path = "../../extensions/tedge_file_system_ext" } tedge_signal_ext = { path = "../../extensions/tedge_signal_ext" } tedge_timer_ext = { path = "../../extensions/tedge_timer_ext" } tedge_utils = { path = "../../common/tedge_utils", features = [ "logging", - "fs-notify", ] } thiserror = "1.0" time = "0.3" diff --git a/crates/core/tedge_mapper/src/c8y/actor.rs b/crates/core/tedge_mapper/src/c8y/actor.rs index 9393787b729..78eb43dd0e4 100644 --- a/crates/core/tedge_mapper/src/c8y/actor.rs +++ b/crates/core/tedge_mapper/src/c8y/actor.rs @@ -3,12 +3,10 @@ use super::config::MQTT_MESSAGE_SIZE_THRESHOLD; use super::converter::CumulocityConverter; use super::converter::CumulocityDeviceInfo; use super::dynamic_discovery::process_inotify_events; -use super::mapper::CumulocityMapper; use crate::core::converter::Converter; use crate::core::converter::MapperConfig; use crate::core::size_threshold::SizeThreshold; use async_trait::async_trait; -use c8y_api::http_proxy; use c8y_api::smartrest::operations::Operations; use c8y_api::smartrest::topic::SMARTREST_PUBLISH_TOPIC; use c8y_http_proxy::handle::C8YHttpProxy; @@ -23,7 +21,6 @@ use tedge_actors::fan_in_message_type; use tedge_actors::Actor; use tedge_actors::Builder; use tedge_actors::DynSender; -use tedge_actors::LinkError; use tedge_actors::LoggingSender; use tedge_actors::MessageReceiver; use tedge_actors::MessageSink; @@ -33,7 +30,6 @@ use tedge_actors::RuntimeError; use tedge_actors::RuntimeRequest; use tedge_actors::RuntimeRequestSink; use tedge_actors::Sender; -use tedge_actors::ServiceConsumer; use tedge_actors::ServiceProvider; use tedge_actors::SimpleMessageBox; use tedge_actors::SimpleMessageBoxBuilder; @@ -43,7 +39,7 @@ use tedge_mqtt_ext::TopicFilter; use tedge_timer_ext::SetTimeout; use tedge_timer_ext::Timeout; -const SYNC_WINDOW: Duration = Duration::from_secs(300); +const SYNC_WINDOW: Duration = Duration::from_secs(3); pub type SyncStart = SetTimeout<()>; pub type SyncComplete = Timeout<()>; diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index 79165482cbf..58b0b03cd52 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -4,14 +4,10 @@ use super::service_monitor::service_monitor_status_message; use crate::core::component::TEdgeComponent; use crate::core::mapper::start_basic_actors; use async_trait::async_trait; -use c8y_api::smartrest::operations::Operations; -use c8y_api::smartrest::topic::C8yTopic; use c8y_http_proxy::credentials::C8YJwtRetriever; use c8y_http_proxy::C8YHttpProxyBuilder; use mqtt_channel::Config; -use mqtt_channel::TopicFilter; use std::path::Path; -use tedge_api::topic::ResponseTopic; use tedge_config::ConfigSettingAccessor; use tedge_config::DeviceIdSetting; use tedge_config::ServiceTypeSetting; diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs index 86ce544d8c5..0864834bf7f 100644 --- a/crates/core/tedge_mapper/src/c8y/tests.rs +++ b/crates/core/tedge_mapper/src/c8y/tests.rs @@ -28,10 +28,8 @@ use tedge_actors::Actor; use tedge_actors::Builder; use tedge_actors::LoggingSender; use tedge_actors::MessageReceiver; -use tedge_actors::MessageSource; use tedge_actors::NoMessage; use tedge_actors::Sender; -use tedge_actors::ServiceConsumer; use tedge_actors::SimpleMessageBox; use tedge_actors::SimpleMessageBoxBuilder; use tedge_api::SoftwareUpdateResponse; @@ -41,7 +39,6 @@ use tedge_mqtt_ext::test_helpers::assert_received_includes_json; use tedge_mqtt_ext::MqttMessage; use tedge_test_utils::fs::TempTedgeDir; use tedge_timer_ext::Timeout; -use tokio::task::JoinHandle; const TEST_TIMEOUT_MS: Duration = Duration::from_millis(5000); @@ -49,8 +46,8 @@ const TEST_TIMEOUT_MS: Duration = Duration::from_millis(5000); #[serial] async fn mapper_publishes_init_messages_on_startup() { // Start SM Mapper - let mut cfg_dir = TempTedgeDir::new(); - let (mqtt, _http, _fs, _timer) = spawn_c8y_mapper_actor(&mut cfg_dir, true).await; + let cfg_dir = TempTedgeDir::new(); + let (mqtt, _http, _fs, _timer) = spawn_c8y_mapper_actor(&cfg_dir, true).await; let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); @@ -88,8 +85,7 @@ async fn mapper_publishes_init_messages_on_startup() { async fn mapper_publishes_software_update_request() { // The test assures SM Mapper correctly receives software update request smartrest message on `c8y/s/ds` // and converts it to thin-edge json message published on `tedge/commands/req/software/update`. - let (mqtt, mut http, _fs, _timer) = - spawn_c8y_mapper_actor(&mut TempTedgeDir::new(), true).await; + let (mqtt, http, _fs, _timer) = spawn_c8y_mapper_actor(&TempTedgeDir::new(), true).await; spawn_dummy_c8y_http_proxy(http); let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); @@ -134,7 +130,7 @@ async fn mapper_publishes_software_update_status_onto_c8y_topic() { // and publishes status of the operation `501` on `c8y/s/us` // Start SM Mapper - let (mqtt, http, _fs, _timer) = spawn_c8y_mapper_actor(&mut TempTedgeDir::new(), true).await; + let (mqtt, http, _fs, _timer) = spawn_c8y_mapper_actor(&TempTedgeDir::new(), true).await; spawn_dummy_c8y_http_proxy(http); let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); @@ -186,7 +182,7 @@ async fn mapper_publishes_software_update_status_onto_c8y_topic() { #[serial] async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() { // Start SM Mapper - let (mqtt, _http, _fs, _timer) = spawn_c8y_mapper_actor(&mut TempTedgeDir::new(), true).await; + let (mqtt, _http, _fs, _timer) = spawn_c8y_mapper_actor(&TempTedgeDir::new(), true).await; let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); mqtt.skip(6).await; @@ -336,7 +332,7 @@ async fn mapper_publishes_software_update_request_with_wrong_action() { // Then SM Mapper publishes an operation status message as failed `502,c8y_SoftwareUpdate,Action remove is not recognized. It must be install or delete.` on `c8/s/us`. // Then the subscriber that subscribed for messages on `c8/s/us` receives these messages and verifies them. - let (mqtt, _http, _fs, _timer) = spawn_c8y_mapper_actor(&mut TempTedgeDir::new(), true).await; + let (mqtt, _http, _fs, _timer) = spawn_c8y_mapper_actor(&TempTedgeDir::new(), true).await; let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); mqtt.skip(6).await; @@ -367,8 +363,7 @@ async fn mapper_publishes_software_update_request_with_wrong_action() { #[tokio::test] #[serial] async fn c8y_mapper_alarm_mapping_to_smartrest() { - let (mqtt, _http, _fs, mut timer) = - spawn_c8y_mapper_actor(&mut TempTedgeDir::new(), true).await; + let (mqtt, _http, _fs, mut timer) = spawn_c8y_mapper_actor(&TempTedgeDir::new(), true).await; timer.send(Timeout::new(())).await.unwrap(); //Complete sync phase so that alarm mapping starts let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); @@ -1404,7 +1399,7 @@ async fn test_convert_small_measurement_for_child_device() { #[serial] async fn mapper_handles_multiline_sm_requests() { // The test assures if Mapper can handle multiline smartrest messages arrived on `c8y/s/ds` - let (mqtt, http, _fs, _timer) = spawn_c8y_mapper_actor(&mut TempTedgeDir::new(), true).await; + let (mqtt, http, _fs, _timer) = spawn_c8y_mapper_actor(&TempTedgeDir::new(), true).await; spawn_dummy_c8y_http_proxy(http); let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); @@ -1464,10 +1459,10 @@ async fn mapper_publishes_supported_operations() { // The test assures tede-mapper reads/parses the operations from operations directory and // correctly publishes the supported operations message on `c8y/s/us` // and verifies the supported operations that are published by the tedge-mapper. - let mut cfg_dir = TempTedgeDir::new(); + let cfg_dir = TempTedgeDir::new(); create_thin_edge_operations(&cfg_dir, vec!["c8y_TestOp1", "c8y_TestOp2"]); - let (mqtt, _http, _fs, _timer) = spawn_c8y_mapper_actor(&mut cfg_dir, false).await; + let (mqtt, _http, _fs, _timer) = spawn_c8y_mapper_actor(&cfg_dir, false).await; let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); mqtt.skip(1).await; @@ -1482,10 +1477,10 @@ async fn mapper_publishes_child_device_create_message() { // The test assures tedge-mapper checks if there is a directory for operations for child devices, then it reads and // correctly publishes the child device create message on to `c8y/s/us` // and verifies the device create message. - let mut cfg_dir = TempTedgeDir::new(); + let cfg_dir = TempTedgeDir::new(); create_thin_edge_child_devices(&cfg_dir, vec!["child1", "child2"]); - let (mqtt, _http, _fs, _timer) = spawn_c8y_mapper_actor(&mut cfg_dir, false).await; + let (mqtt, _http, _fs, _timer) = spawn_c8y_mapper_actor(&cfg_dir, false).await; let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); mqtt.skip(6).await; @@ -1807,12 +1802,6 @@ fn create_thin_edge_child_operations(cfg_dir: &TempTedgeDir, child_id: &str, ops } } -fn remove_whitespace(s: &str) -> String { - let mut s = String::from(s); - s.retain(|c| !c.is_whitespace()); - s -} - async fn spawn_c8y_mapper_actor( config_dir: &TempTedgeDir, init: bool, diff --git a/crates/extensions/tedge_http_ext/src/messages.rs b/crates/extensions/tedge_http_ext/src/messages.rs index 47e3378dc96..1046db888d5 100644 --- a/crates/extensions/tedge_http_ext/src/messages.rs +++ b/crates/extensions/tedge_http_ext/src/messages.rs @@ -67,7 +67,7 @@ impl HttpRequestBuilder { } } - /// Start to build a POST request + /// Start to build a PUT request pub fn put(uri: T) -> Self where hyper::Uri: TryFrom,