Skip to content

Commit

Permalink
C8y mapper handling c8y_LogfileRequest thin-edge#2017
Browse files Browse the repository at this point in the history
  • Loading branch information
albinsuresh authored and didier-wenzek committed Aug 17, 2023
1 parent f1d0017 commit 60c2705
Show file tree
Hide file tree
Showing 10 changed files with 297 additions and 5 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ pub struct SmartRestLogRequest {
pub date_from: OffsetDateTime,
#[serde(deserialize_with = "to_datetime")]
pub date_to: OffsetDateTime,
pub needle: Option<String>,
pub search_text: Option<String>,
pub lines: usize,
}

Expand Down
51 changes: 51 additions & 0 deletions crates/core/tedge_api/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use mqtt_channel::Topic;
use nanoid::nanoid;
use serde::Deserialize;
use serde::Serialize;
use time::OffsetDateTime;

const SOFTWARE_LIST_REQUEST_TOPIC: &str = "tedge/commands/req/software/list";
const SOFTWARE_LIST_RESPONSE_TOPIC: &str = "tedge/commands/res/software/list";
Expand Down Expand Up @@ -534,6 +535,56 @@ impl RestartOperationResponse {
}
}

#[derive(Debug, Deserialize, Serialize, PartialEq, Copy, Eq, Clone)]
#[serde(rename_all = "camelCase")]
pub enum CommandStatus {
Init,
Executing,
Successful,
Failed,
}

#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct LogUploadCmdPayload {
pub status: CommandStatus, //Define a different enum if this op needs more states,
#[serde(rename = "type")]
pub log_type: String,
pub tedge_url: String,
pub date_from: OffsetDateTime,
pub date_to: OffsetDateTime,
pub search_text: Option<String>,
pub max_lines: usize,
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
}

impl<'a> Jsonify<'a> for LogUploadCmdPayload {}

