Skip to content

Commit

Permalink
Move external id aspects from entity store to mapper
Browse files Browse the repository at this point in the history
  • Loading branch information
albinsuresh committed Dec 17, 2024
1 parent f4b1c65 commit a3fa213
Show file tree
Hide file tree
Showing 30 changed files with 631 additions and 808 deletions.
131 changes: 35 additions & 96 deletions crates/core/c8y_api/src/json_c8y.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ use std::fmt;
use tedge_api::alarm::ThinEdgeAlarm;
use tedge_api::alarm::ThinEdgeAlarmData;
use tedge_api::commands::SoftwareListCommand;
use tedge_api::entity_store::EntityMetadata;
use tedge_api::entity::EntityExternalId;
use tedge_api::entity_store::EntityType;
use tedge_api::event::ThinEdgeEvent;
use tedge_api::EntityStore;
use tedge_api::Jsonify;
use tedge_api::SoftwareModule;
use time::OffsetDateTime;
Expand Down Expand Up @@ -241,36 +240,34 @@ pub struct C8yClearAlarm {
impl C8yAlarm {
pub fn try_from(
alarm: &ThinEdgeAlarm,
entity_store: &EntityStore,
external_id: &EntityExternalId,
entity_type: &EntityType,
) -> Result<Self, C8yAlarmError> {
if let Some(entity) = entity_store.get(&alarm.source) {
let source = Self::convert_source(entity);
let alarm_type = Self::convert_alarm_type(&alarm.alarm_type);

let c8y_alarm = match alarm.data.as_ref() {
None => C8yAlarm::Clear(C8yClearAlarm { alarm_type, source }),
Some(tedge_alarm_data) => C8yAlarm::Create(C8yCreateAlarm {
alarm_type: alarm_type.clone(),
source,
severity: C8yCreateAlarm::convert_severity(tedge_alarm_data),
text: C8yCreateAlarm::convert_text(tedge_alarm_data, &alarm_type),
time: C8yCreateAlarm::convert_time(tedge_alarm_data),
fragments: C8yCreateAlarm::convert_extras(tedge_alarm_data),
}),
};
Ok(c8y_alarm)
} else {
Err(C8yAlarmError::UnsupportedDeviceTopicId(
alarm.source.to_string(),
))
}
let source = Self::convert_source(external_id, entity_type);
let alarm_type = Self::convert_alarm_type(&alarm.alarm_type);

let c8y_alarm = match alarm.data.as_ref() {
None => C8yAlarm::Clear(C8yClearAlarm { alarm_type, source }),
Some(tedge_alarm_data) => C8yAlarm::Create(C8yCreateAlarm {
alarm_type: alarm_type.clone(),
source,
severity: C8yCreateAlarm::convert_severity(tedge_alarm_data),
text: C8yCreateAlarm::convert_text(tedge_alarm_data, &alarm_type),
time: C8yCreateAlarm::convert_time(tedge_alarm_data),
fragments: C8yCreateAlarm::convert_extras(tedge_alarm_data),
}),
};
Ok(c8y_alarm)
}

fn convert_source(entity: &EntityMetadata) -> Option<SourceInfo> {
match entity.r#type {
fn convert_source(
external_id: &EntityExternalId,
entity_type: &EntityType,
) -> Option<SourceInfo> {
match entity_type {
EntityType::MainDevice => None,
EntityType::ChildDevice => Some(make_c8y_source_fragment(entity.external_id.as_ref())),
EntityType::Service => Some(make_c8y_source_fragment(entity.external_id.as_ref())),
EntityType::ChildDevice => Some(make_c8y_source_fragment(external_id.as_ref())),
EntityType::Service => Some(make_c8y_source_fragment(external_id.as_ref())),
}
}

Expand Down Expand Up @@ -360,19 +357,12 @@ mod tests {
use crate::json_c8y::AlarmSeverity;
use anyhow::Result;
use assert_matches::assert_matches;
use mqtt_channel::MqttMessage;
use mqtt_channel::Topic;
use serde_json::json;
use std::collections::HashSet;
use tedge_api::alarm::ThinEdgeAlarm;
use tedge_api::alarm::ThinEdgeAlarmData;
use tedge_api::commands::SoftwareListCommandPayload;
use tedge_api::entity_store::EntityExternalId;
use tedge_api::entity_store::EntityRegistrationMessage;
use tedge_api::entity_store::InvalidExternalIdError;
use tedge_api::event::ThinEdgeEventData;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use test_case::test_case;
use time::macros::datetime;

Expand Down Expand Up @@ -709,28 +699,14 @@ mod tests {
;"convert to clear alarm"
)]
fn check_alarm_translation(tedge_alarm: ThinEdgeAlarm, expected_c8y_alarm: C8yAlarm) {
let temp_dir = tempfile::tempdir().unwrap();
let main_device = EntityRegistrationMessage::main_device("test-main".into());
let mut entity_store = EntityStore::with_main_device_and_default_service_type(
MqttSchema::default(),
main_device,
"service".into(),
dummy_external_id_mapper,
dummy_external_id_validator,
5,
&temp_dir,
true,
)
.unwrap();

let child_registration = EntityRegistrationMessage::new(&MqttMessage::new(
&Topic::new_unchecked("te/device/external_source//"),
r#"{"@id": "external_source", "@type": "child-device"}"#,
))
.unwrap();
entity_store.update(child_registration).unwrap();

let actual_c8y_alarm = C8yAlarm::try_from(&tedge_alarm, &entity_store).unwrap();
let (external_id, entity_type) = if tedge_alarm.source.is_default_main_device() {
("main_device".into(), EntityType::MainDevice)
} else {
("external_source".into(), EntityType::ChildDevice)
};

let actual_c8y_alarm =
C8yAlarm::try_from(&tedge_alarm, &external_id, &entity_type).unwrap();
assert_eq!(actual_c8y_alarm, expected_c8y_alarm);
}

Expand All @@ -746,50 +722,13 @@ mod tests {
extras: HashMap::new(),
}),
};
let external_id = "main".into();

let temp_dir = tempfile::tempdir().unwrap();
let main_device = EntityRegistrationMessage::main_device("test-main".into());
let entity_store = EntityStore::with_main_device_and_default_service_type(
MqttSchema::default(),
main_device,
"service".into(),
dummy_external_id_mapper,
dummy_external_id_validator,
5,
&temp_dir,
true,
)
.unwrap();

match C8yAlarm::try_from(&tedge_alarm, &entity_store).unwrap() {
match C8yAlarm::try_from(&tedge_alarm, &external_id, &EntityType::MainDevice).unwrap() {
C8yAlarm::Create(value) => {
assert!(value.time.millisecond() > 0);
}
C8yAlarm::Clear(_) => panic!("Must be C8yAlarm::Create"),
};
}

fn dummy_external_id_mapper(
entity_topic_id: &EntityTopicId,
_main_device_xid: &EntityExternalId,
) -> EntityExternalId {
entity_topic_id
.to_string()
.trim_end_matches('/')
.replace('/', ":")
.into()
}

fn dummy_external_id_validator(id: &str) -> Result<EntityExternalId, InvalidExternalIdError> {
let forbidden_chars = HashSet::from(['/', '+', '#']);
for c in id.chars() {
if forbidden_chars.contains(&c) {
return Err(InvalidExternalIdError {
external_id: id.into(),
invalid_char: c,
});
}
}
Ok(id.into())
}
}
2 changes: 1 addition & 1 deletion crates/core/c8y_api/src/smartrest/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::json_c8y::C8yAlarm;
use mqtt_channel::MqttError;
use mqtt_channel::Topic;
use mqtt_channel::TopicFilter;
use tedge_api::entity_store::EntityExternalId;
use tedge_api::entity::EntityExternalId;
use tedge_api::entity_store::EntityType;
use tedge_config::TopicPrefix;

