Skip to content

Commit

Permalink
Cache early twin data messages
Browse files Browse the repository at this point in the history
  • Loading branch information
albinsuresh committed Dec 6, 2023
1 parent d36302f commit 13564ef
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
3 changes: 3 additions & 0 deletions crates/core/tedge_api/src/partial_entity_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ pub struct PartialEntityStore {
}

pub struct PartialEntityCache {
pub twin_data: Vec<MqttMessage>,
pub telemetry: RingBuffer<MqttMessage>,
}

impl PartialEntityCache {
fn new() -> Self {
PartialEntityCache {
twin_data: vec![],
telemetry: RingBuffer::new(EARLY_MESSAGE_BUFFER_SIZE),
}
}
Expand Down Expand Up @@ -53,6 +55,7 @@ impl PartialEntityStore {
Channel::Measurement { .. } | Channel::Event { .. } | Channel::Alarm { .. } => {
entity_cache.telemetry.push(message);
}
Channel::EntityTwinData { .. } => entity_cache.twin_data.push(message),
_ => {
// Ignore
}
Expand Down
24 changes: 23 additions & 1 deletion crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1163,7 +1163,11 @@ impl CumulocityConverter {
cached_entity: PartialEntityCache,
) -> Result<Vec<Message>, ConversionError> {
let mut converted_messages = vec![];
for message in cached_entity.telemetry {
for message in cached_entity
.twin_data
.into_iter()
.chain(cached_entity.telemetry.into_iter())
{
let (source, channel) = self.mqtt_schema.entity_channel_of(&message.topic).unwrap();
converted_messages.append(
&mut self
Expand Down Expand Up @@ -3153,6 +3157,17 @@ pub(crate) mod tests {
)
}

// Publish a twin message which is also cached
let twin_message = Message::new(
&Topic::new_unchecked("te/custom/child1///twin/foo"),
r#"5.6789"#,
);
let mapped_messages = converter.convert(&twin_message).await;
assert!(
mapped_messages.is_empty(),
"Expected the early twin messages to be cached and not mapped"
);

// Publish the registration message which will trigger the conversion of cached messages as well
let reg_message = Message::new(
&Topic::new_unchecked("te/custom/child1//"),
Expand All @@ -3165,6 +3180,13 @@ pub(crate) mod tests {
&messages,
[
("c8y/s/us", "101,child1,child1,thin-edge.io-child".into()),
(
"c8y/inventory/managedObjects/update/child1",
json!({
"foo": 5.6789
})
.into(),
),
(
"c8y/measurement/measurements/create",
json!({
Expand Down

0 comments on commit 13564ef

Please sign in to comment.