Skip to content

Commit

Permalink
Make entity store persistent #2428
Browse files Browse the repository at this point in the history
Persist the entity store as a JSON lines file.
Every registration message and twin data message is persisted as JSON lines.
On startup the in-memory entity store is rebuilt by replaying these messages.
  • Loading branch information
albinsuresh committed Dec 14, 2023
1 parent b4974b9 commit 4315a60
Show file tree
Hide file tree
Showing 17 changed files with 475 additions and 76 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/common/mqtt_channel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ zeroize = { workspace = true }
[dev-dependencies]
anyhow = { workspace = true }
mqtt_tests = { workspace = true }
serde_json = { workspace = true }
serial_test = { workspace = true }
41 changes: 39 additions & 2 deletions crates/common/mqtt_channel/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,43 @@ use crate::errors::MqttError;
use crate::topics::Topic;
use rumqttc::Publish;
use rumqttc::QoS;
use serde::Deserialize;
use serde::Serialize;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::fmt::Write;

/// A message to be sent to or received from MQTT.
#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct Message {
pub topic: Topic,
pub payload: DebugPayload,
#[serde(serialize_with = "serialize_qos", deserialize_with = "deserialize_qos")]
pub qos: QoS,
pub retain: bool,
}

#[derive(Clone, Eq, PartialEq)]
fn serialize_qos<S>(qos: &QoS, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
(*qos as u8).serialize(serializer)
}

fn deserialize_qos<'de, D>(deserializer: D) -> Result<QoS, D::Error>
where
D: serde::Deserializer<'de>,
{
let value = u8::deserialize(deserializer)?;
match value {
0 => Ok(QoS::AtMostOnce),
1 => Ok(QoS::AtLeastOnce),
2 => Ok(QoS::ExactlyOnce),
_ => Err(serde::de::Error::custom("Invalid QoS value")),
}
}

#[derive(Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct DebugPayload(Payload);

impl Debug for DebugPayload {
Expand Down Expand Up @@ -188,4 +211,18 @@ mod tests {
"Invalid UTF8 payload: invalid utf-8 sequence of 1 bytes from index 0: ..."
);
}

#[test]
fn message_serialize_deserialize() {
let message = Message {
topic: Topic::new("test").unwrap(),
payload: DebugPayload("payload".as_bytes().to_vec()),
qos: QoS::AtMostOnce,
retain: true,
};

let json = serde_json::to_string(&message).expect("Serialization failed");
let deserialized: Message = serde_json::from_str(&json).expect("Deserialization failed");
assert_eq!(deserialized, message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,13 @@ define_tedge_config! {
path: Utf8PathBuf,
},

db: {
/// The directory used to store any persistent data that survives tedge updates and even firmware updates
#[tedge_config(example = "/data/tedge", default(value = "/data/tedge"))]
#[doku(as = "PathBuf")]
path: Utf8PathBuf,
},

data: {
/// The directory used to store data like cached files, runtime metadata, etc.
#[tedge_config(example = "/var/tedge", default(value = "/var/tedge"))]
Expand Down
15 changes: 13 additions & 2 deletions crates/core/c8y_api/src/json_c8y.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ mod tests {
use tedge_api::event::ThinEdgeEventData;
use tedge_api::messages::SoftwareListCommandPayload;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use test_case::test_case;
use time::macros::datetime;

Expand Down Expand Up @@ -768,11 +769,16 @@ mod tests {
;"convert to clear alarm"
)]
fn check_alarm_translation(tedge_alarm: ThinEdgeAlarm, expected_c8y_alarm: C8yAlarm) {
let temp_dir = tempfile::tempdir().unwrap();
let main_device = EntityRegistrationMessage::main_device("test-main".into());
let mut entity_store = EntityStore::with_main_device(
let mut entity_store = EntityStore::with_main_device_and_default_service_type(
MqttSchema::default(),
main_device,
"service".into(),
dummy_external_id_mapper,
dummy_external_id_validator,
5,
&temp_dir,
)
.unwrap();

Expand Down Expand Up @@ -800,11 +806,16 @@ mod tests {
}),
};

let temp_dir = tempfile::tempdir().unwrap();
let main_device = EntityRegistrationMessage::main_device("test-main".into());
let entity_store = EntityStore::with_main_device(
let entity_store = EntityStore::with_main_device_and_default_service_type(
MqttSchema::default(),
main_device,
"service".into(),
dummy_external_id_mapper,
dummy_external_id_validator,
5,
&temp_dir,
)
.unwrap();

Expand Down
9 changes: 9 additions & 0 deletions crates/core/tedge/src/cli/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,15 @@ impl TEdgeInitCmd {
),
)?;

create_directory(
&config.db.path,
PermissionEntry::new(
Some(self.user.clone()),
Some(self.group.clone()),
Some(0o775),
),
)?;

Ok(())
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/core/tedge_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ clock = { workspace = true }
maplit = { workspace = true }
mockall = { workspace = true }
regex = { workspace = true }
tempfile = { workspace = true }
test-case = { workspace = true }
time = { workspace = true, features = ["macros"] }
toml = { workspace = true }
Expand Down
Loading

0 comments on commit 4315a60

Please sign in to comment.