Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): abstract HttpFetch trait for raw http client #5184

Merged
merged 11 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 45 additions & 14 deletions core/src/raw/http_util/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,26 @@ use std::fmt::Formatter;
use std::future;
use std::mem;
use std::str::FromStr;
use std::sync::Arc;

use futures::TryStreamExt;
use http::Request;
use http::Response;
use once_cell::sync::Lazy;
use raw::oio::Read;

use super::parse_content_encoding;
use super::parse_content_length;
use super::HttpBody;
use crate::*;

/// Http client used across opendal for loading credentials.
pub static CREDENTIAL_CLIENT: Lazy<reqwest::Client> = Lazy::new(reqwest::Client::new);
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved

/// HttpClient that used across opendal.
#[derive(Clone)]
pub struct HttpClient {
client: reqwest::Client,
fetcher: Arc<dyn HttpFetch>,
}

/// We don't want users to know details about our clients.
Expand All @@ -47,26 +52,24 @@ impl Debug for HttpClient {
impl HttpClient {
/// Create a new http client in async context.
pub fn new() -> Result<Self> {
Self::build(reqwest::ClientBuilder::new())
let fetcher = Arc::new(ReqwestHttpFetcher::new());
Ok(Self { fetcher })
}

/// Construct `Self` with given [`reqwest::Client`]
pub fn with(client: reqwest::Client) -> Self {
Self { client }
pub fn with(client: impl HttpFetch + 'static) -> Self {
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
let fetcher = Arc::new(client);
Self { fetcher }
}

/// Build a new http client in async context.
#[deprecated]
pub fn build(builder: reqwest::ClientBuilder) -> Result<Self> {
Ok(Self {
client: builder.build().map_err(|err| {
Error::new(ErrorKind::Unexpected, "http client build failed").set_source(err)
})?,
})
}

/// Get the async client from http client.
pub fn client(&self) -> reqwest::Client {
self.client.clone()
let client = builder.build().map_err(|err| {
Error::new(ErrorKind::Unexpected, "http client build failed").set_source(err)
})?;
let fetcher = Arc::new(ReqwestHttpFetcher::with(client));
Ok(Self { fetcher })
}

/// Send a request in async way.
Expand All @@ -78,6 +81,34 @@ impl HttpClient {

/// Fetch a request in async way.
pub async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
self.fetcher.fetch(req).await
}
}

#[async_trait::async_trait]
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
pub trait HttpFetch: Send + Sync {
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
/// Fetch a request in async way.
async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>>;
}

#[derive(Clone)]
struct ReqwestHttpFetcher {
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
client: reqwest::Client,
}

impl ReqwestHttpFetcher {
pub fn new() -> Self {
Self::with(reqwest::Client::new())
}

pub fn with(client: reqwest::Client) -> Self {
Self { client }
}
}

#[async_trait::async_trait]
impl HttpFetch for ReqwestHttpFetcher {
async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
// Uri stores all string alike data in `Bytes` which means
// the clone here is cheap.
let uri = req.uri().clone();
Expand Down
1 change: 1 addition & 0 deletions core/src/raw/http_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

mod client;
pub use client::HttpClient;
pub use client::CREDENTIAL_CLIENT;
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved

mod body;
pub use body::HttpBody;
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/cos/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl Builder for CosBuilder {
cfg.secret_key = Some(v);
}

let cred_loader = TencentCosCredentialLoader::new(client.client(), cfg);
let cred_loader = TencentCosCredentialLoader::new(CREDENTIAL_CLIENT.clone(), cfg);

let signer = TencentCosSigner::new();

Expand Down
2 changes: 1 addition & 1 deletion core/src/services/gcs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl Builder for GcsBuilder {
DEFAULT_GCS_SCOPE
};

let mut token_loader = GoogleTokenLoader::new(scope, client.client());
let mut token_loader = GoogleTokenLoader::new(scope, CREDENTIAL_CLIENT.clone());
if let Some(account) = &self.config.service_account {
token_loader = token_loader.with_service_account(account);
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/oss/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ impl Builder for OssBuilder {
})?
};

let loader = AliyunLoader::new(client.client(), cfg);
let loader = AliyunLoader::new(CREDENTIAL_CLIENT.clone(), cfg);

let signer = AliyunOssSigner::new(bucket);

Expand Down
8 changes: 5 additions & 3 deletions core/src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,8 @@ impl Builder for S3Builder {
// If role_arn is set, we must use AssumeRoleLoad.
if let Some(role_arn) = self.config.role_arn {
// use current env as source credential loader.
let default_loader = AwsDefaultLoader::new(client.client(), cfg.clone());
let default_loader =
AwsDefaultLoader::new(CREDENTIAL_CLIENT.clone().clone(), cfg.clone());

// Build the config for assume role.
let mut assume_role_cfg = AwsConfig {
Expand All @@ -817,7 +818,7 @@ impl Builder for S3Builder {
}

let assume_role_loader = AwsAssumeRoleLoader::new(
client.client(),
CREDENTIAL_CLIENT.clone().clone(),
assume_role_cfg,
Box::new(default_loader),
)
Expand All @@ -835,7 +836,8 @@ impl Builder for S3Builder {
let loader = match loader {
Some(v) => v,
None => {
let mut default_loader = AwsDefaultLoader::new(client.client(), cfg);
let mut default_loader =
AwsDefaultLoader::new(CREDENTIAL_CLIENT.clone().clone(), cfg);
if self.config.disable_ec2_metadata {
default_loader = default_loader.with_disable_ec2_metadata();
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/s3/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl S3Core {
async fn load_credential(&self) -> Result<Option<AwsCredential>> {
let cred = self
.loader
.load_credential(self.client.client())
.load_credential(CREDENTIAL_CLIENT.clone())
.await
.map_err(new_request_credential_error)?;

Expand Down
Loading