Skip to content

Commit

Permalink
C8y mapper handling c8y_LogfileRequest thin-edge#2017
Browse files Browse the repository at this point in the history
  • Loading branch information
albinsuresh committed Jul 28, 2023
1 parent 3610178 commit da5c7b7
Show file tree
Hide file tree
Showing 13 changed files with 359 additions and 26 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ pub struct SmartRestLogRequest {
pub date_from: OffsetDateTime,
#[serde(deserialize_with = "to_datetime")]
pub date_to: OffsetDateTime,
pub needle: Option<String>,
pub search_text: Option<String>,
pub lines: usize,
}

Expand Down
51 changes: 51 additions & 0 deletions crates/core/tedge_api/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use mqtt_channel::Topic;
use nanoid::nanoid;
use serde::Deserialize;
use serde::Serialize;
use time::OffsetDateTime;

const SOFTWARE_LIST_REQUEST_TOPIC: &str = "tedge/commands/req/software/list";
const SOFTWARE_LIST_RESPONSE_TOPIC: &str = "tedge/commands/res/software/list";
Expand Down Expand Up @@ -534,6 +535,56 @@ impl RestartOperationResponse {
}
}

#[derive(Debug, Deserialize, Serialize, PartialEq, Copy, Eq, Clone)]
#[serde(rename_all = "camelCase")]
pub enum CommandStatus {
Init,
Executing,
Successful,
Failed,
}

#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct LogUploadCmdPayload {
pub status: CommandStatus, //Define a different enum if this op needs more states,
#[serde(rename = "type")]
pub log_type: String,
pub tedge_url: String,
pub date_from: OffsetDateTime,
pub date_to: OffsetDateTime,
pub search_text: Option<String>,
pub max_lines: usize,
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
}

impl<'a> Jsonify<'a> for LogUploadCmdPayload {}

