Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

C8y mapping of entity registration messages #2266

Merged
merged 13 commits into from
Sep 27, 2023
Merged
23 changes: 15 additions & 8 deletions crates/extensions/c8y_mapper_ext/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ use super::config::C8yMapperConfig;
use super::converter::CumulocityConverter;
use super::dynamic_discovery::process_inotify_events;
use async_trait::async_trait;
use c8y_api::smartrest::topic::SMARTREST_PUBLISH_TOPIC;
use c8y_auth_proxy::url::ProxyUrlGenerator;
use c8y_http_proxy::handle::C8YHttpProxy;
use c8y_http_proxy::messages::C8YRestRequest;
use c8y_http_proxy::messages::C8YRestResult;
use serde_json::json;
use std::path::PathBuf;
use std::time::Duration;
use tedge_actors::adapt;
Expand All @@ -26,10 +26,11 @@ use tedge_actors::Sender;
use tedge_actors::ServiceProvider;
use tedge_actors::SimpleMessageBox;
use tedge_actors::SimpleMessageBoxBuilder;
use tedge_api::entity_store::EntityRegistrationMessage;
use tedge_api::entity_store::EntityType;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_file_system_ext::FsWatchEvent;
use tedge_mqtt_ext::Message;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::Topic;
use tedge_mqtt_ext::TopicFilter;
use tedge_timer_ext::SetTimeout;
use tedge_timer_ext::Timeout;
Expand Down Expand Up @@ -127,11 +128,17 @@ impl C8yMapperActor {
FsWatchEvent::DirectoryCreated(path) => {
if let Some(directory_name) = path.file_name() {
let child_id = directory_name.to_string_lossy().to_string();
let message = Message::new(
&Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC),
format!("101,{child_id},{child_id},thin-edge.io-child"),
);
self.mqtt_publisher.send(message).await?;
let child_topic_id = EntityTopicId::default_child_device(&child_id).unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the drawback of using short child device names in path: one has to assume a default topic scheme.

Ideally, I would use the same hierarchy for paths as for MQTT. i.e. with a device directory and there a sub directory per child plus a main directory. Or an a/b/c/d tree if the user prefer its own schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the drawback of using short child device names in path: one has to assume a default topic scheme.

Since we're doing this for a legacy mechanism which assumed such things, isn't safe to make the assumptions here as well for the sake of consistency?

Ideally, I would use the same hierarchy for paths as for MQTT. i.e. with a device directory and there a sub directory per child plus a main directory. Or an a/b/c/d tree if the user prefer its own schema.

Yeah, we could do that if we are going to continue supporting this file system based entity creation mechanism for much longer. But, my understanding was that it's not gonna be around for much longer and hence there's no point in extending that further by supporting hierarchical nested child devices and all. So, if we really want to extend this API, we could do that in a follow-up PR.

Copy link
Contributor

@didier-wenzek didier-wenzek Sep 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could do that in a follow-up PR

Sure. The scope of this PR is already unexpectedly growing. We need to focus on what is definitely wrong. We will later address this kind of inconsistencies.

let child_device_reg_msg = EntityRegistrationMessage {
topic_id: child_topic_id,
external_id: None,
r#type: EntityType::ChildDevice,
parent: Some(EntityTopicId::default_main_device()),
payload: json!({}),
};
self.converter
.try_convert_entity_registration(&child_device_reg_msg)
.unwrap();
}
}
FsWatchEvent::FileCreated(path)
Expand Down
80 changes: 45 additions & 35 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ use tedge_actors::LoggingSender;
use tedge_actors::Sender;
use tedge_api::entity_store;
use tedge_api::entity_store::EntityExternalId;
use tedge_api::entity_store::EntityMetadata;
use tedge_api::entity_store::EntityRegistrationMessage;
use tedge_api::entity_store::EntityType;
use tedge_api::entity_store::Error;
Expand Down Expand Up @@ -236,15 +235,15 @@ impl CumulocityConverter {
})
}

