Skip to content

Commit

Permalink
Addressing review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
albinsuresh committed Apr 27, 2023
1 parent 5450cac commit 1c85072
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 84 deletions.
24 changes: 8 additions & 16 deletions crates/core/tedge_agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,23 +121,17 @@ impl Default for SmAgentConfig {

let response_topic_health = Topic::new_unchecked("tedge/health/tedge-agent");

let request_topic_list =
Topic::new(SoftwareListRequest::topic_name()).expect("Invalid topic");
let request_topic_list = SoftwareListRequest::topic();

let request_topic_update =
Topic::new(SoftwareUpdateRequest::topic_name()).expect("Invalid topic");
let request_topic_update = SoftwareUpdateRequest::topic();

let response_topic_list =
Topic::new(SoftwareListResponse::topic_name()).expect("Invalid topic");
let response_topic_list = SoftwareListResponse::topic();

let response_topic_update =
Topic::new(SoftwareUpdateResponse::topic_name()).expect("Invalid topic");
let response_topic_update = SoftwareUpdateResponse::topic();

let request_topic_restart =
Topic::new(RestartOperationRequest::topic_name()).expect("Invalid topic");
let request_topic_restart = RestartOperationRequest::topic();

let response_topic_restart =
Topic::new(RestartOperationResponse::topic_name()).expect("Invalid topic");
let response_topic_restart = RestartOperationResponse::topic();

let sm_home = Utf8PathBuf::from("/etc/tedge");

Expand Down Expand Up @@ -790,8 +784,7 @@ mod tests {

// calling handle_restart_operation should create a file in /tmp/tedge_agent_restart
let (_output, mut output_stream) = mqtt_tests::output_stream();
let response_topic_restart =
Topic::new(RestartOperationResponse::topic_name()).expect("Invalid topic");
let response_topic_restart = RestartOperationResponse::topic();

agent
.handle_restart_operation(&mut output_stream, &response_topic_restart)
Expand Down Expand Up @@ -866,8 +859,7 @@ mod tests {
)
.unwrap();

let response_topic_restart =
Topic::new(SoftwareListResponse::topic_name()).expect("Invalid topic");
let response_topic_restart = SoftwareListResponse::topic();

let plugins = Arc::new(Mutex::new(
ExternalPlugins::open(
Expand Down
25 changes: 17 additions & 8 deletions crates/core/tedge_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,36 @@ pub use software::*;
#[cfg(test)]
mod tests {
use super::*;
use mqtt_channel::Topic;
use regex::Regex;

#[test]
fn topic_names() {
// There are two topics for each kind of requests,
// one for the requests, the other for the responses
assert_eq!(
SoftwareListRequest::topic_name(),
"tedge/commands/req/software/list"
SoftwareListRequest::topic(),
Topic::new_unchecked("tedge/commands/req/software/list")
);
assert_eq!(
SoftwareListResponse::topic_name(),
"tedge/commands/res/software/list"
SoftwareListResponse::topic(),
Topic::new_unchecked("tedge/commands/res/software/list")
);
assert_eq!(
SoftwareUpdateRequest::topic_name(),
"tedge/commands/req/software/update"
SoftwareUpdateRequest::topic(),
Topic::new_unchecked("tedge/commands/req/software/update")
);
assert_eq!(
SoftwareUpdateResponse::topic_name(),
"tedge/commands/res/software/update"
SoftwareUpdateResponse::topic(),
Topic::new_unchecked("tedge/commands/res/software/update")
);
assert_eq!(
RestartOperationRequest::topic(),
Topic::new_unchecked("tedge/commands/req/control/restart")
);
assert_eq!(
RestartOperationResponse::topic(),
Topic::new_unchecked("tedge/commands/res/control/restart")
);
}

Expand Down
39 changes: 15 additions & 24 deletions crates/core/tedge_api/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ use nanoid::nanoid;
use serde::Deserialize;
use serde::Serialize;

const SOFTWARE_LIST_REQUEST_TOPIC: &str = "tedge/commands/req/software/list";
const SOFTWARE_LIST_RESPONSE_TOPIC: &str = "tedge/commands/res/software/list";
const SOFTWARE_UPDATE_REQUEST_TOPIC: &str = "tedge/commands/req/software/update";
const SOFTWARE_UPDATE_RESPONSE_TOPIC: &str = "tedge/commands/res/software/update";
const DEVICE_RESTART_REQUEST_TOPIC: &str = "tedge/commands/req/control/restart";
const DEVICE_RESTART_RESPONSE_TOPIC: &str = "tedge/commands/res/control/restart";

/// All the messages are serialized using json.
pub trait Jsonify<'a>
where
Expand Down Expand Up @@ -58,12 +65,8 @@ impl SoftwareListRequest {
SoftwareListRequest { id: id.to_string() }
}

pub fn topic_name() -> &'static str {
"tedge/commands/req/software/list"
}

pub fn topic() -> Topic {
Topic::new_unchecked(Self::topic_name())
Topic::new_unchecked(SOFTWARE_LIST_REQUEST_TOPIC)
}
}

Expand Down Expand Up @@ -96,12 +99,8 @@ impl SoftwareUpdateRequest {
}
}

pub fn topic_name() -> &'static str {
"tedge/commands/req/software/update"
}

pub fn topic() -> Topic {
Topic::new_unchecked(Self::topic_name())
Topic::new_unchecked(SOFTWARE_UPDATE_REQUEST_TOPIC)
}

pub fn add_update(&mut self, mut update: SoftwareModuleUpdate) {
Expand Down Expand Up @@ -213,12 +212,8 @@ impl SoftwareListResponse {
}
}

pub fn topic_name() -> &'static str {
"tedge/commands/res/software/list"
}

pub fn topic() -> Topic {
Topic::new_unchecked(Self::topic_name())
Topic::new_unchecked(SOFTWARE_LIST_RESPONSE_TOPIC)
}

pub fn add_modules(&mut self, plugin_type: &str, modules: Vec<SoftwareModule>) {
Expand Down Expand Up @@ -269,12 +264,8 @@ impl SoftwareUpdateResponse {
}
}

pub fn topic_name() -> &'static str {
"tedge/commands/res/software/update"
}

