Skip to content

Commit

Permalink
fixup! fixup! fixup! Make entity store persistent thin-edge#2428
Browse files Browse the repository at this point in the history
  • Loading branch information
albinsuresh committed Dec 16, 2023
1 parent 1e2f9ee commit 3001ea3
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 34 deletions.
187 changes: 161 additions & 26 deletions crates/core/tedge_api/src/entity_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::pending_entity_store::PendingEntityData;
use crate::pending_entity_store::PendingEntityStore;
use log::debug;
use log::error;
use log::info;
use log::warn;
use mqtt_channel::Message;
use serde_json::json;
Expand Down Expand Up @@ -205,6 +206,7 @@ impl EntityStore {
}

pub fn load_from_message_log(&mut self) {
info!("Loading the entity store from the log");
match self.message_log.reader() {
Err(err) => {
error!(
Expand All @@ -218,7 +220,10 @@ impl EntityStore {
error!("Parsing log entry failed with {err}");
continue;
}
Ok(None) => return,
Ok(None) => {
info!("Finished loading the entity store from the log");
return;
}
Ok(Some(message)) => {
if let Ok((source, channel)) =
self.mqtt_schema.entity_channel_of(&message.topic)
Expand All @@ -228,7 +233,9 @@ impl EntityStore {
if let Ok(register_message) =
EntityRegistrationMessage::try_from(&message)
{
if let Err(err) = self.update(register_message) {
if let Err(err) =
self.register_entity(register_message, false)
{
error!("Failed to re-register {source} from the persistent entity store due to {err}");
continue;
}
Expand All @@ -254,7 +261,9 @@ impl EntityStore {
fragment_key,
fragment_value,
);
if let Err(err) = self.update_twin_data(twin_data.clone()) {
if let Err(err) =
self.register_twin_data(twin_data.clone(), false)
{
error!("Failed to restore twin fragment: {twin_data:?} from the persistent entity store due to {err}");
continue;
}
Expand All @@ -271,9 +280,9 @@ impl EntityStore {
}
} else {
warn!(
"Ignoring unsupported message retrieved from entity store: {:?}",
message
);
"Ignoring unsupported message retrieved from entity store: {:?}",
message
);
}
}
}
Expand Down Expand Up @@ -419,7 +428,7 @@ impl EntityStore {
&mut self,
message: EntityRegistrationMessage,
) -> Result<(Vec<EntityTopicId>, Vec<PendingEntityData>), Error> {
match self.register_entity(message.clone()) {
match self.register_entity(message.clone(), true) {
Ok(affected_entities) => {
if affected_entities.is_empty() {
Ok((vec![], vec![]))
Expand All @@ -434,7 +443,7 @@ impl EntityStore {
.take_cached_child_entities_data(&topic_id);
for pending_child in pending_children {
let child_reg_message = pending_child.reg_message.clone();
self.register_entity(child_reg_message)?;
self.register_entity(child_reg_message.clone(), true)?;
pending_entities.push(pending_child);
}

Expand All @@ -455,6 +464,7 @@ impl EntityStore {
fn register_entity(
&mut self,
message: EntityRegistrationMessage,
persist: bool,
) -> Result<Vec<EntityTopicId>, Error> {
let message_clone = message.clone();
debug!("Processing entity registration message, {:?}", message);
Expand All @@ -478,6 +488,7 @@ impl EntityStore {
// parent device is affected if new device is its child
if let Some(parent) = &parent {
if !self.entities.contains_key(parent) {
dbg!("No parent for {}", &topic_id);
return Err(Error::NoParent(parent.to_string().into_boxed_str()));
}

Expand Down Expand Up @@ -518,30 +529,39 @@ impl EntityStore {
match previous {
Entry::Occupied(mut occupied) => {
// if there is no change, no entities were affected
let mut existing_entity = occupied.get().clone();
if existing_entity == entity_metadata {
return Ok(vec![]);
}
let existing_entity = occupied.get().clone();
dbg!(&existing_entity);
dbg!(&entity_metadata);

existing_entity.other.extend(entity_metadata.other.clone());
let mut merged_other = existing_entity.other.clone();
merged_other.extend(entity_metadata.other.clone());
let merged_entity = EntityMetadata {
twin_data: existing_entity.twin_data,
other: existing_entity.other,
twin_data: existing_entity.twin_data.clone(),
other: merged_other,
..entity_metadata
};

if existing_entity == merged_entity {
dbg!("Ignoring duplicate entry");
return Ok(vec![]);
}

dbg!("Entity metadata updated");
occupied.insert(merged_entity);
affected_entities.push(topic_id);
}
Entry::Vacant(vacant) => {
dbg!("New entity added");
vacant.insert(entity_metadata);
self.entity_id_index.insert(external_id, topic_id);
}
}
debug!("Updated entity map: {:?}", self.entities);
debug!("Updated external id map: {:?}", self.entity_id_index);

if !affected_entities.is_empty() {
dbg!(&persist);
dbg!(!affected_entities.is_empty());
if persist {
self.message_log
.append_message(&message_clone.to_mqtt_message(&self.mqtt_schema))?
}
Expand Down Expand Up @@ -628,6 +648,14 @@ impl EntityStore {
pub fn update_twin_data(
&mut self,
twin_message: EntityTwinMessage,
) -> Result<bool, entity_store::Error> {
self.register_twin_data(twin_message.clone(), true)
}

pub fn register_twin_data(
&mut self,
twin_message: EntityTwinMessage,
persist: bool,
) -> Result<bool, entity_store::Error> {
let fragment_key = twin_message.fragment_key.clone();
let fragment_value = twin_message.fragment_value.clone();
Expand All @@ -646,8 +674,10 @@ impl EntityStore {
}
}

self.message_log
.append_message(&twin_message.to_mqtt_message(&self.mqtt_schema))?;
if persist {
self.message_log
.append_message(&twin_message.to_mqtt_message(&self.mqtt_schema))?;
}
Ok(true)
}

Expand Down Expand Up @@ -1643,54 +1673,159 @@ mod tests {
other: Map::new(),
};

store.update(reg_message.clone()).unwrap();
let affected_entities = store.update(reg_message.clone()).unwrap();
assert!(!affected_entities.0.is_empty());

let affected_entities = store.update(reg_message.clone()).unwrap();
assert!(affected_entities.0.is_empty());

// Duplicate registration ignore even after the entity store is restored from the disk
let mut store = new_entity_store(&temp_dir);
let affected_entities = store.update(reg_message).unwrap();
assert!(affected_entities.0.is_empty());
}

#[test]
fn entities_persisted_and_restored() {
fn duplicate_registration_message_ignored_after_twin_update() {
let temp_dir = tempfile::tempdir().unwrap();

let topic_id = EntityTopicId::default_child_device("child1").unwrap();
let mut store = new_entity_store(&temp_dir);
let entity_topic_id = EntityTopicId::default_child_device("child1").unwrap();
let reg_message = EntityRegistrationMessage {
topic_id: topic_id.clone(),
topic_id: entity_topic_id.clone(),
r#type: EntityType::ChildDevice,
external_id: Some("child1".into()),
parent: None,
other: Map::new(),
};

let affected_entities = store.update(reg_message.clone()).unwrap();
assert!(!affected_entities.0.is_empty());

// Update the entity twin data
store
.update_twin_data(EntityTwinMessage::new(
entity_topic_id.clone(),
"foo".into(),
json!("bar"),
))
.unwrap();

// Assert that the duplicate registration message is still ignored
let affected_entities = store.update(reg_message.clone()).unwrap();
assert!(affected_entities.0.is_empty());

// Duplicate registration ignore even after the entity store is restored from the disk
let mut store = new_entity_store(&temp_dir);
let affected_entities = store.update(reg_message).unwrap();
assert!(affected_entities.0.is_empty());
}

#[test]
fn early_child_device_registrations_processed_only_after_parent_registration() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = new_entity_store(&temp_dir);

let child0_topic_id = EntityTopicId::default_child_device("child0").unwrap();
let child000_topic_id = EntityTopicId::default_child_device("child000").unwrap();
let child00_topic_id = EntityTopicId::default_child_device("child00").unwrap();

// Register great-grand-child before grand-child and child
let child000_reg_message = EntityRegistrationMessage::new_custom(
child000_topic_id.clone(),
EntityType::ChildDevice,
)
.with_parent(child00_topic_id.clone());
let affected_entities = store.update(child000_reg_message.clone()).unwrap();
assert!(affected_entities.0.is_empty());

// Register grand-child before child
let child00_reg_message = EntityRegistrationMessage::new_custom(
child00_topic_id.clone(),
EntityType::ChildDevice,
)
.with_parent(child0_topic_id.clone());
let affected_entities = store.update(child00_reg_message).unwrap();
assert!(affected_entities.0.is_empty());

// Register the immediate child device which will trigger the registration of its children as well
let child0_reg_message =
EntityRegistrationMessage::new_custom(child0_topic_id.clone(), EntityType::ChildDevice);
let affected_entities = store.update(child0_reg_message).unwrap();

// Assert that the affected entities include all the children
assert!(!affected_entities.0.is_empty());

let affected_entities = store.update(child000_reg_message.clone()).unwrap();
assert!(affected_entities.0.is_empty());

// Reload the entity store from the persistent log
let mut store = new_entity_store(&temp_dir);

// Assert that duplicate registrations are still ignored
let affected_entities = store.update(child000_reg_message).unwrap();
assert!(affected_entities.0.is_empty());
}

#[test]
fn entities_persisted_and_restored() {
let temp_dir = tempfile::tempdir().unwrap();

let child1_topic_id = EntityTopicId::default_child_device("child1").unwrap();
let child2_topic_id = EntityTopicId::default_child_device("child2").unwrap();

let twin_fragment_key = "foo".to_string();
let twin_fragment_value = json!("bar");

{
let mut store = new_entity_store(&temp_dir);
store.update(reg_message.clone()).unwrap();
store
.update(
EntityRegistrationMessage::new_custom(
child1_topic_id.clone(),
EntityType::ChildDevice,
)
.with_external_id("child1".into()),
)
.unwrap();
store
.update_twin_data(EntityTwinMessage::new(
topic_id.clone(),
child1_topic_id.clone(),
twin_fragment_key.clone(),
twin_fragment_value.clone(),
))
.unwrap();

store
.update(
EntityRegistrationMessage::new_custom(
child2_topic_id.clone(),
EntityType::ChildDevice,
)
.with_external_id("child2".into()),
)
.unwrap();
}

{
// Reload the entity store using the same persistent file
let store = new_entity_store(&temp_dir);
let entity_metadata = store.get(&topic_id).unwrap();
let mut expected_entity_metadata =
EntityMetadata::child_device("child1".into()).unwrap();
expected_entity_metadata
.twin_data
.insert(twin_fragment_key.clone(), twin_fragment_value.clone());

let entity_metadata = store.get(&child1_topic_id).unwrap();
assert_eq!(entity_metadata, &expected_entity_metadata);
assert_eq!(
entity_metadata.twin_data.get(&twin_fragment_key).unwrap(),
&twin_fragment_value
);

assert_eq!(
store.get(&child2_topic_id).unwrap(),
&EntityMetadata::child_device("child2".into()).unwrap()
);
}
}

Expand Down
1 change: 0 additions & 1 deletion crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3247,7 +3247,6 @@ pub(crate) mod tests {
.to_string(),
);
let messages = converter.convert(&reg_message).await;
dbg!(&messages);

// Assert that the registration message, the twin updates and the cached measurement messages are converted
assert_messages_matching(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,19 @@ Early data messages cached and processed
Device Should Have Fragments maintenance_mode
END

Restart Service tedge-mapper-c8y
Service Health Status Should Be Up tedge-mapper-c8y


Entities persisted and restored
${timestamp}= Get Unix Timestamp
Execute Command tedge mqtt pub --retain 'te/factory1/shop1/plc1/' '{"@type":"child-device","@id":"plc1"}'
Execute Command tedge mqtt pub --retain 'te/factory1/shop1/plc2/' '{"@type":"child-device","@id":"plc2"}'
Execute Command tedge mqtt pub --retain 'te/factory1/shop1/plc1/sensor1' '{"@type":"child-device","@id":"plc1-sensor1","@parent":"factory1/shop1/plc1/"}'
Execute Command tedge mqtt pub --retain 'te/factory1/shop1/plc1/sensor2' '{"@type":"child-device","@id":"plc1-sensor2","@parent":"factory1/shop1/plc1/"}'
Execute Command tedge mqtt pub --retain 'te/factory1/shop1/plc2/sensor1' '{"@type":"child-device","@id":"plc2-sensor1","@parent":"factory1/shop1/plc2/"}'
Execute Command tedge mqtt pub --retain 'te/factory1/shop1/plc1/metrics' '{"@type":"service","@id":"plc1-metrics","@parent":"factory1/shop1/plc1/"}'
Execute Command tedge mqtt pub --retain 'te/factory1/shop1/plc2/metrics' '{"@type":"service","@id":"plc2-metrics","@parent":"factory1/shop1/plc2/"}'
Execute Command tedge mqtt pub --retain 'te/factory/shop/plc1/' '{"@type":"child-device","@id":"plc1"}'
Execute Command tedge mqtt pub --retain 'te/factory/shop/plc2/' '{"@type":"child-device","@id":"plc2"}'
Execute Command tedge mqtt pub --retain 'te/factory/shop/plc1/sensor1' '{"@type":"child-device","@id":"plc1-sensor1","@parent":"factory/shop/plc1/"}'
Execute Command tedge mqtt pub --retain 'te/factory/shop/plc1/sensor2' '{"@type":"child-device","@id":"plc1-sensor2","@parent":"factory/shop/plc1/"}'
Execute Command tedge mqtt pub --retain 'te/factory/shop/plc2/sensor1' '{"@type":"child-device","@id":"plc2-sensor1","@parent":"factory/shop/plc2/"}'
Execute Command tedge mqtt pub --retain 'te/factory/shop/plc1/metrics' '{"@type":"service","@id":"plc1-metrics","@parent":"factory/shop/plc1/"}'
Execute Command tedge mqtt pub --retain 'te/factory/shop/plc2/metrics' '{"@type":"service","@id":"plc2-metrics","@parent":"factory/shop/plc2/"}'

Should Have MQTT Messages c8y/s/us message_contains=101,plc1 date_from=${timestamp} minimum=1 maximum=1
Should Have MQTT Messages c8y/s/us message_contains=101,plc2 date_from=${timestamp} minimum=1 maximum=1
Expand Down

0 comments on commit 3001ea3

Please sign in to comment.