Skip to content

Commit

Permalink
Merge pull request #2522 from albinsuresh/feat/2428/entity-store-pers…
Browse files Browse the repository at this point in the history
…istence

Make entity store persistent #2428
  • Loading branch information
albinsuresh authored Dec 19, 2023
2 parents 460f3eb + 93c0cda commit 141c1d0
Show file tree
Hide file tree
Showing 16 changed files with 789 additions and 87 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/common/mqtt_channel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ zeroize = { workspace = true }
[dev-dependencies]
anyhow = { workspace = true }
mqtt_tests = { workspace = true }
serde_json = { workspace = true }
serial_test = { workspace = true }
95 changes: 94 additions & 1 deletion crates/common/mqtt_channel/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,44 @@ use crate::errors::MqttError;
use crate::topics::Topic;
use rumqttc::Publish;
use rumqttc::QoS;
use serde::Deserialize;
use serde::Deserializer;
use serde::Serialize;
use serde::Serializer;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::fmt::Write;

/// A message to be sent to or received from MQTT.
#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct Message {
pub topic: Topic,
pub payload: DebugPayload,
#[serde(serialize_with = "serialize_qos", deserialize_with = "deserialize_qos")]
pub qos: QoS,
pub retain: bool,
}

fn serialize_qos<S>(qos: &QoS, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
(*qos as u8).serialize(serializer)
}

fn deserialize_qos<'de, D>(deserializer: D) -> Result<QoS, D::Error>
where
D: serde::Deserializer<'de>,
{
let value = u8::deserialize(deserializer)?;
match value {
0 => Ok(QoS::AtMostOnce),
1 => Ok(QoS::AtLeastOnce),
2 => Ok(QoS::ExactlyOnce),
_ => Err(serde::de::Error::custom("Invalid QoS value")),
}
}

#[derive(Clone, Eq, PartialEq)]
pub struct DebugPayload(Payload);

Expand Down Expand Up @@ -43,6 +68,57 @@ impl AsRef<Payload> for DebugPayload {
}
}

impl Serialize for DebugPayload {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match std::str::from_utf8(&self.0) {
Ok(payload_str) => {
// Serialize as a string if all characters are valid UTF-8
serializer.serialize_str(payload_str)
}
Err(_) => {
// Serialize as a byte array otherwise
serializer.serialize_bytes(&self.0)
}
}
}
}

impl<'de> Deserialize<'de> for DebugPayload {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
struct DebugPayloadVisitor;

impl<'de> serde::de::Visitor<'de> for DebugPayloadVisitor {
type Value = DebugPayload;

fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a string or a sequence of bytes")
}

fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(DebugPayload(value.as_bytes().to_vec()))
}

fn visit_bytes<E>(self, value: &[u8]) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(DebugPayload(value.to_vec()))
}
}

deserializer.deserialize_any(DebugPayloadVisitor)
}
}

impl DebugPayload {
/// The payload string (unless this payload is not UTF8)
pub fn as_str(&self) -> Result<&str, MqttError> {
Expand Down Expand Up @@ -138,6 +214,8 @@ where

#[cfg(test)]
mod tests {
use serde_json::json;

use super::*;

#[test]
Expand Down Expand Up @@ -188,4 +266,19 @@ mod tests {
"Invalid UTF8 payload: invalid utf-8 sequence of 1 bytes from index 0: ..."
);
}

#[test]
fn message_serialize_deserialize() {
let message = Message {
topic: Topic::new("test").unwrap(),
payload: DebugPayload("test-payload".as_bytes().to_vec()),
qos: QoS::AtMostOnce,
retain: true,
};

let json = serde_json::to_value(&message).expect("Serialization failed");
assert_eq!(json.get("payload").unwrap(), &json!("test-payload"));
let deserialized: Message = serde_json::from_value(json).expect("Deserialization failed");
assert_eq!(deserialized, message);
}
}
15 changes: 13 additions & 2 deletions crates/core/c8y_api/src/json_c8y.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ mod tests {
use tedge_api::event::ThinEdgeEventData;
use tedge_api::messages::SoftwareListCommandPayload;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use test_case::test_case;
use time::macros::datetime;

Expand Down Expand Up @@ -768,11 +769,16 @@ mod tests {
;"convert to clear alarm"
)]
fn check_alarm_translation(tedge_alarm: ThinEdgeAlarm, expected_c8y_alarm: C8yAlarm) {
let temp_dir = tempfile::tempdir().unwrap();
let main_device = EntityRegistrationMessage::main_device("test-main".into());
let mut entity_store = EntityStore::with_main_device(
let mut entity_store = EntityStore::with_main_device_and_default_service_type(
MqttSchema::default(),
main_device,
"service".into(),
dummy_external_id_mapper,
dummy_external_id_validator,
5,
&temp_dir,
)
.unwrap();

Expand Down Expand Up @@ -800,11 +806,16 @@ mod tests {
}),
};

let temp_dir = tempfile::tempdir().unwrap();
let main_device = EntityRegistrationMessage::main_device("test-main".into());
let entity_store = EntityStore::with_main_device(
let entity_store = EntityStore::with_main_device_and_default_service_type(
MqttSchema::default(),
main_device,
"service".into(),
dummy_external_id_mapper,
dummy_external_id_validator,
5,
&temp_dir,
)
.unwrap();

Expand Down
1 change: 1 addition & 0 deletions crates/core/tedge_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ clock = { workspace = true }
maplit = { workspace = true }
mockall = { workspace = true }
regex = { workspace = true }
tempfile = { workspace = true }
test-case = { workspace = true }
time = { workspace = true, features = ["macros"] }
toml = { workspace = true }
Expand Down
Loading

1 comment on commit 141c1d0

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
376 0 3 376 100 1h1m14.032999999s

Please sign in to comment.