fn try_convert_entity_registration(
pub fn try_convert_entity_registration(
&mut self,
entity_topic_id: &EntityTopicId,
input: &EntityRegistrationMessage,
) -> Result<Message, ConversionError> {
// Parse the optional fields
let display_name = input.payload.get("name").and_then(|v| v.as_str());
let display_type = input.payload.get("type").and_then(|v| v.as_str());

let entity_topic_id = &input.topic_id;
let external_id = self
.entity_store
.get(entity_topic_id)
Expand Down Expand Up @@ -292,7 +291,7 @@ impl CumulocityConverter {
/// - `device/child001//` => `DEVICE_COMMON_NAME:device:child001`
/// - `device/child001/service/service001` => `DEVICE_COMMON_NAME:device:child001:service:service001`
/// - `factory01/hallA/packaging/belt001` => `DEVICE_COMMON_NAME:factory01:hallA:packaging:belt001`
fn map_to_c8y_external_id(
pub fn map_to_c8y_external_id(
entity_topic_id: &EntityTopicId,
main_device_xid: &EntityExternalId,
) -> EntityExternalId {
Expand Down Expand Up @@ -717,7 +716,10 @@ impl CumulocityConverter {
self.children.insert(child_id.clone(), ops.clone());

let ops_msg = ops.create_smartrest_ops_message()?;
let topic_str = format!("{SMARTREST_PUBLISH_TOPIC}/{}", child_id);
let child_topic_id = EntityTopicId::default_child_device(&child_id).unwrap();
let child_external_id =
Self::map_to_c8y_external_id(&child_topic_id, &self.device_name.as_str().into());
let topic_str = format!("{SMARTREST_PUBLISH_TOPIC}/{}", child_external_id.as_ref());
let topic = Topic::new_unchecked(&topic_str);
messages_vec.push(Message::new(&topic, ops_msg));
}
Expand Down Expand Up @@ -779,8 +781,7 @@ impl CumulocityConverter {
if let Err(e) = self.entity_store.update(register_message.clone()) {
error!("Could not update device registration: {e}");
}
let c8y_message =
self.try_convert_entity_registration(&source, &register_message)?;
let c8y_message = self.try_convert_entity_registration(&register_message)?;
registration_messages.push(c8y_message);
}
}
Expand All @@ -803,8 +804,8 @@ impl CumulocityConverter {
}

registration_messages.push(auto_registration_message.into());
let c8y_message = self
.try_convert_entity_registration(&source, auto_registration_message)?;
let c8y_message =
self.try_convert_entity_registration(auto_registration_message)?;
registration_messages.push(c8y_message);
}
}
Expand Down Expand Up @@ -907,9 +908,9 @@ impl CumulocityConverter {
&self.cfg_dir,
));

let supported_operations_message = self.wrap_error(create_supported_operations(
&self.cfg_dir.join("operations").join("c8y"),
));
let supported_operations_message = self.wrap_error(
self.create_supported_operations(&self.cfg_dir.join("operations").join("c8y")),
);

let cloud_child_devices_message = create_request_for_cloud_child_devices();

Expand All @@ -929,6 +930,28 @@ impl CumulocityConverter {
])
}

fn create_supported_operations(&self, path: &Path) -> Result<Message, ConversionError> {
if is_child_operation_path(path) {
// operations for child
let child_id = get_child_id(&path.to_path_buf())?;
let child_topic_id = EntityTopicId::default_child_device(&child_id).unwrap();
let child_external_id =
Self::map_to_c8y_external_id(&child_topic_id, &self.device_name.as_str().into());
let stopic = format!("{SMARTREST_PUBLISH_TOPIC}/{}", child_external_id.as_ref());

Ok(Message::new(
&Topic::new_unchecked(&stopic),
Operations::try_new(path)?.create_smartrest_ops_message()?,
))
} else {
// operations for parent
Ok(Message::new(
&Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC),
Operations::try_new(path)?.create_smartrest_ops_message()?,
))
}
}

