Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Entity registration HTTP API #3230

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,18 @@ define_tedge_config! {
#[tedge_config(example = "true", default(value = true))]
log_upload: bool,
},

entity_store: {
/// Enable auto registration feature
#[tedge_config(example = "true", default(value = true))]
auto_register: bool,

/// On a clean start, the whole state of the device, services and child-devices is resent to the cloud
#[tedge_config(example = "true", default(value = true))]
clean_start: bool,
},


},

software: {
Expand Down
131 changes: 35 additions & 96 deletions crates/core/c8y_api/src/json_c8y.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ use std::fmt;
use tedge_api::alarm::ThinEdgeAlarm;
use tedge_api::alarm::ThinEdgeAlarmData;
use tedge_api::commands::SoftwareListCommand;
use tedge_api::entity_store::EntityMetadata;
use tedge_api::entity::EntityExternalId;
use tedge_api::entity_store::EntityType;
use tedge_api::event::ThinEdgeEvent;
use tedge_api::EntityStore;
use tedge_api::Jsonify;
use tedge_api::SoftwareModule;
use time::OffsetDateTime;
Expand Down Expand Up @@ -241,36 +240,34 @@ pub struct C8yClearAlarm {
impl C8yAlarm {
pub fn try_from(
alarm: &ThinEdgeAlarm,
entity_store: &EntityStore,
external_id: &EntityExternalId,
entity_type: &EntityType,
) -> Result<Self, C8yAlarmError> {
if let Some(entity) = entity_store.get(&alarm.source) {
let source = Self::convert_source(entity);
let alarm_type = Self::convert_alarm_type(&alarm.alarm_type);

let c8y_alarm = match alarm.data.as_ref() {
None => C8yAlarm::Clear(C8yClearAlarm { alarm_type, source }),
Some(tedge_alarm_data) => C8yAlarm::Create(C8yCreateAlarm {
alarm_type: alarm_type.clone(),
source,
severity: C8yCreateAlarm::convert_severity(tedge_alarm_data),
text: C8yCreateAlarm::convert_text(tedge_alarm_data, &alarm_type),
time: C8yCreateAlarm::convert_time(tedge_alarm_data),
fragments: C8yCreateAlarm::convert_extras(tedge_alarm_data),
}),
};
Ok(c8y_alarm)
} else {
Err(C8yAlarmError::UnsupportedDeviceTopicId(
alarm.source.to_string(),
))
}
let source = Self::convert_source(external_id, entity_type);
let alarm_type = Self::convert_alarm_type(&alarm.alarm_type);

let c8y_alarm = match alarm.data.as_ref() {
None => C8yAlarm::Clear(C8yClearAlarm { alarm_type, source }),
Some(tedge_alarm_data) => C8yAlarm::Create(C8yCreateAlarm {
alarm_type: alarm_type.clone(),
source,
severity: C8yCreateAlarm::convert_severity(tedge_alarm_data),
text: C8yCreateAlarm::convert_text(tedge_alarm_data, &alarm_type),
time: C8yCreateAlarm::convert_time(tedge_alarm_data),
fragments: C8yCreateAlarm::convert_extras(tedge_alarm_data),
}),
};
Ok(c8y_alarm)
}

fn convert_source(entity: &EntityMetadata) -> Option<SourceInfo> {
match entity.r#type {
fn convert_source(
external_id: &EntityExternalId,
entity_type: &EntityType,
) -> Option<SourceInfo> {
match entity_type {
EntityType::MainDevice => None,
EntityType::ChildDevice => Some(make_c8y_source_fragment(entity.external_id.as_ref())),
EntityType::Service => Some(make_c8y_source_fragment(entity.external_id.as_ref())),
EntityType::ChildDevice => Some(make_c8y_source_fragment(external_id.as_ref())),
EntityType::Service => Some(make_c8y_source_fragment(external_id.as_ref())),
}
}

Expand Down Expand Up @@ -360,19 +357,12 @@ mod tests {
use crate::json_c8y::AlarmSeverity;
use anyhow::Result;
use assert_matches::assert_matches;
use mqtt_channel::MqttMessage;
use mqtt_channel::Topic;
use serde_json::json;
use std::collections::HashSet;
use tedge_api::alarm::ThinEdgeAlarm;
use tedge_api::alarm::ThinEdgeAlarmData;
use tedge_api::commands::SoftwareListCommandPayload;
use tedge_api::entity_store::EntityExternalId;
use tedge_api::entity_store::EntityRegistrationMessage;
use tedge_api::entity_store::InvalidExternalIdError;
use tedge_api::event::ThinEdgeEventData;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use test_case::test_case;
use time::macros::datetime;

Expand Down Expand Up @@ -709,28 +699,14 @@ mod tests {
;"convert to clear alarm"
)]
fn check_alarm_translation(tedge_alarm: ThinEdgeAlarm, expected_c8y_alarm: C8yAlarm) {
let temp_dir = tempfile::tempdir().unwrap();
let main_device = EntityRegistrationMessage::main_device("test-main".into());
let mut entity_store = EntityStore::with_main_device_and_default_service_type(
MqttSchema::default(),
main_device,
"service".into(),
dummy_external_id_mapper,
dummy_external_id_validator,
5,
&temp_dir,
true,
)
.unwrap();

let child_registration = EntityRegistrationMessage::new(&MqttMessage::new(
&Topic::new_unchecked("te/device/external_source//"),
r#"{"@id": "external_source", "@type": "child-device"}"#,
))
.unwrap();
entity_store.update(child_registration).unwrap();

let actual_c8y_alarm = C8yAlarm::try_from(&tedge_alarm, &entity_store).unwrap();
let (external_id, entity_type) = if tedge_alarm.source.is_default_main_device() {
("main_device".into(), EntityType::MainDevice)
} else {
("external_source".into(), EntityType::ChildDevice)
};

let actual_c8y_alarm =
C8yAlarm::try_from(&tedge_alarm, &external_id, &entity_type).unwrap();
assert_eq!(actual_c8y_alarm, expected_c8y_alarm);
}

Expand All @@ -746,50 +722,13 @@ mod tests {
extras: HashMap::new(),
}),
};
let external_id = "main".into();

let temp_dir = tempfile::tempdir().unwrap();
let main_device = EntityRegistrationMessage::main_device("test-main".into());
let entity_store = EntityStore::with_main_device_and_default_service_type(
MqttSchema::default(),
main_device,
"service".into(),
dummy_external_id_mapper,
dummy_external_id_validator,
5,
&temp_dir,
true,
)
.unwrap();

match C8yAlarm::try_from(&tedge_alarm, &entity_store).unwrap() {
match C8yAlarm::try_from(&tedge_alarm, &external_id, &EntityType::MainDevice).unwrap() {
C8yAlarm::Create(value) => {
assert!(value.time.millisecond() > 0);
}
C8yAlarm::Clear(_) => panic!("Must be C8yAlarm::Create"),
};
}

fn dummy_external_id_mapper(
entity_topic_id: &EntityTopicId,
_main_device_xid: &EntityExternalId,
) -> EntityExternalId {
entity_topic_id
.to_string()
.trim_end_matches('/')
.replace('/', ":")
.into()
}

fn dummy_external_id_validator(id: &str) -> Result<EntityExternalId, InvalidExternalIdError> {
let forbidden_chars = HashSet::from(['/', '+', '#']);
for c in id.chars() {
if forbidden_chars.contains(&c) {
return Err(InvalidExternalIdError {
external_id: id.into(),
invalid_char: c,
});
}
}
Ok(id.into())
}
}
12 changes: 7 additions & 5 deletions crates/core/c8y_api/src/smartrest/inventory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
// smartrest messages are sent. There should be one comprehensive API for
// generating them.

use crate::smartrest::topic::publish_topic_from_ancestors;
use crate::smartrest::topic::publish_topic_from_parent;
use crate::smartrest::topic::C8yTopic;
use mqtt_channel::MqttMessage;
use std::time::Duration;
Expand All @@ -29,7 +29,8 @@ pub fn child_device_creation_message(
child_id: &str,
device_name: Option<&str>,
device_type: Option<&str>,
ancestors: &[String],
parent: Option<&str>,
main_device_id: &str,
prefix: &TopicPrefix,
) -> Result<MqttMessage, InvalidValueError> {
if child_id.is_empty() {
Expand Down Expand Up @@ -60,7 +61,7 @@ pub fn child_device_creation_message(
.expect("child_id, device_name, device_type should not increase payload size over the limit");

Ok(MqttMessage::new(
&publish_topic_from_ancestors(ancestors, prefix),
&publish_topic_from_parent(parent, main_device_id, prefix),
payload.into_inner(),
))
}
Expand All @@ -73,11 +74,12 @@ pub fn service_creation_message(
service_name: &str,
service_type: &str,
service_status: &str,
ancestors: &[String],
parent: Option<&str>,
main_device_id: &str,
prefix: &TopicPrefix,
) -> Result<MqttMessage, InvalidValueError> {
Ok(MqttMessage::new(
&publish_topic_from_ancestors(ancestors, prefix),
&publish_topic_from_parent(parent, main_device_id, prefix),
service_creation_message_payload(service_id, service_name, service_type, service_status)?
.into_inner(),
))
Expand Down
58 changes: 30 additions & 28 deletions crates/core/c8y_api/src/smartrest/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::json_c8y::C8yAlarm;
use mqtt_channel::MqttError;
use mqtt_channel::Topic;
use mqtt_channel::TopicFilter;
use tedge_api::entity_store::EntityMetadata;
use tedge_api::entity::EntityExternalId;
use tedge_api::entity_store::EntityType;
use tedge_config::TopicPrefix;

Expand All @@ -19,13 +19,14 @@ pub enum C8yTopic {
impl C8yTopic {
/// Return the c8y SmartRest response topic for the given entity
pub fn smartrest_response_topic(
entity: &EntityMetadata,
external_id: &EntityExternalId,
entity_type: &EntityType,
prefix: &TopicPrefix,
) -> Option<Topic> {
match entity.r#type {
match entity_type {
EntityType::MainDevice => Some(C8yTopic::upstream_topic(prefix)),
EntityType::ChildDevice | EntityType::Service => {
Self::ChildSmartRestResponse(entity.external_id.clone().into())
Self::ChildSmartRestResponse(external_id.clone().into())
.to_topic(prefix)
.ok()
}
Expand Down Expand Up @@ -77,28 +78,30 @@ impl From<&C8yAlarm> for C8yTopic {
}
}

/// Generates the SmartREST topic to publish to, for a given managed object
/// from the list of external IDs of itself and all its parents.
///
/// The parents are appended in the reverse order,
/// starting from the main device at the end of the list.
/// The main device itself is represented by the root topic c8y/s/us,
/// with the rest of the children appended to it at each topic level.
/// Generates the SmartREST topic to publish to, from the external ID of its parent.
/// If the parent is the main device, the topic would be `<prefix>/s/us`.
/// For all other parent devices, the target topic would be `<prefix>/s/us/<parent-xid>`.
/// For the main device with no parent, and the topic would be `<prefix>/s/us` in that case as well.
///
/// # Examples
///
/// - `["main"]` -> `c8y/s/us`
/// - `["child1", "main"]` -> `c8y/s/us/child1`
/// - `["child2", "child1", "main"]` -> `c8y/s/us/child1/child2`
pub fn publish_topic_from_ancestors(ancestors: &[impl AsRef<str>], prefix: &TopicPrefix) -> Topic {
let mut target_topic = format!("{prefix}/{SMARTREST_PUBLISH_TOPIC}");
for ancestor in ancestors.iter().rev().skip(1) {
// Skipping the last ancestor as it is the main device represented by the root topic itself
target_topic.push('/');
target_topic.push_str(ancestor.as_ref());
/// - `(Some("main"), "main", "c8y")` -> `c8y/s/us`
/// - `[Some("child1"), "main", "c8y"]` -> `c8y/s/us/child1`
/// - `[Some("service1"), "main", "c8y"]` -> `c8y/s/us/service1`
/// - `(None, "main", "c8y")` -> `c8y/s/us`
pub fn publish_topic_from_parent(
parent_xid: Option<&str>,
main_device_xid: &str,
prefix: &TopicPrefix,
) -> Topic {
if let Some(parent) = parent_xid {
if parent != main_device_xid {
return C8yTopic::ChildSmartRestResponse(parent.to_string())
.to_topic(prefix)
.unwrap();
}
}

Topic::new_unchecked(&target_topic)
C8yTopic::upstream_topic(prefix)
}

#[cfg(test)]
Expand Down Expand Up @@ -135,13 +138,12 @@ mod tests {
)
}

#[test_case(& ["main"], "c8y2/s/us")]
#[test_case(& ["foo"], "c8y2/s/us")]
#[test_case(& ["child1", "main"], "c8y2/s/us/child1")]
#[test_case(& ["child3", "child2", "child1", "main"], "c8y2/s/us/child1/child2/child3")]
fn topic_from_ancestors(ancestors: &[&str], topic: &str) {
#[test_case(None, "main-device", "c8y2/s/us")]
#[test_case(Some("child01"), "main-device", "c8y2/s/us/child01")]
#[test_case(Some("main-device"), "main-device", "c8y2/s/us")]
fn topic_from_parent(parent_xid: Option<&str>, main_device_xid: &str, topic: &str) {
let nested_child_topic =
publish_topic_from_ancestors(ancestors, &"c8y2".try_into().unwrap());
publish_topic_from_parent(parent_xid, main_device_xid, &"c8y2".try_into().unwrap());
assert_eq!(nested_child_topic, Topic::new_unchecked(topic));
}
}
Loading
Loading