Expand Down
39 changes: 1 addition & 38 deletions crates/core/tedge_agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use flockfile::check_another_instance_is_not_running;
use flockfile::Flockfile;
use flockfile::FlockfileError;
use reqwest::Identity;
use std::collections::HashSet;
use std::fmt::Debug;
use std::net::SocketAddr;
use std::sync::Arc;
Expand All @@ -40,9 +39,7 @@ use tedge_actors::Runtime;
use tedge_actors::Sequential;
use tedge_actors::ServerActorBuilder;
use tedge_actors::ServerConfig;
use tedge_api::entity_store::EntityExternalId;
use tedge_api::entity_store::EntityRegistrationMessage;
use tedge_api::entity_store::InvalidExternalIdError;
use tedge_api::mqtt_topics::DeviceTopicId;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
Expand Down Expand Up @@ -71,7 +68,6 @@ use tracing::instrument;
use tracing::warn;

pub const TEDGE_AGENT: &str = "tedge-agent";
const EARLY_MESSAGE_BUFFER_SIZE: usize = 100;

#[derive(Debug, Clone)]
pub(crate) struct AgentConfig {
Expand All @@ -89,7 +85,6 @@ pub(crate) struct AgentConfig {
pub data_dir: DataDir,
pub state_dir: Utf8PathBuf,
pub operations_dir: Utf8PathBuf,
pub device_id: Option<String>, //Some for main device, None for child devices
pub mqtt_device_topic_id: EntityTopicId,
pub mqtt_topic_root: Arc<str>,
pub tedge_http_host: Arc<str>,
Expand All @@ -108,8 +103,6 @@ impl AgentConfig {
) -> Result<Self, anyhow::Error> {
let tedge_config = tedge_config::TEdgeConfig::try_new(tedge_config_location.clone())?;

let device_id = tedge_config.device.id.try_read(&tedge_config).cloned().ok();

let config_dir = tedge_config_location.tedge_config_root_path.clone();
let tmp_dir = Arc::from(tedge_config.tmp.path.as_path());
let state_dir = tedge_config.agent.state.path.clone();
Expand Down Expand Up @@ -204,7 +197,6 @@ impl AgentConfig {
agent_log_dir,
operations_dir,
state_dir,
device_id,
mqtt_topic_root,
mqtt_device_topic_id,
tedge_http_host,
Expand Down Expand Up @@ -378,15 +370,11 @@ impl Agent {
let state_dir = agent_state_dir(self.config.state_dir, self.config.config_dir);
//TODO: Migrate the existing `clean_start` setting which is C8Y specific without breaking backward compatibility.
let clean_start = true;
let main_device =
EntityRegistrationMessage::main_device(self.config.device_id.unwrap());
let main_device = EntityRegistrationMessage::main_device(None);
let entity_store = EntityStore::with_main_device_and_default_service_type(
mqtt_schema.clone(),
main_device,
self.config.service.ty.clone(),
Self::dummy_external_id_mapper,
Self::dummy_external_id_validator,
EARLY_MESSAGE_BUFFER_SIZE,
state_dir,
clean_start,
)
Expand Down Expand Up @@ -449,31 +437,6 @@ impl Agent {

Ok(())
}

// TODO: Remove these dummy impls once external ID aspects are removed from entity store
fn dummy_external_id_mapper(
entity_topic_id: &EntityTopicId,
_main_device_xid: &EntityExternalId,
) -> EntityExternalId {
entity_topic_id
.to_string()
.trim_end_matches('/')
.replace('/', ":")
.into()
}

fn dummy_external_id_validator(id: &str) -> Result<EntityExternalId, InvalidExternalIdError> {
let forbidden_chars = HashSet::from(['/', '+', '#']);
for c in id.chars() {
if forbidden_chars.contains(&c) {
return Err(InvalidExternalIdError {
external_id: id.into(),
invalid_char: c,
});
}
}
Ok(id.into())
}
}

pub fn create_tedge_to_te_converter(
Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ homepage = { workspace = true }
repository = { workspace = true }

[dependencies]
camino = { workspace = true }
camino = { workspace = true, features = ["serde1"] }
clock = { workspace = true }
csv = { workspace = true }
download = { workspace = true }
Expand Down
90 changes: 90 additions & 0 deletions crates/core/tedge_api/src/entity.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use crate::entity_store::EntityType;
use crate::mqtt_topics::EntityTopicId;
use crate::mqtt_topics::TopicIdError;
use serde::Deserialize;
use serde::Serialize;

/// Represents externally provided unique ID of an entity.
///
/// Although this struct doesn't enforce any restrictions for the values,
/// the consumers may impose restrictions on the accepted values.
#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
#[serde(transparent)]
pub struct EntityExternalId(String);

impl AsRef<str> for EntityExternalId {
fn as_ref(&self) -> &str {
&self.0
}
}

// XXX: As `EntityExternalId` is used as a part of cloudbound MQTT topic, it
// can't contain characters invalid in topics, i.e. `+` and `#`. ([MQTT-4.7]).
// If it's derived from a MQTT topic, this holds, but if created from a string,
// this isn't checked, which is invalid!
impl From<&str> for EntityExternalId {
fn from(val: &str) -> Self {
Self(val.to_string())
}
}

impl From<&String> for EntityExternalId {
fn from(val: &String) -> Self {
Self(val.to_string())
}
}

impl From<String> for EntityExternalId {
fn from(val: String) -> Self {
Self(val)
}
}

impl From<EntityExternalId> for String {
fn from(value: EntityExternalId) -> Self {
value.0
}
}

impl From<&EntityExternalId> for String {
fn from(value: &EntityExternalId) -> Self {
value.0.clone()
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EntityMetadata {
pub topic_id: EntityTopicId,
pub external_id: EntityExternalId,
pub r#type: EntityType,
pub parent: Option<EntityTopicId>,
pub display_name: Option<String>,
pub display_type: Option<String>,
}

impl EntityMetadata {
/// Creates a entity metadata for the main device.
pub fn main_device(device_id: String) -> Self {
Self {
topic_id: EntityTopicId::default_main_device(),
external_id: device_id.clone().into(),
r#type: EntityType::MainDevice,
parent: None,
display_name: Some(device_id),
display_type: None,
}
}

/// Creates a entity metadata for a child device.
pub fn child_device(child_device_id: String) -> Result<Self, TopicIdError> {
Ok(Self {
topic_id: EntityTopicId::default_child_device(&child_device_id)?,
external_id: child_device_id.clone().into(),
r#type: EntityType::ChildDevice,
parent: Some(EntityTopicId::default_main_device()),
display_name: Some(child_device_id),
display_type: None,
})
}
}
Loading

0 comments on commit a3fa213

Please sign in to comment.