pub fn sync_messages(&mut self) -> Vec<Message> {
let sync_messages: Vec<Message> = self.alarm_converter.sync();
self.alarm_converter = AlarmConverter::Synced;
Expand All @@ -946,7 +969,7 @@ impl CumulocityConverter {
{
// Re populate the operations irrespective add/remove/modify event
self.operations = get_operations(message.ops_dir.clone())?;
Ok(Some(create_supported_operations(&message.ops_dir)?))
Ok(Some(self.create_supported_operations(&message.ops_dir)?))

// operation for child
} else if message.ops_dir.eq(&self
Expand All @@ -960,7 +983,7 @@ impl CumulocityConverter {
get_operations(message.ops_dir.clone())?,
);

Ok(Some(create_supported_operations(&message.ops_dir)?))
Ok(Some(self.create_supported_operations(&message.ops_dir)?))
} else {
Ok(None)
}
Expand Down Expand Up @@ -1020,25 +1043,6 @@ fn is_child_operation_path(path: &Path) -> bool {
}
}

fn create_supported_operations(path: &Path) -> Result<Message, ConversionError> {
if is_child_operation_path(path) {
// operations for child
let child_id = get_child_id(&path.to_path_buf())?;
let stopic = format!("{SMARTREST_PUBLISH_TOPIC}/{}", child_id);

Ok(Message::new(
&Topic::new_unchecked(&stopic),
Operations::try_new(path)?.create_smartrest_ops_message()?,
))
} else {
// operations for parent
Ok(Message::new(
&Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC),
Operations::try_new(path)?.create_smartrest_ops_message()?,
))
}
}

fn create_request_for_cloud_child_devices() -> Message {
Message::new(&Topic::new_unchecked("c8y/s/us"), "105")
}
Expand Down Expand Up @@ -1081,7 +1085,13 @@ impl CumulocityConverter {
let ops_dir = match device.r#type {
EntityType::MainDevice => self.ops_dir.clone(),
EntityType::ChildDevice => {
self.ops_dir.clone().join(device.external_id.as_ref())
let child_dir_name =
if let Some(child_local_id) = target.default_device_name() {
child_local_id
} else {
device.external_id.as_ref()
};
self.ops_dir.clone().join(child_dir_name)
}
EntityType::Service => {
error!("Unsupported `restart` operation for a service: {target}");
Expand All @@ -1091,7 +1101,7 @@ impl CumulocityConverter {
let ops_file = ops_dir.join("c8y_Restart");
create_directory_with_defaults(&ops_dir)?;
create_file_with_defaults(ops_file, None)?;
let device_operations = create_supported_operations(&ops_dir)?;
let device_operations = self.create_supported_operations(&ops_dir)?;
Ok(vec![device_operations])
}
}
Expand Down
9 changes: 6 additions & 3 deletions crates/extensions/c8y_mapper_ext/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,10 @@ async fn mapper_publishes_supported_operations_for_child_device() {
&mut mqtt,
[
("c8y/s/us", "101,child1,child1,thin-edge.io-child"),
("c8y/s/us/child1", "114,c8y_ChildTestOp1,c8y_ChildTestOp2\n"),
(
"c8y/s/us/test-device:device:child1",
"114,c8y_ChildTestOp1,c8y_ChildTestOp2\n",
),
],
)
.await;
Expand Down Expand Up @@ -1117,7 +1120,7 @@ async fn mapper_dynamically_updates_supported_operations_for_child_device() {
let cfg_dir = TempTedgeDir::new();
create_thin_edge_child_operations(
&cfg_dir,
"test-device:device:child1",
"child1",
vec!["c8y_ChildTestOp1", "c8y_ChildTestOp2"],
);

Expand All @@ -1131,7 +1134,7 @@ async fn mapper_dynamically_updates_supported_operations_for_child_device() {
cfg_dir
.dir("operations")
.dir("c8y")
.dir("test-device:device:child1")
.dir("child1")
.file("c8y_ChildTestOp3")
.to_path_buf(),
))
Expand Down