impl LogUploadCmdPayload {
pub fn new(
status: CommandStatus,
log_type: String,
tedge_url: String,
date_from: OffsetDateTime,
date_to: OffsetDateTime,
search_text: Option<String>,
max_lines: usize,
reason: Option<String>,
) -> Self {
Self {
status,
log_type,
tedge_url,
date_from,
date_to,
search_text,
max_lines,
reason,
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
90 changes: 90 additions & 0 deletions crates/core/tedge_api/src/topic.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
use crate::error::TopicError;
use mqtt_channel::Topic;
use mqtt_channel::TopicFilter;
use std::convert::TryFrom;

const CMD_TOPIC_FILTER: &str = "te/device/+/+/+/cmd/+/+";

#[derive(Debug, Clone, Eq, PartialEq)]
pub enum DeviceKind {
Main,
Child(String),
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub enum ResponseTopic {
SoftwareListResponse,
Expand Down Expand Up @@ -76,6 +87,85 @@ pub fn get_child_id_from_child_topic(topic: &str) -> Option<String> {
})
}

pub fn get_target_ids_from_cmd_topic(topic: &Topic) -> Option<(DeviceKind, String)> {
let cmd_topic_filter: TopicFilter = CMD_TOPIC_FILTER.try_into().unwrap();

if cmd_topic_filter.accept_topic(topic) {
// with the topic scheme te/device/<device-id>///cmd/<cmd-id>

let mut topic_split = topic.name.split('/');
// the 3rd level is the device id
let device_id = topic_split.nth(2).unwrap();
// the 6th element is the child id
let cmd_id = topic_split.nth(6).unwrap();

if device_id == "main" {
Some((DeviceKind::Main, cmd_id.into()))
} else {
Some((DeviceKind::Child(device_id.into()), cmd_id.into()))
}
} else {
None
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub enum CmdPublishTopic {
// Restart(Target),
// SoftwareList(Target),
// SoftwareUpdate(Target),
// ConfigSnapshot(Target),
// ConfigUpdate(Target),
LogUpload(Target),
}

impl From<CmdPublishTopic> for Topic {
fn from(value: CmdPublishTopic) -> Self {
let topic = match value {
CmdPublishTopic::LogUpload(target) => {
format!("te/device/{}///cmd/{}", target.device_id, target.cmd_id)
}
};
Topic::new_unchecked(&topic)
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub enum CmdSubscribeTopic {
// Restart,
// SoftwareList,
// SoftwareUpdate,
LogUpload,
// ConfigSnapshot,
// ConfigUpdate,
}

impl From<CmdSubscribeTopic> for &str {
fn from(value: CmdSubscribeTopic) -> Self {
match value {
CmdSubscribeTopic::LogUpload => "te/device/+///cmd/log_upload/+",
}
}
}

impl From<CmdSubscribeTopic> for TopicFilter {
fn from(value: CmdSubscribeTopic) -> Self {
TopicFilter::new_unchecked(value.into())
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub struct Target {
device_id: String,
cmd_id: String,
}

impl Target {
pub fn new(device_id: String, cmd_id: String) -> Self {
Target { device_id, cmd_id }
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
4 changes: 2 additions & 2 deletions crates/extensions/c8y_log_manager/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl LogManagerActor {
logfile.as_path(),
line_counter,
smartrest_obj.lines,
&smartrest_obj.needle,
&smartrest_obj.search_text,
) {
Ok((lines, file_content)) => {
line_counter = lines;
Expand Down Expand Up @@ -501,7 +501,7 @@ mod tests {
log_type,
date_from: datetime!(1970-01-01 00:00:03 +00:00),
date_to: datetime!(1970-01-01 00:00:00 +00:00), // not used
needle,
search_text: needle,
lines,
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/extensions/c8y_mapper_ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ camino = "1.1"
clock = { path = "../../common/clock" }
json-writer = { path = "../../common/json_writer" }
logged_command = { path = "../../common/logged_command" }
nanoid = "0.4"
plugin_sm = { path = "../../core/plugin_sm" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
Expand Down
20 changes: 20 additions & 0 deletions crates/extensions/c8y_mapper_ext/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use c8y_api::smartrest::topic::C8yTopic;
use camino::Utf8PathBuf;
use std::path::Path;
use std::path::PathBuf;
use tedge_api::topic::CmdSubscribeTopic;
use tedge_api::topic::ResponseTopic;
use tedge_api::DEFAULT_FILE_TRANSFER_DIR_NAME;
use tedge_config::ConfigNotSet;
use tedge_config::ReadError;
use tedge_config::TEdgeConfig;
Expand All @@ -16,34 +18,44 @@ pub const MQTT_MESSAGE_SIZE_THRESHOLD: usize = 16184;
pub struct C8yMapperConfig {
pub config_dir: PathBuf,
pub logs_path: Utf8PathBuf,
pub data_dir: Utf8PathBuf,
pub device_id: String,
pub device_type: String,
pub service_type: String,
pub ops_dir: PathBuf,
pub file_transfer_dir: Utf8PathBuf,
pub c8y_host: String,
pub tedge_http_host: String,
pub topics: TopicFilter,
}

impl C8yMapperConfig {
#[allow(clippy::too_many_arguments)]
pub fn new(
config_dir: PathBuf,
logs_path: Utf8PathBuf,
data_dir: Utf8PathBuf,
device_id: String,
device_type: String,
service_type: String,
c8y_host: String,
tedge_http_host: String,
topics: TopicFilter,
) -> Self {
let ops_dir = config_dir.join("operations").join("c8y");
let file_transfer_dir = data_dir.join(DEFAULT_FILE_TRANSFER_DIR_NAME);

Self {
config_dir,
logs_path,
data_dir,
device_id,
device_type,
service_type,
ops_dir,
file_transfer_dir,
c8y_host,
tedge_http_host,
topics,
}
}
Expand All @@ -55,10 +67,15 @@ impl C8yMapperConfig {
let config_dir: PathBuf = config_dir.as_ref().into();

let logs_path = tedge_config.logs.path.clone();
let data_dir = tedge_config.data.path.clone();
let device_id = tedge_config.device.id.try_read(tedge_config)?.to_string();
let device_type = tedge_config.device.ty.clone();
let service_type = tedge_config.service.ty.clone();
let c8y_host = tedge_config.c8y_url().or_config_not_set()?.to_string();
let tedge_http_address = tedge_config.http.bind.address;
let tedge_http_port = tedge_config.http.bind.port;

let tedge_http_host = format!("{}:{}", tedge_http_address, tedge_http_port);

// The topics to subscribe = default internal topics + user configurable external topics
let mut topics = Self::internal_topic_filter(&config_dir)?;
Expand All @@ -71,10 +88,12 @@ impl C8yMapperConfig {
Ok(C8yMapperConfig::new(
config_dir,
logs_path,
data_dir,
device_id,
device_type,
service_type,
c8y_host,
tedge_http_host,
topics,
))
}
Expand All @@ -87,6 +106,7 @@ impl C8yMapperConfig {
ResponseTopic::SoftwareListResponse.as_str(),
ResponseTopic::SoftwareUpdateResponse.as_str(),
ResponseTopic::RestartResponse.as_str(),
CmdSubscribeTopic::LogUpload.into(),
]
.try_into()
.expect("topics that mapper should subscribe to");
Expand Down
Loading

0 comments on commit 60c2705

Please sign in to comment.