From 0a3980209b347864032d83db4ad509cd621bc6a7 Mon Sep 17 00:00:00 2001 From: Rina Fujino Date: Wed, 16 Oct 2024 19:40:40 +0200 Subject: [PATCH 1/3] refactor: flexible Authorization header type of HTTP request - Change JWT token retriever actor to return "Bearer {token}", previously it was just "{token}" - Deprecate Auth enum in downloader so that downloader can accept any authorization header value easily Signed-off-by: Rina Fujino --- crates/common/download/src/download.rs | 27 ++----- crates/common/download/src/lib.rs | 1 - crates/extensions/c8y_auth_proxy/src/actor.rs | 16 ++-- .../extensions/c8y_auth_proxy/src/server.rs | 25 +++--- .../extensions/c8y_auth_proxy/src/tokens.rs | 6 +- .../c8y_firmware_manager/src/actor.rs | 6 +- .../c8y_firmware_manager/src/lib.rs | 10 +-- .../c8y_firmware_manager/src/tests.rs | 18 ++--- .../c8y_firmware_manager/src/worker.rs | 15 ++-- crates/extensions/c8y_http_proxy/src/actor.rs | 37 +++++---- .../c8y_http_proxy/src/credentials.rs | 22 +++--- crates/extensions/c8y_http_proxy/src/lib.rs | 16 ++-- crates/extensions/c8y_http_proxy/src/tests.rs | 79 +++++++++---------- .../tedge_downloader_ext/src/actor.rs | 11 ++- .../tedge_downloader_ext/src/tests.rs | 3 +- .../extensions/tedge_http_ext/src/messages.rs | 7 +- 16 files changed, 137 insertions(+), 162 deletions(-) diff --git a/crates/common/download/src/download.rs b/crates/common/download/src/download.rs index cd3399458c5..d68a44863db 100644 --- a/crates/common/download/src/download.rs +++ b/crates/common/download/src/download.rs @@ -50,7 +50,7 @@ fn default_backoff() -> ExponentialBackoff { pub struct DownloadInfo { pub url: String, #[serde(skip_serializing_if = "Option::is_none")] - pub auth: Option, + pub auth: Option, } impl From<&str> for DownloadInfo { @@ -69,9 +69,9 @@ impl DownloadInfo { } /// Creates new [`DownloadInfo`] from a URL with authentication. - pub fn with_auth(self, auth: Auth) -> Self { + pub fn with_auth(self, auth: &str) -> Self { Self { - auth: Some(auth), + auth: Some(auth.into()), ..self } } @@ -85,21 +85,6 @@ impl DownloadInfo { } } -/// Possible authentication schemes -#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Serialize)] -#[serde(rename_all = "camelCase")] -#[serde(deny_unknown_fields)] -pub enum Auth { - /// HTTP Bearer authentication - Bearer(String), -} - -impl Auth { - pub fn new_bearer(token: &str) -> Self { - Self::Bearer(token.into()) - } -} - /// A struct which manages file downloads. #[derive(Debug)] pub struct Downloader { @@ -384,8 +369,8 @@ impl Downloader { let operation = || async { let mut request = self.client.get(url.url()); - if let Some(Auth::Bearer(token)) = &url.auth { - request = request.bearer_auth(token) + if let Some(header_value) = &url.auth { + request = request.header("Authorization", header_value) } if range_start != 0 { @@ -926,7 +911,7 @@ mod tests { // applying token if `with_token` = true let url = { if with_token { - url.with_auth(Auth::Bearer(String::from("token"))) + url.with_auth("Bearer token") } else { url } diff --git a/crates/common/download/src/lib.rs b/crates/common/download/src/lib.rs index 829ae45736a..56c141b55c9 100644 --- a/crates/common/download/src/lib.rs +++ b/crates/common/download/src/lib.rs @@ -47,7 +47,6 @@ mod download; mod error; -pub use crate::download::Auth; pub use crate::download::DownloadInfo; pub use crate::download::Downloader; pub use crate::error::DownloadError; diff --git a/crates/extensions/c8y_auth_proxy/src/actor.rs b/crates/extensions/c8y_auth_proxy/src/actor.rs index 0b0639acf50..a0a2cb493ab 100644 --- a/crates/extensions/c8y_auth_proxy/src/actor.rs +++ b/crates/extensions/c8y_auth_proxy/src/actor.rs @@ -1,20 +1,18 @@ -use std::convert::Infallible; -use std::net::IpAddr; - use axum::async_trait; -use c8y_http_proxy::credentials::C8YJwtRetriever; -use c8y_http_proxy::credentials::JwtRetriever; +use c8y_http_proxy::credentials::AuthResult; +use c8y_http_proxy::credentials::AuthRetriever; use camino::Utf8PathBuf; use futures::channel::mpsc; use futures::StreamExt; +use std::convert::Infallible; +use std::net::IpAddr; use tedge_actors::Actor; use tedge_actors::Builder; use tedge_actors::DynSender; use tedge_actors::RuntimeError; use tedge_actors::RuntimeRequest; use tedge_actors::RuntimeRequestSink; -use tedge_actors::Sequential; -use tedge_actors::ServerActorBuilder; +use tedge_actors::Service; use tedge_config::TEdgeConfig; use tedge_config_macros::OptionalConfig; use tracing::info; @@ -40,14 +38,14 @@ impl C8yAuthProxyBuilder { pub fn try_from_config( config: &TEdgeConfig, c8y_profile: Option<&str>, - jwt: &mut ServerActorBuilder, + auth: &mut impl Service<(), AuthResult>, ) -> anyhow::Result { let reqwest_client = config.cloud_root_certs().client(); let c8y = config.c8y.try_get(c8y_profile)?; let app_data = AppData { is_https: true, host: c8y.http.or_config_not_set()?.to_string(), - token_manager: TokenManager::new(JwtRetriever::new(jwt)).shared(), + token_manager: TokenManager::new(AuthRetriever::new(auth)).shared(), client: reqwest_client, }; let bind = &c8y.proxy.bind; diff --git a/crates/extensions/c8y_auth_proxy/src/server.rs b/crates/extensions/c8y_auth_proxy/src/server.rs index c3c54468efb..03f8c57f0b7 100644 --- a/crates/extensions/c8y_auth_proxy/src/server.rs +++ b/crates/extensions/c8y_auth_proxy/src/server.rs @@ -226,7 +226,7 @@ fn tungstenite_to_axum(message: tungstenite::Message) -> axum::extract::ws::Mess } async fn connect_to_websocket( - token: &str, + auth_value: &str, headers: &HeaderMap, uri: &str, host: &TargetHost, @@ -235,7 +235,7 @@ async fn connect_to_websocket( for (name, value) in headers { req = req.header(name.as_str(), value); } - req = req.header("Authorization", format!("Bearer {token}")); + req = req.header("Authorization", auth_value); let req = req .uri(uri) .header(HOST, host.without_scheme.as_ref()) @@ -405,9 +405,9 @@ async fn respond_to( }; let auth: fn(reqwest::RequestBuilder, &str) -> reqwest::RequestBuilder = if headers.contains_key("Authorization") { - |req, _token| req + |req, _auth_value| req } else { - |req, token| req.bearer_auth(token) + |req, auth_value| req.header("Authorization", auth_value) }; headers.remove(HOST); @@ -436,7 +436,7 @@ async fn respond_to( let destination = format!("{}/tenant/currentTenant", host.http); let response = client .head(&destination) - .bearer_auth(&token) + .header("Authorization", token.to_string()) .send() .await .with_context(|| format!("making HEAD request to {destination}"))?; @@ -499,9 +499,9 @@ mod tests { use axum::http::Request; use axum::middleware::Next; use axum::TypedHeader; - use c8y_http_proxy::credentials::JwtRequest; - use c8y_http_proxy::credentials::JwtResult; - use c8y_http_proxy::credentials::JwtRetriever; + use c8y_http_proxy::credentials::AuthRequest; + use c8y_http_proxy::credentials::AuthResult; + use c8y_http_proxy::credentials::AuthRetriever; use camino::Utf8PathBuf; use futures::channel::mpsc; use futures::future::ready; @@ -1113,7 +1113,7 @@ mod tests { let state = AppData { is_https: false, host: target_host.into(), - token_manager: TokenManager::new(JwtRetriever::new(&mut retriever)).shared(), + token_manager: TokenManager::new(AuthRetriever::new(&mut retriever)).shared(), client: reqwest::Client::new(), }; let trust_store = ca_dir @@ -1147,15 +1147,16 @@ mod tests { #[async_trait] impl Server for IterJwtRetriever { - type Request = JwtRequest; - type Response = JwtResult; + type Request = AuthRequest; + type Response = AuthResult; fn name(&self) -> &str { "IterJwtRetriever" } async fn handle(&mut self, _request: Self::Request) -> Self::Response { - Ok(self.tokens.next().unwrap().into()) + let auth_value = format!("Bearer {}", self.tokens.next().unwrap()); + Ok(auth_value) } } diff --git a/crates/extensions/c8y_auth_proxy/src/tokens.rs b/crates/extensions/c8y_auth_proxy/src/tokens.rs index ab9e2d02d27..5252f3b77b8 100644 --- a/crates/extensions/c8y_auth_proxy/src/tokens.rs +++ b/crates/extensions/c8y_auth_proxy/src/tokens.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use c8y_http_proxy::credentials::JwtRetriever; +use c8y_http_proxy::credentials::AuthRetriever; use tokio::sync::Mutex; #[derive(Clone)] @@ -16,12 +16,12 @@ impl SharedTokenManager { } pub struct TokenManager { - recv: JwtRetriever, + recv: AuthRetriever, cached: Option>, } impl TokenManager { - pub fn new(recv: JwtRetriever) -> Self { + pub fn new(recv: AuthRetriever) -> Self { Self { recv, cached: None } } diff --git a/crates/extensions/c8y_firmware_manager/src/actor.rs b/crates/extensions/c8y_firmware_manager/src/actor.rs index eee23688121..9ac1b6272fd 100644 --- a/crates/extensions/c8y_firmware_manager/src/actor.rs +++ b/crates/extensions/c8y_firmware_manager/src/actor.rs @@ -12,7 +12,7 @@ use c8y_api::smartrest::message::collect_smartrest_messages; use c8y_api::smartrest::message::get_smartrest_template_id; use c8y_api::smartrest::smartrest_deserializer::SmartRestFirmwareRequest; use c8y_api::smartrest::smartrest_deserializer::SmartRestRequestGeneric; -use c8y_http_proxy::credentials::JwtRetriever; +use c8y_http_proxy::credentials::AuthRetriever; use log::error; use log::info; use log::warn; @@ -84,7 +84,7 @@ impl FirmwareManagerActor { config: FirmwareManagerConfig, input_receiver: LoggingReceiver, mqtt_publisher: DynSender, - jwt_retriever: JwtRetriever, + auth_retriever: AuthRetriever, download_sender: ClientMessageBox, progress_sender: DynSender, ) -> Self { @@ -93,7 +93,7 @@ impl FirmwareManagerActor { worker: FirmwareManagerWorker::new( config, mqtt_publisher, - jwt_retriever, + auth_retriever, download_sender, progress_sender, ), diff --git a/crates/extensions/c8y_firmware_manager/src/lib.rs b/crates/extensions/c8y_firmware_manager/src/lib.rs index 04b3f8838ac..c1e5bc2bd0f 100644 --- a/crates/extensions/c8y_firmware_manager/src/lib.rs +++ b/crates/extensions/c8y_firmware_manager/src/lib.rs @@ -10,8 +10,8 @@ mod tests; use actor::FirmwareInput; use actor::FirmwareManagerActor; -use c8y_http_proxy::credentials::JwtResult; -use c8y_http_proxy::credentials::JwtRetriever; +use c8y_http_proxy::credentials::AuthResult; +use c8y_http_proxy::credentials::AuthRetriever; pub use config::*; use tedge_actors::futures::channel::mpsc; use tedge_actors::Builder; @@ -39,7 +39,7 @@ pub struct FirmwareManagerBuilder { config: FirmwareManagerConfig, input_receiver: LoggingReceiver, mqtt_publisher: DynSender, - jwt_retriever: JwtRetriever, + jwt_retriever: AuthRetriever, download_sender: ClientMessageBox, progress_sender: DynSender, signal_sender: mpsc::Sender, @@ -49,7 +49,7 @@ impl FirmwareManagerBuilder { pub fn try_new( config: FirmwareManagerConfig, mqtt_actor: &mut (impl MessageSource + MessageSink), - jwt_actor: &mut impl Service<(), JwtResult>, + jwt_actor: &mut impl Service<(), AuthResult>, downloader_actor: &mut impl Service, ) -> Result { Self::init(&config.data_dir)?; @@ -65,7 +65,7 @@ impl FirmwareManagerBuilder { mqtt_actor.connect_sink(Self::subscriptions(&config.c8y_prefix), &mqtt_sender); let mqtt_publisher = mqtt_actor.get_sender(); - let jwt_retriever = JwtRetriever::new(jwt_actor); + let jwt_retriever = AuthRetriever::new(jwt_actor); let download_sender = ClientMessageBox::new(downloader_actor); let progress_sender = input_sender.into(); Ok(Self { diff --git a/crates/extensions/c8y_firmware_manager/src/tests.rs b/crates/extensions/c8y_firmware_manager/src/tests.rs index 7e8d87eb153..2ba3d87190b 100644 --- a/crates/extensions/c8y_firmware_manager/src/tests.rs +++ b/crates/extensions/c8y_firmware_manager/src/tests.rs @@ -1,7 +1,7 @@ use super::*; use assert_json_diff::assert_json_include; use c8y_api::smartrest::topic::C8yTopic; -use c8y_http_proxy::credentials::JwtRequest; +use c8y_http_proxy::credentials::AuthRequest; use serde_json::json; use sha256::digest; use std::io; @@ -17,7 +17,6 @@ use tedge_actors::RuntimeError; use tedge_actors::Sender; use tedge_actors::SimpleMessageBox; use tedge_actors::SimpleMessageBoxBuilder; -use tedge_api::Auth; use tedge_api::DownloadError; use tedge_downloader_ext::DownloadResponse; use tedge_mqtt_ext::Topic; @@ -274,7 +273,7 @@ async fn create_download_request_with_c8y_auth() -> Result<(), DynError> { spawn_firmware_manager(&mut ttd, DEFAULT_REQUEST_TIMEOUT_SEC, false).await?; let c8y_download_url = format!("http://{C8Y_HOST}/file/end/point"); - let token = "token"; + let auth_header_value = "Bearer token"; // Publish firmware update operation to child device. let c8y_firmware_update_msg = MqttMessage::new( @@ -288,7 +287,9 @@ async fn create_download_request_with_c8y_auth() -> Result<(), DynError> { assert!(jwt_request.is_some()); // Return JWT token. - jwt_message_box.send(Ok(token.to_string())).await?; + jwt_message_box + .send(Ok(auth_header_value.to_string())) + .await?; // Assert firmware download request. let (_id, download_request) = downloader_message_box.recv().await.unwrap(); @@ -297,10 +298,7 @@ async fn create_download_request_with_c8y_auth() -> Result<(), DynError> { download_request.file_path, ttd.path().join("cache").join(digest(c8y_download_url)) ); - assert_eq!( - download_request.auth, - Some(Auth::Bearer(String::from(token))) - ); + assert_eq!(download_request.auth, Some(auth_header_value.into())); Ok(()) } @@ -622,7 +620,7 @@ async fn spawn_firmware_manager( ( JoinHandle>, TimedMessageBox>, - TimedMessageBox>, + TimedMessageBox>, TimedMessageBox>, ), DynError, @@ -649,7 +647,7 @@ async fn spawn_firmware_manager( let mut mqtt_builder: SimpleMessageBoxBuilder = SimpleMessageBoxBuilder::new("MQTT", 5); - let mut jwt_builder: FakeServerBoxBuilder = FakeServerBox::builder(); + let mut jwt_builder: FakeServerBoxBuilder = FakeServerBox::builder(); let mut downloader_builder: FakeServerBoxBuilder = FakeServerBox::builder(); diff --git a/crates/extensions/c8y_firmware_manager/src/worker.rs b/crates/extensions/c8y_firmware_manager/src/worker.rs index b70f2129e91..f1a8da86391 100644 --- a/crates/extensions/c8y_firmware_manager/src/worker.rs +++ b/crates/extensions/c8y_firmware_manager/src/worker.rs @@ -11,7 +11,7 @@ use c8y_api::smartrest::smartrest_serializer::set_operation_executing_with_name; use c8y_api::smartrest::smartrest_serializer::succeed_operation_with_name_no_parameters; use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations; use c8y_api::smartrest::topic::C8yTopic; -use c8y_http_proxy::credentials::JwtRetriever; +use c8y_http_proxy::credentials::AuthRetriever; use camino::Utf8PathBuf; use log::error; use log::info; @@ -26,7 +26,6 @@ use tedge_actors::ClientMessageBox; use tedge_actors::CloneSender; use tedge_actors::DynSender; use tedge_actors::Sender; -use tedge_api::Auth; use tedge_api::OperationStatus; use tedge_downloader_ext::DownloadRequest; use tedge_downloader_ext::DownloadResult; @@ -53,7 +52,7 @@ pub(crate) struct FirmwareManagerWorker { pub(crate) config: Arc, executing: bool, mqtt_publisher: DynSender, - jwt_retriever: JwtRetriever, + auth_retriever: AuthRetriever, download_sender: ClientMessageBox, progress_sender: DynSender, } @@ -64,7 +63,7 @@ impl Clone for FirmwareManagerWorker { config: self.config.clone(), executing: false, mqtt_publisher: self.mqtt_publisher.sender_clone(), - jwt_retriever: self.jwt_retriever.clone(), + auth_retriever: self.auth_retriever.clone(), download_sender: self.download_sender.clone(), progress_sender: self.progress_sender.sender_clone(), } @@ -75,7 +74,7 @@ impl FirmwareManagerWorker { pub(crate) fn new( config: FirmwareManagerConfig, mqtt_publisher: DynSender, - jwt_retriever: JwtRetriever, + auth_retriever: AuthRetriever, download_sender: ClientMessageBox, progress_sender: DynSender, ) -> Self { @@ -83,7 +82,7 @@ impl FirmwareManagerWorker { config: Arc::new(config), executing: false, mqtt_publisher, - jwt_retriever, + auth_retriever, download_sender, progress_sender, } @@ -206,9 +205,9 @@ impl FirmwareManagerWorker { .maybe_tenant_url(firmware_url) .is_some() { - if let Ok(token) = self.jwt_retriever.await_response(()).await? { + if let Ok(header_value) = self.auth_retriever.await_response(()).await? { DownloadRequest::new(firmware_url, cache_file_path.as_std_path()) - .with_auth(Auth::new_bearer(&token)) + .with_auth(&header_value) } else { return Err(FirmwareManagementError::NoJwtToken); } diff --git a/crates/extensions/c8y_http_proxy/src/actor.rs b/crates/extensions/c8y_http_proxy/src/actor.rs index 30a60ceb6bb..6ed41850f8f 100644 --- a/crates/extensions/c8y_http_proxy/src/actor.rs +++ b/crates/extensions/c8y_http_proxy/src/actor.rs @@ -1,6 +1,6 @@ -use crate::credentials::JwtRequest; -use crate::credentials::JwtResult; -use crate::credentials::JwtRetriever; +use crate::credentials::AuthRequest; +use crate::credentials::AuthResult; +use crate::credentials::AuthRetriever; use crate::messages::C8YConnectionError; use crate::messages::C8YRestError; use crate::messages::C8YRestRequest; @@ -22,7 +22,6 @@ use c8y_api::json_c8y::C8yEventResponse; use c8y_api::json_c8y::C8yManagedObject; use c8y_api::json_c8y::InternalIdResponse; use c8y_api::OffsetDateTime; -use download::Auth; use download::DownloadInfo; use download::Downloader; use http::status::StatusCode; @@ -63,14 +62,14 @@ pub struct C8YHttpProxyMessageBox { /// Connection to an HTTP actor pub(crate) http: ClientMessageBox, - /// Connection to a JWT token retriever - pub(crate) jwt: JwtRetriever, + /// Connection to an HTTP auth header value retriever + pub(crate) auth: AuthRetriever, } pub type C8YRestRequestEnvelope = RequestEnvelope; -fan_in_message_type!(C8YHttpProxyInput[C8YRestRequestEnvelope, HttpResult, JwtResult] : Debug); -fan_in_message_type!(C8YHttpProxyOutput[HttpRequest, JwtRequest] : Debug); +fan_in_message_type!(C8YHttpProxyInput[C8YRestRequestEnvelope, HttpResult, AuthResult] : Debug); +fan_in_message_type!(C8YHttpProxyOutput[HttpRequest, AuthRequest] : Debug); #[async_trait] impl Actor for C8YHttpProxyActor { @@ -88,13 +87,13 @@ impl Actor for C8YHttpProxyActor { { let result = match request { C8YRestRequest::GetJwtToken(_) => self - .get_and_set_jwt_token() + .get_and_set_auth_header_value() .await .map(|response| response.into()), C8YRestRequest::GetFreshJwtToken(_) => { self.end_point.token = None; - self.get_and_set_jwt_token() + self.get_and_set_auth_header_value() .await .map(|response| response.into()) } @@ -207,7 +206,7 @@ impl C8YHttpProxyActor { loop { attempt += 1; let request = HttpRequestBuilder::get(&url_get_id) - .bearer_auth(self.end_point.token.clone().unwrap_or_default()) + .auth(self.end_point.token.clone().unwrap_or_default()) .build()?; let endpoint = request.uri().path().to_owned(); let method = request.method().to_owned(); @@ -265,7 +264,7 @@ impl C8YHttpProxyActor { let request_builder = build_request(&self.end_point); let request = request_builder .await? - .bearer_auth(self.end_point.token.clone().unwrap_or_default()) + .auth(self.end_point.token.clone().unwrap_or_default()) .build()?; let endpoint = request.uri().path().to_owned(); let method = request.method().to_owned(); @@ -296,7 +295,7 @@ impl C8YHttpProxyActor { async fn get_fresh_token(&mut self) -> Result { self.end_point.token = None; - self.get_and_set_jwt_token().await + self.get_and_set_auth_header_value().await } async fn try_request_with_fresh_token< @@ -311,7 +310,7 @@ impl C8YHttpProxyActor { let request_builder = build_request(&self.end_point); let request = request_builder .await? - .bearer_auth(self.end_point.token.clone().unwrap_or_default()) + .auth(self.end_point.token.clone().unwrap_or_default()) .build()?; // retry the request Ok(self.peers.http.await_response(request).await?) @@ -330,7 +329,7 @@ impl C8YHttpProxyActor { let request_builder = build_request(&self.end_point); let request = request_builder .await? - .bearer_auth(self.end_point.token.clone().unwrap_or_default()) + .auth(self.end_point.token.clone().unwrap_or_default()) .build()?; Ok(self.peers.http.await_response(request).await?) } @@ -486,11 +485,11 @@ impl C8YHttpProxyActor { } } - async fn get_and_set_jwt_token(&mut self) -> Result { + async fn get_and_set_auth_header_value(&mut self) -> Result { match self.end_point.token.clone() { Some(token) => Ok(token), None => { - if let Ok(token) = self.peers.jwt.await_response(()).await? { + if let Ok(token) = self.peers.auth.await_response(()).await? { self.end_point.token = Some(token.clone()); Ok(token) } else { @@ -508,8 +507,8 @@ impl C8YHttpProxyActor { .maybe_tenant_url(download_info.url()) .is_some() { - let token = self.get_and_set_jwt_token().await?; - download_info.auth = Some(Auth::new_bearer(token.as_str())); + let header_value = self.get_and_set_auth_header_value().await?; + download_info.auth = Some(header_value); } info!(target: self.name(), "Downloading from: {:?}", download_info.url()); diff --git a/crates/extensions/c8y_http_proxy/src/credentials.rs b/crates/extensions/c8y_http_proxy/src/credentials.rs index f12bbb5abde..a838aaff0d3 100644 --- a/crates/extensions/c8y_http_proxy/src/credentials.rs +++ b/crates/extensions/c8y_http_proxy/src/credentials.rs @@ -8,11 +8,11 @@ use tedge_actors::ServerActorBuilder; use tedge_actors::ServerConfig; use tedge_config::TopicPrefix; -pub type JwtRequest = (); -pub type JwtResult = Result; +pub type AuthRequest = (); +pub type AuthResult = Result; -/// Retrieves JWT tokens authenticating the device -pub type JwtRetriever = ClientMessageBox; +/// Retrieves Authorization header value authenticating the device +pub type AuthRetriever = ClientMessageBox; /// A JwtRetriever that gets JWT tokens from C8Y over MQTT pub struct C8YJwtRetriever { @@ -32,8 +32,8 @@ impl C8YJwtRetriever { #[async_trait] impl Server for C8YJwtRetriever { - type Request = JwtRequest; - type Response = JwtResult; + type Request = AuthRequest; + type Response = AuthResult; fn name(&self) -> &str { "C8YJwtRetriever" @@ -41,7 +41,8 @@ impl Server for C8YJwtRetriever { async fn handle(&mut self, _request: Self::Request) -> Self::Response { let response = self.mqtt_retriever.get_jwt_token().await?; - Ok(response.token()) + let auth_value = format!("Bearer {}", response.token()); + Ok(auth_value) } } @@ -54,14 +55,15 @@ pub(crate) struct ConstJwtRetriever { #[async_trait] #[cfg(test)] impl Server for ConstJwtRetriever { - type Request = JwtRequest; - type Response = JwtResult; + type Request = AuthRequest; + type Response = AuthResult; fn name(&self) -> &str { "ConstJwtRetriever" } async fn handle(&mut self, _request: Self::Request) -> Self::Response { - Ok(self.token.clone()) + let auth_value = format!("Bearer {}", self.token); + Ok(auth_value) } } diff --git a/crates/extensions/c8y_http_proxy/src/lib.rs b/crates/extensions/c8y_http_proxy/src/lib.rs index 008d948e38f..fa9b5fc93b9 100644 --- a/crates/extensions/c8y_http_proxy/src/lib.rs +++ b/crates/extensions/c8y_http_proxy/src/lib.rs @@ -1,7 +1,7 @@ use crate::actor::C8YHttpProxyActor; use crate::actor::C8YHttpProxyMessageBox; -use crate::credentials::JwtResult; -use crate::credentials::JwtRetriever; +use crate::credentials::AuthResult; +use crate::credentials::AuthRetriever; use crate::messages::C8YRestRequest; use crate::messages::C8YRestResult; use certificate::CloudRootCerts; @@ -107,24 +107,24 @@ pub struct C8YHttpProxyBuilder { /// Connection to an HTTP actor http: ClientMessageBox, - /// Connection to a JWT token retriever - jwt: JwtRetriever, + /// Connection to an HTTP auth header value retriever + auth: AuthRetriever, } impl C8YHttpProxyBuilder { pub fn new( config: C8YHttpConfig, http: &mut impl Service, - jwt: &mut impl Service<(), JwtResult>, + auth: &mut impl Service<(), AuthResult>, ) -> Self { let clients = ServerMessageBoxBuilder::new("C8Y-REST", 10); let http = ClientMessageBox::new(http); - let jwt = JwtRetriever::new(jwt); + let auth = AuthRetriever::new(auth); C8YHttpProxyBuilder { config, clients, http, - jwt, + auth, } } } @@ -140,7 +140,7 @@ impl Builder for C8YHttpProxyBuilder { let message_box = C8YHttpProxyMessageBox { clients: self.clients.build(), http: self.http, - jwt: self.jwt, + auth: self.auth, }; C8YHttpProxyActor::new(self.config, message_box) diff --git a/crates/extensions/c8y_http_proxy/src/tests.rs b/crates/extensions/c8y_http_proxy/src/tests.rs index a1a6fa5f56c..2f61aa26cac 100644 --- a/crates/extensions/c8y_http_proxy/src/tests.rs +++ b/crates/extensions/c8y_http_proxy/src/tests.rs @@ -1,6 +1,6 @@ +use crate::credentials::AuthRequest; +use crate::credentials::AuthResult; use crate::credentials::ConstJwtRetriever; -use crate::credentials::JwtRequest; -use crate::credentials::JwtResult; use crate::handle::C8YHttpProxy; use crate::messages::CreateEvent; use crate::C8YHttpConfig; @@ -32,23 +32,25 @@ use tedge_http_ext::HttpResult; use tedge_test_utils::fs::TempTedgeDir; use time::macros::datetime; +const JWT_TOKEN: &str = "JWT token"; +const BEARER_AUTH: &str = "Bearer JWT token"; + #[tokio::test] async fn c8y_http_proxy_requests_the_device_internal_id_on_start() { let c8y_host = "c8y.tenant.io"; let device_id = "device-001"; - let token = "some JWT token"; let external_id = "external-device-001"; let tmp_dir = "/tmp"; let (mut proxy, mut c8y) = - spawn_c8y_http_proxy(c8y_host.into(), device_id.into(), tmp_dir.into(), token).await; + spawn_c8y_http_proxy(c8y_host.into(), device_id.into(), tmp_dir.into(), JWT_TOKEN).await; // Even before any request is sent to the c8y_proxy // the proxy requests over HTTP the internal device id. let init_request = HttpRequestBuilder::get(format!( "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" )) - .bearer_auth(token) + .auth(BEARER_AUTH) .build() .unwrap(); assert_recv(&mut c8y, Some(init_request)).await; @@ -76,7 +78,7 @@ async fn c8y_http_proxy_requests_the_device_internal_id_on_start() { &mut c8y, Some( HttpRequestBuilder::post(format!("https://{c8y_host}/event/events/")) - .bearer_auth(token) + .auth(BEARER_AUTH) .header("content-type", "application/json") .header("accept", "application/json") .build() @@ -90,19 +92,18 @@ async fn c8y_http_proxy_requests_the_device_internal_id_on_start() { async fn retry_internal_id_on_expired_jwt() { let c8y_host = "c8y.tenant.io"; let device_id = "device-001"; - let token = "JWT token"; let external_id = "external-device-001"; let tmp_dir = "/tmp"; let (mut proxy, mut c8y) = - spawn_c8y_http_proxy(c8y_host.into(), device_id.into(), tmp_dir.into(), token).await; + spawn_c8y_http_proxy(c8y_host.into(), device_id.into(), tmp_dir.into(), JWT_TOKEN).await; // Even before any request is sent to the c8y_proxy // the proxy requests over HTTP the internal device id. let init_request = HttpRequestBuilder::get(format!( "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" )) - .bearer_auth(token) + .auth(BEARER_AUTH) .build() .unwrap(); assert_recv(&mut c8y, Some(init_request)).await; @@ -116,7 +117,7 @@ async fn retry_internal_id_on_expired_jwt() { HttpRequestBuilder::get(format!( "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" )) - .bearer_auth(token) + .auth(BEARER_AUTH) .build() .unwrap(), ), @@ -145,7 +146,7 @@ async fn retry_internal_id_on_expired_jwt() { &mut c8y, Some( HttpRequestBuilder::post(format!("https://{c8y_host}/event/events/")) - .bearer_auth(token) + .auth(BEARER_AUTH) .header("content-type", "application/json") .header("accept", "application/json") .build() @@ -159,7 +160,6 @@ async fn retry_internal_id_on_expired_jwt() { async fn retry_get_internal_id_when_not_found() { let c8y_host = "c8y.tenant.io"; let main_device_id = "device-001"; - let token = "JWT token"; let tmp_dir = "/tmp"; let child_device_id = "child-101"; @@ -167,7 +167,7 @@ async fn retry_get_internal_id_when_not_found() { c8y_host.into(), main_device_id.into(), tmp_dir.into(), - token, + JWT_TOKEN, ) .await; @@ -177,7 +177,7 @@ async fn retry_get_internal_id_when_not_found() { let get_internal_id_url = format!("https://{c8y_host}/identity/externalIds/c8y_Serial/{main_device_id}"); let init_request = HttpRequestBuilder::get(get_internal_id_url) - .bearer_auth(token) + .auth(BEARER_AUTH) .build() .unwrap(); assert_recv(&mut c8y, Some(init_request)).await; @@ -197,7 +197,7 @@ async fn retry_get_internal_id_when_not_found() { &mut c8y, Some( HttpRequestBuilder::get(&get_internal_id_url) - .bearer_auth(token) + .auth(BEARER_AUTH) .build() .unwrap(), ), @@ -215,7 +215,7 @@ async fn retry_get_internal_id_when_not_found() { &mut c8y, Some( HttpRequestBuilder::get(&get_internal_id_url) - .bearer_auth(token) + .auth(BEARER_AUTH) .build() .unwrap(), ), @@ -236,7 +236,7 @@ async fn retry_get_internal_id_when_not_found() { HttpRequestBuilder::put(format!("https://{c8y_host}/inventory/managedObjects/200")) .header("content-type", "application/json") .header("accept", "application/json") - .bearer_auth(token) + .auth(BEARER_AUTH) .json(&c8y_software_list) .build() .unwrap(), @@ -260,7 +260,6 @@ async fn retry_get_internal_id_when_not_found() { async fn get_internal_id_retry_fails_after_exceeding_attempts_threshold() { let c8y_host = "c8y.tenant.io"; let main_device_id = "device-001"; - let token = "JWT token"; let tmp_dir = "/tmp"; let child_device_id = "child-101"; @@ -268,7 +267,7 @@ async fn get_internal_id_retry_fails_after_exceeding_attempts_threshold() { c8y_host.into(), main_device_id.into(), tmp_dir.into(), - token, + JWT_TOKEN, ) .await; @@ -278,7 +277,7 @@ async fn get_internal_id_retry_fails_after_exceeding_attempts_threshold() { let get_internal_id_url = format!("https://{c8y_host}/identity/externalIds/c8y_Serial/{main_device_id}"); let init_request = HttpRequestBuilder::get(get_internal_id_url) - .bearer_auth(token) + .auth(BEARER_AUTH) .build() .unwrap(); assert_recv(&mut c8y, Some(init_request)).await; @@ -298,7 +297,7 @@ async fn get_internal_id_retry_fails_after_exceeding_attempts_threshold() { &mut c8y, Some( HttpRequestBuilder::get(&get_internal_id_url) - .bearer_auth(token) + .auth(BEARER_AUTH) .build() .unwrap(), ), @@ -352,7 +351,7 @@ async fn retry_internal_id_on_expired_jwt_with_mock() { .create(); let target_url = server.url(); - let mut jwt = ServerMessageBoxBuilder::new("JWT Actor", 16); + let mut auth = ServerMessageBoxBuilder::new("Auth Actor", 16); let ttd = TempTedgeDir::new(); let config_loc = TEdgeConfigLocation::from_custom_root(ttd.path()); @@ -368,8 +367,8 @@ async fn retry_internal_id_on_expired_jwt_with_mock() { cloud_root_certs: CloudRootCerts::from([]), retry_interval: Duration::from_millis(100), }; - let c8y_proxy_actor = C8YHttpProxyBuilder::new(config, &mut http_actor, &mut jwt); - let jwt_actor = ServerActor::new(DynamicJwtRetriever { count: 0 }, jwt.build()); + let c8y_proxy_actor = C8YHttpProxyBuilder::new(config, &mut http_actor, &mut auth); + let jwt_actor = ServerActor::new(DynamicJwtRetriever { count: 0 }, auth.build()); tokio::spawn(async move { http_actor.run().await }); tokio::spawn(async move { jwt_actor.run().await }); @@ -448,7 +447,7 @@ async fn retry_create_event_on_expired_jwt_with_mock() { proxy .end_point .set_internal_id(external_id.into(), internal_id.into()); - proxy.end_point.token = Some("Cached JWT Token".into()); + proxy.end_point.token = Some("Bearer Cached JWT Token".into()); let result = proxy.create_event(event).await; assert_eq!(event_id, result.unwrap()); @@ -458,19 +457,18 @@ async fn retry_create_event_on_expired_jwt_with_mock() { async fn retry_software_list_once_with_fresh_internal_id() { let c8y_host = "c8y.tenant.io"; let device_id = "device-001"; - let token = "JWT token"; let external_id = "external-device-001"; let tmp_dir = "/tmp"; let (mut proxy, mut c8y) = - spawn_c8y_http_proxy(c8y_host.into(), device_id.into(), tmp_dir.into(), token).await; + spawn_c8y_http_proxy(c8y_host.into(), device_id.into(), tmp_dir.into(), JWT_TOKEN).await; // Even before any request is sent to the c8y_proxy // the proxy requests over HTTP the internal device id. let _init_request = HttpRequestBuilder::get(format!( "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" )) - .bearer_auth(token) + .auth(BEARER_AUTH) .build() .unwrap(); // skip the message @@ -505,7 +503,7 @@ async fn retry_software_list_once_with_fresh_internal_id() { )) .header("content-type", "application/json") .header("accept", "application/json") - .bearer_auth(token) + .auth(BEARER_AUTH) .json(&c8y_software_list) .build() .unwrap(), @@ -528,7 +526,7 @@ async fn retry_software_list_once_with_fresh_internal_id() { HttpRequestBuilder::get(format!( "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" )) - .bearer_auth(token) + .auth(BEARER_AUTH) .build() .unwrap(), ), @@ -551,7 +549,7 @@ async fn retry_software_list_once_with_fresh_internal_id() { HttpRequestBuilder::put(format!( "https://{c8y_host}/inventory/managedObjects/{device_id}" )) - .bearer_auth(token) + .auth(BEARER_AUTH) .header("content-type", "application/json") .header("accept", "application/json") .json(&c8y_software_list) @@ -566,19 +564,18 @@ async fn retry_software_list_once_with_fresh_internal_id() { async fn auto_retry_upload_log_binary_when_internal_id_expires() { let c8y_host = "c8y.tenant.io"; let device_id = "device-001"; - let token = "JWT token"; let external_id = "external-device-001"; let tmp_dir = "/tmp"; let (mut proxy, mut c8y) = - spawn_c8y_http_proxy(c8y_host.into(), device_id.into(), tmp_dir.into(), token).await; + spawn_c8y_http_proxy(c8y_host.into(), device_id.into(), tmp_dir.into(), JWT_TOKEN).await; // Even before any request is sent to the c8y_proxy // the proxy requests over HTTP the internal device id. let init_request = HttpRequestBuilder::get(format!( "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" )) - .bearer_auth(token) + .auth(BEARER_AUTH) .build() .unwrap(); assert_recv(&mut c8y, Some(init_request)).await; @@ -603,7 +600,7 @@ async fn auto_retry_upload_log_binary_when_internal_id_expires() { &mut c8y, Some( HttpRequestBuilder::post(format!("https://{c8y_host}/event/events/")) - .bearer_auth(token) + .auth(BEARER_AUTH) .header("content-type", "application/json") .header("accept", "application/json") .build() @@ -627,7 +624,7 @@ async fn auto_retry_upload_log_binary_when_internal_id_expires() { HttpRequestBuilder::get(format!( "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" )) - .bearer_auth(token) + .auth(BEARER_AUTH) .build() .unwrap(), ), @@ -646,7 +643,7 @@ async fn auto_retry_upload_log_binary_when_internal_id_expires() { &mut c8y, Some( HttpRequestBuilder::post(format!("https://{c8y_host}/event/events/")) - .bearer_auth(token) + .auth(BEARER_AUTH) .header("content-type", "application/json") .header("accept", "application/json") .build() @@ -707,8 +704,8 @@ pub(crate) struct DynamicJwtRetriever { #[async_trait] impl Server for DynamicJwtRetriever { - type Request = JwtRequest; - type Response = JwtResult; + type Request = AuthRequest; + type Response = AuthResult; fn name(&self) -> &str { "DynamicJwtRetriever" @@ -717,9 +714,9 @@ impl Server for DynamicJwtRetriever { async fn handle(&mut self, _request: Self::Request) -> Self::Response { if self.count == 0 { self.count += 1; - Ok("Cached JWT token".into()) + Ok("Bearer Cached JWT token".into()) } else { - Ok("Fresh JWT token".into()) + Ok("Bearer Fresh JWT token".into()) } } } diff --git a/crates/extensions/tedge_downloader_ext/src/actor.rs b/crates/extensions/tedge_downloader_ext/src/actor.rs index c4d839ec03a..c7c9aa8eb27 100644 --- a/crates/extensions/tedge_downloader_ext/src/actor.rs +++ b/crates/extensions/tedge_downloader_ext/src/actor.rs @@ -1,6 +1,5 @@ use async_trait::async_trait; use certificate::CloudRootCerts; -use download::Auth; use download::DownloadError; use download::DownloadInfo; use download::Downloader; @@ -20,7 +19,7 @@ use tedge_utils::file::PermissionEntry; pub struct DownloadRequest { pub url: String, pub file_path: PathBuf, - pub auth: Option, + pub auth: Option, pub permission: Option, } @@ -34,9 +33,9 @@ impl DownloadRequest { } } - pub fn with_auth(self, auth: Auth) -> Self { + pub fn with_auth(self, auth: &str) -> Self { Self { - auth: Some(auth), + auth: Some(auth.into()), ..self } } @@ -112,8 +111,8 @@ impl Server for DownloaderActor { async fn handle(&mut self, id_request: Self::Request) -> Self::Response { let (id, request) = id_request; - let download_info = if let Some(auth) = request.auth { - DownloadInfo::new(&request.url).with_auth(auth) + let download_info = if let Some(header_value) = request.auth { + DownloadInfo::new(&request.url).with_auth(&header_value) } else { DownloadInfo::new(&request.url) }; diff --git a/crates/extensions/tedge_downloader_ext/src/tests.rs b/crates/extensions/tedge_downloader_ext/src/tests.rs index 7b2fbebc640..227cff9b2d2 100644 --- a/crates/extensions/tedge_downloader_ext/src/tests.rs +++ b/crates/extensions/tedge_downloader_ext/src/tests.rs @@ -1,6 +1,5 @@ use super::*; use certificate::CloudRootCerts; -use download::Auth; use std::time::Duration; use tedge_actors::ClientMessageBox; use tedge_test_utils::fs::TempTedgeDir; @@ -53,7 +52,7 @@ async fn download_with_auth() { let target_path = ttd.path().join("downloaded_file"); let server_url = server.url(); let download_request = - DownloadRequest::new(&server_url, &target_path).with_auth(Auth::Bearer("token".into())); + DownloadRequest::new(&server_url, &target_path).with_auth("Bearer token"); let mut requester = spawn_downloader_actor().await; diff --git a/crates/extensions/tedge_http_ext/src/messages.rs b/crates/extensions/tedge_http_ext/src/messages.rs index 96c9f9209ab..8e936278867 100644 --- a/crates/extensions/tedge_http_ext/src/messages.rs +++ b/crates/extensions/tedge_http_ext/src/messages.rs @@ -123,13 +123,12 @@ impl HttpRequestBuilder { HttpRequestBuilder { body, ..self } } - /// Add bearer authentication (e.g. a JWT token) - pub fn bearer_auth(self, token: T) -> Self + /// Add an authentication header + pub fn auth(self, header_value: T) -> Self where T: std::fmt::Display, { - let header_value = format!("Bearer {}", token); - self.header(http::header::AUTHORIZATION, header_value) + self.header(http::header::AUTHORIZATION, header_value.to_string()) } } From a34f6ad78925ca1a1ccbbbc11e6284f890c95f6b Mon Sep 17 00:00:00 2001 From: Rina Fujino Date: Sun, 20 Oct 2024 15:29:01 +0200 Subject: [PATCH 2/3] refactor: remove unused C8YRestRequest items and functions Signed-off-by: Rina Fujino --- crates/extensions/c8y_http_proxy/src/actor.rs | 108 ------------------ .../extensions/c8y_http_proxy/src/handle.rs | 59 ---------- crates/extensions/c8y_http_proxy/src/lib.rs | 8 -- .../extensions/c8y_http_proxy/src/messages.rs | 2 +- crates/extensions/c8y_http_proxy/src/tests.rs | 7 -- crates/extensions/c8y_mapper_ext/src/tests.rs | 16 --- 6 files changed, 1 insertion(+), 199 deletions(-) diff --git a/crates/extensions/c8y_http_proxy/src/actor.rs b/crates/extensions/c8y_http_proxy/src/actor.rs index 6ed41850f8f..fc7ec58b48b 100644 --- a/crates/extensions/c8y_http_proxy/src/actor.rs +++ b/crates/extensions/c8y_http_proxy/src/actor.rs @@ -6,13 +6,10 @@ use crate::messages::C8YRestError; use crate::messages::C8YRestRequest; use crate::messages::C8YRestResult; use crate::messages::CreateEvent; -use crate::messages::DownloadFile; use crate::messages::EventId; use crate::messages::SoftwareListResponse; use crate::messages::Unit; -use crate::messages::UploadFile; use crate::messages::UploadLogBinary; -use crate::messages::Url; use crate::C8YHttpConfig; use anyhow::Context; use async_trait::async_trait; @@ -22,10 +19,7 @@ use c8y_api::json_c8y::C8yEventResponse; use c8y_api::json_c8y::C8yManagedObject; use c8y_api::json_c8y::InternalIdResponse; use c8y_api::OffsetDateTime; -use download::DownloadInfo; -use download::Downloader; use http::status::StatusCode; -use log::debug; use log::error; use log::info; use std::collections::HashMap; @@ -86,18 +80,6 @@ impl Actor for C8YHttpProxyActor { }) = self.peers.clients.recv().await { let result = match request { - C8YRestRequest::GetJwtToken(_) => self - .get_and_set_auth_header_value() - .await - .map(|response| response.into()), - - C8YRestRequest::GetFreshJwtToken(_) => { - self.end_point.token = None; - self.get_and_set_auth_header_value() - .await - .map(|response| response.into()) - } - C8YRestRequest::CreateEvent(request) => self .create_event(request) .await @@ -112,15 +94,6 @@ impl Actor for C8YHttpProxyActor { .upload_log_binary(request) .await .map(|response| response.into()), - - C8YRestRequest::UploadFile(request) => { - self.upload_file(request).await.map(|url| url.into()) - } - - C8YRestRequest::DownloadFile(request) => self - .download_file(request) - .await - .map(|response| response.into()), }; reply_to.send(result).await?; } @@ -427,64 +400,6 @@ impl C8YHttpProxyActor { } } - async fn upload_file(&mut self, request: UploadFile) -> Result { - let device_id = request.device_id; - - let create_event = |internal_id: String| -> C8yCreateEvent { - C8yCreateEvent { - source: Some(C8yManagedObject { id: internal_id }), - event_type: request.file_type.clone(), - time: OffsetDateTime::now_utc(), - text: request.file_type.clone(), - extras: HashMap::new(), - } - }; - - let event_response_id = self - .send_event_internal(device_id.clone(), create_event) - .await?; - debug!(target: self.name(), "File event created with id: {:?}", event_response_id); - - let build_request = |end_point: &C8yEndPoint| { - let binary_upload_event_url = - end_point.get_url_for_event_binary_upload(&event_response_id); - - async { - // TODO: Upload the file as a multi-part stream - let file_content = - tokio::fs::read(&request.file_path).await.with_context(|| { - format!( - "Reading file {} for upload failed", - request.file_path.display() - ) - })?; - - let (content_type, body) = match String::from_utf8(file_content) { - Ok(text) => ("text/plain", hyper::Body::from(text)), - Err(not_text) => ("application/octet-stream", not_text.into_bytes().into()), - }; - - Ok::<_, C8YRestError>( - HttpRequestBuilder::post(binary_upload_event_url) - .header("Accept", "application/json") - .header("Content-Type", content_type) - .body(body), - ) - } - }; - info!(target: self.name(), "Uploading file to URL: {}", self.end_point - .get_url_for_event_binary_upload(&event_response_id)); - let http_result = self.execute(device_id.clone(), build_request).await??; - - if !http_result.status().is_success() { - Err(C8YRestError::CustomError("Upload failed".into())) - } else { - Ok(Url(self - .end_point - .get_url_for_event_binary_upload(&event_response_id))) - } - } - async fn get_and_set_auth_header_value(&mut self) -> Result { match self.end_point.token.clone() { Some(token) => Ok(token), @@ -499,29 +414,6 @@ impl C8YHttpProxyActor { } } - async fn download_file(&mut self, request: DownloadFile) -> Result { - let mut download_info: DownloadInfo = request.download_url.as_str().into(); - // If the provided url is c8y, add auth - if self - .end_point - .maybe_tenant_url(download_info.url()) - .is_some() - { - let header_value = self.get_and_set_auth_header_value().await?; - download_info.auth = Some(header_value); - } - - info!(target: self.name(), "Downloading from: {:?}", download_info.url()); - let downloader: Downloader = Downloader::new( - request.file_path, - self.config.identity.clone(), - self.config.cloud_root_certs.clone(), - ); - downloader.download(&download_info).await?; - - Ok(()) - } - async fn send_event_internal( &mut self, device_id: String, diff --git a/crates/extensions/c8y_http_proxy/src/handle.rs b/crates/extensions/c8y_http_proxy/src/handle.rs index 6fec1e4bbed..b853e75ffa4 100644 --- a/crates/extensions/c8y_http_proxy/src/handle.rs +++ b/crates/extensions/c8y_http_proxy/src/handle.rs @@ -3,19 +3,12 @@ use crate::messages::C8YRestRequest; use crate::messages::C8YRestResponse; use crate::messages::C8YRestResult; use crate::messages::CreateEvent; -use crate::messages::GetFreshJwtToken; -use crate::messages::GetJwtToken; use crate::messages::SoftwareListResponse; -use crate::messages::UploadFile; use crate::messages::UploadLogBinary; use c8y_api::json_c8y::C8yUpdateSoftwareListResponse; -use std::path::Path; -use std::path::PathBuf; use tedge_actors::ClientMessageBox; use tedge_actors::Service; -use super::messages::DownloadFile; - /// Handle to the C8YHttpProxy #[derive(Clone)] pub struct C8YHttpProxy { @@ -28,24 +21,6 @@ impl C8YHttpProxy { C8YHttpProxy { c8y } } - pub async fn get_jwt_token(&mut self) -> Result { - let request: C8YRestRequest = GetJwtToken.into(); - - match self.c8y.await_response(request).await? { - Ok(C8YRestResponse::EventId(id)) => Ok(id), - unexpected => Err(unexpected.into()), - } - } - - pub async fn get_fresh_jwt_token(&mut self) -> Result { - let request: C8YRestRequest = GetFreshJwtToken.into(); - - match self.c8y.await_response(request).await? { - Ok(C8YRestResponse::EventId(id)) => Ok(id), - unexpected => Err(unexpected.into()), - } - } - pub async fn send_event(&mut self, c8y_event: CreateEvent) -> Result { let request: C8YRestRequest = c8y_event.into(); match self.c8y.await_response(request).await? { @@ -88,38 +63,4 @@ impl C8YHttpProxy { unexpected => Err(unexpected.into()), } } - - pub async fn upload_file( - &mut self, - file_path: &Path, - file_type: &str, - device_id: String, - ) -> Result { - let request: C8YRestRequest = UploadFile { - file_path: file_path.to_owned(), - file_type: file_type.to_string(), - device_id, - } - .into(); - match self.c8y.await_response(request).await? { - Ok(C8YRestResponse::Url(url)) => Ok(url.0), - unexpected => Err(unexpected.into()), - } - } - - pub async fn download_file( - &mut self, - download_url: &str, - file_path: PathBuf, - ) -> Result<(), C8YRestError> { - let request: C8YRestRequest = DownloadFile { - download_url: download_url.into(), - file_path, - } - .into(); - match self.c8y.await_response(request).await? { - Ok(C8YRestResponse::Unit(())) => Ok(()), - unexpected => Err(unexpected.into()), - } - } } diff --git a/crates/extensions/c8y_http_proxy/src/lib.rs b/crates/extensions/c8y_http_proxy/src/lib.rs index fa9b5fc93b9..8450542ccd6 100644 --- a/crates/extensions/c8y_http_proxy/src/lib.rs +++ b/crates/extensions/c8y_http_proxy/src/lib.rs @@ -4,8 +4,6 @@ use crate::credentials::AuthResult; use crate::credentials::AuthRetriever; use crate::messages::C8YRestRequest; use crate::messages::C8YRestResult; -use certificate::CloudRootCerts; -use reqwest::Identity; use std::convert::Infallible; use std::path::PathBuf; use std::time::Duration; @@ -40,8 +38,6 @@ pub struct C8YHttpConfig { pub c8y_mqtt_host: String, pub device_id: String, pub tmp_dir: PathBuf, - identity: Option, - cloud_root_certs: CloudRootCerts, retry_interval: Duration, } @@ -64,8 +60,6 @@ impl C8YHttpConfig { .to_string(); let device_id = tedge_config.device.id.try_read(tedge_config)?.to_string(); let tmp_dir = tedge_config.tmp.path.as_std_path().to_path_buf(); - let identity = tedge_config.http.client.auth.identity()?; - let cloud_root_certs = tedge_config.cloud_root_certs(); let retry_interval = Duration::from_secs(5); Ok(Self { @@ -73,8 +67,6 @@ impl C8YHttpConfig { c8y_mqtt_host, device_id, tmp_dir, - identity, - cloud_root_certs, retry_interval, }) } diff --git a/crates/extensions/c8y_http_proxy/src/messages.rs b/crates/extensions/c8y_http_proxy/src/messages.rs index 2e9fe61c30e..7568220b33e 100644 --- a/crates/extensions/c8y_http_proxy/src/messages.rs +++ b/crates/extensions/c8y_http_proxy/src/messages.rs @@ -6,7 +6,7 @@ use tedge_actors::fan_in_message_type; use tedge_actors::ChannelError; use tedge_http_ext::HttpError; -fan_in_message_type!(C8YRestRequest[GetJwtToken, GetFreshJwtToken, CreateEvent, SoftwareListResponse, UploadLogBinary, UploadFile, DownloadFile]: Debug, PartialEq, Eq); +fan_in_message_type!(C8YRestRequest[CreateEvent, SoftwareListResponse, UploadLogBinary]: Debug, PartialEq, Eq); //HIPPO Rename EventId to String as there could be many other String responses as well and this macro doesn't allow another String variant fan_in_message_type!(C8YRestResponse[EventId, Url, Unit]: Debug); diff --git a/crates/extensions/c8y_http_proxy/src/tests.rs b/crates/extensions/c8y_http_proxy/src/tests.rs index 2f61aa26cac..24f9bec9068 100644 --- a/crates/extensions/c8y_http_proxy/src/tests.rs +++ b/crates/extensions/c8y_http_proxy/src/tests.rs @@ -9,7 +9,6 @@ use async_trait::async_trait; use c8y_api::json_c8y::C8yEventResponse; use c8y_api::json_c8y::C8yUpdateSoftwareListResponse; use c8y_api::json_c8y::InternalIdResponse; -use certificate::CloudRootCerts; use http::StatusCode; use mockito::Matcher; use std::collections::HashMap; @@ -363,8 +362,6 @@ async fn retry_internal_id_on_expired_jwt_with_mock() { c8y_mqtt_host: target_url.clone(), device_id: external_id.into(), tmp_dir: tmp_dir.into(), - identity: None, - cloud_root_certs: CloudRootCerts::from([]), retry_interval: Duration::from_millis(100), }; let c8y_proxy_actor = C8YHttpProxyBuilder::new(config, &mut http_actor, &mut auth); @@ -432,8 +429,6 @@ async fn retry_create_event_on_expired_jwt_with_mock() { c8y_mqtt_host: target_url.clone(), device_id: external_id.into(), tmp_dir: tmp_dir.into(), - identity: None, - cloud_root_certs: CloudRootCerts::from([]), retry_interval: Duration::from_millis(100), }; let c8y_proxy_actor = C8YHttpProxyBuilder::new(config, &mut http_actor, &mut jwt); @@ -675,8 +670,6 @@ async fn spawn_c8y_http_proxy( c8y_mqtt_host: c8y_host, device_id, tmp_dir, - identity: None, - cloud_root_certs: CloudRootCerts::from([]), retry_interval: Duration::from_millis(10), }; let mut c8y_proxy_actor = C8YHttpProxyBuilder::new(config, &mut http, &mut jwt); diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index a99124a7384..b298be87c7f 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -22,7 +22,6 @@ use std::fs::File; use std::io::Read; use std::path::Path; use std::time::Duration; -use std::time::SystemTime; use tedge_actors::test_helpers::FakeServerBox; use tedge_actors::test_helpers::FakeServerBoxBuilder; use tedge_actors::test_helpers::MessageReceiverExt; @@ -3055,21 +3054,6 @@ pub(crate) fn spawn_dummy_c8y_http_proxy(mut http: FakeServerBox { - let _ = http - .send(Ok(c8y_http_proxy::messages::C8YRestResponse::EventId( - "dummy-token".into(), - ))) - .await; - } - Some(C8YRestRequest::GetFreshJwtToken(_)) => { - let now = SystemTime::now(); - let _ = http - .send(Ok(c8y_http_proxy::messages::C8YRestResponse::EventId( - format!("dummy-token-{:?}", now), - ))) - .await; - } Some(C8YRestRequest::SoftwareListResponse(_)) => { let _ = http .send(Ok(c8y_http_proxy::messages::C8YRestResponse::Unit(()))) From 9711f8119d8bda416d51eec3e8f3b85a1a47e9c0 Mon Sep 17 00:00:00 2001 From: Rina Fujino Date: Sun, 20 Oct 2024 19:49:13 +0200 Subject: [PATCH 3/3] refactor: Introduce HttpHeaderRetriever, abstraction of auth retriever HttpHeaderRetriever returns HeaderMap. So, C8YJwtRetriever now returns HeaderMap. The retrieved JWT token is included in the map. In the future, there will be C8YBasicAuthRetriever, which will also returns HeaderMap. Signed-off-by: Rina Fujino --- Cargo.lock | 1 + crates/common/download/src/download.rs | 26 +++--- crates/core/c8y_api/src/http_proxy.rs | 5 +- crates/extensions/c8y_auth_proxy/src/actor.rs | 8 +- .../extensions/c8y_auth_proxy/src/server.rs | 32 ++++--- .../extensions/c8y_auth_proxy/src/tokens.rs | 14 ++- .../c8y_firmware_manager/src/actor.rs | 6 +- .../c8y_firmware_manager/src/lib.rs | 14 +-- .../c8y_firmware_manager/src/tests.rs | 18 ++-- .../c8y_firmware_manager/src/worker.rs | 14 +-- crates/extensions/c8y_http_proxy/Cargo.toml | 1 + crates/extensions/c8y_http_proxy/src/actor.rs | 55 ++++++------ .../c8y_http_proxy/src/credentials.rs | 85 ++++++++++++++++--- crates/extensions/c8y_http_proxy/src/lib.rs | 18 ++-- crates/extensions/c8y_http_proxy/src/tests.rs | 63 ++++++++------ .../tedge_config_manager/src/tests.rs | 2 +- .../tedge_downloader_ext/src/actor.rs | 15 ++-- .../tedge_downloader_ext/src/tests.rs | 8 +- .../extensions/tedge_http_ext/src/messages.rs | 18 ++-- 19 files changed, 246 insertions(+), 157 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 76427451fda..f0280882af9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -744,6 +744,7 @@ version = "1.3.1" dependencies = [ "anyhow", "async-trait", + "base64 0.13.1", "c8y_api", "certificate", "download", diff --git a/crates/common/download/src/download.rs b/crates/common/download/src/download.rs index d68a44863db..fb19ab956f2 100644 --- a/crates/common/download/src/download.rs +++ b/crates/common/download/src/download.rs @@ -11,6 +11,7 @@ use log::warn; use nix::sys::statvfs; pub use partial_response::InvalidResponseError; use reqwest::header; +use reqwest::header::HeaderMap; use reqwest::Client; use reqwest::Identity; use serde::Deserialize; @@ -20,8 +21,6 @@ use std::fs::File; use std::io::Seek; use std::io::SeekFrom; use std::io::Write; -#[cfg(target_os = "linux")] -use std::os::unix::prelude::AsRawFd; use std::path::Path; use std::path::PathBuf; use std::time::Duration; @@ -31,6 +30,8 @@ use tedge_utils::file::FileError; use nix::fcntl::fallocate; #[cfg(target_os = "linux")] use nix::fcntl::FallocateFlags; +#[cfg(target_os = "linux")] +use std::os::unix::prelude::AsRawFd; fn default_backoff() -> ExponentialBackoff { // Default retry is an exponential retry with a limit of 15 minutes total. @@ -49,8 +50,8 @@ fn default_backoff() -> ExponentialBackoff { #[serde(deny_unknown_fields)] pub struct DownloadInfo { pub url: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub auth: Option, + #[serde(skip)] + pub headers: HeaderMap, } impl From<&str> for DownloadInfo { @@ -64,14 +65,14 @@ impl DownloadInfo { pub fn new(url: &str) -> Self { Self { url: url.into(), - auth: None, + headers: HeaderMap::new(), } } /// Creates new [`DownloadInfo`] from a URL with authentication. - pub fn with_auth(self, auth: &str) -> Self { + pub fn with_headers(self, header_map: HeaderMap) -> Self { Self { - auth: Some(auth.into()), + headers: header_map, ..self } } @@ -369,9 +370,7 @@ impl Downloader { let operation = || async { let mut request = self.client.get(url.url()); - if let Some(header_value) = &url.auth { - request = request.header("Authorization", header_value) - } + request = request.headers(url.headers.clone()); if range_start != 0 { request = request.header("Range", format!("bytes={range_start}-")); @@ -467,6 +466,7 @@ fn try_pre_allocate_space(file: &File, path: &Path, file_len: u64) -> Result<(), #[allow(deprecated)] mod tests { use super::*; + use hyper::header::AUTHORIZATION; use std::io::Write; use tempfile::tempdir; use tempfile::NamedTempFile; @@ -908,10 +908,12 @@ mod tests { } }; - // applying token if `with_token` = true + // applying http auth header let url = { if with_token { - url.with_auth("Bearer token") + let mut headers = HeaderMap::new(); + headers.append(AUTHORIZATION, "Bearer token".parse().unwrap()); + url.with_headers(headers) } else { url } diff --git a/crates/core/c8y_api/src/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs index 390e7aba274..cfc03dfb85a 100644 --- a/crates/core/c8y_api/src/http_proxy.rs +++ b/crates/core/c8y_api/src/http_proxy.rs @@ -5,6 +5,7 @@ use mqtt_channel::PubChannel; use mqtt_channel::StreamExt; use mqtt_channel::Topic; use mqtt_channel::TopicFilter; +use reqwest::header::HeaderMap; use reqwest::Url; use std::collections::HashMap; use std::time::Duration; @@ -27,7 +28,7 @@ pub struct C8yEndPoint { c8y_host: String, c8y_mqtt_host: String, pub device_id: String, - pub token: Option, + pub headers: HeaderMap, devices_internal_id: HashMap, } @@ -37,7 +38,7 @@ impl C8yEndPoint { c8y_host: c8y_host.into(), c8y_mqtt_host: c8y_mqtt_host.into(), device_id: device_id.into(), - token: None, + headers: HeaderMap::new(), devices_internal_id: HashMap::new(), } } diff --git a/crates/extensions/c8y_auth_proxy/src/actor.rs b/crates/extensions/c8y_auth_proxy/src/actor.rs index a0a2cb493ab..07871b64d24 100644 --- a/crates/extensions/c8y_auth_proxy/src/actor.rs +++ b/crates/extensions/c8y_auth_proxy/src/actor.rs @@ -1,6 +1,6 @@ use axum::async_trait; -use c8y_http_proxy::credentials::AuthResult; -use c8y_http_proxy::credentials::AuthRetriever; +use c8y_http_proxy::credentials::HttpHeaderResult; +use c8y_http_proxy::credentials::HttpHeaderRetriever; use camino::Utf8PathBuf; use futures::channel::mpsc; use futures::StreamExt; @@ -38,14 +38,14 @@ impl C8yAuthProxyBuilder { pub fn try_from_config( config: &TEdgeConfig, c8y_profile: Option<&str>, - auth: &mut impl Service<(), AuthResult>, + header_retriever: &mut impl Service<(), HttpHeaderResult>, ) -> anyhow::Result { let reqwest_client = config.cloud_root_certs().client(); let c8y = config.c8y.try_get(c8y_profile)?; let app_data = AppData { is_https: true, host: c8y.http.or_config_not_set()?.to_string(), - token_manager: TokenManager::new(AuthRetriever::new(auth)).shared(), + token_manager: TokenManager::new(HttpHeaderRetriever::new(header_retriever)).shared(), client: reqwest_client, }; let bind = &c8y.proxy.bind; diff --git a/crates/extensions/c8y_auth_proxy/src/server.rs b/crates/extensions/c8y_auth_proxy/src/server.rs index 03f8c57f0b7..933afa7d51f 100644 --- a/crates/extensions/c8y_auth_proxy/src/server.rs +++ b/crates/extensions/c8y_auth_proxy/src/server.rs @@ -23,6 +23,7 @@ use futures::Sink; use futures::SinkExt; use futures::Stream; use futures::StreamExt; +use hyper::header::AUTHORIZATION; use hyper::header::HOST; use hyper::HeaderMap; use reqwest::Method; @@ -235,7 +236,7 @@ async fn connect_to_websocket( for (name, value) in headers { req = req.header(name.as_str(), value); } - req = req.header("Authorization", auth_value); + req = req.header(AUTHORIZATION, auth_value); let req = req .uri(uri) .header(HOST, host.without_scheme.as_ref()) @@ -404,10 +405,10 @@ async fn respond_to( None => "", }; let auth: fn(reqwest::RequestBuilder, &str) -> reqwest::RequestBuilder = - if headers.contains_key("Authorization") { + if headers.contains_key(AUTHORIZATION) { |req, _auth_value| req } else { - |req, auth_value| req.header("Authorization", auth_value) + |req, auth_value| req.header(AUTHORIZATION, auth_value) }; headers.remove(HOST); @@ -436,7 +437,7 @@ async fn respond_to( let destination = format!("{}/tenant/currentTenant", host.http); let response = client .head(&destination) - .header("Authorization", token.to_string()) + .header(AUTHORIZATION, token.to_string()) .send() .await .with_context(|| format!("making HEAD request to {destination}"))?; @@ -496,12 +497,13 @@ mod tests { use axum::body::Bytes; use axum::headers::authorization::Bearer; use axum::headers::Authorization; + use axum::http::header::AUTHORIZATION; use axum::http::Request; use axum::middleware::Next; use axum::TypedHeader; - use c8y_http_proxy::credentials::AuthRequest; - use c8y_http_proxy::credentials::AuthResult; - use c8y_http_proxy::credentials::AuthRetriever; + use c8y_http_proxy::credentials::HttpHeaderRequest; + use c8y_http_proxy::credentials::HttpHeaderResult; + use c8y_http_proxy::credentials::HttpHeaderRetriever; use camino::Utf8PathBuf; use futures::channel::mpsc; use futures::future::ready; @@ -1113,7 +1115,7 @@ mod tests { let state = AppData { is_https: false, host: target_host.into(), - token_manager: TokenManager::new(AuthRetriever::new(&mut retriever)).shared(), + token_manager: TokenManager::new(HttpHeaderRetriever::new(&mut retriever)).shared(), client: reqwest::Client::new(), }; let trust_store = ca_dir @@ -1147,16 +1149,22 @@ mod tests { #[async_trait] impl Server for IterJwtRetriever { - type Request = AuthRequest; - type Response = AuthResult; + type Request = HttpHeaderRequest; + type Response = HttpHeaderResult; fn name(&self) -> &str { "IterJwtRetriever" } async fn handle(&mut self, _request: Self::Request) -> Self::Response { - let auth_value = format!("Bearer {}", self.tokens.next().unwrap()); - Ok(auth_value) + let mut header_map = HeaderMap::new(); + header_map.insert( + AUTHORIZATION, + format!("Bearer {}", self.tokens.next().unwrap()) + .parse() + .unwrap(), + ); + Ok(header_map) } } diff --git a/crates/extensions/c8y_auth_proxy/src/tokens.rs b/crates/extensions/c8y_auth_proxy/src/tokens.rs index 5252f3b77b8..96b61b27a0b 100644 --- a/crates/extensions/c8y_auth_proxy/src/tokens.rs +++ b/crates/extensions/c8y_auth_proxy/src/tokens.rs @@ -1,6 +1,8 @@ +use anyhow::Context; +use hyper::header::AUTHORIZATION; use std::sync::Arc; -use c8y_http_proxy::credentials::AuthRetriever; +use c8y_http_proxy::credentials::HttpHeaderRetriever; use tokio::sync::Mutex; #[derive(Clone)] @@ -16,12 +18,12 @@ impl SharedTokenManager { } pub struct TokenManager { - recv: AuthRetriever, + recv: HttpHeaderRetriever, cached: Option>, } impl TokenManager { - pub fn new(recv: AuthRetriever) -> Self { + pub fn new(recv: HttpHeaderRetriever) -> Self { Self { recv, cached: None } } @@ -41,7 +43,11 @@ impl TokenManager { } async fn refresh(&mut self) -> Result, anyhow::Error> { - self.cached = Some(self.recv.await_response(()).await??.into()); + let header_map = self.recv.await_response(()).await??; + let auth_header_value = header_map + .get(AUTHORIZATION) + .context("Authorization is missing from header")?; + self.cached = Some(auth_header_value.to_str()?.into()); Ok(self.cached.as_ref().unwrap().clone()) } } diff --git a/crates/extensions/c8y_firmware_manager/src/actor.rs b/crates/extensions/c8y_firmware_manager/src/actor.rs index 9ac1b6272fd..c1132913f07 100644 --- a/crates/extensions/c8y_firmware_manager/src/actor.rs +++ b/crates/extensions/c8y_firmware_manager/src/actor.rs @@ -12,7 +12,7 @@ use c8y_api::smartrest::message::collect_smartrest_messages; use c8y_api::smartrest::message::get_smartrest_template_id; use c8y_api::smartrest::smartrest_deserializer::SmartRestFirmwareRequest; use c8y_api::smartrest::smartrest_deserializer::SmartRestRequestGeneric; -use c8y_http_proxy::credentials::AuthRetriever; +use c8y_http_proxy::credentials::HttpHeaderRetriever; use log::error; use log::info; use log::warn; @@ -84,7 +84,7 @@ impl FirmwareManagerActor { config: FirmwareManagerConfig, input_receiver: LoggingReceiver, mqtt_publisher: DynSender, - auth_retriever: AuthRetriever, + header_retriever: HttpHeaderRetriever, download_sender: ClientMessageBox, progress_sender: DynSender, ) -> Self { @@ -93,7 +93,7 @@ impl FirmwareManagerActor { worker: FirmwareManagerWorker::new( config, mqtt_publisher, - auth_retriever, + header_retriever, download_sender, progress_sender, ), diff --git a/crates/extensions/c8y_firmware_manager/src/lib.rs b/crates/extensions/c8y_firmware_manager/src/lib.rs index c1e5bc2bd0f..fb74d004984 100644 --- a/crates/extensions/c8y_firmware_manager/src/lib.rs +++ b/crates/extensions/c8y_firmware_manager/src/lib.rs @@ -10,8 +10,8 @@ mod tests; use actor::FirmwareInput; use actor::FirmwareManagerActor; -use c8y_http_proxy::credentials::AuthResult; -use c8y_http_proxy::credentials::AuthRetriever; +use c8y_http_proxy::credentials::HttpHeaderResult; +use c8y_http_proxy::credentials::HttpHeaderRetriever; pub use config::*; use tedge_actors::futures::channel::mpsc; use tedge_actors::Builder; @@ -39,7 +39,7 @@ pub struct FirmwareManagerBuilder { config: FirmwareManagerConfig, input_receiver: LoggingReceiver, mqtt_publisher: DynSender, - jwt_retriever: AuthRetriever, + header_retriever: HttpHeaderRetriever, download_sender: ClientMessageBox, progress_sender: DynSender, signal_sender: mpsc::Sender, @@ -49,7 +49,7 @@ impl FirmwareManagerBuilder { pub fn try_new( config: FirmwareManagerConfig, mqtt_actor: &mut (impl MessageSource + MessageSink), - jwt_actor: &mut impl Service<(), AuthResult>, + header_actor: &mut impl Service<(), HttpHeaderResult>, downloader_actor: &mut impl Service, ) -> Result { Self::init(&config.data_dir)?; @@ -65,14 +65,14 @@ impl FirmwareManagerBuilder { mqtt_actor.connect_sink(Self::subscriptions(&config.c8y_prefix), &mqtt_sender); let mqtt_publisher = mqtt_actor.get_sender(); - let jwt_retriever = AuthRetriever::new(jwt_actor); + let header_retriever = HttpHeaderRetriever::new(header_actor); let download_sender = ClientMessageBox::new(downloader_actor); let progress_sender = input_sender.into(); Ok(Self { config, input_receiver, mqtt_publisher, - jwt_retriever, + header_retriever, download_sender, progress_sender, signal_sender, @@ -110,7 +110,7 @@ impl Builder for FirmwareManagerBuilder { self.config, self.input_receiver, self.mqtt_publisher, - self.jwt_retriever, + self.header_retriever, self.download_sender, self.progress_sender, )) diff --git a/crates/extensions/c8y_firmware_manager/src/tests.rs b/crates/extensions/c8y_firmware_manager/src/tests.rs index 2ba3d87190b..8ed58ccbf06 100644 --- a/crates/extensions/c8y_firmware_manager/src/tests.rs +++ b/crates/extensions/c8y_firmware_manager/src/tests.rs @@ -1,7 +1,8 @@ use super::*; use assert_json_diff::assert_json_include; use c8y_api::smartrest::topic::C8yTopic; -use c8y_http_proxy::credentials::AuthRequest; +use c8y_http_proxy::credentials::HttpHeaderRequest; +use c8y_http_proxy::HeaderMap; use serde_json::json; use sha256::digest; use std::io; @@ -183,7 +184,7 @@ async fn handle_request_child_device_with_new_download() -> Result<(), DynError> download_request.file_path, ttd.path().join("cache").join(DOWNLOADED_FILE_NAME) ); - assert_eq!(download_request.auth, None); + assert!(download_request.headers.is_empty()); // Simulate downloading a file is completed. ttd.dir("cache").file(DOWNLOADED_FILE_NAME); @@ -287,9 +288,9 @@ async fn create_download_request_with_c8y_auth() -> Result<(), DynError> { assert!(jwt_request.is_some()); // Return JWT token. - jwt_message_box - .send(Ok(auth_header_value.to_string())) - .await?; + let mut headers = HeaderMap::new(); + headers.insert("Authorization", auth_header_value.parse().unwrap()); + jwt_message_box.send(Ok(headers.clone())).await?; // Assert firmware download request. let (_id, download_request) = downloader_message_box.recv().await.unwrap(); @@ -298,7 +299,7 @@ async fn create_download_request_with_c8y_auth() -> Result<(), DynError> { download_request.file_path, ttd.path().join("cache").join(digest(c8y_download_url)) ); - assert_eq!(download_request.auth, Some(auth_header_value.into())); + assert_eq!(download_request.headers, headers); Ok(()) } @@ -620,7 +621,7 @@ async fn spawn_firmware_manager( ( JoinHandle>, TimedMessageBox>, - TimedMessageBox>, + TimedMessageBox>, TimedMessageBox>, ), DynError, @@ -647,7 +648,8 @@ async fn spawn_firmware_manager( let mut mqtt_builder: SimpleMessageBoxBuilder = SimpleMessageBoxBuilder::new("MQTT", 5); - let mut jwt_builder: FakeServerBoxBuilder = FakeServerBox::builder(); + let mut jwt_builder: FakeServerBoxBuilder = + FakeServerBox::builder(); let mut downloader_builder: FakeServerBoxBuilder = FakeServerBox::builder(); diff --git a/crates/extensions/c8y_firmware_manager/src/worker.rs b/crates/extensions/c8y_firmware_manager/src/worker.rs index f1a8da86391..f9999763e7f 100644 --- a/crates/extensions/c8y_firmware_manager/src/worker.rs +++ b/crates/extensions/c8y_firmware_manager/src/worker.rs @@ -11,7 +11,7 @@ use c8y_api::smartrest::smartrest_serializer::set_operation_executing_with_name; use c8y_api::smartrest::smartrest_serializer::succeed_operation_with_name_no_parameters; use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations; use c8y_api::smartrest::topic::C8yTopic; -use c8y_http_proxy::credentials::AuthRetriever; +use c8y_http_proxy::credentials::HttpHeaderRetriever; use camino::Utf8PathBuf; use log::error; use log::info; @@ -52,7 +52,7 @@ pub(crate) struct FirmwareManagerWorker { pub(crate) config: Arc, executing: bool, mqtt_publisher: DynSender, - auth_retriever: AuthRetriever, + header_retriever: HttpHeaderRetriever, download_sender: ClientMessageBox, progress_sender: DynSender, } @@ -63,7 +63,7 @@ impl Clone for FirmwareManagerWorker { config: self.config.clone(), executing: false, mqtt_publisher: self.mqtt_publisher.sender_clone(), - auth_retriever: self.auth_retriever.clone(), + header_retriever: self.header_retriever.clone(), download_sender: self.download_sender.clone(), progress_sender: self.progress_sender.sender_clone(), } @@ -74,7 +74,7 @@ impl FirmwareManagerWorker { pub(crate) fn new( config: FirmwareManagerConfig, mqtt_publisher: DynSender, - auth_retriever: AuthRetriever, + header_retriever: HttpHeaderRetriever, download_sender: ClientMessageBox, progress_sender: DynSender, ) -> Self { @@ -82,7 +82,7 @@ impl FirmwareManagerWorker { config: Arc::new(config), executing: false, mqtt_publisher, - auth_retriever, + header_retriever, download_sender, progress_sender, } @@ -205,9 +205,9 @@ impl FirmwareManagerWorker { .maybe_tenant_url(firmware_url) .is_some() { - if let Ok(header_value) = self.auth_retriever.await_response(()).await? { + if let Ok(header_map) = self.header_retriever.await_response(()).await? { DownloadRequest::new(firmware_url, cache_file_path.as_std_path()) - .with_auth(&header_value) + .with_headers(header_map) } else { return Err(FirmwareManagementError::NoJwtToken); } diff --git a/crates/extensions/c8y_http_proxy/Cargo.toml b/crates/extensions/c8y_http_proxy/Cargo.toml index ca26ba78de6..6a12a63d4f0 100644 --- a/crates/extensions/c8y_http_proxy/Cargo.toml +++ b/crates/extensions/c8y_http_proxy/Cargo.toml @@ -12,6 +12,7 @@ repository = { workspace = true } [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } +base64 = { workspace = true } c8y_api = { workspace = true } certificate = { workspace = true, features = ["reqwest"] } download = { workspace = true } diff --git a/crates/extensions/c8y_http_proxy/src/actor.rs b/crates/extensions/c8y_http_proxy/src/actor.rs index fc7ec58b48b..6eddadb73c1 100644 --- a/crates/extensions/c8y_http_proxy/src/actor.rs +++ b/crates/extensions/c8y_http_proxy/src/actor.rs @@ -1,6 +1,6 @@ -use crate::credentials::AuthRequest; -use crate::credentials::AuthResult; -use crate::credentials::AuthRetriever; +use crate::credentials::HttpHeaderRequest; +use crate::credentials::HttpHeaderResult; +use crate::credentials::HttpHeaderRetriever; use crate::messages::C8YConnectionError; use crate::messages::C8YRestError; use crate::messages::C8YRestRequest; @@ -56,14 +56,14 @@ pub struct C8YHttpProxyMessageBox { /// Connection to an HTTP actor pub(crate) http: ClientMessageBox, - /// Connection to an HTTP auth header value retriever - pub(crate) auth: AuthRetriever, + /// Connection to an HTTP header value retriever + pub(crate) header_retriever: HttpHeaderRetriever, } pub type C8YRestRequestEnvelope = RequestEnvelope; -fan_in_message_type!(C8YHttpProxyInput[C8YRestRequestEnvelope, HttpResult, AuthResult] : Debug); -fan_in_message_type!(C8YHttpProxyOutput[HttpRequest, AuthRequest] : Debug); +fan_in_message_type!(C8YHttpProxyInput[C8YRestRequestEnvelope, HttpResult, HttpHeaderResult] : Debug); +fan_in_message_type!(C8YHttpProxyOutput[HttpRequest, HttpHeaderRequest] : Debug); #[async_trait] impl Actor for C8YHttpProxyActor { @@ -170,16 +170,14 @@ impl C8YHttpProxyActor { device_id: String, ) -> Result { let url_get_id: String = self.end_point.get_url_for_internal_id(&device_id); - if self.end_point.token.is_none() { - self.get_fresh_token().await?; - } + self.refresh_headers().await?; let mut attempt = 0; let mut token_refreshed = false; loop { attempt += 1; let request = HttpRequestBuilder::get(&url_get_id) - .auth(self.end_point.token.clone().unwrap_or_default()) + .headers(&self.end_point.headers) .build()?; let endpoint = request.uri().path().to_owned(); let method = request.method().to_owned(); @@ -208,7 +206,7 @@ impl C8YHttpProxyActor { response.error_for_status()?; } info!("Re-fetching internal id for {device_id} with fresh token"); - self.get_fresh_token().await?; + self.refresh_headers().await?; token_refreshed = true; continue; } @@ -237,7 +235,7 @@ impl C8YHttpProxyActor { let request_builder = build_request(&self.end_point); let request = request_builder .await? - .auth(self.end_point.token.clone().unwrap_or_default()) + .headers(&self.end_point.headers) .build()?; let endpoint = request.uri().path().to_owned(); let method = request.method().to_owned(); @@ -266,11 +264,6 @@ impl C8YHttpProxyActor { } } - async fn get_fresh_token(&mut self) -> Result { - self.end_point.token = None; - self.get_and_set_auth_header_value().await - } - async fn try_request_with_fresh_token< Fut: Future>, >( @@ -278,12 +271,12 @@ impl C8YHttpProxyActor { build_request: impl Fn(&C8yEndPoint) -> Fut, ) -> Result { // get new token not the cached one - self.get_fresh_token().await?; + self.refresh_headers().await?; // build the request let request_builder = build_request(&self.end_point); let request = request_builder .await? - .auth(self.end_point.token.clone().unwrap_or_default()) + .headers(&self.end_point.headers) .build()?; // retry the request Ok(self.peers.http.await_response(request).await?) @@ -302,7 +295,7 @@ impl C8YHttpProxyActor { let request_builder = build_request(&self.end_point); let request = request_builder .await? - .auth(self.end_point.token.clone().unwrap_or_default()) + .headers(&self.end_point.headers) .build()?; Ok(self.peers.http.await_response(request).await?) } @@ -400,17 +393,19 @@ impl C8YHttpProxyActor { } } - async fn get_and_set_auth_header_value(&mut self) -> Result { - match self.end_point.token.clone() { - Some(token) => Ok(token), - None => { - if let Ok(token) = self.peers.auth.await_response(()).await? { - self.end_point.token = Some(token.clone()); - Ok(token) - } else { - Err(C8YRestError::CustomError("JWT token not available".into())) + /// Update HTTP headers with the ones retried from the HttpHeaderRetriever actor + async fn refresh_headers(&mut self) -> Result<(), C8YRestError> { + match self.peers.header_retriever.await_response(()).await? { + Ok(headers) => { + self.end_point.headers.clear(); + for (key, value) in headers { + self.end_point.headers.insert(key.unwrap(), value); } + Ok(()) } + Err(err) => Err(C8YRestError::CustomError(format!( + "Failed to retrieve headers with reason {err}" + ))), } } diff --git a/crates/extensions/c8y_http_proxy/src/credentials.rs b/crates/extensions/c8y_http_proxy/src/credentials.rs index a838aaff0d3..3cc031c8107 100644 --- a/crates/extensions/c8y_http_proxy/src/credentials.rs +++ b/crates/extensions/c8y_http_proxy/src/credentials.rs @@ -1,6 +1,7 @@ use async_trait::async_trait; use c8y_api::http_proxy::C8yMqttJwtTokenRetriever; -use c8y_api::http_proxy::JwtError; +use http::header::AUTHORIZATION; +use http::HeaderMap; use tedge_actors::ClientMessageBox; use tedge_actors::Sequential; use tedge_actors::Server; @@ -8,13 +9,13 @@ use tedge_actors::ServerActorBuilder; use tedge_actors::ServerConfig; use tedge_config::TopicPrefix; -pub type AuthRequest = (); -pub type AuthResult = Result; +pub type HttpHeaderRequest = (); +pub type HttpHeaderResult = Result; -/// Retrieves Authorization header value authenticating the device -pub type AuthRetriever = ClientMessageBox; +/// Retrieves HTTP headers +pub type HttpHeaderRetriever = ClientMessageBox; -/// A JwtRetriever that gets JWT tokens from C8Y over MQTT +/// A JwtRetriever that gets JWT tokens from C8Y over MQTT and returns authorization header pub struct C8YJwtRetriever { mqtt_retriever: C8yMqttJwtTokenRetriever, } @@ -32,20 +33,75 @@ impl C8YJwtRetriever { #[async_trait] impl Server for C8YJwtRetriever { - type Request = AuthRequest; - type Response = AuthResult; + type Request = HttpHeaderRequest; + type Response = HttpHeaderResult; fn name(&self) -> &str { "C8YJwtRetriever" } async fn handle(&mut self, _request: Self::Request) -> Self::Response { + let mut heeader_map = HeaderMap::new(); let response = self.mqtt_retriever.get_jwt_token().await?; - let auth_value = format!("Bearer {}", response.token()); - Ok(auth_value) + heeader_map.insert( + AUTHORIZATION, + format!("Bearer {}", response.token()).parse()?, + ); + Ok(heeader_map) } } +/// Return base64 encoded Basic Auth header +pub struct C8YBasicAuthRetriever { + username: String, + password: String, +} + +impl C8YBasicAuthRetriever { + pub fn builder( + username: &str, + password: &str, + ) -> ServerActorBuilder { + let server = C8YBasicAuthRetriever { + username: username.into(), + password: password.into(), + }; + ServerActorBuilder::new(server, &ServerConfig::default(), Sequential) + } +} + +#[async_trait] +impl Server for C8YBasicAuthRetriever { + type Request = HttpHeaderRequest; + type Response = HttpHeaderResult; + + fn name(&self) -> &str { + "C8YBasicAuthRetriever" + } + + async fn handle(&mut self, _request: Self::Request) -> Self::Response { + let mut header_map = HeaderMap::new(); + header_map.insert( + AUTHORIZATION, + format!( + "Basic {}", + base64::encode(format!("{}:{}", self.username, self.password)) + ) + .parse()?, + ); + Ok(header_map) + } +} + +#[derive(thiserror::Error, Debug)] +pub enum HttpHeaderError { + #[error(transparent)] + JwtError(#[from] c8y_api::http_proxy::JwtError), + + #[error(transparent)] + InvalidHeaderValue(#[from] http::header::InvalidHeaderValue), +} + /// A JwtRetriever that simply always returns the same JWT token (possibly none) #[cfg(test)] pub(crate) struct ConstJwtRetriever { @@ -55,15 +111,16 @@ pub(crate) struct ConstJwtRetriever { #[async_trait] #[cfg(test)] impl Server for ConstJwtRetriever { - type Request = AuthRequest; - type Response = AuthResult; + type Request = HttpHeaderRequest; + type Response = HttpHeaderResult; fn name(&self) -> &str { "ConstJwtRetriever" } async fn handle(&mut self, _request: Self::Request) -> Self::Response { - let auth_value = format!("Bearer {}", self.token); - Ok(auth_value) + let mut header_map = HeaderMap::new(); + header_map.insert(AUTHORIZATION, format!("Bearer {}", self.token).parse()?); + Ok(header_map) } } diff --git a/crates/extensions/c8y_http_proxy/src/lib.rs b/crates/extensions/c8y_http_proxy/src/lib.rs index 8450542ccd6..b4cdec36a85 100644 --- a/crates/extensions/c8y_http_proxy/src/lib.rs +++ b/crates/extensions/c8y_http_proxy/src/lib.rs @@ -1,7 +1,7 @@ use crate::actor::C8YHttpProxyActor; use crate::actor::C8YHttpProxyMessageBox; -use crate::credentials::AuthResult; -use crate::credentials::AuthRetriever; +use crate::credentials::HttpHeaderResult; +use crate::credentials::HttpHeaderRetriever; use crate::messages::C8YRestRequest; use crate::messages::C8YRestResult; use std::convert::Infallible; @@ -28,6 +28,8 @@ pub mod credentials; pub mod handle; pub mod messages; +pub use http::HeaderMap; + #[cfg(test)] mod tests; @@ -99,24 +101,24 @@ pub struct C8YHttpProxyBuilder { /// Connection to an HTTP actor http: ClientMessageBox, - /// Connection to an HTTP auth header value retriever - auth: AuthRetriever, + /// Connection to an HTTP header value retriever + header_retriever: HttpHeaderRetriever, } impl C8YHttpProxyBuilder { pub fn new( config: C8YHttpConfig, http: &mut impl Service, - auth: &mut impl Service<(), AuthResult>, + header_retriever: &mut impl Service<(), HttpHeaderResult>, ) -> Self { let clients = ServerMessageBoxBuilder::new("C8Y-REST", 10); let http = ClientMessageBox::new(http); - let auth = AuthRetriever::new(auth); + let header_retriever = HttpHeaderRetriever::new(header_retriever); C8YHttpProxyBuilder { config, clients, http, - auth, + header_retriever, } } } @@ -132,7 +134,7 @@ impl Builder for C8YHttpProxyBuilder { let message_box = C8YHttpProxyMessageBox { clients: self.clients.build(), http: self.http, - auth: self.auth, + header_retriever: self.header_retriever, }; C8YHttpProxyActor::new(self.config, message_box) diff --git a/crates/extensions/c8y_http_proxy/src/tests.rs b/crates/extensions/c8y_http_proxy/src/tests.rs index 24f9bec9068..0c0c8d6818c 100644 --- a/crates/extensions/c8y_http_proxy/src/tests.rs +++ b/crates/extensions/c8y_http_proxy/src/tests.rs @@ -1,6 +1,6 @@ -use crate::credentials::AuthRequest; -use crate::credentials::AuthResult; use crate::credentials::ConstJwtRetriever; +use crate::credentials::HttpHeaderRequest; +use crate::credentials::HttpHeaderResult; use crate::handle::C8YHttpProxy; use crate::messages::CreateEvent; use crate::C8YHttpConfig; @@ -9,6 +9,8 @@ use async_trait::async_trait; use c8y_api::json_c8y::C8yEventResponse; use c8y_api::json_c8y::C8yUpdateSoftwareListResponse; use c8y_api::json_c8y::InternalIdResponse; +use http::header::AUTHORIZATION; +use http::HeaderMap; use http::StatusCode; use mockito::Matcher; use std::collections::HashMap; @@ -49,7 +51,7 @@ async fn c8y_http_proxy_requests_the_device_internal_id_on_start() { let init_request = HttpRequestBuilder::get(format!( "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" )) - .auth(BEARER_AUTH) + .headers(&get_test_auth_header(BEARER_AUTH)) .build() .unwrap(); assert_recv(&mut c8y, Some(init_request)).await; @@ -77,7 +79,7 @@ async fn c8y_http_proxy_requests_the_device_internal_id_on_start() { &mut c8y, Some( HttpRequestBuilder::post(format!("https://{c8y_host}/event/events/")) - .auth(BEARER_AUTH) + .headers(&get_test_auth_header(BEARER_AUTH)) .header("content-type", "application/json") .header("accept", "application/json") .build() @@ -102,7 +104,7 @@ async fn retry_internal_id_on_expired_jwt() { let init_request = HttpRequestBuilder::get(format!( "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" )) - .auth(BEARER_AUTH) + .headers(&get_test_auth_header(BEARER_AUTH)) .build() .unwrap(); assert_recv(&mut c8y, Some(init_request)).await; @@ -116,7 +118,7 @@ async fn retry_internal_id_on_expired_jwt() { HttpRequestBuilder::get(format!( "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" )) - .auth(BEARER_AUTH) + .headers(&get_test_auth_header(BEARER_AUTH)) .build() .unwrap(), ), @@ -145,7 +147,7 @@ async fn retry_internal_id_on_expired_jwt() { &mut c8y, Some( HttpRequestBuilder::post(format!("https://{c8y_host}/event/events/")) - .auth(BEARER_AUTH) + .headers(&get_test_auth_header(BEARER_AUTH)) .header("content-type", "application/json") .header("accept", "application/json") .build() @@ -176,7 +178,7 @@ async fn retry_get_internal_id_when_not_found() { let get_internal_id_url = format!("https://{c8y_host}/identity/externalIds/c8y_Serial/{main_device_id}"); let init_request = HttpRequestBuilder::get(get_internal_id_url) - .auth(BEARER_AUTH) + .headers(&get_test_auth_header(BEARER_AUTH)) .build() .unwrap(); assert_recv(&mut c8y, Some(init_request)).await; @@ -196,7 +198,7 @@ async fn retry_get_internal_id_when_not_found() { &mut c8y, Some( HttpRequestBuilder::get(&get_internal_id_url) - .auth(BEARER_AUTH) + .headers(&get_test_auth_header(BEARER_AUTH)) .build() .unwrap(), ), @@ -214,7 +216,7 @@ async fn retry_get_internal_id_when_not_found() { &mut c8y, Some( HttpRequestBuilder::get(&get_internal_id_url) - .auth(BEARER_AUTH) + .headers(&get_test_auth_header(BEARER_AUTH)) .build() .unwrap(), ), @@ -235,7 +237,7 @@ async fn retry_get_internal_id_when_not_found() { HttpRequestBuilder::put(format!("https://{c8y_host}/inventory/managedObjects/200")) .header("content-type", "application/json") .header("accept", "application/json") - .auth(BEARER_AUTH) + .headers(&get_test_auth_header(BEARER_AUTH)) .json(&c8y_software_list) .build() .unwrap(), @@ -276,7 +278,7 @@ async fn get_internal_id_retry_fails_after_exceeding_attempts_threshold() { let get_internal_id_url = format!("https://{c8y_host}/identity/externalIds/c8y_Serial/{main_device_id}"); let init_request = HttpRequestBuilder::get(get_internal_id_url) - .auth(BEARER_AUTH) + .headers(&get_test_auth_header(BEARER_AUTH)) .build() .unwrap(); assert_recv(&mut c8y, Some(init_request)).await; @@ -296,7 +298,7 @@ async fn get_internal_id_retry_fails_after_exceeding_attempts_threshold() { &mut c8y, Some( HttpRequestBuilder::get(&get_internal_id_url) - .auth(BEARER_AUTH) + .headers(&get_test_auth_header(BEARER_AUTH)) .build() .unwrap(), ), @@ -442,7 +444,8 @@ async fn retry_create_event_on_expired_jwt_with_mock() { proxy .end_point .set_internal_id(external_id.into(), internal_id.into()); - proxy.end_point.token = Some("Bearer Cached JWT Token".into()); + let headers = get_test_auth_header("Bearer Cached JWT Token"); + proxy.end_point.headers = headers; let result = proxy.create_event(event).await; assert_eq!(event_id, result.unwrap()); @@ -463,7 +466,7 @@ async fn retry_software_list_once_with_fresh_internal_id() { let _init_request = HttpRequestBuilder::get(format!( "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" )) - .auth(BEARER_AUTH) + .headers(&get_test_auth_header(BEARER_AUTH)) .build() .unwrap(); // skip the message @@ -498,7 +501,7 @@ async fn retry_software_list_once_with_fresh_internal_id() { )) .header("content-type", "application/json") .header("accept", "application/json") - .auth(BEARER_AUTH) + .headers(&get_test_auth_header(BEARER_AUTH)) .json(&c8y_software_list) .build() .unwrap(), @@ -521,7 +524,7 @@ async fn retry_software_list_once_with_fresh_internal_id() { HttpRequestBuilder::get(format!( "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" )) - .auth(BEARER_AUTH) + .headers(&get_test_auth_header(BEARER_AUTH)) .build() .unwrap(), ), @@ -544,7 +547,7 @@ async fn retry_software_list_once_with_fresh_internal_id() { HttpRequestBuilder::put(format!( "https://{c8y_host}/inventory/managedObjects/{device_id}" )) - .auth(BEARER_AUTH) + .headers(&get_test_auth_header(BEARER_AUTH)) .header("content-type", "application/json") .header("accept", "application/json") .json(&c8y_software_list) @@ -570,7 +573,7 @@ async fn auto_retry_upload_log_binary_when_internal_id_expires() { let init_request = HttpRequestBuilder::get(format!( "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" )) - .auth(BEARER_AUTH) + .headers(&get_test_auth_header(BEARER_AUTH)) .build() .unwrap(); assert_recv(&mut c8y, Some(init_request)).await; @@ -595,7 +598,7 @@ async fn auto_retry_upload_log_binary_when_internal_id_expires() { &mut c8y, Some( HttpRequestBuilder::post(format!("https://{c8y_host}/event/events/")) - .auth(BEARER_AUTH) + .headers(&get_test_auth_header(BEARER_AUTH)) .header("content-type", "application/json") .header("accept", "application/json") .build() @@ -619,7 +622,7 @@ async fn auto_retry_upload_log_binary_when_internal_id_expires() { HttpRequestBuilder::get(format!( "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" )) - .auth(BEARER_AUTH) + .headers(&get_test_auth_header(BEARER_AUTH)) .build() .unwrap(), ), @@ -638,7 +641,7 @@ async fn auto_retry_upload_log_binary_when_internal_id_expires() { &mut c8y, Some( HttpRequestBuilder::post(format!("https://{c8y_host}/event/events/")) - .auth(BEARER_AUTH) + .headers(&get_test_auth_header(BEARER_AUTH)) .header("content-type", "application/json") .header("accept", "application/json") .build() @@ -697,23 +700,31 @@ pub(crate) struct DynamicJwtRetriever { #[async_trait] impl Server for DynamicJwtRetriever { - type Request = AuthRequest; - type Response = AuthResult; + type Request = HttpHeaderRequest; + type Response = HttpHeaderResult; fn name(&self) -> &str { "DynamicJwtRetriever" } async fn handle(&mut self, _request: Self::Request) -> Self::Response { + let mut headers = HeaderMap::new(); if self.count == 0 { self.count += 1; - Ok("Bearer Cached JWT token".into()) + headers.insert(AUTHORIZATION, "Bearer Cached JWT token".parse().unwrap()); } else { - Ok("Bearer Fresh JWT token".into()) + headers.insert(AUTHORIZATION, "Bearer Fresh JWT token".parse().unwrap()); } + Ok(headers) } } +fn get_test_auth_header(value: &str) -> HeaderMap { + let mut headers = HeaderMap::new(); + headers.insert(AUTHORIZATION, value.parse().unwrap()); + headers +} + async fn assert_recv( from: &mut FakeServerBox, expected: Option, diff --git a/crates/extensions/tedge_config_manager/src/tests.rs b/crates/extensions/tedge_config_manager/src/tests.rs index 340b7109f6d..ec943d9f5eb 100644 --- a/crates/extensions/tedge_config_manager/src/tests.rs +++ b/crates/extensions/tedge_config_manager/src/tests.rs @@ -379,7 +379,7 @@ async fn config_manager_download_update() -> Result<(), anyhow::Error> { std::env::temp_dir().join("type_two") ); - assert_eq!(download_request.auth, None); + assert!(download_request.headers.is_empty()); // Simulate downloading a file is completed. std::fs::File::create(&download_request.file_path).unwrap(); diff --git a/crates/extensions/tedge_downloader_ext/src/actor.rs b/crates/extensions/tedge_downloader_ext/src/actor.rs index c7c9aa8eb27..bfcb2847f32 100644 --- a/crates/extensions/tedge_downloader_ext/src/actor.rs +++ b/crates/extensions/tedge_downloader_ext/src/actor.rs @@ -4,6 +4,7 @@ use download::DownloadError; use download::DownloadInfo; use download::Downloader; use log::info; +use reqwest::header::HeaderMap; use reqwest::Identity; use std::marker::PhantomData; use std::path::Path; @@ -19,7 +20,7 @@ use tedge_utils::file::PermissionEntry; pub struct DownloadRequest { pub url: String, pub file_path: PathBuf, - pub auth: Option, + pub headers: HeaderMap, pub permission: Option, } @@ -28,14 +29,14 @@ impl DownloadRequest { Self { url: url.into(), file_path: file_path.into(), - auth: None, + headers: HeaderMap::new(), permission: None, } } - pub fn with_auth(self, auth: &str) -> Self { + pub fn with_headers(self, header_map: HeaderMap) -> Self { Self { - auth: Some(auth.into()), + headers: header_map, ..self } } @@ -111,11 +112,7 @@ impl Server for DownloaderActor { async fn handle(&mut self, id_request: Self::Request) -> Self::Response { let (id, request) = id_request; - let download_info = if let Some(header_value) = request.auth { - DownloadInfo::new(&request.url).with_auth(&header_value) - } else { - DownloadInfo::new(&request.url) - }; + let download_info = DownloadInfo::new(&request.url).with_headers(request.headers); let downloader = Downloader::new( request.file_path.clone(), diff --git a/crates/extensions/tedge_downloader_ext/src/tests.rs b/crates/extensions/tedge_downloader_ext/src/tests.rs index 227cff9b2d2..73106ead093 100644 --- a/crates/extensions/tedge_downloader_ext/src/tests.rs +++ b/crates/extensions/tedge_downloader_ext/src/tests.rs @@ -1,5 +1,7 @@ use super::*; use certificate::CloudRootCerts; +use reqwest::header::HeaderMap; +use reqwest::header::AUTHORIZATION; use std::time::Duration; use tedge_actors::ClientMessageBox; use tedge_test_utils::fs::TempTedgeDir; @@ -51,8 +53,10 @@ async fn download_with_auth() { let target_path = ttd.path().join("downloaded_file"); let server_url = server.url(); - let download_request = - DownloadRequest::new(&server_url, &target_path).with_auth("Bearer token"); + + let mut headers = HeaderMap::new(); + headers.append(AUTHORIZATION, "Bearer token".parse().unwrap()); + let download_request = DownloadRequest::new(&server_url, &target_path).with_headers(headers); let mut requester = spawn_downloader_actor().await; diff --git a/crates/extensions/tedge_http_ext/src/messages.rs b/crates/extensions/tedge_http_ext/src/messages.rs index 8e936278867..46a2cf1819b 100644 --- a/crates/extensions/tedge_http_ext/src/messages.rs +++ b/crates/extensions/tedge_http_ext/src/messages.rs @@ -1,6 +1,7 @@ use async_trait::async_trait; use http::header::HeaderName; use http::header::HeaderValue; +use http::HeaderMap; use http::Method; use serde::de::DeserializeOwned; use thiserror::Error; @@ -109,6 +110,15 @@ impl HttpRequestBuilder { } } + /// Add multiple headers at once + pub fn headers(mut self, header_map: &HeaderMap) -> Self { + let request = self.inner.headers_mut().unwrap(); + for (key, value) in header_map { + request.insert(key, value.clone()); + } + self + } + /// Send a JSON body pub fn json(self, json: &T) -> Self { let body = serde_json::to_vec(json) @@ -122,14 +132,6 @@ impl HttpRequestBuilder { let body = Ok(content.into()); HttpRequestBuilder { body, ..self } } - - /// Add an authentication header - pub fn auth(self, header_value: T) -> Self - where - T: std::fmt::Display, - { - self.header(http::header::AUTHORIZATION, header_value.to_string()) - } } #[async_trait]