impl LogUploadCmdPayload {
pub fn new(
status: CommandStatus,
log_type: String,
tedge_url: String,
date_from: OffsetDateTime,
date_to: OffsetDateTime,
search_text: Option<String>,
max_lines: usize,
reason: Option<String>,
) -> Self {
Self {
status,
log_type,
tedge_url,
date_from,
date_to,
search_text,
max_lines,
reason,
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
90 changes: 90 additions & 0 deletions crates/core/tedge_api/src/topic.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
use crate::error::TopicError;
use mqtt_channel::Topic;
use mqtt_channel::TopicFilter;
use std::convert::TryFrom;

const CMD_TOPIC_FILTER: &str = "te/device/+/+/+/cmd/+/+";

#[derive(Debug, Clone, Eq, PartialEq)]
pub enum DeviceKind {
Main,
Child(String),
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub enum ResponseTopic {
SoftwareListResponse,
Expand Down Expand Up @@ -76,6 +87,85 @@ pub fn get_child_id_from_child_topic(topic: &str) -> Option<String> {
})
}

pub fn get_target_ids_from_cmd_topic(topic: &Topic) -> Option<(DeviceKind, String)> {
let cmd_topic_filter: TopicFilter = CMD_TOPIC_FILTER.try_into().unwrap();

if cmd_topic_filter.accept_topic(topic) {
// with the topic scheme te/device/<device-id>///cmd/<cmd-id>

let mut topic_split = topic.name.split('/');
// the 3rd level is the device id
let device_id = topic_split.nth(2).unwrap();
// the 6th element is the child id
let cmd_id = topic_split.nth(6).unwrap();

if device_id == "main" {
Some((DeviceKind::Main, cmd_id.into()))
} else {
Some((DeviceKind::Child(device_id.into()), cmd_id.into()))
}
} else {
None
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub enum CmdPublishTopic {
// Restart(Target),
// SoftwareList(Target),
// SoftwareUpdate(Target),
// ConfigSnapshot(Target),
// ConfigUpdate(Target),
LogUpload(Target),
}

impl From<CmdPublishTopic> for Topic {
fn from(value: CmdPublishTopic) -> Self {
let topic = match value {
CmdPublishTopic::LogUpload(target) => {
format!("te/device/{}///cmd/{}", target.device_id, target.cmd_id)
}
};
Topic::new_unchecked(&topic)
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub enum CmdSubscribeTopic {
// Restart,
// SoftwareList,
// SoftwareUpdate,
LogUpload,
// ConfigSnapshot,
// ConfigUpdate,
}

impl From<CmdSubscribeTopic> for &str {
fn from(value: CmdSubscribeTopic) -> Self {
match value {
CmdSubscribeTopic::LogUpload => "te/device/+///cmd/log_upload/+",
}
}
}

impl From<CmdSubscribeTopic> for TopicFilter {
fn from(value: CmdSubscribeTopic) -> Self {
TopicFilter::new_unchecked(value.into())
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub struct Target {
device_id: String,
cmd_id: String,
}

impl Target {
pub fn new(device_id: String, cmd_id: String) -> Self {
Target { device_id, cmd_id }
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
22 changes: 22 additions & 0 deletions crates/extensions/c8y_http_proxy/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::messages::SoftwareListResponse;
use crate::messages::Unit;
use crate::messages::UploadConfigFile;
use crate::messages::UploadLogBinary;
use crate::messages::UploadLogFile;
use crate::C8YHttpConfig;
use async_trait::async_trait;
use c8y_api::http_proxy::C8yEndPoint;
Expand Down Expand Up @@ -99,6 +100,12 @@ impl Actor for C8YHttpProxyActor {
.upload_log_binary(request)
.await
.map(|response| response.into()),

C8YRestRequest::UploadLogFile(request) => self
.upload_log_file(request)
.await
.map(|response| response.into()),

C8YRestRequest::UploadConfigFile(request) => self
.upload_config_file(request)
.await
Expand Down Expand Up @@ -345,11 +352,26 @@ impl C8YHttpProxyActor {
}
}

async fn upload_log_file(&mut self, request: UploadLogFile) -> Result<EventId, C8YRestError> {
// read the log file contents
let log_content = std::fs::read_to_string(request.log_path)?;

// TODO: Upload the file as a multi-part stream
let request = UploadLogBinary {
log_type: request.log_type,
log_content,
device_id: request.device_id,
};

self.upload_log_binary(request).await
}

async fn upload_config_file(
&mut self,
request: UploadConfigFile,
) -> Result<EventId, C8YRestError> {
let device_id = request.device_id;
// TODO: Upload the file as a multi-part stream
// read the config file contents
let config_content = std::fs::read_to_string(request.config_path)
.map_err(<std::io::Error as Into<SMCumulocityMapperError>>::into)?;
Expand Down
19 changes: 19 additions & 0 deletions crates/extensions/c8y_http_proxy/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::messages::GetJwtToken;
use crate::messages::SoftwareListResponse;
use crate::messages::UploadConfigFile;
use crate::messages::UploadLogBinary;
use crate::messages::UploadLogFile;
use c8y_api::json_c8y::C8yCreateEvent;
use c8y_api::json_c8y::C8yUpdateSoftwareListResponse;
use std::path::Path;
Expand Down Expand Up @@ -82,6 +83,24 @@ impl C8YHttpProxy {
}
}

pub async fn upload_log_file(
&mut self,
log_type: impl AsRef<str>,
log_path: impl AsRef<Path>,
device_id: String,
) -> Result<String, C8YRestError> {
let request: C8YRestRequest = UploadLogFile {
log_type: log_type.as_ref().into(),
log_path: log_path.as_ref().to_owned(),
device_id,
}
.into();
match self.c8y.await_response(request).await? {
Ok(C8YRestResponse::EventId(id)) => Ok(id),
unexpected => Err(unexpected.into()),
}
}

pub async fn upload_config_file(
&mut self,
config_path: &Path,
Expand Down
14 changes: 12 additions & 2 deletions crates/extensions/c8y_http_proxy/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use tedge_actors::ChannelError;
use tedge_http_ext::HttpError;
use tedge_utils::file::PermissionEntry;

fan_in_message_type!(C8YRestRequest[GetJwtToken, C8yCreateEvent, SoftwareListResponse, UploadLogBinary, UploadConfigFile, DownloadFile]: Debug, PartialEq, Eq);
//HIPPO Rename EventId to String as there could be many other String responses as well and this macro doesn't allow another String variant
fan_in_message_type!(C8YRestRequest[GetJwtToken, C8yCreateEvent, SoftwareListResponse, UploadLogBinary, UploadLogFile, UploadConfigFile, DownloadFile]: Debug, PartialEq, Eq);
//TODO Rename EventId to String as there could be many other String responses as well and this macro doesn't allow another String variant
fan_in_message_type!(C8YRestResponse[EventId, Unit]: Debug);

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -36,6 +36,9 @@ pub enum C8YRestError {

#[error(transparent)]
FromFileError(#[from] tedge_utils::file::FileError),

#[error(transparent)]
FromIOError(#[from] std::io::Error),
}

pub type C8YRestResult = Result<C8YRestResponse, C8YRestError>;
Expand All @@ -56,6 +59,13 @@ pub struct UploadLogBinary {
pub device_id: String,
}

#[derive(Debug, PartialEq, Eq)]
pub struct UploadLogFile {
pub log_type: String,
pub log_path: PathBuf,
pub device_id: String,
}

#[derive(Debug, PartialEq, Eq)]
pub struct UploadConfigFile {
pub config_path: PathBuf,
Expand Down
4 changes: 2 additions & 2 deletions crates/extensions/c8y_log_manager/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl LogManagerActor {
logfile.as_path(),
line_counter,
smartrest_obj.lines,
&smartrest_obj.needle,
&smartrest_obj.search_text,
) {
Ok((lines, file_content)) => {
line_counter = lines;
Expand Down Expand Up @@ -501,7 +501,7 @@ mod tests {
log_type,
date_from: datetime!(1970-01-01 00:00:03 +00:00),
date_to: datetime!(1970-01-01 00:00:00 +00:00), // not used
needle,
search_text: needle,
lines,
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/extensions/c8y_mapper_ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ camino = "1.1"
clock = { path = "../../common/clock" }
json-writer = { path = "../../common/json_writer" }
logged_command = { path = "../../common/logged_command" }
nanoid = "0.4"
plugin_sm = { path = "../../core/plugin_sm" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
Expand Down
Loading

0 comments on commit da5c7b7

Please sign in to comment.