From 047e24ef0de1b39c761c026e11d8af9a8f74a639 Mon Sep 17 00:00:00 2001 From: Nan Li Date: Sun, 15 Jan 2023 10:34:37 +0800 Subject: [PATCH] storage: Add LocalHttpProxy backend LocalHttpProxy is a storage backend driver to access blobs stored in the local machine through a http proxy server over unix socket. `LocalHttpProxy` uses two API endpoints to access the blobs: - `HEAD /` to get the blob size - `GET /` to read the blob The http proxy server should respect [the `Range` header](https://www.rfc-editor.org/rfc/rfc9110.html#name-range) to compute the offset and length of the blob. Signed-off-by: Nan Li --- Cargo.lock | 2 + Cargo.toml | 2 +- api/src/config.rs | 27 ++ storage/Cargo.toml | 5 +- storage/src/backend/local_http_proxy.rs | 380 ++++++++++++++++++++++++ storage/src/backend/mod.rs | 7 + storage/src/factory.rs | 7 + storage/src/meta/toc.rs | 3 + 8 files changed, 431 insertions(+), 2 deletions(-) create mode 100644 storage/src/backend/local_http_proxy.rs diff --git a/Cargo.lock b/Cargo.lock index 1179b9c6a16..20a95230cb5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1106,6 +1106,8 @@ dependencies = [ "hmac-sha1-compact", "http", "httpdate", + "hyper", + "hyperlocal", "lazy_static", "leaky-bucket", "libc", diff --git a/Cargo.toml b/Cargo.toml index 657b697a8ed..8cbd2ef8376 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,7 +66,7 @@ nydus-rafs = { version = "0.1.0", path = "rafs", features = [ "backend-oss", "backend-s3", ] } -nydus-storage = { version = "0.5.0", path = "storage" } +nydus-storage = { version = "0.5.0", path = "storage", features = ["backend-local-http-proxy"] } nydus-utils = { version = "0.3.0", path = "utils" } nydus-blobfs = { version = "0.1.0", path = "blobfs", features = [ "virtiofs", diff --git a/api/src/config.rs b/api/src/config.rs index a030ee0acf2..4bfccb6b8db 100644 --- a/api/src/config.rs +++ b/api/src/config.rs @@ -238,6 +238,8 @@ pub struct BackendConfigV2 { pub s3: Option, /// Configuration for container registry backend. pub registry: Option, + /// Configuration for local http proxy. + pub local_http_proxy: Option, } impl BackendConfigV2 { @@ -276,6 +278,7 @@ impl BackendConfigV2 { } None => return false, }, + _ => return false, } @@ -325,6 +328,17 @@ impl BackendConfigV2 { .ok_or_else(|| einval!("no configuration information for registry")) } } + + /// Get configuration information for local http proxy + pub fn get_local_http_proxy_config(&self) -> Result<&LocalHttpProxyConfig> { + if &self.backend_type != "local-http-proxy" { + Err(einval!("backend type is not 'local-http-proxy'")) + } else { + self.local_http_proxy + .as_ref() + .ok_or_else(|| einval!("no configuration information for local-http-proxy")) + } + } } /// Configuration information for localfs storage backend. @@ -427,6 +441,14 @@ pub struct S3Config { pub mirrors: Vec, } +/// ContentProxyProxy configuration information to access blobs. +#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)] +pub struct LocalHttpProxyConfig { + pub socket_path: String, + #[serde(default = "default_local_http_proxy_thread_num")] + pub thread_num: usize, +} + /// Container registry configuration information to access blobs. #[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)] pub struct RegistryConfig { @@ -878,6 +900,10 @@ fn default_rafs_mode() -> String { "direct".to_string() } +fn default_local_http_proxy_thread_num() -> usize { + 1 +} + //////////////////////////////////////////////////////////////////////////////////////////////////// // For backward compatibility //////////////////////////////////////////////////////////////////////////////////////////////////// @@ -904,6 +930,7 @@ impl TryFrom<&BackendConfig> for BackendConfigV2 { oss: None, s3: None, registry: None, + local_http_proxy: None, }; match value.backend_type.as_str() { diff --git a/storage/Cargo.toml b/storage/Cargo.toml index 3c4881bb0dd..616c886d158 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -17,6 +17,8 @@ hmac = { version = "0.12.1", optional = true } hmac-sha1-compact = { version = "1.1.1", optional = true } http = { version = "0.2.8", optional = true } httpdate = { version = "1.0", optional = true } +hyper = {version = "0.14.11", optional = true} +hyperlocal = {version = "0.8.0", optional = true} lazy_static = "1.4.0" leaky-bucket = "0.12.1" libc = "0.2" @@ -28,7 +30,7 @@ serde_json = "1.0.53" sha2 = { version = "0.10.2", optional = true } tar = "0.4.38" time = { version = "0.3.14", features = ["formatting"], optional = true } -tokio = { version = "1.19.0", features = ["rt", "rt-multi-thread", "sync", "time"] } +tokio = { version = "1.19.0", features = ["macros", "rt", "rt-multi-thread", "sync", "time"] } url = { version = "2.1.1", optional = true } vm-memory = "0.9" fuse-backend-rs = "0.10" @@ -47,6 +49,7 @@ backend-localfs = [] backend-oss = ["base64", "httpdate", "hmac-sha1-compact", "reqwest", "url"] backend-registry = ["base64", "reqwest", "url"] backend-s3 = ["base64", "hmac", "http", "reqwest", "sha2", "time", "url"] +backend-local-http-proxy = ["hyper", "hyperlocal", "http"] [package.metadata.docs.rs] all-features = true diff --git a/storage/src/backend/local_http_proxy.rs b/storage/src/backend/local_http_proxy.rs new file mode 100644 index 00000000000..96432bd5abc --- /dev/null +++ b/storage/src/backend/local_http_proxy.rs @@ -0,0 +1,380 @@ +// Copyright 2022 Ant Group. All rights reserved. + +// SPDX-License-Identifier: Apache-2.0 + +// ! Storage backend driver to access the blobs on the local machine using a http proxy over unix socket. + +use http::{Method, Request}; +use hyper::{body, Body, Client, Response}; +use hyperlocal::{UnixClientExt, UnixConnector, Uri}; +use nydus_api::LocalHttpProxyConfig; +use nydus_utils::metrics::BackendMetrics; +use tokio::runtime::Runtime; + +use super::{BackendError, BackendResult, BlobBackend, BlobReader}; +use std::{ + fmt, + io::{Error, Result}, + num::ParseIntError, + str::{self}, + sync::Arc, +}; + +#[derive(Debug)] +pub enum LocalHttpProxyError { + /// Failed to parse string to integer. + ParseStringToInteger(ParseIntError), + ParseContentLengthFromHeader(http::header::ToStrError), + /// Failed to get response from the local http server. + Request(hyper::Error), + /// Failed to build the tokio runtime. + BuildTokioRuntime(Error), + /// Failed to build local http request. + BuildHttpRequest(http::Error), + /// Failed to read the response body. + ReadResponseBody(hyper::Error), + /// Failed to copy the buffer. + CopyBuffer(Error), +} + +impl fmt::Display for LocalHttpProxyError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + LocalHttpProxyError::ParseStringToInteger(e) => { + write!(f, "failed to parse string to integer, {}", e) + } + LocalHttpProxyError::ParseContentLengthFromHeader(e) => { + write!(f, "failed to parse content length from header, {}", e) + } + LocalHttpProxyError::Request(e) => write!(f, "failed to get response, {}", e), + LocalHttpProxyError::BuildTokioRuntime(e) => { + write!(f, "failed to build tokio runtime, {}", e) + } + LocalHttpProxyError::BuildHttpRequest(e) => { + write!(f, "failed to build http request, {}", e) + } + LocalHttpProxyError::ReadResponseBody(e) => { + write!(f, "failed to read response body, {}", e) + } + LocalHttpProxyError::CopyBuffer(e) => write!(f, "failed to copy buffer, {}", e), + } + } +} + +impl From for BackendError { + fn from(error: LocalHttpProxyError) -> Self { + BackendError::LocalHttpProxy(error) + } +} + +/// A storage backend driver to access blobs stored in the local machine +/// through a http proxy server over unix socket. +/// +/// `LocalHttpProxy` uses two API endpoints to access the blobs: +/// - `HEAD /` to get the blob size +/// - `GET /` to read the blob +/// +/// The http proxy server should respect [the `Range` header](https://www.rfc-editor.org/rfc/rfc9110.html#name-range) to compute the offset and length of the blob. +pub struct LocalHttpProxy { + socket_path: String, + thread_num: usize, + metrics: Option>, +} + +/// LocalHttpProxyReader is a BlobReader to implement the LocalHttpProxy backend driver. +pub struct LocalHttpProxyReader { + uri: Arc, + client: Arc>, + runtime: Arc, + metrics: Arc, +} + +fn range_str_for_header(offset: u64, len: Option) -> String { + match len { + Some(len) => format!("bytes={}-{}", offset, offset + len as u64 - 1), + None => format!("bytes={}-", offset), + } +} + +fn build_tokio_runtime(name: &str, thread_num: usize) -> BackendResult { + let runtime = tokio::runtime::Builder::new_multi_thread() + .thread_name(name) + .worker_threads(thread_num) + .enable_all() + .build() + .map_err(LocalHttpProxyError::BuildTokioRuntime)?; + Ok(runtime) +} + +impl LocalHttpProxyReader { + async fn do_req( + &self, + only_head: bool, + offset: u64, + len: Option, + ) -> BackendResult> { + let method = if only_head { Method::HEAD } else { Method::GET }; + let req = Request::builder() + .method(method) + .uri(self.uri.as_ref()) + .header(http::header::RANGE, range_str_for_header(offset, len)) + .body(Body::default()) + .map_err(LocalHttpProxyError::BuildHttpRequest)?; + let resp = self + .client + .request(req) + .await + .map_err(LocalHttpProxyError::Request)?; + Ok(resp) + } + + async fn blob_size_async(&self) -> BackendResult { + let content_length = self.do_req(true, 0, None).await?.headers() + [http::header::CONTENT_LENGTH] + .to_str() + .map_err(LocalHttpProxyError::ParseContentLengthFromHeader)? + .parse::() + .map_err(LocalHttpProxyError::ParseStringToInteger)?; + Ok(content_length) + } + + async fn try_read_async(&self, offset: u64, len: usize) -> BackendResult> { + let resp = self.do_req(false, offset, Some(len)).await?; + let bytes = body::to_bytes(resp.into_body()) + .await + .map_err(LocalHttpProxyError::ReadResponseBody)?; + Ok(bytes.to_vec()) + } +} + +impl BlobReader for LocalHttpProxyReader { + // Note! If the blob is compressed, the function will always return 0 + // as we cannot get the actual size by stream reading. + fn blob_size(&self) -> super::BackendResult { + let size = self.runtime.block_on(self.blob_size_async())?; + Ok(size) + } + + fn try_read(&self, mut buf: &mut [u8], offset: u64) -> BackendResult { + let content = self + .runtime + .block_on(self.try_read_async(offset, buf.len()))?; + let copied_size = std::io::copy(&mut content.as_slice(), &mut buf) + .map_err(LocalHttpProxyError::CopyBuffer)?; + Ok(copied_size as usize) + } + + fn metrics(&self) -> &nydus_utils::metrics::BackendMetrics { + &self.metrics + } +} + +impl LocalHttpProxy { + pub fn new(config: &LocalHttpProxyConfig, id: Option<&str>) -> Result { + Ok(LocalHttpProxy { + socket_path: config.socket_path.to_string(), + thread_num: config.thread_num, + metrics: id.map(|i| BackendMetrics::new(i, "local-http-proxy")), + }) + } +} + +impl BlobBackend for LocalHttpProxy { + fn shutdown(&self) { + // do nothing + } + + fn metrics(&self) -> &nydus_utils::metrics::BackendMetrics { + // `metrics()` is only used for nydusd, which will always provide valid `blob_id`, thus + // `self.metrics` has valid value. + self.metrics.as_ref().unwrap() + } + + fn get_reader( + &self, + _blob_id: &str, + ) -> super::BackendResult> { + let client = Client::unix(); + let runtime = build_tokio_runtime("local-http-proxy", self.thread_num)?; + let reader = Arc::new(LocalHttpProxyReader { + uri: Arc::new(Uri::new(self.socket_path.clone(), "/").into()), + client: Arc::new(client), + runtime: Arc::new(runtime), + metrics: self.metrics.as_ref().unwrap().clone(), + }); + Ok(reader) + } +} + +impl Drop for LocalHttpProxy { + fn drop(&mut self) { + self.shutdown(); + if let Some(metrics) = self.metrics.as_ref() { + metrics.release().unwrap_or_else(|e| error!("{:?}", e)); + } + } +} + +#[cfg(test)] +mod tests { + + use crate::{ + backend::{local_http_proxy::LocalHttpProxy, BlobBackend}, + utils::alloc_buf, + }; + + use http::status; + use hyper::{ + service::{make_service_fn, service_fn}, + Body, Response, Server, + }; + use hyperlocal::UnixServerExt; + use nydus_api::LocalHttpProxyConfig; + use std::{ + cmp, + fs::{self, File}, + io::Write, + path::Path, + thread, + time::Duration, + }; + + use super::build_tokio_runtime; + + const CONTENT: &str = "some content for test"; + const SOCKET_PATH: &str = "/tmp/nydus-test-local-http-proxy.sock"; + + fn parse_range_header(range_str: &str) -> (u64, Option) { + let range_str = range_str.trim_start_matches("bytes="); + let range: Vec<&str> = range_str.split('-').collect(); + let start = range[0].parse::().unwrap(); + let end = match range[1] { + "" => None, + _ => Some(cmp::min( + range[1].parse::().unwrap(), + (CONTENT.len() - 1) as u64, + )), + }; + (start, end) + } + + #[test] + fn test_head_and_get() { + // create a temp file for test and write some content + let src_file_path = Path::new("/tmp/nydus-test-local-http-proxy.src.txt"); + if src_file_path.exists() { + fs::remove_file(src_file_path).unwrap(); + } + File::create(src_file_path) + .unwrap() + .write_all(CONTENT.as_bytes()) + .unwrap(); + + thread::spawn(|| { + let rt = build_tokio_runtime("test-local-http-proxy-server", 1).unwrap(); + rt.block_on(async { + println!("\nstarting server......"); + let path = Path::new(SOCKET_PATH); + if path.exists() { + fs::remove_file(path).unwrap(); + } + Server::bind_unix(path) + .unwrap() + .serve(make_service_fn(|_| async { + Ok::<_, hyper::Error>(service_fn(|req| async move { + let range = req.headers()[http::header::RANGE].to_str().unwrap(); + println!("range: {}", range); + let (start, end) = parse_range_header(range); + let length = match end { + Some(e) => e - start + 1, + None => CONTENT.len() as u64, + }; + println!("start: {}, end: {:?}, length: {}", start, end, length); + + return match *req.method() { + hyper::Method::HEAD => Ok::<_, hyper::Error>( + Response::builder() + .status(200) + .header(http::header::CONTENT_LENGTH, length) + .body(Body::empty()) + .unwrap(), + ), + hyper::Method::GET => { + let end = match end { + Some(e) => e, + None => (CONTENT.len() - 1) as u64, + }; + let content = CONTENT.as_bytes() + [start as usize..(end + 1) as usize] + .to_vec(); + Ok::<_, hyper::Error>( + Response::builder() + .status(200) + .header(http::header::CONTENT_LENGTH, length) + .body(Body::from(content)) + .unwrap(), + ) + } + _ => Ok::<_, hyper::Error>( + Response::builder() + .status(status::StatusCode::METHOD_NOT_ALLOWED) + .body(Body::empty()) + .unwrap(), + ), + }; + })) + })) + .await + .unwrap(); + }); + }); + + // wait for server to start + thread::sleep(Duration::from_secs(5)); + + // start the client and test + let cfg = LocalHttpProxyConfig { + socket_path: SOCKET_PATH.to_string(), + thread_num: 1, + }; + let backend = LocalHttpProxy::new(&cfg, Some("test-local-http-proxy")).unwrap(); + let reader = backend.get_reader("blob_id").unwrap(); + + println!(); + println!("testing blob_size()......"); + let blob_size = reader + .blob_size() + .map_err(|e| { + println!("blob_size() failed: {}", e); + e + }) + .unwrap(); + assert_eq!(blob_size, CONTENT.len() as u64); + + println!(); + println!("testing read() range......"); + let mut buf = alloc_buf(3); + let size = reader + .try_read(&mut buf, 0) + .map_err(|e| { + println!("read() range failed: {}", e); + e + }) + .unwrap(); + assert_eq!(size, 3); + assert_eq!(buf, CONTENT.as_bytes()[0..3]); + + println!(); + println!("testing read() full......"); + let mut buf = alloc_buf(80); + let size = reader + .try_read(&mut buf, 0) + .map_err(|e| { + println!("read() range failed: {}", e); + e + }) + .unwrap(); + assert_eq!(size, CONTENT.len() as usize); + assert_eq!(&buf[0..CONTENT.len()], CONTENT.as_bytes()); + } +} diff --git a/storage/src/backend/mod.rs b/storage/src/backend/mod.rs index faff36dc3ed..c43c61e8901 100644 --- a/storage/src/backend/mod.rs +++ b/storage/src/backend/mod.rs @@ -32,6 +32,8 @@ use crate::StorageError; feature = "backend-s3" ))] pub mod connection; +#[cfg(feature = "backend-local-http-proxy")] +pub mod local_http_proxy; #[cfg(feature = "backend-localfs")] pub mod localfs; #[cfg(any(feature = "backend-oss", feature = "backend-s3"))] @@ -59,6 +61,9 @@ pub enum BackendError { #[cfg(any(feature = "backend-oss", feature = "backend-s3"))] /// Error from object storage backend. ObjectStorage(self::object_storage::ObjectStorageError), + #[cfg(feature = "backend-local-http-proxy")] + /// Error from local http proxy backend. + LocalHttpProxy(self::local_http_proxy::LocalHttpProxyError), } impl fmt::Display for BackendError { @@ -72,6 +77,8 @@ impl fmt::Display for BackendError { BackendError::LocalFs(e) => write!(f, "{}", e), #[cfg(any(feature = "backend-oss", feature = "backend-s3"))] BackendError::ObjectStorage(e) => write!(f, "{}", e), + #[cfg(feature = "backend-local-http-proxy")] + BackendError::LocalHttpProxy(e) => write!(f, "{}", e), } } } diff --git a/storage/src/factory.rs b/storage/src/factory.rs index ce28d9203c8..67908246da0 100644 --- a/storage/src/factory.rs +++ b/storage/src/factory.rs @@ -21,6 +21,8 @@ use nydus_api::{BackendConfigV2, ConfigV2}; use tokio::runtime::{Builder, Runtime}; use tokio::time; +#[cfg(feature = "backend-local-http-proxy")] +use crate::backend::local_http_proxy; #[cfg(feature = "backend-localfs")] use crate::backend::localfs; #[cfg(feature = "backend-oss")] @@ -213,6 +215,11 @@ impl BlobFactory { config.get_localfs_config()?, Some(blob_id), )?)), + #[cfg(feature = "backend-local-http-proxy")] + "local-http-proxy" => Ok(Arc::new(local_http_proxy::LocalHttpProxy::new( + config.get_local_http_proxy_config()?, + Some(blob_id), + )?)), _ => Err(einval!(format!( "unsupported backend type '{}'", config.backend_type diff --git a/storage/src/meta/toc.rs b/storage/src/meta/toc.rs index ed4599a8d3d..a0ab273a2be 100644 --- a/storage/src/meta/toc.rs +++ b/storage/src/meta/toc.rs @@ -768,6 +768,7 @@ mod tests { oss: None, registry: None, s3: None, + local_http_proxy: None, }; let blob_mgr = BlobFactory::new_backend(&config, id).unwrap(); let blob = blob_mgr.get_reader(id).unwrap(); @@ -815,6 +816,7 @@ mod tests { oss: None, registry: None, s3: None, + local_http_proxy: None, }; let blob_mgr = BlobFactory::new_backend(&config, id).unwrap(); let blob = blob_mgr.get_reader(id).unwrap(); @@ -848,6 +850,7 @@ mod tests { oss: None, registry: None, s3: None, + local_http_proxy: None, }; let blob_mgr = BlobFactory::new_backend(&config, id).unwrap(); let blob = blob_mgr.get_reader(id).unwrap();