diff --git a/Cargo.lock b/Cargo.lock index 76427451fda..f0280882af9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -744,6 +744,7 @@ version = "1.3.1" dependencies = [ "anyhow", "async-trait", + "base64 0.13.1", "c8y_api", "certificate", "download", diff --git a/crates/common/download/src/download.rs b/crates/common/download/src/download.rs index cd3399458c5..fb19ab956f2 100644 --- a/crates/common/download/src/download.rs +++ b/crates/common/download/src/download.rs @@ -11,6 +11,7 @@ use log::warn; use nix::sys::statvfs; pub use partial_response::InvalidResponseError; use reqwest::header; +use reqwest::header::HeaderMap; use reqwest::Client; use reqwest::Identity; use serde::Deserialize; @@ -20,8 +21,6 @@ use std::fs::File; use std::io::Seek; use std::io::SeekFrom; use std::io::Write; -#[cfg(target_os = "linux")] -use std::os::unix::prelude::AsRawFd; use std::path::Path; use std::path::PathBuf; use std::time::Duration; @@ -31,6 +30,8 @@ use tedge_utils::file::FileError; use nix::fcntl::fallocate; #[cfg(target_os = "linux")] use nix::fcntl::FallocateFlags; +#[cfg(target_os = "linux")] +use std::os::unix::prelude::AsRawFd; fn default_backoff() -> ExponentialBackoff { // Default retry is an exponential retry with a limit of 15 minutes total. @@ -49,8 +50,8 @@ fn default_backoff() -> ExponentialBackoff { #[serde(deny_unknown_fields)] pub struct DownloadInfo { pub url: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub auth: Option, + #[serde(skip)] + pub headers: HeaderMap, } impl From<&str> for DownloadInfo { @@ -64,14 +65,14 @@ impl DownloadInfo { pub fn new(url: &str) -> Self { Self { url: url.into(), - auth: None, + headers: HeaderMap::new(), } } /// Creates new [`DownloadInfo`] from a URL with authentication. - pub fn with_auth(self, auth: Auth) -> Self { + pub fn with_headers(self, header_map: HeaderMap) -> Self { Self { - auth: Some(auth), + headers: header_map, ..self } } @@ -85,21 +86,6 @@ impl DownloadInfo { } } -/// Possible authentication schemes -#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Serialize)] -#[serde(rename_all = "camelCase")] -#[serde(deny_unknown_fields)] -pub enum Auth { - /// HTTP Bearer authentication - Bearer(String), -} - -impl Auth { - pub fn new_bearer(token: &str) -> Self { - Self::Bearer(token.into()) - } -} - /// A struct which manages file downloads. #[derive(Debug)] pub struct Downloader { @@ -384,9 +370,7 @@ impl Downloader { let operation = || async { let mut request = self.client.get(url.url()); - if let Some(Auth::Bearer(token)) = &url.auth { - request = request.bearer_auth(token) - } + request = request.headers(url.headers.clone()); if range_start != 0 { request = request.header("Range", format!("bytes={range_start}-")); @@ -482,6 +466,7 @@ fn try_pre_allocate_space(file: &File, path: &Path, file_len: u64) -> Result<(), #[allow(deprecated)] mod tests { use super::*; + use hyper::header::AUTHORIZATION; use std::io::Write; use tempfile::tempdir; use tempfile::NamedTempFile; @@ -923,10 +908,12 @@ mod tests { } }; - // applying token if `with_token` = true + // applying http auth header let url = { if with_token { - url.with_auth(Auth::Bearer(String::from("token"))) + let mut headers = HeaderMap::new(); + headers.append(AUTHORIZATION, "Bearer token".parse().unwrap()); + url.with_headers(headers) } else { url } diff --git a/crates/common/download/src/lib.rs b/crates/common/download/src/lib.rs index 829ae45736a..56c141b55c9 100644 --- a/crates/common/download/src/lib.rs +++ b/crates/common/download/src/lib.rs @@ -47,7 +47,6 @@ mod download; mod error; -pub use crate::download::Auth; pub use crate::download::DownloadInfo; pub use crate::download::Downloader; pub use crate::error::DownloadError; diff --git a/crates/core/c8y_api/src/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs index 390e7aba274..cfc03dfb85a 100644 --- a/crates/core/c8y_api/src/http_proxy.rs +++ b/crates/core/c8y_api/src/http_proxy.rs @@ -5,6 +5,7 @@ use mqtt_channel::PubChannel; use mqtt_channel::StreamExt; use mqtt_channel::Topic; use mqtt_channel::TopicFilter; +use reqwest::header::HeaderMap; use reqwest::Url; use std::collections::HashMap; use std::time::Duration; @@ -27,7 +28,7 @@ pub struct C8yEndPoint { c8y_host: String, c8y_mqtt_host: String, pub device_id: String, - pub token: Option, + pub headers: HeaderMap, devices_internal_id: HashMap, } @@ -37,7 +38,7 @@ impl C8yEndPoint { c8y_host: c8y_host.into(), c8y_mqtt_host: c8y_mqtt_host.into(), device_id: device_id.into(), - token: None, + headers: HeaderMap::new(), devices_internal_id: HashMap::new(), } } diff --git a/crates/extensions/c8y_auth_proxy/src/actor.rs b/crates/extensions/c8y_auth_proxy/src/actor.rs index 0b0639acf50..07871b64d24 100644 --- a/crates/extensions/c8y_auth_proxy/src/actor.rs +++ b/crates/extensions/c8y_auth_proxy/src/actor.rs @@ -1,20 +1,18 @@ -use std::convert::Infallible; -use std::net::IpAddr; - use axum::async_trait; -use c8y_http_proxy::credentials::C8YJwtRetriever; -use c8y_http_proxy::credentials::JwtRetriever; +use c8y_http_proxy::credentials::HttpHeaderResult; +use c8y_http_proxy::credentials::HttpHeaderRetriever; use camino::Utf8PathBuf; use futures::channel::mpsc; use futures::StreamExt; +use std::convert::Infallible; +use std::net::IpAddr; use tedge_actors::Actor; use tedge_actors::Builder; use tedge_actors::DynSender; use tedge_actors::RuntimeError; use tedge_actors::RuntimeRequest; use tedge_actors::RuntimeRequestSink; -use tedge_actors::Sequential; -use tedge_actors::ServerActorBuilder; +use tedge_actors::Service; use tedge_config::TEdgeConfig; use tedge_config_macros::OptionalConfig; use tracing::info; @@ -40,14 +38,14 @@ impl C8yAuthProxyBuilder { pub fn try_from_config( config: &TEdgeConfig, c8y_profile: Option<&str>, - jwt: &mut ServerActorBuilder, + header_retriever: &mut impl Service<(), HttpHeaderResult>, ) -> anyhow::Result { let reqwest_client = config.cloud_root_certs().client(); let c8y = config.c8y.try_get(c8y_profile)?; let app_data = AppData { is_https: true, host: c8y.http.or_config_not_set()?.to_string(), - token_manager: TokenManager::new(JwtRetriever::new(jwt)).shared(), + token_manager: TokenManager::new(HttpHeaderRetriever::new(header_retriever)).shared(), client: reqwest_client, }; let bind = &c8y.proxy.bind; diff --git a/crates/extensions/c8y_auth_proxy/src/server.rs b/crates/extensions/c8y_auth_proxy/src/server.rs index c3c54468efb..933afa7d51f 100644 --- a/crates/extensions/c8y_auth_proxy/src/server.rs +++ b/crates/extensions/c8y_auth_proxy/src/server.rs @@ -23,6 +23,7 @@ use futures::Sink; use futures::SinkExt; use futures::Stream; use futures::StreamExt; +use hyper::header::AUTHORIZATION; use hyper::header::HOST; use hyper::HeaderMap; use reqwest::Method; @@ -226,7 +227,7 @@ fn tungstenite_to_axum(message: tungstenite::Message) -> axum::extract::ws::Mess } async fn connect_to_websocket( - token: &str, + auth_value: &str, headers: &HeaderMap, uri: &str, host: &TargetHost, @@ -235,7 +236,7 @@ async fn connect_to_websocket( for (name, value) in headers { req = req.header(name.as_str(), value); } - req = req.header("Authorization", format!("Bearer {token}")); + req = req.header(AUTHORIZATION, auth_value); let req = req .uri(uri) .header(HOST, host.without_scheme.as_ref()) @@ -404,10 +405,10 @@ async fn respond_to( None => "", }; let auth: fn(reqwest::RequestBuilder, &str) -> reqwest::RequestBuilder = - if headers.contains_key("Authorization") { - |req, _token| req + if headers.contains_key(AUTHORIZATION) { + |req, _auth_value| req } else { - |req, token| req.bearer_auth(token) + |req, auth_value| req.header(AUTHORIZATION, auth_value) }; headers.remove(HOST); @@ -436,7 +437,7 @@ async fn respond_to( let destination = format!("{}/tenant/currentTenant", host.http); let response = client .head(&destination) - .bearer_auth(&token) + .header(AUTHORIZATION, token.to_string()) .send() .await .with_context(|| format!("making HEAD request to {destination}"))?; @@ -496,12 +497,13 @@ mod tests { use axum::body::Bytes; use axum::headers::authorization::Bearer; use axum::headers::Authorization; + use axum::http::header::AUTHORIZATION; use axum::http::Request; use axum::middleware::Next; use axum::TypedHeader; - use c8y_http_proxy::credentials::JwtRequest; - use c8y_http_proxy::credentials::JwtResult; - use c8y_http_proxy::credentials::JwtRetriever; + use c8y_http_proxy::credentials::HttpHeaderRequest; + use c8y_http_proxy::credentials::HttpHeaderResult; + use c8y_http_proxy::credentials::HttpHeaderRetriever; use camino::Utf8PathBuf; use futures::channel::mpsc; use futures::future::ready; @@ -1113,7 +1115,7 @@ mod tests { let state = AppData { is_https: false, host: target_host.into(), - token_manager: TokenManager::new(JwtRetriever::new(&mut retriever)).shared(), + token_manager: TokenManager::new(HttpHeaderRetriever::new(&mut retriever)).shared(), client: reqwest::Client::new(), }; let trust_store = ca_dir @@ -1147,15 +1149,22 @@ mod tests { #[async_trait] impl Server for IterJwtRetriever { - type Request = JwtRequest; - type Response = JwtResult; + type Request = HttpHeaderRequest; + type Response = HttpHeaderResult; fn name(&self) -> &str { "IterJwtRetriever" } async fn handle(&mut self, _request: Self::Request) -> Self::Response { - Ok(self.tokens.next().unwrap().into()) + let mut header_map = HeaderMap::new(); + header_map.insert( + AUTHORIZATION, + format!("Bearer {}", self.tokens.next().unwrap()) + .parse() + .unwrap(), + ); + Ok(header_map) } } diff --git a/crates/extensions/c8y_auth_proxy/src/tokens.rs b/crates/extensions/c8y_auth_proxy/src/tokens.rs index ab9e2d02d27..96b61b27a0b 100644 --- a/crates/extensions/c8y_auth_proxy/src/tokens.rs +++ b/crates/extensions/c8y_auth_proxy/src/tokens.rs @@ -1,6 +1,8 @@ +use anyhow::Context; +use hyper::header::AUTHORIZATION; use std::sync::Arc; -use c8y_http_proxy::credentials::JwtRetriever; +use c8y_http_proxy::credentials::HttpHeaderRetriever; use tokio::sync::Mutex; #[derive(Clone)] @@ -16,12 +18,12 @@ impl SharedTokenManager { } pub struct TokenManager { - recv: JwtRetriever, + recv: HttpHeaderRetriever, cached: Option>, } impl TokenManager { - pub fn new(recv: JwtRetriever) -> Self { + pub fn new(recv: HttpHeaderRetriever) -> Self { Self { recv, cached: None } } @@ -41,7 +43,11 @@ impl TokenManager { } async fn refresh(&mut self) -> Result, anyhow::Error> { - self.cached = Some(self.recv.await_response(()).await??.into()); + let header_map = self.recv.await_response(()).await??; + let auth_header_value = header_map + .get(AUTHORIZATION) + .context("Authorization is missing from header")?; + self.cached = Some(auth_header_value.to_str()?.into()); Ok(self.cached.as_ref().unwrap().clone()) } } diff --git a/crates/extensions/c8y_firmware_manager/src/actor.rs b/crates/extensions/c8y_firmware_manager/src/actor.rs index eee23688121..c1132913f07 100644 --- a/crates/extensions/c8y_firmware_manager/src/actor.rs +++ b/crates/extensions/c8y_firmware_manager/src/actor.rs @@ -12,7 +12,7 @@ use c8y_api::smartrest::message::collect_smartrest_messages; use c8y_api::smartrest::message::get_smartrest_template_id; use c8y_api::smartrest::smartrest_deserializer::SmartRestFirmwareRequest; use c8y_api::smartrest::smartrest_deserializer::SmartRestRequestGeneric; -use c8y_http_proxy::credentials::JwtRetriever; +use c8y_http_proxy::credentials::HttpHeaderRetriever; use log::error; use log::info; use log::warn; @@ -84,7 +84,7 @@ impl FirmwareManagerActor { config: FirmwareManagerConfig, input_receiver: LoggingReceiver, mqtt_publisher: DynSender, - jwt_retriever: JwtRetriever, + header_retriever: HttpHeaderRetriever, download_sender: ClientMessageBox, progress_sender: DynSender, ) -> Self { @@ -93,7 +93,7 @@ impl FirmwareManagerActor { worker: FirmwareManagerWorker::new( config, mqtt_publisher, - jwt_retriever, + header_retriever, download_sender, progress_sender, ), diff --git a/crates/extensions/c8y_firmware_manager/src/lib.rs b/crates/extensions/c8y_firmware_manager/src/lib.rs index 04b3f8838ac..fb74d004984 100644 --- a/crates/extensions/c8y_firmware_manager/src/lib.rs +++ b/crates/extensions/c8y_firmware_manager/src/lib.rs @@ -10,8 +10,8 @@ mod tests; use actor::FirmwareInput; use actor::FirmwareManagerActor; -use c8y_http_proxy::credentials::JwtResult; -use c8y_http_proxy::credentials::JwtRetriever; +use c8y_http_proxy::credentials::HttpHeaderResult; +use c8y_http_proxy::credentials::HttpHeaderRetriever; pub use config::*; use tedge_actors::futures::channel::mpsc; use tedge_actors::Builder; @@ -39,7 +39,7 @@ pub struct FirmwareManagerBuilder { config: FirmwareManagerConfig, input_receiver: LoggingReceiver, mqtt_publisher: DynSender, - jwt_retriever: JwtRetriever, + header_retriever: HttpHeaderRetriever, download_sender: ClientMessageBox, progress_sender: DynSender, signal_sender: mpsc::Sender, @@ -49,7 +49,7 @@ impl FirmwareManagerBuilder { pub fn try_new( config: FirmwareManagerConfig, mqtt_actor: &mut (impl MessageSource + MessageSink), - jwt_actor: &mut impl Service<(), JwtResult>, + header_actor: &mut impl Service<(), HttpHeaderResult>, downloader_actor: &mut impl Service, ) -> Result { Self::init(&config.data_dir)?; @@ -65,14 +65,14 @@ impl FirmwareManagerBuilder { mqtt_actor.connect_sink(Self::subscriptions(&config.c8y_prefix), &mqtt_sender); let mqtt_publisher = mqtt_actor.get_sender(); - let jwt_retriever = JwtRetriever::new(jwt_actor); + let header_retriever = HttpHeaderRetriever::new(header_actor); let download_sender = ClientMessageBox::new(downloader_actor); let progress_sender = input_sender.into(); Ok(Self { config, input_receiver, mqtt_publisher, - jwt_retriever, + header_retriever, download_sender, progress_sender, signal_sender, @@ -110,7 +110,7 @@ impl Builder for FirmwareManagerBuilder { self.config, self.input_receiver, self.mqtt_publisher, - self.jwt_retriever, + self.header_retriever, self.download_sender, self.progress_sender, )) diff --git a/crates/extensions/c8y_firmware_manager/src/tests.rs b/crates/extensions/c8y_firmware_manager/src/tests.rs index 7e8d87eb153..8ed58ccbf06 100644 --- a/crates/extensions/c8y_firmware_manager/src/tests.rs +++ b/crates/extensions/c8y_firmware_manager/src/tests.rs @@ -1,7 +1,8 @@ use super::*; use assert_json_diff::assert_json_include; use c8y_api::smartrest::topic::C8yTopic; -use c8y_http_proxy::credentials::JwtRequest; +use c8y_http_proxy::credentials::HttpHeaderRequest; +use c8y_http_proxy::HeaderMap; use serde_json::json; use sha256::digest; use std::io; @@ -17,7 +18,6 @@ use tedge_actors::RuntimeError; use tedge_actors::Sender; use tedge_actors::SimpleMessageBox; use tedge_actors::SimpleMessageBoxBuilder; -use tedge_api::Auth; use tedge_api::DownloadError; use tedge_downloader_ext::DownloadResponse; use tedge_mqtt_ext::Topic; @@ -184,7 +184,7 @@ async fn handle_request_child_device_with_new_download() -> Result<(), DynError> download_request.file_path, ttd.path().join("cache").join(DOWNLOADED_FILE_NAME) ); - assert_eq!(download_request.auth, None); + assert!(download_request.headers.is_empty()); // Simulate downloading a file is completed. ttd.dir("cache").file(DOWNLOADED_FILE_NAME); @@ -274,7 +274,7 @@ async fn create_download_request_with_c8y_auth() -> Result<(), DynError> { spawn_firmware_manager(&mut ttd, DEFAULT_REQUEST_TIMEOUT_SEC, false).await?; let c8y_download_url = format!("http://{C8Y_HOST}/file/end/point"); - let token = "token"; + let auth_header_value = "Bearer token"; // Publish firmware update operation to child device. let c8y_firmware_update_msg = MqttMessage::new( @@ -288,7 +288,9 @@ async fn create_download_request_with_c8y_auth() -> Result<(), DynError> { assert!(jwt_request.is_some()); // Return JWT token. - jwt_message_box.send(Ok(token.to_string())).await?; + let mut headers = HeaderMap::new(); + headers.insert("Authorization", auth_header_value.parse().unwrap()); + jwt_message_box.send(Ok(headers.clone())).await?; // Assert firmware download request. let (_id, download_request) = downloader_message_box.recv().await.unwrap(); @@ -297,10 +299,7 @@ async fn create_download_request_with_c8y_auth() -> Result<(), DynError> { download_request.file_path, ttd.path().join("cache").join(digest(c8y_download_url)) ); - assert_eq!( - download_request.auth, - Some(Auth::Bearer(String::from(token))) - ); + assert_eq!(download_request.headers, headers); Ok(()) } @@ -622,7 +621,7 @@ async fn spawn_firmware_manager( ( JoinHandle>, TimedMessageBox>, - TimedMessageBox>, + TimedMessageBox>, TimedMessageBox>, ), DynError, @@ -649,7 +648,8 @@ async fn spawn_firmware_manager( let mut mqtt_builder: SimpleMessageBoxBuilder = SimpleMessageBoxBuilder::new("MQTT", 5); - let mut jwt_builder: FakeServerBoxBuilder = FakeServerBox::builder(); + let mut jwt_builder: FakeServerBoxBuilder = + FakeServerBox::builder(); let mut downloader_builder: FakeServerBoxBuilder = FakeServerBox::builder(); diff --git a/crates/extensions/c8y_firmware_manager/src/worker.rs b/crates/extensions/c8y_firmware_manager/src/worker.rs index b70f2129e91..f9999763e7f 100644 --- a/crates/extensions/c8y_firmware_manager/src/worker.rs +++ b/crates/extensions/c8y_firmware_manager/src/worker.rs @@ -11,7 +11,7 @@ use c8y_api::smartrest::smartrest_serializer::set_operation_executing_with_name; use c8y_api::smartrest::smartrest_serializer::succeed_operation_with_name_no_parameters; use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations; use c8y_api::smartrest::topic::C8yTopic; -use c8y_http_proxy::credentials::JwtRetriever; +use c8y_http_proxy::credentials::HttpHeaderRetriever; use camino::Utf8PathBuf; use log::error; use log::info; @@ -26,7 +26,6 @@ use tedge_actors::ClientMessageBox; use tedge_actors::CloneSender; use tedge_actors::DynSender; use tedge_actors::Sender; -use tedge_api::Auth; use tedge_api::OperationStatus; use tedge_downloader_ext::DownloadRequest; use tedge_downloader_ext::DownloadResult; @@ -53,7 +52,7 @@ pub(crate) struct FirmwareManagerWorker { pub(crate) config: Arc, executing: bool, mqtt_publisher: DynSender, - jwt_retriever: JwtRetriever, + header_retriever: HttpHeaderRetriever, download_sender: ClientMessageBox, progress_sender: DynSender, } @@ -64,7 +63,7 @@ impl Clone for FirmwareManagerWorker { config: self.config.clone(), executing: false, mqtt_publisher: self.mqtt_publisher.sender_clone(), - jwt_retriever: self.jwt_retriever.clone(), + header_retriever: self.header_retriever.clone(), download_sender: self.download_sender.clone(), progress_sender: self.progress_sender.sender_clone(), } @@ -75,7 +74,7 @@ impl FirmwareManagerWorker { pub(crate) fn new( config: FirmwareManagerConfig, mqtt_publisher: DynSender, - jwt_retriever: JwtRetriever, + header_retriever: HttpHeaderRetriever, download_sender: ClientMessageBox, progress_sender: DynSender, ) -> Self { @@ -83,7 +82,7 @@ impl FirmwareManagerWorker { config: Arc::new(config), executing: false, mqtt_publisher, - jwt_retriever, + header_retriever, download_sender, progress_sender, } @@ -206,9 +205,9 @@ impl FirmwareManagerWorker { .maybe_tenant_url(firmware_url) .is_some() { - if let Ok(token) = self.jwt_retriever.await_response(()).await? { + if let Ok(header_map) = self.header_retriever.await_response(()).await? { DownloadRequest::new(firmware_url, cache_file_path.as_std_path()) - .with_auth(Auth::new_bearer(&token)) + .with_headers(header_map) } else { return Err(FirmwareManagementError::NoJwtToken); } diff --git a/crates/extensions/c8y_http_proxy/Cargo.toml b/crates/extensions/c8y_http_proxy/Cargo.toml index ca26ba78de6..6a12a63d4f0 100644 --- a/crates/extensions/c8y_http_proxy/Cargo.toml +++ b/crates/extensions/c8y_http_proxy/Cargo.toml @@ -12,6 +12,7 @@ repository = { workspace = true } [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } +base64 = { workspace = true } c8y_api = { workspace = true } certificate = { workspace = true, features = ["reqwest"] } download = { workspace = true } diff --git a/crates/extensions/c8y_http_proxy/src/actor.rs b/crates/extensions/c8y_http_proxy/src/actor.rs index 30a60ceb6bb..6eddadb73c1 100644 --- a/crates/extensions/c8y_http_proxy/src/actor.rs +++ b/crates/extensions/c8y_http_proxy/src/actor.rs @@ -1,18 +1,15 @@ -use crate::credentials::JwtRequest; -use crate::credentials::JwtResult; -use crate::credentials::JwtRetriever; +use crate::credentials::HttpHeaderRequest; +use crate::credentials::HttpHeaderResult; +use crate::credentials::HttpHeaderRetriever; use crate::messages::C8YConnectionError; use crate::messages::C8YRestError; use crate::messages::C8YRestRequest; use crate::messages::C8YRestResult; use crate::messages::CreateEvent; -use crate::messages::DownloadFile; use crate::messages::EventId; use crate::messages::SoftwareListResponse; use crate::messages::Unit; -use crate::messages::UploadFile; use crate::messages::UploadLogBinary; -use crate::messages::Url; use crate::C8YHttpConfig; use anyhow::Context; use async_trait::async_trait; @@ -22,11 +19,7 @@ use c8y_api::json_c8y::C8yEventResponse; use c8y_api::json_c8y::C8yManagedObject; use c8y_api::json_c8y::InternalIdResponse; use c8y_api::OffsetDateTime; -use download::Auth; -use download::DownloadInfo; -use download::Downloader; use http::status::StatusCode; -use log::debug; use log::error; use log::info; use std::collections::HashMap; @@ -63,14 +56,14 @@ pub struct C8YHttpProxyMessageBox { /// Connection to an HTTP actor pub(crate) http: ClientMessageBox, - /// Connection to a JWT token retriever - pub(crate) jwt: JwtRetriever, + /// Connection to an HTTP header value retriever + pub(crate) header_retriever: HttpHeaderRetriever, } pub type C8YRestRequestEnvelope = RequestEnvelope; -fan_in_message_type!(C8YHttpProxyInput[C8YRestRequestEnvelope, HttpResult, JwtResult] : Debug); -fan_in_message_type!(C8YHttpProxyOutput[HttpRequest, JwtRequest] : Debug); +fan_in_message_type!(C8YHttpProxyInput[C8YRestRequestEnvelope, HttpResult, HttpHeaderResult] : Debug); +fan_in_message_type!(C8YHttpProxyOutput[HttpRequest, HttpHeaderRequest] : Debug); #[async_trait] impl Actor for C8YHttpProxyActor { @@ -87,18 +80,6 @@ impl Actor for C8YHttpProxyActor { }) = self.peers.clients.recv().await { let result = match request { - C8YRestRequest::GetJwtToken(_) => self - .get_and_set_jwt_token() - .await - .map(|response| response.into()), - - C8YRestRequest::GetFreshJwtToken(_) => { - self.end_point.token = None; - self.get_and_set_jwt_token() - .await - .map(|response| response.into()) - } - C8YRestRequest::CreateEvent(request) => self .create_event(request) .await @@ -113,15 +94,6 @@ impl Actor for C8YHttpProxyActor { .upload_log_binary(request) .await .map(|response| response.into()), - - C8YRestRequest::UploadFile(request) => { - self.upload_file(request).await.map(|url| url.into()) - } - - C8YRestRequest::DownloadFile(request) => self - .download_file(request) - .await - .map(|response| response.into()), }; reply_to.send(result).await?; } @@ -198,16 +170,14 @@ impl C8YHttpProxyActor { device_id: String, ) -> Result { let url_get_id: String = self.end_point.get_url_for_internal_id(&device_id); - if self.end_point.token.is_none() { - self.get_fresh_token().await?; - } + self.refresh_headers().await?; let mut attempt = 0; let mut token_refreshed = false; loop { attempt += 1; let request = HttpRequestBuilder::get(&url_get_id) - .bearer_auth(self.end_point.token.clone().unwrap_or_default()) + .headers(&self.end_point.headers) .build()?; let endpoint = request.uri().path().to_owned(); let method = request.method().to_owned(); @@ -236,7 +206,7 @@ impl C8YHttpProxyActor { response.error_for_status()?; } info!("Re-fetching internal id for {device_id} with fresh token"); - self.get_fresh_token().await?; + self.refresh_headers().await?; token_refreshed = true; continue; } @@ -265,7 +235,7 @@ impl C8YHttpProxyActor { let request_builder = build_request(&self.end_point); let request = request_builder .await? - .bearer_auth(self.end_point.token.clone().unwrap_or_default()) + .headers(&self.end_point.headers) .build()?; let endpoint = request.uri().path().to_owned(); let method = request.method().to_owned(); @@ -294,11 +264,6 @@ impl C8YHttpProxyActor { } } - async fn get_fresh_token(&mut self) -> Result { - self.end_point.token = None; - self.get_and_set_jwt_token().await - } - async fn try_request_with_fresh_token< Fut: Future>, >( @@ -306,12 +271,12 @@ impl C8YHttpProxyActor { build_request: impl Fn(&C8yEndPoint) -> Fut, ) -> Result { // get new token not the cached one - self.get_fresh_token().await?; + self.refresh_headers().await?; // build the request let request_builder = build_request(&self.end_point); let request = request_builder .await? - .bearer_auth(self.end_point.token.clone().unwrap_or_default()) + .headers(&self.end_point.headers) .build()?; // retry the request Ok(self.peers.http.await_response(request).await?) @@ -330,7 +295,7 @@ impl C8YHttpProxyActor { let request_builder = build_request(&self.end_point); let request = request_builder .await? - .bearer_auth(self.end_point.token.clone().unwrap_or_default()) + .headers(&self.end_point.headers) .build()?; Ok(self.peers.http.await_response(request).await?) } @@ -428,101 +393,22 @@ impl C8YHttpProxyActor { } } - async fn upload_file(&mut self, request: UploadFile) -> Result { - let device_id = request.device_id; - - let create_event = |internal_id: String| -> C8yCreateEvent { - C8yCreateEvent { - source: Some(C8yManagedObject { id: internal_id }), - event_type: request.file_type.clone(), - time: OffsetDateTime::now_utc(), - text: request.file_type.clone(), - extras: HashMap::new(), - } - }; - - let event_response_id = self - .send_event_internal(device_id.clone(), create_event) - .await?; - debug!(target: self.name(), "File event created with id: {:?}", event_response_id); - - let build_request = |end_point: &C8yEndPoint| { - let binary_upload_event_url = - end_point.get_url_for_event_binary_upload(&event_response_id); - - async { - // TODO: Upload the file as a multi-part stream - let file_content = - tokio::fs::read(&request.file_path).await.with_context(|| { - format!( - "Reading file {} for upload failed", - request.file_path.display() - ) - })?; - - let (content_type, body) = match String::from_utf8(file_content) { - Ok(text) => ("text/plain", hyper::Body::from(text)), - Err(not_text) => ("application/octet-stream", not_text.into_bytes().into()), - }; - - Ok::<_, C8YRestError>( - HttpRequestBuilder::post(binary_upload_event_url) - .header("Accept", "application/json") - .header("Content-Type", content_type) - .body(body), - ) - } - }; - info!(target: self.name(), "Uploading file to URL: {}", self.end_point - .get_url_for_event_binary_upload(&event_response_id)); - let http_result = self.execute(device_id.clone(), build_request).await??; - - if !http_result.status().is_success() { - Err(C8YRestError::CustomError("Upload failed".into())) - } else { - Ok(Url(self - .end_point - .get_url_for_event_binary_upload(&event_response_id))) - } - } - - async fn get_and_set_jwt_token(&mut self) -> Result { - match self.end_point.token.clone() { - Some(token) => Ok(token), - None => { - if let Ok(token) = self.peers.jwt.await_response(()).await? { - self.end_point.token = Some(token.clone()); - Ok(token) - } else { - Err(C8YRestError::CustomError("JWT token not available".into())) + /// Update HTTP headers with the ones retried from the HttpHeaderRetriever actor + async fn refresh_headers(&mut self) -> Result<(), C8YRestError> { + match self.peers.header_retriever.await_response(()).await? { + Ok(headers) => { + self.end_point.headers.clear(); + for (key, value) in headers { + self.end_point.headers.insert(key.unwrap(), value); } + Ok(()) } + Err(err) => Err(C8YRestError::CustomError(format!( + "Failed to retrieve headers with reason {err}" + ))), } } - async fn download_file(&mut self, request: DownloadFile) -> Result { - let mut download_info: DownloadInfo = request.download_url.as_str().into(); - // If the provided url is c8y, add auth - if self - .end_point - .maybe_tenant_url(download_info.url()) - .is_some() - { - let token = self.get_and_set_jwt_token().await?; - download_info.auth = Some(Auth::new_bearer(token.as_str())); - } - - info!(target: self.name(), "Downloading from: {:?}", download_info.url()); - let downloader: Downloader = Downloader::new( - request.file_path, - self.config.identity.clone(), - self.config.cloud_root_certs.clone(), - ); - downloader.download(&download_info).await?; - - Ok(()) - } - async fn send_event_internal( &mut self, device_id: String, diff --git a/crates/extensions/c8y_http_proxy/src/credentials.rs b/crates/extensions/c8y_http_proxy/src/credentials.rs index f12bbb5abde..3cc031c8107 100644 --- a/crates/extensions/c8y_http_proxy/src/credentials.rs +++ b/crates/extensions/c8y_http_proxy/src/credentials.rs @@ -1,6 +1,7 @@ use async_trait::async_trait; use c8y_api::http_proxy::C8yMqttJwtTokenRetriever; -use c8y_api::http_proxy::JwtError; +use http::header::AUTHORIZATION; +use http::HeaderMap; use tedge_actors::ClientMessageBox; use tedge_actors::Sequential; use tedge_actors::Server; @@ -8,13 +9,13 @@ use tedge_actors::ServerActorBuilder; use tedge_actors::ServerConfig; use tedge_config::TopicPrefix; -pub type JwtRequest = (); -pub type JwtResult = Result; +pub type HttpHeaderRequest = (); +pub type HttpHeaderResult = Result; -/// Retrieves JWT tokens authenticating the device -pub type JwtRetriever = ClientMessageBox; +/// Retrieves HTTP headers +pub type HttpHeaderRetriever = ClientMessageBox; -/// A JwtRetriever that gets JWT tokens from C8Y over MQTT +/// A JwtRetriever that gets JWT tokens from C8Y over MQTT and returns authorization header pub struct C8YJwtRetriever { mqtt_retriever: C8yMqttJwtTokenRetriever, } @@ -32,19 +33,75 @@ impl C8YJwtRetriever { #[async_trait] impl Server for C8YJwtRetriever { - type Request = JwtRequest; - type Response = JwtResult; + type Request = HttpHeaderRequest; + type Response = HttpHeaderResult; fn name(&self) -> &str { "C8YJwtRetriever" } async fn handle(&mut self, _request: Self::Request) -> Self::Response { + let mut heeader_map = HeaderMap::new(); let response = self.mqtt_retriever.get_jwt_token().await?; - Ok(response.token()) + heeader_map.insert( + AUTHORIZATION, + format!("Bearer {}", response.token()).parse()?, + ); + Ok(heeader_map) } } +/// Return base64 encoded Basic Auth header +pub struct C8YBasicAuthRetriever { + username: String, + password: String, +} + +impl C8YBasicAuthRetriever { + pub fn builder( + username: &str, + password: &str, + ) -> ServerActorBuilder { + let server = C8YBasicAuthRetriever { + username: username.into(), + password: password.into(), + }; + ServerActorBuilder::new(server, &ServerConfig::default(), Sequential) + } +} + +#[async_trait] +impl Server for C8YBasicAuthRetriever { + type Request = HttpHeaderRequest; + type Response = HttpHeaderResult; + + fn name(&self) -> &str { + "C8YBasicAuthRetriever" + } + + async fn handle(&mut self, _request: Self::Request) -> Self::Response { + let mut header_map = HeaderMap::new(); + header_map.insert( + AUTHORIZATION, + format!( + "Basic {}", + base64::encode(format!("{}:{}", self.username, self.password)) + ) + .parse()?, + ); + Ok(header_map) + } +} + +#[derive(thiserror::Error, Debug)] +pub enum HttpHeaderError { + #[error(transparent)] + JwtError(#[from] c8y_api::http_proxy::JwtError), + + #[error(transparent)] + InvalidHeaderValue(#[from] http::header::InvalidHeaderValue), +} + /// A JwtRetriever that simply always returns the same JWT token (possibly none) #[cfg(test)] pub(crate) struct ConstJwtRetriever { @@ -54,14 +111,16 @@ pub(crate) struct ConstJwtRetriever { #[async_trait] #[cfg(test)] impl Server for ConstJwtRetriever { - type Request = JwtRequest; - type Response = JwtResult; + type Request = HttpHeaderRequest; + type Response = HttpHeaderResult; fn name(&self) -> &str { "ConstJwtRetriever" } async fn handle(&mut self, _request: Self::Request) -> Self::Response { - Ok(self.token.clone()) + let mut header_map = HeaderMap::new(); + header_map.insert(AUTHORIZATION, format!("Bearer {}", self.token).parse()?); + Ok(header_map) } } diff --git a/crates/extensions/c8y_http_proxy/src/handle.rs b/crates/extensions/c8y_http_proxy/src/handle.rs index 6fec1e4bbed..b853e75ffa4 100644 --- a/crates/extensions/c8y_http_proxy/src/handle.rs +++ b/crates/extensions/c8y_http_proxy/src/handle.rs @@ -3,19 +3,12 @@ use crate::messages::C8YRestRequest; use crate::messages::C8YRestResponse; use crate::messages::C8YRestResult; use crate::messages::CreateEvent; -use crate::messages::GetFreshJwtToken; -use crate::messages::GetJwtToken; use crate::messages::SoftwareListResponse; -use crate::messages::UploadFile; use crate::messages::UploadLogBinary; use c8y_api::json_c8y::C8yUpdateSoftwareListResponse; -use std::path::Path; -use std::path::PathBuf; use tedge_actors::ClientMessageBox; use tedge_actors::Service; -use super::messages::DownloadFile; - /// Handle to the C8YHttpProxy #[derive(Clone)] pub struct C8YHttpProxy { @@ -28,24 +21,6 @@ impl C8YHttpProxy { C8YHttpProxy { c8y } } - pub async fn get_jwt_token(&mut self) -> Result { - let request: C8YRestRequest = GetJwtToken.into(); - - match self.c8y.await_response(request).await? { - Ok(C8YRestResponse::EventId(id)) => Ok(id), - unexpected => Err(unexpected.into()), - } - } - - pub async fn get_fresh_jwt_token(&mut self) -> Result { - let request: C8YRestRequest = GetFreshJwtToken.into(); - - match self.c8y.await_response(request).await? { - Ok(C8YRestResponse::EventId(id)) => Ok(id), - unexpected => Err(unexpected.into()), - } - } - pub async fn send_event(&mut self, c8y_event: CreateEvent) -> Result { let request: C8YRestRequest = c8y_event.into(); match self.c8y.await_response(request).await? { @@ -88,38 +63,4 @@ impl C8YHttpProxy { unexpected => Err(unexpected.into()), } } - - pub async fn upload_file( - &mut self, - file_path: &Path, - file_type: &str, - device_id: String, - ) -> Result { - let request: C8YRestRequest = UploadFile { - file_path: file_path.to_owned(), - file_type: file_type.to_string(), - device_id, - } - .into(); - match self.c8y.await_response(request).await? { - Ok(C8YRestResponse::Url(url)) => Ok(url.0), - unexpected => Err(unexpected.into()), - } - } - - pub async fn download_file( - &mut self, - download_url: &str, - file_path: PathBuf, - ) -> Result<(), C8YRestError> { - let request: C8YRestRequest = DownloadFile { - download_url: download_url.into(), - file_path, - } - .into(); - match self.c8y.await_response(request).await? { - Ok(C8YRestResponse::Unit(())) => Ok(()), - unexpected => Err(unexpected.into()), - } - } } diff --git a/crates/extensions/c8y_http_proxy/src/lib.rs b/crates/extensions/c8y_http_proxy/src/lib.rs index 008d948e38f..b4cdec36a85 100644 --- a/crates/extensions/c8y_http_proxy/src/lib.rs +++ b/crates/extensions/c8y_http_proxy/src/lib.rs @@ -1,11 +1,9 @@ use crate::actor::C8YHttpProxyActor; use crate::actor::C8YHttpProxyMessageBox; -use crate::credentials::JwtResult; -use crate::credentials::JwtRetriever; +use crate::credentials::HttpHeaderResult; +use crate::credentials::HttpHeaderRetriever; use crate::messages::C8YRestRequest; use crate::messages::C8YRestResult; -use certificate::CloudRootCerts; -use reqwest::Identity; use std::convert::Infallible; use std::path::PathBuf; use std::time::Duration; @@ -30,6 +28,8 @@ pub mod credentials; pub mod handle; pub mod messages; +pub use http::HeaderMap; + #[cfg(test)] mod tests; @@ -40,8 +40,6 @@ pub struct C8YHttpConfig { pub c8y_mqtt_host: String, pub device_id: String, pub tmp_dir: PathBuf, - identity: Option, - cloud_root_certs: CloudRootCerts, retry_interval: Duration, } @@ -64,8 +62,6 @@ impl C8YHttpConfig { .to_string(); let device_id = tedge_config.device.id.try_read(tedge_config)?.to_string(); let tmp_dir = tedge_config.tmp.path.as_std_path().to_path_buf(); - let identity = tedge_config.http.client.auth.identity()?; - let cloud_root_certs = tedge_config.cloud_root_certs(); let retry_interval = Duration::from_secs(5); Ok(Self { @@ -73,8 +69,6 @@ impl C8YHttpConfig { c8y_mqtt_host, device_id, tmp_dir, - identity, - cloud_root_certs, retry_interval, }) } @@ -107,24 +101,24 @@ pub struct C8YHttpProxyBuilder { /// Connection to an HTTP actor http: ClientMessageBox, - /// Connection to a JWT token retriever - jwt: JwtRetriever, + /// Connection to an HTTP header value retriever + header_retriever: HttpHeaderRetriever, } impl C8YHttpProxyBuilder { pub fn new( config: C8YHttpConfig, http: &mut impl Service, - jwt: &mut impl Service<(), JwtResult>, + header_retriever: &mut impl Service<(), HttpHeaderResult>, ) -> Self { let clients = ServerMessageBoxBuilder::new("C8Y-REST", 10); let http = ClientMessageBox::new(http); - let jwt = JwtRetriever::new(jwt); + let header_retriever = HttpHeaderRetriever::new(header_retriever); C8YHttpProxyBuilder { config, clients, http, - jwt, + header_retriever, } } } @@ -140,7 +134,7 @@ impl Builder for C8YHttpProxyBuilder { let message_box = C8YHttpProxyMessageBox { clients: self.clients.build(), http: self.http, - jwt: self.jwt, + header_retriever: self.header_retriever, }; C8YHttpProxyActor::new(self.config, message_box) diff --git a/crates/extensions/c8y_http_proxy/src/messages.rs b/crates/extensions/c8y_http_proxy/src/messages.rs index 2e9fe61c30e..7568220b33e 100644 --- a/crates/extensions/c8y_http_proxy/src/messages.rs +++ b/crates/extensions/c8y_http_proxy/src/messages.rs @@ -6,7 +6,7 @@ use tedge_actors::fan_in_message_type; use tedge_actors::ChannelError; use tedge_http_ext::HttpError; -fan_in_message_type!(C8YRestRequest[GetJwtToken, GetFreshJwtToken, CreateEvent, SoftwareListResponse, UploadLogBinary, UploadFile, DownloadFile]: Debug, PartialEq, Eq); +fan_in_message_type!(C8YRestRequest[CreateEvent, SoftwareListResponse, UploadLogBinary]: Debug, PartialEq, Eq); //HIPPO Rename EventId to String as there could be many other String responses as well and this macro doesn't allow another String variant fan_in_message_type!(C8YRestResponse[EventId, Url, Unit]: Debug); diff --git a/crates/extensions/c8y_http_proxy/src/tests.rs b/crates/extensions/c8y_http_proxy/src/tests.rs index a1a6fa5f56c..0c0c8d6818c 100644 --- a/crates/extensions/c8y_http_proxy/src/tests.rs +++ b/crates/extensions/c8y_http_proxy/src/tests.rs @@ -1,6 +1,6 @@ use crate::credentials::ConstJwtRetriever; -use crate::credentials::JwtRequest; -use crate::credentials::JwtResult; +use crate::credentials::HttpHeaderRequest; +use crate::credentials::HttpHeaderResult; use crate::handle::C8YHttpProxy; use crate::messages::CreateEvent; use crate::C8YHttpConfig; @@ -9,7 +9,8 @@ use async_trait::async_trait; use c8y_api::json_c8y::C8yEventResponse; use c8y_api::json_c8y::C8yUpdateSoftwareListResponse; use c8y_api::json_c8y::InternalIdResponse; -use certificate::CloudRootCerts; +use http::header::AUTHORIZATION; +use http::HeaderMap; use http::StatusCode; use mockito::Matcher; use std::collections::HashMap; @@ -32,23 +33,25 @@ use tedge_http_ext::HttpResult; use tedge_test_utils::fs::TempTedgeDir; use time::macros::datetime; +const JWT_TOKEN: &str = "JWT token"; +const BEARER_AUTH: &str = "Bearer JWT token"; + #[tokio::test] async fn c8y_http_proxy_requests_the_device_internal_id_on_start() { let c8y_host = "c8y.tenant.io"; let device_id = "device-001"; - let token = "some JWT token"; let external_id = "external-device-001"; let tmp_dir = "/tmp"; let (mut proxy, mut c8y) = - spawn_c8y_http_proxy(c8y_host.into(), device_id.into(), tmp_dir.into(), token).await; + spawn_c8y_http_proxy(c8y_host.into(), device_id.into(), tmp_dir.into(), JWT_TOKEN).await; // Even before any request is sent to the c8y_proxy // the proxy requests over HTTP the internal device id. let init_request = HttpRequestBuilder::get(format!( "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" )) - .bearer_auth(token) + .headers(&get_test_auth_header(BEARER_AUTH)) .build() .unwrap(); assert_recv(&mut c8y, Some(init_request)).await; @@ -76,7 +79,7 @@ async fn c8y_http_proxy_requests_the_device_internal_id_on_start() { &mut c8y, Some( HttpRequestBuilder::post(format!("https://{c8y_host}/event/events/")) - .bearer_auth(token) + .headers(&get_test_auth_header(BEARER_AUTH)) .header("content-type", "application/json") .header("accept", "application/json") .build() @@ -90,19 +93,18 @@ async fn c8y_http_proxy_requests_the_device_internal_id_on_start() { async fn retry_internal_id_on_expired_jwt() { let c8y_host = "c8y.tenant.io"; let device_id = "device-001"; - let token = "JWT token"; let external_id = "external-device-001"; let tmp_dir = "/tmp"; let (mut proxy, mut c8y) = - spawn_c8y_http_proxy(c8y_host.into(), device_id.into(), tmp_dir.into(), token).await; + spawn_c8y_http_proxy(c8y_host.into(), device_id.into(), tmp_dir.into(), JWT_TOKEN).await; // Even before any request is sent to the c8y_proxy // the proxy requests over HTTP the internal device id. let init_request = HttpRequestBuilder::get(format!( "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" )) - .bearer_auth(token) + .headers(&get_test_auth_header(BEARER_AUTH)) .build() .unwrap(); assert_recv(&mut c8y, Some(init_request)).await; @@ -116,7 +118,7 @@ async fn retry_internal_id_on_expired_jwt() { HttpRequestBuilder::get(format!( "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" )) - .bearer_auth(token) + .headers(&get_test_auth_header(BEARER_AUTH)) .build() .unwrap(), ), @@ -145,7 +147,7 @@ async fn retry_internal_id_on_expired_jwt() { &mut c8y, Some( HttpRequestBuilder::post(format!("https://{c8y_host}/event/events/")) - .bearer_auth(token) + .headers(&get_test_auth_header(BEARER_AUTH)) .header("content-type", "application/json") .header("accept", "application/json") .build() @@ -159,7 +161,6 @@ async fn retry_internal_id_on_expired_jwt() { async fn retry_get_internal_id_when_not_found() { let c8y_host = "c8y.tenant.io"; let main_device_id = "device-001"; - let token = "JWT token"; let tmp_dir = "/tmp"; let child_device_id = "child-101"; @@ -167,7 +168,7 @@ async fn retry_get_internal_id_when_not_found() { c8y_host.into(), main_device_id.into(), tmp_dir.into(), - token, + JWT_TOKEN, ) .await; @@ -177,7 +178,7 @@ async fn retry_get_internal_id_when_not_found() { let get_internal_id_url = format!("https://{c8y_host}/identity/externalIds/c8y_Serial/{main_device_id}"); let init_request = HttpRequestBuilder::get(get_internal_id_url) - .bearer_auth(token) + .headers(&get_test_auth_header(BEARER_AUTH)) .build() .unwrap(); assert_recv(&mut c8y, Some(init_request)).await; @@ -197,7 +198,7 @@ async fn retry_get_internal_id_when_not_found() { &mut c8y, Some( HttpRequestBuilder::get(&get_internal_id_url) - .bearer_auth(token) + .headers(&get_test_auth_header(BEARER_AUTH)) .build() .unwrap(), ), @@ -215,7 +216,7 @@ async fn retry_get_internal_id_when_not_found() { &mut c8y, Some( HttpRequestBuilder::get(&get_internal_id_url) - .bearer_auth(token) + .headers(&get_test_auth_header(BEARER_AUTH)) .build() .unwrap(), ), @@ -236,7 +237,7 @@ async fn retry_get_internal_id_when_not_found() { HttpRequestBuilder::put(format!("https://{c8y_host}/inventory/managedObjects/200")) .header("content-type", "application/json") .header("accept", "application/json") - .bearer_auth(token) + .headers(&get_test_auth_header(BEARER_AUTH)) .json(&c8y_software_list) .build() .unwrap(), @@ -260,7 +261,6 @@ async fn retry_get_internal_id_when_not_found() { async fn get_internal_id_retry_fails_after_exceeding_attempts_threshold() { let c8y_host = "c8y.tenant.io"; let main_device_id = "device-001"; - let token = "JWT token"; let tmp_dir = "/tmp"; let child_device_id = "child-101"; @@ -268,7 +268,7 @@ async fn get_internal_id_retry_fails_after_exceeding_attempts_threshold() { c8y_host.into(), main_device_id.into(), tmp_dir.into(), - token, + JWT_TOKEN, ) .await; @@ -278,7 +278,7 @@ async fn get_internal_id_retry_fails_after_exceeding_attempts_threshold() { let get_internal_id_url = format!("https://{c8y_host}/identity/externalIds/c8y_Serial/{main_device_id}"); let init_request = HttpRequestBuilder::get(get_internal_id_url) - .bearer_auth(token) + .headers(&get_test_auth_header(BEARER_AUTH)) .build() .unwrap(); assert_recv(&mut c8y, Some(init_request)).await; @@ -298,7 +298,7 @@ async fn get_internal_id_retry_fails_after_exceeding_attempts_threshold() { &mut c8y, Some( HttpRequestBuilder::get(&get_internal_id_url) - .bearer_auth(token) + .headers(&get_test_auth_header(BEARER_AUTH)) .build() .unwrap(), ), @@ -352,7 +352,7 @@ async fn retry_internal_id_on_expired_jwt_with_mock() { .create(); let target_url = server.url(); - let mut jwt = ServerMessageBoxBuilder::new("JWT Actor", 16); + let mut auth = ServerMessageBoxBuilder::new("Auth Actor", 16); let ttd = TempTedgeDir::new(); let config_loc = TEdgeConfigLocation::from_custom_root(ttd.path()); @@ -364,12 +364,10 @@ async fn retry_internal_id_on_expired_jwt_with_mock() { c8y_mqtt_host: target_url.clone(), device_id: external_id.into(), tmp_dir: tmp_dir.into(), - identity: None, - cloud_root_certs: CloudRootCerts::from([]), retry_interval: Duration::from_millis(100), }; - let c8y_proxy_actor = C8YHttpProxyBuilder::new(config, &mut http_actor, &mut jwt); - let jwt_actor = ServerActor::new(DynamicJwtRetriever { count: 0 }, jwt.build()); + let c8y_proxy_actor = C8YHttpProxyBuilder::new(config, &mut http_actor, &mut auth); + let jwt_actor = ServerActor::new(DynamicJwtRetriever { count: 0 }, auth.build()); tokio::spawn(async move { http_actor.run().await }); tokio::spawn(async move { jwt_actor.run().await }); @@ -433,8 +431,6 @@ async fn retry_create_event_on_expired_jwt_with_mock() { c8y_mqtt_host: target_url.clone(), device_id: external_id.into(), tmp_dir: tmp_dir.into(), - identity: None, - cloud_root_certs: CloudRootCerts::from([]), retry_interval: Duration::from_millis(100), }; let c8y_proxy_actor = C8YHttpProxyBuilder::new(config, &mut http_actor, &mut jwt); @@ -448,7 +444,8 @@ async fn retry_create_event_on_expired_jwt_with_mock() { proxy .end_point .set_internal_id(external_id.into(), internal_id.into()); - proxy.end_point.token = Some("Cached JWT Token".into()); + let headers = get_test_auth_header("Bearer Cached JWT Token"); + proxy.end_point.headers = headers; let result = proxy.create_event(event).await; assert_eq!(event_id, result.unwrap()); @@ -458,19 +455,18 @@ async fn retry_create_event_on_expired_jwt_with_mock() { async fn retry_software_list_once_with_fresh_internal_id() { let c8y_host = "c8y.tenant.io"; let device_id = "device-001"; - let token = "JWT token"; let external_id = "external-device-001"; let tmp_dir = "/tmp"; let (mut proxy, mut c8y) = - spawn_c8y_http_proxy(c8y_host.into(), device_id.into(), tmp_dir.into(), token).await; + spawn_c8y_http_proxy(c8y_host.into(), device_id.into(), tmp_dir.into(), JWT_TOKEN).await; // Even before any request is sent to the c8y_proxy // the proxy requests over HTTP the internal device id. let _init_request = HttpRequestBuilder::get(format!( "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" )) - .bearer_auth(token) + .headers(&get_test_auth_header(BEARER_AUTH)) .build() .unwrap(); // skip the message @@ -505,7 +501,7 @@ async fn retry_software_list_once_with_fresh_internal_id() { )) .header("content-type", "application/json") .header("accept", "application/json") - .bearer_auth(token) + .headers(&get_test_auth_header(BEARER_AUTH)) .json(&c8y_software_list) .build() .unwrap(), @@ -528,7 +524,7 @@ async fn retry_software_list_once_with_fresh_internal_id() { HttpRequestBuilder::get(format!( "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" )) - .bearer_auth(token) + .headers(&get_test_auth_header(BEARER_AUTH)) .build() .unwrap(), ), @@ -551,7 +547,7 @@ async fn retry_software_list_once_with_fresh_internal_id() { HttpRequestBuilder::put(format!( "https://{c8y_host}/inventory/managedObjects/{device_id}" )) - .bearer_auth(token) + .headers(&get_test_auth_header(BEARER_AUTH)) .header("content-type", "application/json") .header("accept", "application/json") .json(&c8y_software_list) @@ -566,19 +562,18 @@ async fn retry_software_list_once_with_fresh_internal_id() { async fn auto_retry_upload_log_binary_when_internal_id_expires() { let c8y_host = "c8y.tenant.io"; let device_id = "device-001"; - let token = "JWT token"; let external_id = "external-device-001"; let tmp_dir = "/tmp"; let (mut proxy, mut c8y) = - spawn_c8y_http_proxy(c8y_host.into(), device_id.into(), tmp_dir.into(), token).await; + spawn_c8y_http_proxy(c8y_host.into(), device_id.into(), tmp_dir.into(), JWT_TOKEN).await; // Even before any request is sent to the c8y_proxy // the proxy requests over HTTP the internal device id. let init_request = HttpRequestBuilder::get(format!( "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" )) - .bearer_auth(token) + .headers(&get_test_auth_header(BEARER_AUTH)) .build() .unwrap(); assert_recv(&mut c8y, Some(init_request)).await; @@ -603,7 +598,7 @@ async fn auto_retry_upload_log_binary_when_internal_id_expires() { &mut c8y, Some( HttpRequestBuilder::post(format!("https://{c8y_host}/event/events/")) - .bearer_auth(token) + .headers(&get_test_auth_header(BEARER_AUTH)) .header("content-type", "application/json") .header("accept", "application/json") .build() @@ -627,7 +622,7 @@ async fn auto_retry_upload_log_binary_when_internal_id_expires() { HttpRequestBuilder::get(format!( "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" )) - .bearer_auth(token) + .headers(&get_test_auth_header(BEARER_AUTH)) .build() .unwrap(), ), @@ -646,7 +641,7 @@ async fn auto_retry_upload_log_binary_when_internal_id_expires() { &mut c8y, Some( HttpRequestBuilder::post(format!("https://{c8y_host}/event/events/")) - .bearer_auth(token) + .headers(&get_test_auth_header(BEARER_AUTH)) .header("content-type", "application/json") .header("accept", "application/json") .build() @@ -678,8 +673,6 @@ async fn spawn_c8y_http_proxy( c8y_mqtt_host: c8y_host, device_id, tmp_dir, - identity: None, - cloud_root_certs: CloudRootCerts::from([]), retry_interval: Duration::from_millis(10), }; let mut c8y_proxy_actor = C8YHttpProxyBuilder::new(config, &mut http, &mut jwt); @@ -707,23 +700,31 @@ pub(crate) struct DynamicJwtRetriever { #[async_trait] impl Server for DynamicJwtRetriever { - type Request = JwtRequest; - type Response = JwtResult; + type Request = HttpHeaderRequest; + type Response = HttpHeaderResult; fn name(&self) -> &str { "DynamicJwtRetriever" } async fn handle(&mut self, _request: Self::Request) -> Self::Response { + let mut headers = HeaderMap::new(); if self.count == 0 { self.count += 1; - Ok("Cached JWT token".into()) + headers.insert(AUTHORIZATION, "Bearer Cached JWT token".parse().unwrap()); } else { - Ok("Fresh JWT token".into()) + headers.insert(AUTHORIZATION, "Bearer Fresh JWT token".parse().unwrap()); } + Ok(headers) } } +fn get_test_auth_header(value: &str) -> HeaderMap { + let mut headers = HeaderMap::new(); + headers.insert(AUTHORIZATION, value.parse().unwrap()); + headers +} + async fn assert_recv( from: &mut FakeServerBox, expected: Option, diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index a99124a7384..b298be87c7f 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -22,7 +22,6 @@ use std::fs::File; use std::io::Read; use std::path::Path; use std::time::Duration; -use std::time::SystemTime; use tedge_actors::test_helpers::FakeServerBox; use tedge_actors::test_helpers::FakeServerBoxBuilder; use tedge_actors::test_helpers::MessageReceiverExt; @@ -3055,21 +3054,6 @@ pub(crate) fn spawn_dummy_c8y_http_proxy(mut http: FakeServerBox { - let _ = http - .send(Ok(c8y_http_proxy::messages::C8YRestResponse::EventId( - "dummy-token".into(), - ))) - .await; - } - Some(C8YRestRequest::GetFreshJwtToken(_)) => { - let now = SystemTime::now(); - let _ = http - .send(Ok(c8y_http_proxy::messages::C8YRestResponse::EventId( - format!("dummy-token-{:?}", now), - ))) - .await; - } Some(C8YRestRequest::SoftwareListResponse(_)) => { let _ = http .send(Ok(c8y_http_proxy::messages::C8YRestResponse::Unit(()))) diff --git a/crates/extensions/tedge_config_manager/src/tests.rs b/crates/extensions/tedge_config_manager/src/tests.rs index 340b7109f6d..ec943d9f5eb 100644 --- a/crates/extensions/tedge_config_manager/src/tests.rs +++ b/crates/extensions/tedge_config_manager/src/tests.rs @@ -379,7 +379,7 @@ async fn config_manager_download_update() -> Result<(), anyhow::Error> { std::env::temp_dir().join("type_two") ); - assert_eq!(download_request.auth, None); + assert!(download_request.headers.is_empty()); // Simulate downloading a file is completed. std::fs::File::create(&download_request.file_path).unwrap(); diff --git a/crates/extensions/tedge_downloader_ext/src/actor.rs b/crates/extensions/tedge_downloader_ext/src/actor.rs index c4d839ec03a..bfcb2847f32 100644 --- a/crates/extensions/tedge_downloader_ext/src/actor.rs +++ b/crates/extensions/tedge_downloader_ext/src/actor.rs @@ -1,10 +1,10 @@ use async_trait::async_trait; use certificate::CloudRootCerts; -use download::Auth; use download::DownloadError; use download::DownloadInfo; use download::Downloader; use log::info; +use reqwest::header::HeaderMap; use reqwest::Identity; use std::marker::PhantomData; use std::path::Path; @@ -20,7 +20,7 @@ use tedge_utils::file::PermissionEntry; pub struct DownloadRequest { pub url: String, pub file_path: PathBuf, - pub auth: Option, + pub headers: HeaderMap, pub permission: Option, } @@ -29,14 +29,14 @@ impl DownloadRequest { Self { url: url.into(), file_path: file_path.into(), - auth: None, + headers: HeaderMap::new(), permission: None, } } - pub fn with_auth(self, auth: Auth) -> Self { + pub fn with_headers(self, header_map: HeaderMap) -> Self { Self { - auth: Some(auth), + headers: header_map, ..self } } @@ -112,11 +112,7 @@ impl Server for DownloaderActor { async fn handle(&mut self, id_request: Self::Request) -> Self::Response { let (id, request) = id_request; - let download_info = if let Some(auth) = request.auth { - DownloadInfo::new(&request.url).with_auth(auth) - } else { - DownloadInfo::new(&request.url) - }; + let download_info = DownloadInfo::new(&request.url).with_headers(request.headers); let downloader = Downloader::new( request.file_path.clone(), diff --git a/crates/extensions/tedge_downloader_ext/src/tests.rs b/crates/extensions/tedge_downloader_ext/src/tests.rs index 7b2fbebc640..73106ead093 100644 --- a/crates/extensions/tedge_downloader_ext/src/tests.rs +++ b/crates/extensions/tedge_downloader_ext/src/tests.rs @@ -1,6 +1,7 @@ use super::*; use certificate::CloudRootCerts; -use download::Auth; +use reqwest::header::HeaderMap; +use reqwest::header::AUTHORIZATION; use std::time::Duration; use tedge_actors::ClientMessageBox; use tedge_test_utils::fs::TempTedgeDir; @@ -52,8 +53,10 @@ async fn download_with_auth() { let target_path = ttd.path().join("downloaded_file"); let server_url = server.url(); - let download_request = - DownloadRequest::new(&server_url, &target_path).with_auth(Auth::Bearer("token".into())); + + let mut headers = HeaderMap::new(); + headers.append(AUTHORIZATION, "Bearer token".parse().unwrap()); + let download_request = DownloadRequest::new(&server_url, &target_path).with_headers(headers); let mut requester = spawn_downloader_actor().await; diff --git a/crates/extensions/tedge_http_ext/src/messages.rs b/crates/extensions/tedge_http_ext/src/messages.rs index 96c9f9209ab..46a2cf1819b 100644 --- a/crates/extensions/tedge_http_ext/src/messages.rs +++ b/crates/extensions/tedge_http_ext/src/messages.rs @@ -1,6 +1,7 @@ use async_trait::async_trait; use http::header::HeaderName; use http::header::HeaderValue; +use http::HeaderMap; use http::Method; use serde::de::DeserializeOwned; use thiserror::Error; @@ -109,6 +110,15 @@ impl HttpRequestBuilder { } } + /// Add multiple headers at once + pub fn headers(mut self, header_map: &HeaderMap) -> Self { + let request = self.inner.headers_mut().unwrap(); + for (key, value) in header_map { + request.insert(key, value.clone()); + } + self + } + /// Send a JSON body pub fn json(self, json: &T) -> Self { let body = serde_json::to_vec(json) @@ -122,15 +132,6 @@ impl HttpRequestBuilder { let body = Ok(content.into()); HttpRequestBuilder { body, ..self } } - - /// Add bearer authentication (e.g. a JWT token) - pub fn bearer_auth(self, token: T) -> Self - where - T: std::fmt::Display, - { - let header_value = format!("Bearer {}", token); - self.header(http::header::AUTHORIZATION, header_value) - } } #[async_trait]