Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only send 114 when operations changed #2588

Merged
merged 1 commit into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/core/c8y_api/src/smartrest/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl Operation {
}
}

#[derive(Debug, Default, Clone)]
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct Operations {
operations: Vec<Operation>,
}
Expand Down
123 changes: 66 additions & 57 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ use tedge_actors::LoggingSender;
use tedge_actors::Sender;
use tedge_api::entity_store;
use tedge_api::entity_store::EntityExternalId;
use tedge_api::entity_store::EntityMetadata;
use tedge_api::entity_store::EntityRegistrationMessage;
use tedge_api::entity_store::EntityType;
use tedge_api::entity_store::Error;
Expand Down Expand Up @@ -1264,23 +1263,19 @@ impl CumulocityConverter {
}

fn create_supported_operations(&self, path: &Path) -> Result<Message, ConversionError> {
if is_child_operation_path(path) {
// operations for child
let child_id = get_child_id(&path.to_path_buf())?;
let topic = if is_child_operation_path(path) {
let child_id = get_child_external_id(path)?;
let child_external_id = Self::validate_external_id(&child_id)?;

let topic = C8yTopic::ChildSmartRestResponse(child_external_id.into()).to_topic()?;
Ok(Message::new(
&topic,
Operations::try_new(path)?.create_smartrest_ops_message(),
))
C8yTopic::ChildSmartRestResponse(child_external_id.into()).to_topic()?
} else {
// operations for parent
Ok(Message::new(
&Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC),
Operations::try_new(path)?.create_smartrest_ops_message(),
))
}
Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC)
};

Ok(Message::new(
&topic,
Operations::try_new(path)?.create_smartrest_ops_message(),
))
}

pub fn sync_messages(&mut self) -> Vec<Message> {
Expand All @@ -1293,42 +1288,26 @@ impl CumulocityConverter {
&mut self,
message: &DiscoverOp,
) -> Result<Option<Message>, ConversionError> {
// operation for parent
if message
.ops_dir
.eq(&self.cfg_dir.join("operations").join("c8y"))
{
// Re populate the operations irrespective add/remove/modify event
self.operations = get_operations(message.ops_dir.clone())?;
Ok(Some(self.create_supported_operations(&message.ops_dir)?))

// operation for child
} else if message.ops_dir.eq(&self
.cfg_dir
.join("operations")
.join("c8y")
.join(get_child_id(&message.ops_dir)?))
{
self.children.insert(
get_child_id(&message.ops_dir)?,
get_operations(message.ops_dir.clone())?,
);
let needs_cloud_update = self.update_operations(&message.ops_dir)?;

if needs_cloud_update {
Ok(Some(self.create_supported_operations(&message.ops_dir)?))
} else {
Ok(None)
}
}
}

fn get_child_id(dir_path: &PathBuf) -> Result<String, ConversionError> {
let dir_ele: Vec<&std::ffi::OsStr> = dir_path.as_path().iter().collect();

match dir_ele.last() {
// FIXME: this only extracts the final component of the path, the path prefix can be anything. this
// should be simplified
fn get_child_external_id(dir_path: &Path) -> Result<String, ConversionError> {
match dir_path.file_name() {
Some(child_id) => {
let child_id = child_id.to_string_lossy().to_string();
Ok(child_id)
}
// only returned when path is empty, e.g. "/", in practice this shouldn't ever be given as
// input
None => Err(ConversionError::DirPathComponentError {
dir: dir_path.to_owned(),
}),
Expand Down Expand Up @@ -1361,10 +1340,11 @@ fn create_request_for_cloud_child_devices() -> Message {
impl CumulocityConverter {
/// Register on C8y an operation capability for a device.
fn register_operation(
&self,
device: &EntityMetadata,
&mut self,
target: &EntityTopicId,
c8y_operation_name: &str,
) -> Result<Vec<Message>, ConversionError> {
let device = self.entity_store.try_get(target)?;
let ops_dir = match device.r#type {
EntityType::MainDevice => self.ops_dir.clone(),
EntityType::ChildDevice => {
Expand All @@ -1380,20 +1360,50 @@ impl CumulocityConverter {
let ops_file = ops_dir.join(c8y_operation_name);
create_directory_with_defaults(&ops_dir)?;
create_file_with_defaults(ops_file, None)?;
let device_operations = self.create_supported_operations(&ops_dir)?;
Ok(vec![device_operations])

let need_cloud_update = self.update_operations(&ops_dir)?;

if need_cloud_update {
let device_operations = self.create_supported_operations(&ops_dir)?;
return Ok(vec![device_operations]);
}

Ok(vec![])
}

/// Saves a new supported operation set for a given device.
///
/// If the supported operation set changed, `Ok(true)` is returned to denote that this change
/// should be sent to the cloud.
fn update_operations(&mut self, dir: &Path) -> Result<bool, ConversionError> {
let operations = get_operations(dir)?;
let current_operations = if is_child_operation_path(dir) {
let child_id = get_child_external_id(dir)?;
let Some(current_operations) = self.children.get_mut(&child_id) else {
self.children.insert(child_id, operations);
return Ok(true);
};
current_operations
} else {
&mut self.operations
};

let modified = *current_operations != operations;
*current_operations = operations;

Ok(modified)
}

async fn register_restart_operation(
&self,
&mut self,
target: &EntityTopicId,
) -> Result<Vec<Message>, ConversionError> {
match self.entity_store.get(target) {
None => {
match self.register_operation(target, "c8y_Restart") {
Err(_) => {
error!("Fail to register `restart` operation for unknown device: {target}");
Ok(vec![])
}
Some(device) => self.register_operation(device, "c8y_Restart"),
Ok(messages) => Ok(messages),
}
}

Expand Down Expand Up @@ -1462,20 +1472,19 @@ impl CumulocityConverter {
}

async fn register_software_update_operation(
&self,
&mut self,
target: &EntityTopicId,
) -> Result<Vec<Message>, ConversionError> {
match self.entity_store.get(target) {
None => {
let mut registration = match self.register_operation(target, "c8y_SoftwareUpdate") {
Err(_) => {
error!("Fail to register `software-list` operation for unknown device: {target}");
Ok(vec![])
}
Some(device) => {
let mut registration = self.register_operation(device, "c8y_SoftwareUpdate")?;
registration.push(self.request_software_list(target));
Ok(registration)
return Ok(vec![]);
}
}
Ok(messages) => messages,
};

registration.push(self.request_software_list(target));
Ok(registration)
}

async fn publish_software_update_status(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ Test Setup Custom Setup
Test Teardown Get Logs

*** Test Cases ***

Full supported operations message has no duplicates
Should Have MQTT Messages c8y/s/us message_pattern=114,c8y_DownloadConfigFile,c8y_LogfileRequest,c8y_RemoteAccessConnect,c8y_Restart,c8y_SoftwareUpdate,c8y_UploadConfigFile minimum=1 maximum=1

Create and publish the tedge agent supported operations on mapper restart
# stop mapper and remove the supported operations
ThinEdgeIO.Stop Service tedge-mapper-c8y
Expand Down