Skip to content

Commit

Permalink
C8y mapper handling c8y_LogfileRequest thin-edge#2017
Browse files Browse the repository at this point in the history
  • Loading branch information
albinsuresh authored and rina23q committed Aug 8, 2023
1 parent 4833d2f commit d6a2d15
Showing 10 changed files with 297 additions and 5 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -241,7 +241,7 @@ pub struct SmartRestLogRequest {
pub date_from: OffsetDateTime,
#[serde(deserialize_with = "to_datetime")]
pub date_to: OffsetDateTime,
pub needle: Option<String>,
pub search_text: Option<String>,
pub lines: usize,
}

51 changes: 51 additions & 0 deletions crates/core/tedge_api/src/messages.rs
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ use mqtt_channel::Topic;
use nanoid::nanoid;
use serde::Deserialize;
use serde::Serialize;
use time::OffsetDateTime;

const SOFTWARE_LIST_REQUEST_TOPIC: &str = "tedge/commands/req/software/list";
const SOFTWARE_LIST_RESPONSE_TOPIC: &str = "tedge/commands/res/software/list";
@@ -534,6 +535,56 @@ impl RestartOperationResponse {
}
}

#[derive(Debug, Deserialize, Serialize, PartialEq, Copy, Eq, Clone)]
#[serde(rename_all = "camelCase")]
pub enum CommandStatus {
Init,
Executing,
Successful,
Failed,
}

#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct LogUploadCmdPayload {
pub status: CommandStatus, //Define a different enum if this op needs more states,
#[serde(rename = "type")]
pub log_type: String,
pub tedge_url: String,
pub date_from: OffsetDateTime,
pub date_to: OffsetDateTime,
pub search_text: Option<String>,
pub max_lines: usize,
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
}

impl<'a> Jsonify<'a> for LogUploadCmdPayload {}

impl LogUploadCmdPayload {
pub fn new(
status: CommandStatus,
log_type: String,
tedge_url: String,
date_from: OffsetDateTime,
date_to: OffsetDateTime,
search_text: Option<String>,
max_lines: usize,
reason: Option<String>,
) -> Self {
Self {
status,
log_type,
tedge_url,
date_from,
date_to,
search_text,
max_lines,
reason,
}
}
}

