Skip to content

Commit

Permalink
Addressing review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
albinsuresh committed Sep 26, 2023
1 parent 525e47f commit cdb1143
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 68 deletions.
13 changes: 0 additions & 13 deletions crates/core/c8y_api/src/smartrest/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,6 @@ impl From<C8yTopic> for TopicFilter {
}
}

// FIXME this From conversion is error prone as this can only be used for responses.
impl From<&EntityMetadata> for C8yTopic {
fn from(value: &EntityMetadata) -> Self {
match value.r#type {
EntityType::MainDevice => Self::SmartRestResponse,
EntityType::ChildDevice => {
Self::ChildSmartRestResponse(value.external_id.clone().into())
}
EntityType::Service => Self::SmartRestResponse, // TODO how services are handled by c8y?
}
}
}

impl From<&C8yAlarm> for C8yTopic {
fn from(value: &C8yAlarm) -> Self {
match value {
Expand Down
49 changes: 1 addition & 48 deletions crates/core/tedge_api/src/entity_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ use crate::entity_store;
use crate::mqtt_topics::EntityTopicId;
use crate::mqtt_topics::TopicIdError;
use mqtt_channel::Message;
use mqtt_channel::Topic;
use serde_json::json;
use serde_json::Map;
use serde_json::Value;

/// Represents an "Entity topic identifier" portion of the MQTT topic
///
Expand Down Expand Up @@ -491,39 +488,6 @@ impl TryFrom<&Message> for EntityRegistrationMessage {
}
}

impl From<&EntityRegistrationMessage> for Message {
fn from(value: &EntityRegistrationMessage) -> Self {
let entity_topic_id = value.topic_id.clone();

let mut register_payload: Map<String, Value> = Map::new();

let entity_type = match value.r#type {
EntityType::MainDevice => "device",
EntityType::ChildDevice => "child-device",
EntityType::Service => "service",
};
register_payload.insert("@type".into(), Value::String(entity_type.to_string()));

if let Some(external_id) = &value.external_id {
register_payload.insert("@id".into(), Value::String(external_id.as_ref().into()));
}

if let Some(parent_id) = &value.parent {
register_payload.insert("@parent".into(), Value::String(parent_id.to_string()));
}

if let Value::Object(other_keys) = value.payload.clone() {
register_payload.extend(other_keys)
}

Message::new(
&Topic::new(&format!("{MQTT_ROOT}/{entity_topic_id}")).unwrap(),
serde_json::to_string(&Value::Object(register_payload)).unwrap(),
)
.with_retain()
}
}

/// Parse a MQTT message payload as an entity registration payload.
///
/// Returns `Some(register_payload)` if a payload is valid JSON and is a
Expand All @@ -543,6 +507,7 @@ mod tests {
use std::str::FromStr;

use assert_matches::assert_matches;
use mqtt_channel::Topic;
use serde_json::json;

use super::*;
Expand Down Expand Up @@ -935,18 +900,6 @@ mod tests {
);
}

#[test]
fn entity_registration_message_into_mqtt_message() {
let entity_reg_message = EntityRegistrationMessage::new(&Message::new(
&Topic::new("te/device/child2/service/collectd").unwrap(),
json!({"@type": "service"}).to_string(),
))
.unwrap();

let message: Message = (&entity_reg_message).into();
println!("{}", message.payload_str().unwrap());
}

#[test]
fn auto_register_service() {
let mut store = EntityStore::with_main_device(
Expand Down
39 changes: 37 additions & 2 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ use plugin_sm::operation_logs::OperationLogsError;
use serde::Deserialize;
use serde::Serialize;
use serde_json::json;
use serde_json::Map;
use serde_json::Value;
use service_monitor::convert_health_status_message;
use std::collections::HashMap;
use std::fs;
Expand Down Expand Up @@ -287,7 +289,9 @@ impl CumulocityConverter {
}
}

pub fn publish_topic_for_entity(
/// Return the SmartREST publish topic for the given entity
/// derived from its ancestors.
pub fn smartrest_publish_topic_for_entity(
&self,
entity_topic_id: &EntityTopicId,
) -> Result<Topic, ConversionError> {
Expand Down Expand Up @@ -904,13 +908,44 @@ impl CumulocityConverter {
}

let mut registration_messages = vec![];
registration_messages.push(registration_message.into());
registration_messages.push(self.convert_entity_registration_message(registration_message));
let mut c8y_message = self.try_convert_entity_registration(registration_message)?;
registration_messages.append(&mut c8y_message);

Ok(registration_messages)
}

fn convert_entity_registration_message(&self, value: &EntityRegistrationMessage) -> Message {
let entity_topic_id = value.topic_id.clone();

let mut register_payload: Map<String, Value> = Map::new();

let entity_type = match value.r#type {
EntityType::MainDevice => "device",
EntityType::ChildDevice => "child-device",
EntityType::Service => "service",
};
register_payload.insert("@type".into(), Value::String(entity_type.to_string()));

if let Some(external_id) = &value.external_id {
register_payload.insert("@id".into(), Value::String(external_id.as_ref().into()));
}

if let Some(parent_id) = &value.parent {
register_payload.insert("@parent".into(), Value::String(parent_id.to_string()));
}

if let Value::Object(other_keys) = value.payload.clone() {
register_payload.extend(other_keys)
}

Message::new(
&Topic::new(&format!("{}/{entity_topic_id}", self.mqtt_schema.root)).unwrap(),
serde_json::to_string(&Value::Object(register_payload)).unwrap(),
)
.with_retain()
}

async fn try_convert_tedge_topics(
&mut self,
message: &Message,
Expand Down
6 changes: 2 additions & 4 deletions crates/extensions/c8y_mapper_ext/src/log_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use c8y_api::smartrest::smartrest_serializer::SmartRestSerializer;
use c8y_api::smartrest::smartrest_serializer::SmartRestSetOperationToExecuting;
use c8y_api::smartrest::smartrest_serializer::SmartRestSetOperationToFailed;
use c8y_api::smartrest::smartrest_serializer::SmartRestSetOperationToSuccessful;
use c8y_api::smartrest::topic::C8yTopic;
use nanoid::nanoid;
use tedge_api::entity_store::EntityType;
use tedge_api::messages::CommandStatus;
Expand Down Expand Up @@ -112,8 +111,7 @@ impl CumulocityConverter {
})?;
let external_id = &device.external_id;

let c8y_topic: C8yTopic = device.into();
let smartrest_topic = c8y_topic.to_topic()?;
let smartrest_topic = self.smartrest_publish_topic_for_entity(topic_id)?;

let payload = message.payload_str()?;
let response = &LogUploadCmdPayload::from_json(payload)?;
Expand Down Expand Up @@ -228,7 +226,7 @@ impl CumulocityConverter {
let supported_log_types = types.join(",");
let payload = format!("118,{supported_log_types}");

let c8y_topic = self.publish_topic_for_entity(topic_id)?;
let c8y_topic = self.smartrest_publish_topic_for_entity(topic_id)?;
Ok(vec![MqttMessage::new(&c8y_topic, payload)])
}
}
2 changes: 1 addition & 1 deletion crates/extensions/c8y_mapper_ext/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1776,7 +1776,7 @@ async fn mapper_converts_log_upload_cmd_to_supported_op_and_types_for_child_devi
// Validate if the supported operation file is created
assert!(ttd
.path()
.join("operations/c8y/test-device:device:child1/c8y_LogfileRequest")
.join("operations/c8y/child1/c8y_LogfileRequest")
.exists());
}

Expand Down

0 comments on commit cdb1143

Please sign in to comment.