Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): abstract HttpFetch trait for raw http client #5184

Merged
merged 11 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 64 additions & 15 deletions core/src/raw/http_util/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,36 @@ use std::fmt::Debug;
use std::fmt::Formatter;
use std::future;
use std::mem;
use std::ops::Deref;
use std::str::FromStr;
use std::sync::Arc;

use futures::Future;
use futures::TryStreamExt;
use http::Request;
use http::Response;
use once_cell::sync::Lazy;
use raw::oio::Read;

use super::parse_content_encoding;
use super::parse_content_length;
use super::HttpBody;
use crate::raw::*;
use crate::*;

/// Http client used across opendal for loading credentials.
/// This is merely a temporary solution because reqsign requires a reqwest client to be passed.
/// We will remove it after the next major version of reqsign, which will enable users to provide their own client.
#[allow(dead_code)]
pub(crate) static GLOBAL_REQWEST_CLIENT: Lazy<reqwest::Client> = Lazy::new(reqwest::Client::new);

/// HttpFetcher is a type erased [`HttpFetch`].
pub type HttpFetcher = Arc<dyn HttpFetchDyn>;

/// HttpClient that used across opendal.
#[derive(Clone)]
pub struct HttpClient {
client: reqwest::Client,
fetcher: HttpFetcher,
}

/// We don't want users to know details about our clients.
Expand All @@ -47,26 +61,24 @@ impl Debug for HttpClient {
impl HttpClient {
/// Create a new http client in async context.
pub fn new() -> Result<Self> {
Self::build(reqwest::ClientBuilder::new())
let fetcher = Arc::new(reqwest::Client::new());
Ok(Self { fetcher })
}

/// Construct `Self` with given [`reqwest::Client`]
pub fn with(client: reqwest::Client) -> Self {
Self { client }
pub fn with(client: impl HttpFetch) -> Self {
let fetcher = Arc::new(client);
Self { fetcher }
}

/// Build a new http client in async context.
#[deprecated]
pub fn build(builder: reqwest::ClientBuilder) -> Result<Self> {
Ok(Self {
client: builder.build().map_err(|err| {
Error::new(ErrorKind::Unexpected, "http client build failed").set_source(err)
})?,
})
}

/// Get the async client from http client.
pub fn client(&self) -> reqwest::Client {
self.client.clone()
let client = builder.build().map_err(|err| {
Error::new(ErrorKind::Unexpected, "http client build failed").set_source(err)
})?;
let fetcher = Arc::new(client);
Ok(Self { fetcher })
}

/// Send a request in async way.
Expand All @@ -78,6 +90,44 @@ impl HttpClient {

/// Fetch a request in async way.
pub async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
self.fetcher.fetch(req).await
}
}

/// HttpFetch is the trait to fetch a request in async way.
/// User should implement this trait to provide their own http client.
pub trait HttpFetch: Send + Sync + Unpin + 'static {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comments to this trait. This trait is exposed to users, and we expect that users should never interact with HttpFetchDyn.

/// Fetch a request in async way.
fn fetch(
&self,
req: Request<Buffer>,
) -> impl Future<Output = Result<Response<HttpBody>>> + MaybeSend;
}

/// HttpFetchDyn is the dyn version of [`HttpFetch`]
/// which make it possible to use as `Arc<dyn HttpFetchDyn>`.
/// User should never implement this trait, but use `HttpFetch` instead.
pub trait HttpFetchDyn: Send + Sync + Unpin + 'static {
/// The dyn version of [`HttpFetch::fetch`].
///
/// This function returns a boxed future to make it object safe.
fn fetch_dyn(&self, req: Request<Buffer>) -> BoxedFuture<Result<Response<HttpBody>>>;
}

impl<T: HttpFetch + ?Sized> HttpFetchDyn for T {
fn fetch_dyn(&self, req: Request<Buffer>) -> BoxedFuture<Result<Response<HttpBody>>> {
Box::pin(self.fetch(req))
}
}

impl<T: HttpFetchDyn + ?Sized> HttpFetch for Arc<T> {
async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
self.deref().fetch_dyn(req).await
}
}

impl HttpFetch for reqwest::Client {
async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
// Uri stores all string alike data in `Bytes` which means
// the clone here is cheap.
let uri = req.uri().clone();
Expand All @@ -86,7 +136,6 @@ impl HttpClient {
let (parts, body) = req.into_parts();

let mut req_builder = self
.client
.request(
parts.method,
reqwest::Url::from_str(&uri.to_string()).expect("input request url must be valid"),
Expand Down
5 changes: 5 additions & 0 deletions core/src/raw/http_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@

mod client;
pub use client::HttpClient;
pub use client::HttpFetch;

/// temporary client used by several features
#[allow(unused_imports)]
pub(crate) use client::GLOBAL_REQWEST_CLIENT;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make HttpFetch public.


mod body;
pub use body::HttpBody;
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/cos/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl Builder for CosBuilder {
cfg.secret_key = Some(v);
}

let cred_loader = TencentCosCredentialLoader::new(client.client(), cfg);
let cred_loader = TencentCosCredentialLoader::new(GLOBAL_REQWEST_CLIENT.clone(), cfg);

let signer = TencentCosSigner::new();

Expand Down
2 changes: 1 addition & 1 deletion core/src/services/gcs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl Builder for GcsBuilder {
DEFAULT_GCS_SCOPE
};

let mut token_loader = GoogleTokenLoader::new(scope, client.client());
let mut token_loader = GoogleTokenLoader::new(scope, GLOBAL_REQWEST_CLIENT.clone());
if let Some(account) = &self.config.service_account {
token_loader = token_loader.with_service_account(account);
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/oss/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ impl Builder for OssBuilder {
})?
};

let loader = AliyunLoader::new(client.client(), cfg);
let loader = AliyunLoader::new(GLOBAL_REQWEST_CLIENT.clone(), cfg);

let signer = AliyunOssSigner::new(bucket);

Expand Down
8 changes: 5 additions & 3 deletions core/src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,8 @@ impl Builder for S3Builder {
// If role_arn is set, we must use AssumeRoleLoad.
if let Some(role_arn) = self.config.role_arn {
// use current env as source credential loader.
let default_loader = AwsDefaultLoader::new(client.client(), cfg.clone());
let default_loader =
AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg.clone());

// Build the config for assume role.
let mut assume_role_cfg = AwsConfig {
Expand All @@ -817,7 +818,7 @@ impl Builder for S3Builder {
}

let assume_role_loader = AwsAssumeRoleLoader::new(
client.client(),
GLOBAL_REQWEST_CLIENT.clone().clone(),
assume_role_cfg,
Box::new(default_loader),
)
Expand All @@ -835,7 +836,8 @@ impl Builder for S3Builder {
let loader = match loader {
Some(v) => v,
None => {
let mut default_loader = AwsDefaultLoader::new(client.client(), cfg);
let mut default_loader =
AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg);
if self.config.disable_ec2_metadata {
default_loader = default_loader.with_disable_ec2_metadata();
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/s3/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl S3Core {
async fn load_credential(&self) -> Result<Option<AwsCredential>> {
let cred = self
.loader
.load_credential(self.client.client())
.load_credential(GLOBAL_REQWEST_CLIENT.clone())
.await
.map_err(new_request_credential_error)?;

Expand Down
Loading