#[cfg(test)]
mod tests {
use super::*;
90 changes: 90 additions & 0 deletions crates/core/tedge_api/src/topic.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
use crate::error::TopicError;
use mqtt_channel::Topic;
use mqtt_channel::TopicFilter;
use std::convert::TryFrom;

const CMD_TOPIC_FILTER: &str = "te/device/+/+/+/cmd/+/+";

#[derive(Debug, Clone, Eq, PartialEq)]
pub enum DeviceKind {
Main,
Child(String),
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub enum ResponseTopic {
SoftwareListResponse,
@@ -76,6 +87,85 @@ pub fn get_child_id_from_child_topic(topic: &str) -> Option<String> {
})
}

pub fn get_target_ids_from_cmd_topic(topic: &Topic) -> Option<(DeviceKind, String)> {
let cmd_topic_filter: TopicFilter = CMD_TOPIC_FILTER.try_into().unwrap();

if cmd_topic_filter.accept_topic(topic) {
// with the topic scheme te/device/<device-id>///cmd/<cmd-id>

let mut topic_split = topic.name.split('/');
// the 3rd level is the device id
let device_id = topic_split.nth(2).unwrap();
// the 6th element is the child id
let cmd_id = topic_split.nth(6).unwrap();

if device_id == "main" {
Some((DeviceKind::Main, cmd_id.into()))
} else {
Some((DeviceKind::Child(device_id.into()), cmd_id.into()))
}
} else {
None
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub enum CmdPublishTopic {
// Restart(Target),
// SoftwareList(Target),
// SoftwareUpdate(Target),
// ConfigSnapshot(Target),
// ConfigUpdate(Target),
LogUpload(Target),
}

impl From<CmdPublishTopic> for Topic {
fn from(value: CmdPublishTopic) -> Self {
let topic = match value {
CmdPublishTopic::LogUpload(target) => {
format!("te/device/{}///cmd/{}", target.device_id, target.cmd_id)
}
};
Topic::new_unchecked(&topic)
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub enum CmdSubscribeTopic {
// Restart,
// SoftwareList,
// SoftwareUpdate,
LogUpload,
// ConfigSnapshot,
// ConfigUpdate,
}

impl From<CmdSubscribeTopic> for &str {
fn from(value: CmdSubscribeTopic) -> Self {
match value {
CmdSubscribeTopic::LogUpload => "te/device/+///cmd/log_upload/+",
}
}
}

impl From<CmdSubscribeTopic> for TopicFilter {
fn from(value: CmdSubscribeTopic) -> Self {
TopicFilter::new_unchecked(value.into())
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub struct Target {
device_id: String,
cmd_id: String,
}

impl Target {
pub fn new(device_id: String, cmd_id: String) -> Self {
Target { device_id, cmd_id }
}
}

#[cfg(test)]
mod tests {
use super::*;
4 changes: 2 additions & 2 deletions crates/extensions/c8y_log_manager/src/actor.rs
Original file line number Diff line number Diff line change
@@ -174,7 +174,7 @@ impl LogManagerActor {
logfile.as_path(),
line_counter,
smartrest_obj.lines,
&smartrest_obj.needle,
&smartrest_obj.search_text,
) {
Ok((lines, file_content)) => {
line_counter = lines;
@@ -501,7 +501,7 @@ mod tests {
log_type,
date_from: datetime!(1970-01-01 00:00:03 +00:00),
date_to: datetime!(1970-01-01 00:00:00 +00:00), // not used
needle,
search_text: needle,
lines,
}
}
1 change: 1 addition & 0 deletions crates/extensions/c8y_mapper_ext/Cargo.toml
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ camino = "1.1"
clock = { path = "../../common/clock" }
json-writer = { path = "../../common/json_writer" }
logged_command = { path = "../../common/logged_command" }
nanoid = "0.4"
plugin_sm = { path = "../../core/plugin_sm" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
20 changes: 20 additions & 0 deletions crates/extensions/c8y_mapper_ext/src/config.rs
Original file line number Diff line number Diff line change
@@ -4,10 +4,12 @@ use c8y_api::smartrest::topic::C8yTopic;
use camino::Utf8PathBuf;
use std::path::Path;
use std::path::PathBuf;
use tedge_api::topic::CmdSubscribeTopic;
use tedge_api::topic::ResponseTopic;
use tedge_config::new::ConfigNotSet;
use tedge_config::new::ReadError;
use tedge_config::new::TEdgeConfig;
use tedge_config::DEFAULT_FILE_TRANSFER_DIR_NAME;
use tedge_mqtt_ext::TopicFilter;
use tracing::log::warn;

@@ -16,34 +18,44 @@ pub const MQTT_MESSAGE_SIZE_THRESHOLD: usize = 16184;
pub struct C8yMapperConfig {
pub config_dir: PathBuf,
pub logs_path: Utf8PathBuf,
pub data_dir: Utf8PathBuf,
pub device_id: String,
pub device_type: String,
pub service_type: String,
pub ops_dir: PathBuf,
pub file_transfer_dir: Utf8PathBuf,
pub c8y_host: String,
pub tedge_http_host: String,
pub topics: TopicFilter,
}

impl C8yMapperConfig {
#[allow(clippy::too_many_arguments)]
pub fn new(
config_dir: PathBuf,
logs_path: Utf8PathBuf,
data_dir: Utf8PathBuf,
device_id: String,
device_type: String,
service_type: String,
c8y_host: String,
tedge_http_host: String,
topics: TopicFilter,
) -> Self {
let ops_dir = config_dir.join("operations").join("c8y");
let file_transfer_dir = data_dir.join(DEFAULT_FILE_TRANSFER_DIR_NAME);

Self {
config_dir,
logs_path,
data_dir,
device_id,
device_type,
service_type,
ops_dir,
file_transfer_dir,
c8y_host,
tedge_http_host,
topics,
}
}
@@ -55,10 +67,15 @@ impl C8yMapperConfig {
let config_dir: PathBuf = config_dir.as_ref().into();

let logs_path = tedge_config.logs.path.clone();
let data_dir = tedge_config.data.path.clone();
let device_id = tedge_config.device.id.try_read(tedge_config)?.to_string();
let device_type = tedge_config.device.ty.clone();
let service_type = tedge_config.service.ty.clone();
let c8y_host = tedge_config.c8y_url().or_config_not_set()?.to_string();
let tedge_http_address = tedge_config.http.bind.address;
let tedge_http_port = tedge_config.http.bind.port;

let tedge_http_host = format!("{}:{}", tedge_http_address, tedge_http_port);

// The topics to subscribe = default internal topics + user configurable external topics
let mut topics = Self::internal_topic_filter(&config_dir)?;
@@ -71,10 +88,12 @@ impl C8yMapperConfig {
Ok(C8yMapperConfig::new(
config_dir,
logs_path,
data_dir,
device_id,
device_type,
service_type,
c8y_host,
tedge_http_host,
topics,
))
}
@@ -87,6 +106,7 @@ impl C8yMapperConfig {
ResponseTopic::SoftwareListResponse.as_str(),
ResponseTopic::SoftwareUpdateResponse.as_str(),
ResponseTopic::RestartResponse.as_str(),
CmdSubscribeTopic::LogUpload.into(),
]
.try_into()
.expect("topics that mapper should subscribe to");
Loading

0 comments on commit d6a2d15

Please sign in to comment.