Skip to content

Commit

Permalink
C8yHttpProxy generic file upload API
Browse files Browse the repository at this point in the history
  • Loading branch information
albinsuresh authored and rina23q committed Aug 29, 2023
1 parent 5bf727e commit 0ba8025
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 44 deletions.
27 changes: 14 additions & 13 deletions crates/extensions/c8y_config_manager/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use c8y_http_proxy::messages::C8YRestRequest;
use c8y_http_proxy::messages::C8YRestResponse;
use c8y_http_proxy::messages::C8YRestResult;
use c8y_http_proxy::messages::DownloadFile;
use c8y_http_proxy::messages::UploadConfigFile;
use c8y_http_proxy::messages::UploadFile;
use c8y_http_proxy::messages::Url;
use serde_json::json;
use std::net::Ipv4Addr;
use std::time::Duration;
Expand Down Expand Up @@ -102,16 +103,16 @@ async fn test_config_upload_tedge_device() -> Result<(), DynError> {

// Assert config file upload HTTP request
c8y_proxy_message_box
.assert_received([UploadConfigFile {
config_path: test_config_path.into(),
config_type: test_config_type.to_string(),
.assert_received([UploadFile {
file_path: test_config_path.into(),
file_type: test_config_type.to_string(),
device_id,
}])
.await;

// Provide mock config file upload HTTP response to continue
c8y_proxy_message_box
.send(Ok(C8YRestResponse::EventId("test-url".to_string())))
.send(Ok(Url("test-url".to_string()).into()))
.await?;

// Assert SUCCESSFUL SmartREST MQTT message
Expand Down Expand Up @@ -503,21 +504,21 @@ async fn test_child_device_successful_config_snapshot_response_mapping() -> Resu
mqtt_message_box.send(c8y_config_upload_msg).await?;

c8y_proxy_message_box
.assert_received([UploadConfigFile {
config_path: ttd
.assert_received([UploadFile {
file_path: ttd
.to_path_buf()
.join("file-transfer")
.join(child_device_id)
.join("config_snapshot")
.join(test_config_type),
config_type: test_config_type.into(),
file_type: test_config_type.into(),
device_id: child_device_id.into(),
}])
.await;

// Provide mock config file upload HTTP response to continue
c8y_proxy_message_box
.send(Ok(C8YRestResponse::EventId("test-url".to_string())))
.send(Ok(Url("test-url".to_string()).into()))
.await?;

mqtt_message_box
Expand Down Expand Up @@ -577,14 +578,14 @@ async fn test_child_config_snapshot_successful_response_without_uploaded_file_ma
mqtt_message_box.send(c8y_config_upload_msg).await?;

c8y_proxy_message_box
.assert_received([UploadConfigFile {
config_path: ttd
.assert_received([UploadFile {
file_path: ttd
.to_path_buf()
.join("file-transfer")
.join(child_device_id)
.join("config_snapshot")
.join(test_config_type),
config_type: test_config_type.into(),
file_type: test_config_type.into(),
device_id: child_device_id.into(),
}])
.await;
Expand Down Expand Up @@ -932,7 +933,7 @@ async fn test_multiline_smartrest_requests() -> Result<(), DynError> {
loop {
if let Some(_req) = c8y_proxy_message_box.recv().await {
c8y_proxy_message_box
.send(Ok(C8YRestResponse::EventId("test-url".to_string())))
.send(Ok(Url("test-url".to_string()).into()))
.await
.unwrap();
}
Expand Down
2 changes: 1 addition & 1 deletion crates/extensions/c8y_config_manager/src/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ impl ConfigUploadManager {
) -> Result<String, ConfigManagementError> {
let url = message_box
.c8y_http_proxy
.upload_config_file(config_file_path, config_type, device_id)
.upload_file(config_file_path, config_type, device_id)
.await?;
Ok(url)
}
Expand Down
33 changes: 16 additions & 17 deletions crates/extensions/c8y_http_proxy/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use crate::messages::DownloadFile;
use crate::messages::EventId;
use crate::messages::SoftwareListResponse;
use crate::messages::Unit;
use crate::messages::UploadConfigFile;
use crate::messages::UploadFile;
use crate::messages::UploadLogBinary;
use crate::messages::Url;
use crate::C8YHttpConfig;
use async_trait::async_trait;
use c8y_api::http_proxy::C8yEndPoint;
Expand Down Expand Up @@ -106,10 +107,10 @@ impl Actor for C8YHttpProxyActor {
.upload_log_binary(request)
.await
.map(|response| response.into()),
C8YRestRequest::UploadConfigFile(request) => self
.upload_config_file(request)
.await
.map(|response| response.into()),

C8YRestRequest::UploadFile(request) => {
self.upload_file(request).await.map(|url| url.into())
}

C8YRestRequest::DownloadFile(request) => self
.download_file(request)
Expand Down Expand Up @@ -352,48 +353,46 @@ impl C8YHttpProxyActor {
}
}

async fn upload_config_file(
&mut self,
request: UploadConfigFile,
) -> Result<EventId, C8YRestError> {
async fn upload_file(&mut self, request: UploadFile) -> Result<Url, C8YRestError> {
let device_id = request.device_id;
// TODO: Upload the file as a multi-part stream
// read the config file contents
let config_content = std::fs::read_to_string(request.config_path)
let file_content = std::fs::read_to_string(request.file_path)
.map_err(<std::io::Error as Into<SMCumulocityMapperError>>::into)?;

let create_event = |internal_id: String| -> C8yCreateEvent {
C8yCreateEvent {
source: Some(C8yManagedObject { id: internal_id }),
event_type: request.config_type.clone(),
event_type: request.file_type.clone(),
time: OffsetDateTime::now_utc(),
text: request.config_type.clone(),
text: request.file_type.clone(),
extras: HashMap::new(),
}
};

let event_response_id = self
.send_event_internal(device_id.clone(), create_event)
.await?;
debug!(target: self.name(), "Config event created with id: {:?}", event_response_id);
debug!(target: self.name(), "File event created with id: {:?}", event_response_id);

let build_request = |end_point: &C8yEndPoint| -> Result<HttpRequestBuilder, C8YRestError> {
let binary_upload_event_url =
end_point.get_url_for_event_binary_upload(&event_response_id);
Ok(HttpRequestBuilder::post(&binary_upload_event_url)
.header("Accept", "application/json")
.header("Content-Type", "text/plain")
.body(config_content.to_string()))
.body(file_content.to_string()))
};
info!(target: self.name(), "Uploading config file to URL: {}", self.end_point
info!(target: self.name(), "Uploading file to URL: {}", self.end_point
.get_url_for_event_binary_upload(&event_response_id));
let http_result = self.execute(device_id.clone(), build_request).await??;

if !http_result.status().is_success() {
Err(C8YRestError::CustomError("Upload failed".into()))
} else {
Ok(self
Ok(Url(self
.end_point
.get_url_for_event_binary_upload(&event_response_id))
.get_url_for_event_binary_upload(&event_response_id)))
}
}

Expand Down
16 changes: 8 additions & 8 deletions crates/extensions/c8y_http_proxy/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::messages::C8YRestResult;
use crate::messages::GetFreshJwtToken;
use crate::messages::GetJwtToken;
use crate::messages::SoftwareListResponse;
use crate::messages::UploadConfigFile;
use crate::messages::UploadFile;
use crate::messages::UploadLogBinary;
use c8y_api::json_c8y::C8yCreateEvent;
use c8y_api::json_c8y::C8yUpdateSoftwareListResponse;
Expand Down Expand Up @@ -93,20 +93,20 @@ impl C8YHttpProxy {
}
}

pub async fn upload_config_file(
pub async fn upload_file(
&mut self,
config_path: &Path,
config_type: &str,
file_path: &Path,
file_type: &str,
device_id: String,
) -> Result<String, C8YRestError> {
let request: C8YRestRequest = UploadConfigFile {
config_path: config_path.to_owned(),
config_type: config_type.to_string(),
let request: C8YRestRequest = UploadFile {
file_path: file_path.to_owned(),
file_type: file_type.to_string(),
device_id,
}
.into();
match self.c8y.await_response(request).await? {
Ok(C8YRestResponse::EventId(id)) => Ok(id),
Ok(C8YRestResponse::Url(url)) => Ok(url.0),
unexpected => Err(unexpected.into()),
}
}
Expand Down
16 changes: 11 additions & 5 deletions crates/extensions/c8y_http_proxy/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use tedge_actors::ChannelError;
use tedge_http_ext::HttpError;
use tedge_utils::file::PermissionEntry;

fan_in_message_type!(C8YRestRequest[GetJwtToken, GetFreshJwtToken, C8yCreateEvent, SoftwareListResponse, UploadLogBinary, UploadConfigFile, DownloadFile]: Debug, PartialEq, Eq);
fan_in_message_type!(C8YRestRequest[GetJwtToken, GetFreshJwtToken, C8yCreateEvent, SoftwareListResponse, UploadLogBinary, UploadFile, DownloadFile]: Debug, PartialEq, Eq);
//HIPPO Rename EventId to String as there could be many other String responses as well and this macro doesn't allow another String variant
fan_in_message_type!(C8YRestResponse[EventId, Unit]: Debug);
fan_in_message_type!(C8YRestResponse[EventId, Url, Unit]: Debug);

#[derive(thiserror::Error, Debug)]
pub enum C8YRestError {
Expand Down Expand Up @@ -36,6 +36,9 @@ pub enum C8YRestError {

#[error(transparent)]
FromFileError(#[from] tedge_utils::file::FileError),

#[error(transparent)]
FromIOError(#[from] std::io::Error),
}

pub type C8YRestResult = Result<C8YRestResponse, C8YRestError>;
Expand All @@ -60,9 +63,9 @@ pub struct UploadLogBinary {
}

#[derive(Debug, PartialEq, Eq)]
pub struct UploadConfigFile {
pub config_path: PathBuf,
pub config_type: String,
pub struct UploadFile {
pub file_path: PathBuf,
pub file_type: String,
pub device_id: String,
}

Expand All @@ -77,6 +80,9 @@ pub type EventId = String;

pub type Unit = ();

#[derive(Debug, Clone, Eq, PartialEq)]
pub struct Url(pub String);

// Transform any unexpected message into an error
impl From<C8YRestResult> for C8YRestError {
fn from(result: C8YRestResult) -> Self {
Expand Down

0 comments on commit 0ba8025

Please sign in to comment.