From a000744bf8457518713e6777dd3d34d79a2ac1bd Mon Sep 17 00:00:00 2001 From: Nan Li Date: Wed, 1 Feb 2023 11:20:13 +0800 Subject: [PATCH] storage: add new backend `HttpProxy` This patch adds a new storage backend `HttpProxy` which can access blobs through a http proxy server. The http proxy server can be local (using unix socket) or remote (using `https://` or using `http://`). `HttpProxy` uses two API endpoints to access the blobs: - `HEAD /path/to/blobs` to get the blob size - `GET /path/to/blobs` to read the blob The http proxy server should respect [the `Range` header](https://www.rfc-editor.org/rfc/rfc9110.html#name-range) to compute the offset and length of the blob. The example config files for this new backend may be: ```jsonc // for remote usage { "backend": { "type": "http-proxy", "config": { "addr": "http://127.0.0.1:9977", "path": "/namespace//blobs" } } } ``` or ```jsonc // for local unix socket { "backend": { "type": "http-proxy", "config": { "addr": "/path/to/unix/socket/file" } } } ``` There is also a test in `http_proxy.rs` to make sure `HttpProxy` works well, which setups a simple http server and generates a `HttpProxy` backend to get contents from the server. Signed-off-by: Nan Li --- Cargo.lock | 2 + Cargo.toml | 1 + api/src/config.rs | 65 ++++ blobfs/Cargo.toml | 1 + clib/Cargo.toml | 1 + docs/nydusd.md | 45 +++ rafs/Cargo.toml | 1 + storage/Cargo.toml | 5 +- storage/src/backend/connection.rs | 15 +- storage/src/backend/http_proxy.rs | 539 ++++++++++++++++++++++++++++++ storage/src/backend/mod.rs | 10 +- storage/src/factory.rs | 7 + storage/src/meta/toc.rs | 3 + 13 files changed, 692 insertions(+), 3 deletions(-) create mode 100644 storage/src/backend/http_proxy.rs diff --git a/Cargo.lock b/Cargo.lock index 55bc40e8534..4bafac7d22d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1056,6 +1056,8 @@ dependencies = [ "hmac", "http", "httpdate", + "hyper", + "hyperlocal", "lazy_static", "leaky-bucket", "libc", diff --git a/Cargo.toml b/Cargo.toml index f7db58e18b5..d48d65d1c3a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,6 +67,7 @@ nydus-rafs = { version = "0.2.0", path = "rafs", features = [ "backend-registry", "backend-oss", "backend-s3", + "backend-http-proxy", ] } nydus-storage = { version = "0.6.0", path = "storage" } nydus-utils = { version = "0.4.0", path = "utils" } diff --git a/api/src/config.rs b/api/src/config.rs index a030ee0acf2..0140bc93c94 100644 --- a/api/src/config.rs +++ b/api/src/config.rs @@ -238,6 +238,8 @@ pub struct BackendConfigV2 { pub s3: Option, /// Configuration for container registry backend. pub registry: Option, + /// Configuration for local http proxy. + pub http_proxy: Option, } impl BackendConfigV2 { @@ -276,6 +278,28 @@ impl BackendConfigV2 { } None => return false, }, + + "http-proxy" => match self.http_proxy.as_ref() { + Some(v) => { + let is_valid_unix_socket_path = |path: &str| { + let path = Path::new(path); + path.is_absolute() && path.exists() + }; + if v.addr.is_empty() + || !(v.addr.starts_with("http://") + || v.addr.starts_with("https://") + || is_valid_unix_socket_path(&v.addr)) + { + return false; + } + + // check if v.path is valid url path format + if Path::new(&v.path).join("any_blob_id").to_str().is_none() { + return false; + } + } + None => return false, + }, _ => return false, } @@ -325,6 +349,17 @@ impl BackendConfigV2 { .ok_or_else(|| einval!("no configuration information for registry")) } } + + /// Get configuration information for http proxy + pub fn get_http_proxy_config(&self) -> Result<&HttpProxyConfig> { + if &self.backend_type != "http-proxy" { + Err(einval!("backend type is not 'http-proxy'")) + } else { + self.http_proxy + .as_ref() + .ok_or_else(|| einval!("no configuration information for http-proxy")) + } + } } /// Configuration information for localfs storage backend. @@ -427,6 +462,35 @@ pub struct S3Config { pub mirrors: Vec, } +/// Http proxy configuration information to access blobs. +#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)] +pub struct HttpProxyConfig { + /// Address of http proxy server, like `http://xxx.xxx` or `https://xxx.xxx` or `/path/to/unix.sock`. + pub addr: String, + /// Path to access the blobs, like `/<_namespace>/<_repo>/blobs`. + /// If the http proxy server is over unix socket, this field will be ignored. + #[serde(default)] + pub path: String, + /// Skip SSL certificate validation for HTTPS scheme. + #[serde(default)] + pub skip_verify: bool, + /// Drop the read request once http request timeout, in seconds. + #[serde(default = "default_http_timeout")] + pub timeout: u32, + /// Drop the read request once http connection timeout, in seconds. + #[serde(default = "default_http_timeout")] + pub connect_timeout: u32, + /// Retry count when read request failed. + #[serde(default)] + pub retry_limit: u8, + /// Enable HTTP proxy for the read request. + #[serde(default)] + pub proxy: ProxyConfig, + /// Enable mirrors for the read request. + #[serde(default)] + pub mirrors: Vec, +} + /// Container registry configuration information to access blobs. #[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)] pub struct RegistryConfig { @@ -904,6 +968,7 @@ impl TryFrom<&BackendConfig> for BackendConfigV2 { oss: None, s3: None, registry: None, + http_proxy: None, }; match value.backend_type.as_str() { diff --git a/blobfs/Cargo.toml b/blobfs/Cargo.toml index 5d1a5cdc47b..8213b6b67bb 100644 --- a/blobfs/Cargo.toml +++ b/blobfs/Cargo.toml @@ -31,6 +31,7 @@ virtiofs = ["fuse-backend-rs/virtiofs", "nydus-rafs/virtio-fs"] baekend-s3 = ["nydus-rafs/backend-s3"] backend-oss = ["nydus-rafs/backend-oss"] backend-registry = ["nydus-rafs/backend-registry"] +backend-http-proxy = ["nydus-rafs/backend-http-proxy"] [package.metadata.docs.rs] all-features = true diff --git a/clib/Cargo.toml b/clib/Cargo.toml index ea84528c91e..71d2ef1a50b 100644 --- a/clib/Cargo.toml +++ b/clib/Cargo.toml @@ -23,3 +23,4 @@ nydus-rafs = { version = "0.2.0", path = "../rafs" } baekend-s3 = ["nydus-rafs/backend-s3"] backend-oss = ["nydus-rafs/backend-oss"] backend-registry = ["nydus-rafs/backend-registry"] +backend-http-proxy = ["nydus-rafs/backend-http-proxy"] diff --git a/docs/nydusd.md b/docs/nydusd.md index d1956978afa..b768c51da32 100644 --- a/docs/nydusd.md +++ b/docs/nydusd.md @@ -293,6 +293,51 @@ Currently, the mirror mode is only tested in the registry backend, and in theory } ``` +#### HTTP proxy backend + +The `HttpProxy` backend can access blobs through a http proxy server which can be local (using unix socket) or remote (using `https://` or using `http://`). + +`HttpProxy` uses two API endpoints to access the blobs: +- `HEAD /path/to/blobs` to get the blob size +- `GET /path/to/blobs` to read the blob + +The http proxy server should respect [the `Range` header](https://www.rfc-editor.org/rfc/rfc9110.html#name-range) to compute the offset and length of the blob. + +The example config files for the `HttpProxy` backend may be: + +``` +// for remote usage +{ + "device": { + "backend": { + "type": "http-proxy", + "config": { + "addr": "http://127.0.0.1:9977", + "path": "/namespace//blobs" + } + } + } +} +``` + +or + +``` +// for remote usage +{ + "device": { + "backend": { + "type": "http-proxy", + "config": { + "addr": "/path/to/unix.sock", + } + } + } +} +``` + +The `HttpProxy` backend also supports the `Proxy` and `Mirrors` configurations for remote usage like the `Registry backend` described above. + ### Mount Bootstrap Via API To mount a bootstrap via api, first launch nydusd without a bootstrap: diff --git a/rafs/Cargo.toml b/rafs/Cargo.toml index 01788d0299b..7efe069fbad 100644 --- a/rafs/Cargo.toml +++ b/rafs/Cargo.toml @@ -42,6 +42,7 @@ vhost-user-fs = ["fuse-backend-rs/vhost-user-fs"] backend-oss = ["nydus-storage/backend-oss"] backend-s3 = ["nydus-storage/backend-s3"] backend-registry = ["nydus-storage/backend-registry"] +backend-http-proxy = ["nydus-storage/backend-http-proxy"] [package.metadata.docs.rs] all-features = true diff --git a/storage/Cargo.toml b/storage/Cargo.toml index b3450d8c6dc..5a782f61912 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -16,6 +16,8 @@ hex = "0.4.3" hmac = { version = "0.12.1", optional = true } http = { version = "0.2.8", optional = true } httpdate = { version = "1.0", optional = true } +hyper = {version = "0.14.11", optional = true} +hyperlocal = {version = "0.8.0", optional = true} lazy_static = "1.4.0" leaky-bucket = "0.12.1" libc = "0.2" @@ -27,7 +29,7 @@ serde_json = "1.0.53" sha2 = { version = "0.10.2", optional = true } tar = "0.4.38" time = { version = "0.3.14", features = ["formatting"], optional = true } -tokio = { version = "1.19.0", features = ["rt", "rt-multi-thread", "sync", "time"] } +tokio = { version = "1.19.0", features = ["macros", "rt", "rt-multi-thread", "sync", "time"] } url = { version = "2.1.1", optional = true } vm-memory = "0.9" fuse-backend-rs = "0.10" @@ -47,6 +49,7 @@ backend-localfs = [] backend-oss = ["base64", "httpdate", "hmac", "sha1", "reqwest", "url"] backend-registry = ["base64", "reqwest", "url"] backend-s3 = ["base64", "hmac", "http", "reqwest", "sha2", "time", "url"] +backend-http-proxy = ["hyper", "hyperlocal", "http", "reqwest", "url"] [package.metadata.docs.rs] all-features = true diff --git a/storage/src/backend/connection.rs b/storage/src/backend/connection.rs index 140938fd772..e324e4f4aa2 100644 --- a/storage/src/backend/connection.rs +++ b/storage/src/backend/connection.rs @@ -23,7 +23,7 @@ use reqwest::{ Method, StatusCode, Url, }; -use nydus_api::{MirrorConfig, OssConfig, ProxyConfig, RegistryConfig, S3Config}; +use nydus_api::{HttpProxyConfig, MirrorConfig, OssConfig, ProxyConfig, RegistryConfig, S3Config}; use url::ParseError; const HEADER_AUTHORIZATION: &str = "Authorization"; @@ -128,6 +128,19 @@ impl From for ConnectionConfig { } } +impl From for ConnectionConfig { + fn from(c: HttpProxyConfig) -> ConnectionConfig { + ConnectionConfig { + proxy: c.proxy, + mirrors: c.mirrors, + skip_verify: c.skip_verify, + timeout: c.timeout, + connect_timeout: c.connect_timeout, + retry_limit: c.retry_limit, + } + } +} + /// HTTP request data with progress callback. #[derive(Clone)] pub struct Progress { diff --git a/storage/src/backend/http_proxy.rs b/storage/src/backend/http_proxy.rs new file mode 100644 index 00000000000..8fd31df77a5 --- /dev/null +++ b/storage/src/backend/http_proxy.rs @@ -0,0 +1,539 @@ +// Copyright 2023 Ant Group. All rights reserved. + +// SPDX-License-Identifier: Apache-2.0 + +// ! Storage backend driver to access the blobs through a http proxy. + +use http::{HeaderMap, HeaderValue, Method, Request}; +use hyper::Client as HyperClient; +use hyper::{body, Body, Response}; +use hyperlocal::Uri as HyperLocalUri; +use hyperlocal::{UnixClientExt, UnixConnector}; +use nydus_api::HttpProxyConfig; +use nydus_utils::metrics::BackendMetrics; +use reqwest; +use tokio::runtime::Runtime; + +use super::connection::{Connection, ConnectionConfig, ConnectionError}; +use super::{BackendError, BackendResult, BlobBackend, BlobReader}; +use std::path::Path; +use std::{ + fmt, + io::{Error, Result}, + num::ParseIntError, + str::{self}, + sync::Arc, +}; + +const HYPER_LOCAL_CLIENT_RUNTIME_THREAD_NUM: usize = 1; + +#[derive(Debug)] +pub enum HttpProxyError { + /// Failed to parse string to integer. + ParseStringToInteger(ParseIntError), + ParseContentLengthFromHeader(http::header::ToStrError), + /// Failed to get response from the local http server. + LocalRequest(hyper::Error), + /// Failed to get response from the remote http server. + RemoteRequest(ConnectionError), + /// Failed to build the tokio runtime. + BuildTokioRuntime(Error), + /// Failed to build local http request. + BuildHttpRequest(http::Error), + /// Failed to read the response body. + ReadResponseBody(hyper::Error), + /// Failed to transport the remote response body. + Transport(reqwest::Error), + /// Failed to copy the buffer. + CopyBuffer(Error), + /// Invalid path. + InvalidPath, + /// Failed to build request header. + ConstructHeader(String), +} + +impl fmt::Display for HttpProxyError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + HttpProxyError::ParseStringToInteger(e) => { + write!(f, "failed to parse string to integer, {}", e) + } + HttpProxyError::ParseContentLengthFromHeader(e) => { + write!(f, "failed to parse content length from header, {}", e) + } + HttpProxyError::LocalRequest(e) => write!(f, "failed to get response, {}", e), + HttpProxyError::RemoteRequest(e) => write!(f, "failed to get response, {}", e), + HttpProxyError::BuildTokioRuntime(e) => { + write!(f, "failed to build tokio runtime, {}", e) + } + HttpProxyError::BuildHttpRequest(e) => { + write!(f, "failed to build http request, {}", e) + } + HttpProxyError::Transport(e) => { + write!(f, "failed to transport remote response body, {}", e) + } + HttpProxyError::ReadResponseBody(e) => { + write!(f, "failed to read response body, {}", e) + } + HttpProxyError::CopyBuffer(e) => write!(f, "failed to copy buffer, {}", e), + HttpProxyError::InvalidPath => write!(f, "invalid path"), + HttpProxyError::ConstructHeader(e) => { + write!(f, "failed to construct request header, {}", e) + } + } + } +} + +impl From for BackendError { + fn from(error: HttpProxyError) -> Self { + BackendError::HttpProxy(error) + } +} + +/// A storage backend driver to access blobs through a http proxy server. +/// The http proxy server may be local (using unix socket) or be remote (using `http://` or `https://`). +/// +/// `HttpProxy` uses two API endpoints to access the blobs: +/// - `HEAD /path/to/blob` to get the blob size +/// - `GET /path/to/blob` to read the blob +/// +/// The http proxy server should respect [the `Range` header](https://www.rfc-editor.org/rfc/rfc9110.html#name-range) to support range reading. +pub struct HttpProxy { + addr: String, + path: String, + client: Client, + metrics: Option>, +} + +/// HttpProxyReader is a BlobReader to implement the HttpProxy backend driver. +pub struct HttpProxyReader { + client: Client, + uri: Uri, + metrics: Arc, +} + +#[derive(Clone)] +struct LocalClient { + client: Arc>, + runtime: Arc, +} + +#[derive(Clone)] +enum Client { + Local(LocalClient), + Remote(Arc), +} + +enum Uri { + Local(Arc), + Remote(String), +} + +fn range_str_for_header(offset: u64, len: Option) -> String { + match len { + Some(len) => format!("bytes={}-{}", offset, offset + len as u64 - 1), + None => format!("bytes={}-", offset), + } +} + +fn build_tokio_runtime(name: &str, thread_num: usize) -> Result { + let runtime = tokio::runtime::Builder::new_multi_thread() + .thread_name(name) + .worker_threads(thread_num) + .enable_all() + .build()?; + Ok(runtime) +} + +impl LocalClient { + async fn do_req( + &self, + uri: Arc, + only_head: bool, + offset: u64, + len: Option, + ) -> BackendResult> { + let method = if only_head { Method::HEAD } else { Method::GET }; + let req = Request::builder() + .method(method) + .uri(uri.as_ref()) + .header(http::header::RANGE, range_str_for_header(offset, len)) + .body(Body::default()) + .map_err(HttpProxyError::BuildHttpRequest)?; + let resp = self + .client + .request(req) + .await + .map_err(HttpProxyError::LocalRequest)?; + Ok(resp) + } + + fn get_headers(&self, uri: Arc) -> BackendResult> { + let headers = self + .runtime + .block_on(self.do_req(uri, true, 0, None))? + .headers() + .to_owned(); + Ok(headers) + } + + fn try_read(&self, uri: Arc, offset: u64, len: usize) -> BackendResult> { + self.runtime.block_on(async { + let resp = self.do_req(uri, false, offset, Some(len)).await; + match resp { + Ok(resp) => body::to_bytes(resp) + .await + .map_err(|e| HttpProxyError::ReadResponseBody(e).into()) + .map(|bytes| bytes.to_vec()), + Err(e) => Err(e), + } + }) + } +} + +impl BlobReader for HttpProxyReader { + fn blob_size(&self) -> super::BackendResult { + let headers = match &self.client { + Client::Local(client) => { + let uri = match self.uri { + Uri::Local(ref uri) => uri.clone(), + Uri::Remote(_) => unreachable!(), + }; + client.get_headers(uri) + } + Client::Remote(connection) => { + let uri = match self.uri { + Uri::Local(_) => unreachable!(), + Uri::Remote(ref uri) => uri.clone(), + }; + connection + .call::<&[u8]>( + Method::HEAD, + uri.as_str(), + None, + None, + &mut HeaderMap::new(), + true, + false, + ) + .map(|resp| resp.headers().to_owned()) + .map_err(|e| HttpProxyError::RemoteRequest(e).into()) + } + }; + let content_length = headers?[http::header::CONTENT_LENGTH] + .to_str() + .map_err(HttpProxyError::ParseContentLengthFromHeader)? + .parse::() + .map_err(HttpProxyError::ParseStringToInteger)?; + Ok(content_length) + } + + fn try_read(&self, mut buf: &mut [u8], offset: u64) -> BackendResult { + match &self.client { + Client::Local(client) => { + let uri = match self.uri { + Uri::Local(ref uri) => uri.clone(), + Uri::Remote(_) => unreachable!(), + }; + let content = client.try_read(uri, offset, buf.len())?; + let copied_size = std::io::copy(&mut content.as_slice(), &mut buf) + .map_err(HttpProxyError::CopyBuffer)?; + Ok(copied_size as usize) + } + Client::Remote(connection) => { + let uri = match self.uri { + Uri::Local(_) => unreachable!(), + Uri::Remote(ref uri) => uri.clone(), + }; + let mut headers = HeaderMap::new(); + let range = range_str_for_header(offset, Some(buf.len())); + headers.insert( + http::header::RANGE, + range + .as_str() + .parse() + .map_err(|e| HttpProxyError::ConstructHeader(format!("{}", e)))?, + ); + let mut resp = connection + .call::<&[u8]>( + Method::GET, + uri.as_str(), + None, + None, + &mut headers, + true, + false, + ) + .map_err(HttpProxyError::RemoteRequest)?; + + Ok(resp + .copy_to(&mut buf) + .map_err(HttpProxyError::Transport) + .map(|size| size as usize)?) + } + } + } + + fn metrics(&self) -> &nydus_utils::metrics::BackendMetrics { + &self.metrics + } +} + +impl HttpProxy { + pub fn new(config: &HttpProxyConfig, id: Option<&str>) -> Result { + let client = if config.addr.starts_with("http://") || config.addr.starts_with("https://") { + let conn_cfg: ConnectionConfig = config.clone().into(); + let conn = Connection::new(&conn_cfg)?; + Client::Remote(conn) + } else { + let client = HyperClient::unix(); + let runtime = build_tokio_runtime("http-proxy", HYPER_LOCAL_CLIENT_RUNTIME_THREAD_NUM)?; + let local_client = LocalClient { + client: Arc::new(client), + runtime: Arc::new(runtime), + }; + Client::Local(local_client) + }; + Ok(HttpProxy { + addr: config.addr.to_string(), + path: config.path.to_string(), + client, + metrics: id.map(|i| BackendMetrics::new(i, "http-proxy")), + }) + } +} + +impl BlobBackend for HttpProxy { + fn shutdown(&self) { + match &self.client { + Client::Local(_) => { + // do nothing + } + Client::Remote(remote_client) => { + remote_client.shutdown(); + } + } + } + + fn metrics(&self) -> &nydus_utils::metrics::BackendMetrics { + // `metrics()` is only used for nydusd, which will always provide valid `blob_id`, thus + // `self.metrics` has valid value. + self.metrics.as_ref().unwrap() + } + + fn get_reader( + &self, + blob_id: &str, + ) -> super::BackendResult> { + let path = Path::new(&self.path).join(blob_id); + let path = path.to_str().ok_or(HttpProxyError::InvalidPath)?; + let uri = match &self.client { + Client::Local(_) => { + let uri: Arc = + Arc::new(HyperLocalUri::new(self.addr.clone(), "/").into()); + Uri::Local(uri) + } + Client::Remote(_) => { + let uri = format!("{}{}", self.addr, path); + Uri::Remote(uri) + } + }; + let reader = Arc::new(HttpProxyReader { + client: self.client.clone(), + uri, + metrics: self.metrics.as_ref().unwrap().clone(), + }); + Ok(reader) + } +} + +impl Drop for HttpProxy { + fn drop(&mut self) { + self.shutdown(); + if let Some(metrics) = self.metrics.as_ref() { + metrics.release().unwrap_or_else(|e| error!("{:?}", e)); + } + } +} + +#[cfg(test)] +mod tests { + + use crate::{ + backend::{http_proxy::HttpProxy, BlobBackend}, + utils::alloc_buf, + }; + + use http::{status, Request}; + use hyper::{ + service::{make_service_fn, service_fn}, + Body, Response, Server, + }; + use hyperlocal::UnixServerExt; + use nydus_api::HttpProxyConfig; + use std::{ + cmp, + fs::{self}, + net::{IpAddr, Ipv4Addr, SocketAddr}, + path::Path, + thread, + time::Duration, + }; + + use super::build_tokio_runtime; + + const CONTENT: &str = "some content for test"; + const SOCKET_PATH: &str = "/tmp/nydus-test-local-http-proxy.sock"; + + fn parse_range_header(range_str: &str) -> (u64, Option) { + let range_str = range_str.trim_start_matches("bytes="); + let range: Vec<&str> = range_str.split('-').collect(); + let start = range[0].parse::().unwrap(); + let end = match range[1] { + "" => None, + _ => Some(cmp::min( + range[1].parse::().unwrap(), + (CONTENT.len() - 1) as u64, + )), + }; + (start, end) + } + + async fn server_handler(req: Request) -> Result, hyper::Error> { + return match *req.method() { + hyper::Method::HEAD => Ok::<_, hyper::Error>( + Response::builder() + .status(200) + .header(http::header::CONTENT_LENGTH, CONTENT.len()) + .body(Body::empty()) + .unwrap(), + ), + hyper::Method::GET => { + let range = req.headers()[http::header::RANGE].to_str().unwrap(); + println!("range: {}", range); + let (start, end) = parse_range_header(range); + let length = match end { + Some(e) => e - start + 1, + None => CONTENT.len() as u64, + }; + println!("start: {}, end: {:?}, length: {}", start, end, length); + let end = match end { + Some(e) => e, + None => (CONTENT.len() - 1) as u64, + }; + let content = CONTENT.as_bytes()[start as usize..(end + 1) as usize].to_vec(); + Ok::<_, hyper::Error>( + Response::builder() + .status(200) + .header(http::header::CONTENT_LENGTH, length) + .body(Body::from(content)) + .unwrap(), + ) + } + _ => Ok::<_, hyper::Error>( + Response::builder() + .status(status::StatusCode::METHOD_NOT_ALLOWED) + .body(Body::empty()) + .unwrap(), + ), + }; + } + + #[test] + fn test_head_and_get() { + thread::spawn(|| { + let rt = build_tokio_runtime("test-local-http-proxy-server", 1).unwrap(); + rt.block_on(async { + println!("\nstarting local http proxy server......"); + let path = Path::new(SOCKET_PATH); + if path.exists() { + fs::remove_file(path).unwrap(); + } + Server::bind_unix(path) + .unwrap() + .serve(make_service_fn(|_| async { + Ok::<_, hyper::Error>(service_fn(server_handler)) + })) + .await + .unwrap(); + }); + }); + + thread::spawn(|| { + let rt = build_tokio_runtime("test-remote-http-proxy-server", 1).unwrap(); + rt.block_on(async { + println!("\nstarting remote http proxy server......"); + Server::bind(&SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + 9977, + )) + .serve(make_service_fn(|_| async { + Ok::<_, hyper::Error>(service_fn(server_handler)) + })) + .await + .unwrap(); + }); + }); + + // wait for server to start + thread::sleep(Duration::from_secs(5)); + + // start the client and test + let test_list: Vec<(String, String)> = vec![ + ( + format!( + "{{\"addr\":\"{}\",\"path\":\"/namespace//blobs\"}}", + SOCKET_PATH, + ), + "test-local-http-proxy".to_string(), + ), + ( + "{\"addr\":\"http://127.0.0.1:9977\",\"path\":\"/namespace//blobs\"}" + .to_string(), + "test-remote-http-proxy".to_string(), + ), + ]; + for test_case in test_list.iter() { + let config: HttpProxyConfig = serde_json::from_str(test_case.0.as_str()).unwrap(); + let backend = HttpProxy::new(&config, Some(test_case.1.as_str())).unwrap(); + let reader = backend.get_reader("blob_id").unwrap(); + + println!(); + println!("testing blob_size()......"); + let blob_size = reader + .blob_size() + .map_err(|e| { + println!("blob_size() failed: {}", e); + e + }) + .unwrap(); + assert_eq!(blob_size, CONTENT.len() as u64); + + println!(); + println!("testing read() range......"); + let mut buf = alloc_buf(3); + let size = reader + .try_read(&mut buf, 0) + .map_err(|e| { + println!("read() range failed: {}", e); + e + }) + .unwrap(); + assert_eq!(size, 3); + assert_eq!(buf, CONTENT.as_bytes()[0..3]); + + println!(); + println!("testing read() full......"); + let mut buf = alloc_buf(80); + let size = reader + .try_read(&mut buf, 0) + .map_err(|e| { + println!("read() range failed: {}", e); + e + }) + .unwrap(); + assert_eq!(size, CONTENT.len() as usize); + assert_eq!(&buf[0..CONTENT.len()], CONTENT.as_bytes()); + } + } +} diff --git a/storage/src/backend/mod.rs b/storage/src/backend/mod.rs index faff36dc3ed..249e8c286d5 100644 --- a/storage/src/backend/mod.rs +++ b/storage/src/backend/mod.rs @@ -29,9 +29,12 @@ use crate::StorageError; #[cfg(any( feature = "backend-oss", feature = "backend-registry", - feature = "backend-s3" + feature = "backend-s3", + feature = "backend-http-proxy", ))] pub mod connection; +#[cfg(feature = "backend-http-proxy")] +pub mod http_proxy; #[cfg(feature = "backend-localfs")] pub mod localfs; #[cfg(any(feature = "backend-oss", feature = "backend-s3"))] @@ -59,6 +62,9 @@ pub enum BackendError { #[cfg(any(feature = "backend-oss", feature = "backend-s3"))] /// Error from object storage backend. ObjectStorage(self::object_storage::ObjectStorageError), + #[cfg(feature = "backend-http-proxy")] + /// Error from local http proxy backend. + HttpProxy(self::http_proxy::HttpProxyError), } impl fmt::Display for BackendError { @@ -72,6 +78,8 @@ impl fmt::Display for BackendError { BackendError::LocalFs(e) => write!(f, "{}", e), #[cfg(any(feature = "backend-oss", feature = "backend-s3"))] BackendError::ObjectStorage(e) => write!(f, "{}", e), + #[cfg(feature = "backend-http-proxy")] + BackendError::HttpProxy(e) => write!(f, "{}", e), } } } diff --git a/storage/src/factory.rs b/storage/src/factory.rs index ce28d9203c8..6bf2f071306 100644 --- a/storage/src/factory.rs +++ b/storage/src/factory.rs @@ -21,6 +21,8 @@ use nydus_api::{BackendConfigV2, ConfigV2}; use tokio::runtime::{Builder, Runtime}; use tokio::time; +#[cfg(feature = "backend-http-proxy")] +use crate::backend::http_proxy; #[cfg(feature = "backend-localfs")] use crate::backend::localfs; #[cfg(feature = "backend-oss")] @@ -213,6 +215,11 @@ impl BlobFactory { config.get_localfs_config()?, Some(blob_id), )?)), + #[cfg(feature = "backend-http-proxy")] + "http-proxy" => Ok(Arc::new(http_proxy::HttpProxy::new( + config.get_http_proxy_config()?, + Some(blob_id), + )?)), _ => Err(einval!(format!( "unsupported backend type '{}'", config.backend_type diff --git a/storage/src/meta/toc.rs b/storage/src/meta/toc.rs index ed4599a8d3d..1f5c3426d8d 100644 --- a/storage/src/meta/toc.rs +++ b/storage/src/meta/toc.rs @@ -768,6 +768,7 @@ mod tests { oss: None, registry: None, s3: None, + http_proxy: None, }; let blob_mgr = BlobFactory::new_backend(&config, id).unwrap(); let blob = blob_mgr.get_reader(id).unwrap(); @@ -815,6 +816,7 @@ mod tests { oss: None, registry: None, s3: None, + http_proxy: None, }; let blob_mgr = BlobFactory::new_backend(&config, id).unwrap(); let blob = blob_mgr.get_reader(id).unwrap(); @@ -848,6 +850,7 @@ mod tests { oss: None, registry: None, s3: None, + http_proxy: None, }; let blob_mgr = BlobFactory::new_backend(&config, id).unwrap(); let blob = blob_mgr.get_reader(id).unwrap();