Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

C8y mapping of entity registration messages #2266

Merged
merged 13 commits into from
Sep 27, 2023
Merged
4 changes: 2 additions & 2 deletions crates/core/c8y_api/src/json_c8y.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,8 @@ impl C8yAlarm {
fn convert_source(entity: &EntityMetadata) -> Option<SourceInfo> {
match entity.r#type {
EntityType::MainDevice => None,
EntityType::ChildDevice => Some(make_c8y_source_fragment(entity.entity_id.as_ref())),
EntityType::Service => Some(make_c8y_source_fragment(entity.entity_id.as_ref())),
EntityType::ChildDevice => Some(make_c8y_source_fragment(entity.external_id.as_ref())),
EntityType::Service => Some(make_c8y_source_fragment(entity.external_id.as_ref())),
}
}

Expand Down
6 changes: 4 additions & 2 deletions crates/core/c8y_api/src/smartrest/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl C8yTopic {
match entity.r#type {
EntityType::MainDevice => Some(C8yTopic::upstream_topic()),
EntityType::ChildDevice | EntityType::Service => {
Self::ChildSmartRestResponse(entity.entity_id.clone().into())
Self::ChildSmartRestResponse(entity.external_id.clone().into())
.to_topic()
.ok()
}
Expand Down Expand Up @@ -108,7 +108,9 @@ impl From<&EntityMetadata> for C8yTopic {
fn from(value: &EntityMetadata) -> Self {
match value.r#type {
EntityType::MainDevice => Self::SmartRestResponse,
EntityType::ChildDevice => Self::ChildSmartRestResponse(value.entity_id.clone().into()),
EntityType::ChildDevice => {
Self::ChildSmartRestResponse(value.external_id.clone().into())
}
EntityType::Service => Self::SmartRestResponse, // TODO how services are handled by c8y?
albinsuresh marked this conversation as resolved.
Show resolved Hide resolved
}
}
Expand Down
107 changes: 77 additions & 30 deletions crates/core/tedge_api/src/entity_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use crate::mqtt_topics::EntityTopicId;
use crate::mqtt_topics::TopicIdError;
use mqtt_channel::Message;
use mqtt_channel::Topic;
use serde_json::Map;
use serde_json::Value;

/// Represents an "Entity topic identifier" portion of the MQTT topic
///
Expand Down Expand Up @@ -109,10 +111,10 @@ impl EntityStore {
return None;
}

let entity_id: EntityExternalId = main_device.entity_id?;
let entity_id: EntityExternalId = main_device.external_id?;
let metadata = EntityMetadata {
topic_id: main_device.topic_id.clone(),
entity_id: entity_id.clone(),
external_id: entity_id.clone(),
r#type: main_device.r#type,
parent: None,
other: main_device.payload,
Expand Down Expand Up @@ -146,7 +148,7 @@ impl EntityStore {

/// Returns the external id of the main device.
pub fn main_device_external_id(&self) -> EntityExternalId {
self.get(&self.main_device).unwrap().entity_id.clone()
self.get(&self.main_device).unwrap().external_id.clone()
}

/// Returns an ordered list of ancestors of the given entity
Expand Down Expand Up @@ -186,7 +188,7 @@ impl EntityStore {
.map(|tid| {
self.entities
.get(tid)
.map(|e| e.entity_id.clone().into())
.map(|e| e.external_id.clone().into())
.unwrap()
})
.collect();
Expand Down Expand Up @@ -258,13 +260,13 @@ impl EntityStore {
affected_entities.push(parent.clone());
}

let external_id = message.entity_id.unwrap_or_else(|| {
let external_id = message.external_id.unwrap_or_else(|| {
(self.external_id_mapper)(&topic_id, &self.main_device_external_id())
});
let entity_metadata = EntityMetadata {
topic_id: topic_id.clone(),
r#type: message.r#type,
entity_id: external_id.clone(),
external_id: external_id.clone(),
parent,
other: message.payload,
};
Expand Down Expand Up @@ -300,7 +302,7 @@ impl EntityStore {
pub fn auto_register_entity(
albinsuresh marked this conversation as resolved.
Show resolved Hide resolved
&mut self,
entity_topic_id: &EntityTopicId,
) -> Result<Vec<Message>, entity_store::Error> {
) -> Result<Vec<EntityRegistrationMessage>, entity_store::Error> {
if entity_topic_id.matches_default_topic_scheme() {
if entity_topic_id.is_default_main_device() {
return Ok(vec![]); // Do nothing as the main device is always pre-registered
Expand All @@ -327,10 +329,10 @@ impl EntityStore {
let topic = Topic::new(&format!("{MQTT_ROOT}/{parent_device_id}")).unwrap();
let device_register_message =
Message::new(&topic, device_register_payload).with_retain();
let device_register_message =
EntityRegistrationMessage::try_from(&device_register_message).unwrap();
albinsuresh marked this conversation as resolved.
Show resolved Hide resolved
register_messages.push(device_register_message.clone());
self.update(
EntityRegistrationMessage::try_from(&device_register_message).unwrap(),
)?;
self.update(device_register_message)?;
}

// if the entity is a service, register the service as well
Expand All @@ -349,10 +351,10 @@ impl EntityStore {
service_register_payload,
)
.with_retain();
let service_register_message =
EntityRegistrationMessage::try_from(&service_register_message).unwrap();
register_messages.push(service_register_message.clone());
self.update(
EntityRegistrationMessage::try_from(&service_register_message).unwrap(),
)?;
self.update(service_register_message)?;
}

Ok(register_messages)
Expand All @@ -367,7 +369,7 @@ pub struct EntityMetadata {
pub topic_id: EntityTopicId,
pub parent: Option<EntityTopicId>,
pub r#type: EntityType,
pub entity_id: EntityExternalId,
pub external_id: EntityExternalId,
pub other: serde_json::Value,
}

Expand All @@ -383,7 +385,7 @@ impl EntityMetadata {
pub fn main_device(device_id: String) -> Self {
Self {
topic_id: EntityTopicId::default_main_device(),
entity_id: device_id.into(),
external_id: device_id.into(),
r#type: EntityType::MainDevice,
parent: None,
other: serde_json::json!({}),
Expand All @@ -394,7 +396,7 @@ impl EntityMetadata {
pub fn child_device(child_device_id: String) -> Result<Self, TopicIdError> {
Ok(Self {
topic_id: EntityTopicId::default_child_device(&child_device_id)?,
entity_id: child_device_id.into(),
external_id: child_device_id.into(),
r#type: EntityType::ChildDevice,
parent: Some(EntityTopicId::default_main_device()),
other: serde_json::json!({}),
Expand Down Expand Up @@ -425,7 +427,7 @@ pub enum Error {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EntityRegistrationMessage {
pub topic_id: EntityTopicId,
pub entity_id: Option<EntityExternalId>,
pub external_id: Option<EntityExternalId>,
pub r#type: EntityType,
pub parent: Option<EntityTopicId>,
pub payload: serde_json::Value,
Expand Down Expand Up @@ -474,7 +476,7 @@ impl EntityRegistrationMessage {

Some(Self {
topic_id: topic_id.parse().ok()?,
entity_id,
external_id: entity_id,
r#type,
parent,
payload,
Expand All @@ -485,7 +487,7 @@ impl EntityRegistrationMessage {
pub fn main_device(main_device_id: String) -> Self {
Self {
topic_id: EntityTopicId::default_main_device(),
entity_id: Some(main_device_id.into()),
external_id: Some(main_device_id.into()),
r#type: EntityType::MainDevice,
parent: None,
payload: serde_json::json!({}),
Expand All @@ -501,6 +503,39 @@ impl TryFrom<&Message> for EntityRegistrationMessage {
}
}

impl From<&EntityRegistrationMessage> for Message {
fn from(value: &EntityRegistrationMessage) -> Self {
let entity_topic_id = value.topic_id.clone();

let mut register_payload: Map<String, Value> = Map::new();

let entity_type = match value.r#type {
EntityType::MainDevice => "device",
EntityType::ChildDevice => "child-device",
EntityType::Service => "service",
};
register_payload.insert("@type".into(), Value::String(entity_type.to_string()));

if let Some(external_id) = &value.external_id {
register_payload.insert("@id".into(), Value::String(external_id.as_ref().into()));
}

if let Some(parent_id) = &value.parent {
register_payload.insert("@parent".into(), Value::String(parent_id.to_string()));
}

if let Value::Object(other_keys) = value.payload.clone() {
register_payload.extend(other_keys)
}

Message::new(
&Topic::new(&format!("{MQTT_ROOT}/{entity_topic_id}")).unwrap(),
albinsuresh marked this conversation as resolved.
Show resolved Hide resolved
serde_json::to_string(&Value::Object(register_payload)).unwrap(),
)
.with_retain()
}
}

/// Parse a MQTT message payload as an entity registration payload.
///
/// Returns `Some(register_payload)` if a payload is valid JSON and is a
Expand Down Expand Up @@ -537,7 +572,7 @@ mod tests {
let store = EntityStore::with_main_device(
EntityRegistrationMessage {
topic_id: EntityTopicId::default_main_device(),
entity_id: Some("test-device".into()),
external_id: Some("test-device".into()),
r#type: EntityType::MainDevice,
parent: None,
payload: json!({"@type": "device"}),
Expand All @@ -555,7 +590,7 @@ mod tests {
let mut store = EntityStore::with_main_device(
EntityRegistrationMessage {
topic_id: EntityTopicId::default_main_device(),
entity_id: Some("test-device".into()),
external_id: Some("test-device".into()),
r#type: EntityType::MainDevice,
parent: None,
payload: json!({"@type": "device"}),
Expand Down Expand Up @@ -602,7 +637,7 @@ mod tests {
let mut store = EntityStore::with_main_device(
EntityRegistrationMessage {
r#type: EntityType::MainDevice,
entity_id: Some("test-device".into()),
external_id: Some("test-device".into()),
topic_id: EntityTopicId::default_main_device(),
parent: None,
payload: json!({}),
Expand All @@ -615,7 +650,7 @@ mod tests {
let updated_entities = store
.update(EntityRegistrationMessage {
r#type: EntityType::Service,
entity_id: None,
external_id: None,
topic_id: EntityTopicId::default_main_service("service1").unwrap(),
parent: None,
payload: json!({}),
Expand All @@ -631,7 +666,7 @@ mod tests {
let updated_entities = store
.update(EntityRegistrationMessage {
r#type: EntityType::Service,
entity_id: None,
external_id: None,
topic_id: EntityTopicId::default_main_service("service2").unwrap(),
parent: None,
payload: json!({}),
Expand Down Expand Up @@ -659,7 +694,7 @@ mod tests {
EntityRegistrationMessage {
topic_id: EntityTopicId::default_main_device(),
r#type: EntityType::MainDevice,
entity_id: Some("test-device".into()),
external_id: Some("test-device".into()),
parent: None,
payload: json!({}),
},
Expand All @@ -669,7 +704,7 @@ mod tests {

let res = store.update(EntityRegistrationMessage {
topic_id: EntityTopicId::default_child_device("another_main").unwrap(),
entity_id: Some("test-device".into()),
external_id: Some("test-device".into()),
r#type: EntityType::MainDevice,
parent: None,
payload: json!({}),
Expand All @@ -686,7 +721,7 @@ mod tests {
let mut store = EntityStore::with_main_device(
EntityRegistrationMessage {
topic_id: EntityTopicId::default_main_device(),
entity_id: Some("test-device".into()),
external_id: Some("test-device".into()),
r#type: EntityType::MainDevice,
parent: None,
payload: json!({}),
Expand All @@ -697,7 +732,7 @@ mod tests {

let res = store.update(EntityRegistrationMessage {
topic_id: EntityTopicId::default_main_device(),
entity_id: None,
external_id: None,
r#type: EntityType::ChildDevice,
parent: Some(EntityTopicId::default_child_device("myawesomeparent").unwrap()),
payload: json!({}),
Expand All @@ -711,7 +746,7 @@ mod tests {
let mut store = EntityStore::with_main_device(
EntityRegistrationMessage {
topic_id: EntityTopicId::default_main_device(),
entity_id: Some("test-device".into()),
external_id: Some("test-device".into()),
r#type: EntityType::MainDevice,
parent: None,
payload: json!({"@type": "device"}),
Expand Down Expand Up @@ -827,7 +862,7 @@ mod tests {
let mut store = EntityStore::with_main_device(
EntityRegistrationMessage {
topic_id: EntityTopicId::default_main_device(),
entity_id: Some("test-device".into()),
external_id: Some("test-device".into()),
r#type: EntityType::MainDevice,
parent: None,
payload: json!({"@type": "device"}),
Expand Down Expand Up @@ -941,4 +976,16 @@ mod tests {
["device:child2", "device:child1", "test-device"]
);
}

#[test]
fn entity_registration_message_into_mqtt_message() {
let entity_reg_message = EntityRegistrationMessage::new(&Message::new(
&Topic::new("te/device/child2/service/collectd").unwrap(),
json!({"@type": "service"}).to_string(),
))
.unwrap();

let message: Message = (&entity_reg_message).into();
println!("{}", message.payload_str().unwrap());
}
}
2 changes: 1 addition & 1 deletion crates/core/tedge_api/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl ThinEdgeEvent {
};

// Parent exists means the device is child device
let external_source = entity.parent.as_ref().map(|_| entity.entity_id.clone());
let external_source = entity.parent.as_ref().map(|_| entity.external_id.clone());

Ok(Self {
name: event_type.into(),
Expand Down
Loading