pub fn topic() -> Topic {
Topic::new_unchecked(Self::topic_name())
Topic::new_unchecked(SOFTWARE_UPDATE_RESPONSE_TOPIC)
}

pub fn add_modules(&mut self, plugin_type: &str, modules: Vec<SoftwareModule>) {
Expand Down Expand Up @@ -509,8 +500,8 @@ impl RestartOperationRequest {
RestartOperationRequest { id: id.to_string() }
}

pub fn topic_name() -> &'static str {
"tedge/commands/req/control/restart"
pub fn topic() -> Topic {
Topic::new_unchecked(DEVICE_RESTART_REQUEST_TOPIC)
}
}

Expand All @@ -534,8 +525,8 @@ impl RestartOperationResponse {
Self { status, ..self }
}

pub fn topic_name() -> &'static str {
"tedge/commands/res/control/restart"
pub fn topic() -> Topic {
Topic::new_unchecked(DEVICE_RESTART_RESPONSE_TOPIC)
}

pub fn status(&self) -> OperationStatus {
Expand Down
5 changes: 2 additions & 3 deletions crates/core/tedge_mapper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ async-trait = "0.1"
aws_mapper_ext = { path = "../../extensions/aws_mapper_ext" }
az_mapper_ext = { path = "../../extensions/az_mapper_ext" }
batcher = { path = "../../common/batcher" }
camino = "1.1.4"
c8y_api = { path = "../c8y_api" }
c8y_http_proxy = { path = "../../extensions/c8y_http_proxy" }
camino = "1.1"
clap = { version = "3.2", features = ["cargo", "derive"] }
clock = { path = "../../common/clock" }
collectd_ext = { path = "../../extensions/collectd_ext" }
Expand All @@ -85,15 +85,14 @@ serde_json = "1.0"
tedge_actors = { path = "../../core/tedge_actors" }
tedge_api = { path = "../tedge_api" }
tedge_config = { path = "../../common/tedge_config" }
tedge_file_system_ext = { path = "../../extensions/tedge_file_system_ext" }
tedge_health_ext = { path = "../../extensions/tedge_health_ext" }
tedge_http_ext = { path = "../../extensions/tedge_http_ext" }
tedge_mqtt_ext = { path = "../../extensions/tedge_mqtt_ext" }
tedge_file_system_ext = { path = "../../extensions/tedge_file_system_ext" }
tedge_signal_ext = { path = "../../extensions/tedge_signal_ext" }
tedge_timer_ext = { path = "../../extensions/tedge_timer_ext" }
tedge_utils = { path = "../../common/tedge_utils", features = [
"logging",
"fs-notify",
] }
thiserror = "1.0"
time = "0.3"
Expand Down
6 changes: 1 addition & 5 deletions crates/core/tedge_mapper/src/c8y/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ use super::config::MQTT_MESSAGE_SIZE_THRESHOLD;
use super::converter::CumulocityConverter;
use super::converter::CumulocityDeviceInfo;
use super::dynamic_discovery::process_inotify_events;
use super::mapper::CumulocityMapper;
use crate::core::converter::Converter;
use crate::core::converter::MapperConfig;
use crate::core::size_threshold::SizeThreshold;
use async_trait::async_trait;
use c8y_api::http_proxy;
use c8y_api::smartrest::operations::Operations;
use c8y_api::smartrest::topic::SMARTREST_PUBLISH_TOPIC;
use c8y_http_proxy::handle::C8YHttpProxy;
Expand All @@ -23,7 +21,6 @@ use tedge_actors::fan_in_message_type;
use tedge_actors::Actor;
use tedge_actors::Builder;
use tedge_actors::DynSender;
use tedge_actors::LinkError;
use tedge_actors::LoggingSender;
use tedge_actors::MessageReceiver;
use tedge_actors::MessageSink;
Expand All @@ -33,7 +30,6 @@ use tedge_actors::RuntimeError;
use tedge_actors::RuntimeRequest;
use tedge_actors::RuntimeRequestSink;
use tedge_actors::Sender;
use tedge_actors::ServiceConsumer;
use tedge_actors::ServiceProvider;
use tedge_actors::SimpleMessageBox;
use tedge_actors::SimpleMessageBoxBuilder;
Expand All @@ -43,7 +39,7 @@ use tedge_mqtt_ext::TopicFilter;
use tedge_timer_ext::SetTimeout;
use tedge_timer_ext::Timeout;

const SYNC_WINDOW: Duration = Duration::from_secs(300);
const SYNC_WINDOW: Duration = Duration::from_secs(3);

pub type SyncStart = SetTimeout<()>;
pub type SyncComplete = Timeout<()>;
Expand Down
4 changes: 0 additions & 4 deletions crates/core/tedge_mapper/src/c8y/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,10 @@ use super::service_monitor::service_monitor_status_message;
use crate::core::component::TEdgeComponent;
use crate::core::mapper::start_basic_actors;
use async_trait::async_trait;
use c8y_api::smartrest::operations::Operations;
use c8y_api::smartrest::topic::C8yTopic;
use c8y_http_proxy::credentials::C8YJwtRetriever;
use c8y_http_proxy::C8YHttpProxyBuilder;
use mqtt_channel::Config;
use mqtt_channel::TopicFilter;
use std::path::Path;
use tedge_api::topic::ResponseTopic;
use tedge_config::ConfigSettingAccessor;
use tedge_config::DeviceIdSetting;
use tedge_config::ServiceTypeSetting;
Expand Down
35 changes: 12 additions & 23 deletions crates/core/tedge_mapper/src/c8y/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ use tedge_actors::Actor;
use tedge_actors::Builder;
use tedge_actors::LoggingSender;
use tedge_actors::MessageReceiver;
use tedge_actors::MessageSource;
use tedge_actors::NoMessage;
use tedge_actors::Sender;
use tedge_actors::ServiceConsumer;
use tedge_actors::SimpleMessageBox;
use tedge_actors::SimpleMessageBoxBuilder;
use tedge_api::SoftwareUpdateResponse;
Expand All @@ -41,16 +39,15 @@ use tedge_mqtt_ext::test_helpers::assert_received_includes_json;
use tedge_mqtt_ext::MqttMessage;
use tedge_test_utils::fs::TempTedgeDir;
use tedge_timer_ext::Timeout;
use tokio::task::JoinHandle;

const TEST_TIMEOUT_MS: Duration = Duration::from_millis(5000);

#[tokio::test]
#[serial]
async fn mapper_publishes_init_messages_on_startup() {
// Start SM Mapper
let mut cfg_dir = TempTedgeDir::new();
let (mqtt, _http, _fs, _timer) = spawn_c8y_mapper_actor(&mut cfg_dir, true).await;
let cfg_dir = TempTedgeDir::new();
let (mqtt, _http, _fs, _timer) = spawn_c8y_mapper_actor(&cfg_dir, true).await;

let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS);

Expand Down Expand Up @@ -88,8 +85,7 @@ async fn mapper_publishes_init_messages_on_startup() {
async fn mapper_publishes_software_update_request() {
// The test assures SM Mapper correctly receives software update request smartrest message on `c8y/s/ds`
// and converts it to thin-edge json message published on `tedge/commands/req/software/update`.
let (mqtt, mut http, _fs, _timer) =
spawn_c8y_mapper_actor(&mut TempTedgeDir::new(), true).await;
let (mqtt, http, _fs, _timer) = spawn_c8y_mapper_actor(&TempTedgeDir::new(), true).await;
spawn_dummy_c8y_http_proxy(http);

let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS);
Expand Down Expand Up @@ -134,7 +130,7 @@ async fn mapper_publishes_software_update_status_onto_c8y_topic() {
// and publishes status of the operation `501` on `c8y/s/us`

// Start SM Mapper
let (mqtt, http, _fs, _timer) = spawn_c8y_mapper_actor(&mut TempTedgeDir::new(), true).await;
let (mqtt, http, _fs, _timer) = spawn_c8y_mapper_actor(&TempTedgeDir::new(), true).await;
spawn_dummy_c8y_http_proxy(http);

let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS);
Expand Down Expand Up @@ -186,7 +182,7 @@ async fn mapper_publishes_software_update_status_onto_c8y_topic() {
#[serial]
async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() {
// Start SM Mapper
let (mqtt, _http, _fs, _timer) = spawn_c8y_mapper_actor(&mut TempTedgeDir::new(), true).await;
let (mqtt, _http, _fs, _timer) = spawn_c8y_mapper_actor(&TempTedgeDir::new(), true).await;

let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS);
mqtt.skip(6).await;
Expand Down Expand Up @@ -336,7 +332,7 @@ async fn mapper_publishes_software_update_request_with_wrong_action() {
// Then SM Mapper publishes an operation status message as failed `502,c8y_SoftwareUpdate,Action remove is not recognized. It must be install or delete.` on `c8/s/us`.
// Then the subscriber that subscribed for messages on `c8/s/us` receives these messages and verifies them.

let (mqtt, _http, _fs, _timer) = spawn_c8y_mapper_actor(&mut TempTedgeDir::new(), true).await;
let (mqtt, _http, _fs, _timer) = spawn_c8y_mapper_actor(&TempTedgeDir::new(), true).await;

let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS);
mqtt.skip(6).await;
Expand Down Expand Up @@ -367,8 +363,7 @@ async fn mapper_publishes_software_update_request_with_wrong_action() {
#[tokio::test]
#[serial]
async fn c8y_mapper_alarm_mapping_to_smartrest() {
let (mqtt, _http, _fs, mut timer) =
spawn_c8y_mapper_actor(&mut TempTedgeDir::new(), true).await;
let (mqtt, _http, _fs, mut timer) = spawn_c8y_mapper_actor(&TempTedgeDir::new(), true).await;
timer.send(Timeout::new(())).await.unwrap(); //Complete sync phase so that alarm mapping starts

let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS);
Expand Down Expand Up @@ -1404,7 +1399,7 @@ async fn test_convert_small_measurement_for_child_device() {
#[serial]
async fn mapper_handles_multiline_sm_requests() {
// The test assures if Mapper can handle multiline smartrest messages arrived on `c8y/s/ds`
let (mqtt, http, _fs, _timer) = spawn_c8y_mapper_actor(&mut TempTedgeDir::new(), true).await;
let (mqtt, http, _fs, _timer) = spawn_c8y_mapper_actor(&TempTedgeDir::new(), true).await;
spawn_dummy_c8y_http_proxy(http);

let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS);
Expand Down Expand Up @@ -1464,10 +1459,10 @@ async fn mapper_publishes_supported_operations() {
// The test assures tede-mapper reads/parses the operations from operations directory and
// correctly publishes the supported operations message on `c8y/s/us`
// and verifies the supported operations that are published by the tedge-mapper.
let mut cfg_dir = TempTedgeDir::new();
let cfg_dir = TempTedgeDir::new();
create_thin_edge_operations(&cfg_dir, vec!["c8y_TestOp1", "c8y_TestOp2"]);

let (mqtt, _http, _fs, _timer) = spawn_c8y_mapper_actor(&mut cfg_dir, false).await;
let (mqtt, _http, _fs, _timer) = spawn_c8y_mapper_actor(&cfg_dir, false).await;
let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS);

mqtt.skip(1).await;
Expand All @@ -1482,10 +1477,10 @@ async fn mapper_publishes_child_device_create_message() {
// The test assures tedge-mapper checks if there is a directory for operations for child devices, then it reads and
// correctly publishes the child device create message on to `c8y/s/us`
// and verifies the device create message.
let mut cfg_dir = TempTedgeDir::new();
let cfg_dir = TempTedgeDir::new();
create_thin_edge_child_devices(&cfg_dir, vec!["child1", "child2"]);

let (mqtt, _http, _fs, _timer) = spawn_c8y_mapper_actor(&mut cfg_dir, false).await;
let (mqtt, _http, _fs, _timer) = spawn_c8y_mapper_actor(&cfg_dir, false).await;
let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS);
mqtt.skip(6).await;

Expand Down Expand Up @@ -1807,12 +1802,6 @@ fn create_thin_edge_child_operations(cfg_dir: &TempTedgeDir, child_id: &str, ops
}
}

fn remove_whitespace(s: &str) -> String {
let mut s = String::from(s);
s.retain(|c| !c.is_whitespace());
s
}

async fn spawn_c8y_mapper_actor(
config_dir: &TempTedgeDir,
init: bool,
Expand Down
Loading

0 comments on commit 1c85072

Please sign in to comment.