diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs index 2b58a77f2ce3..47e68637b663 100644 --- a/object_store/src/client/mod.rs +++ b/object_store/src/client/mod.rs @@ -24,7 +24,9 @@ pub mod pagination; pub mod retry; pub mod token; +use reqwest::header::{HeaderMap, HeaderValue}; use reqwest::{Client, ClientBuilder, Proxy}; +use std::time::Duration; fn map_client_error(e: reqwest::Error) -> super::Error { super::Error::Generic { @@ -33,11 +35,25 @@ fn map_client_error(e: reqwest::Error) -> super::Error { } } +static DEFAULT_USER_AGENT: &str = + concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),); + /// HTTP client configuration for remote object stores #[derive(Debug, Clone, Default)] pub struct ClientOptions { + user_agent: Option, + default_headers: Option, proxy_url: Option, allow_http: bool, + timeout: Option, + connect_timeout: Option, + pool_idle_timeout: Option, + pool_max_idle_per_host: Option, + http2_keep_alive_interval: Option, + http2_keep_alive_timeout: Option, + http2_keep_alive_while_idle: bool, + http1_only: bool, + http2_only: bool, } impl ClientOptions { @@ -46,6 +62,20 @@ impl ClientOptions { Default::default() } + /// Sets the User-Agent header to be used by this client + /// + /// Default is based on the version of this crate + pub fn with_user_agent(mut self, agent: HeaderValue) -> Self { + self.user_agent = Some(agent); + self + } + + /// Sets the default headers for every request + pub fn with_default_headers(mut self, headers: HeaderMap) -> Self { + self.default_headers = Some(headers); + self + } + /// Sets what protocol is allowed. If `allow_http` is : /// * false (default): Only HTTPS are allowed /// * true: HTTP and HTTPS are allowed @@ -54,19 +84,140 @@ impl ClientOptions { self } + /// Only use http1 connections + pub fn with_http1_only(mut self) -> Self { + self.http1_only = true; + self + } + + /// Only use http2 connections + pub fn with_http2_only(mut self) -> Self { + self.http2_only = true; + self + } + /// Set an HTTP proxy to use for requests pub fn with_proxy_url(mut self, proxy_url: impl Into) -> Self { self.proxy_url = Some(proxy_url.into()); self } + /// Set a request timeout + /// + /// The timeout is applied from when the request starts connecting until the + /// response body has finished + pub fn with_timeout(mut self, timeout: Duration) -> Self { + self.timeout = Some(timeout); + self + } + + /// Set a timeout for only the connect phase of a Client + pub fn with_connect_timeout(mut self, timeout: Duration) -> Self { + self.connect_timeout = Some(timeout); + self + } + + /// Set the pool max idle timeout + /// + /// This is the length of time an idle connection will be kept alive + /// + /// Default is 90 seconds + pub fn with_pool_idle_timeout(mut self, timeout: Duration) -> Self { + self.pool_idle_timeout = Some(timeout); + self + } + + /// Set the maximum number of idle connections per host + /// + /// Default is no limit + pub fn with_pool_max_idle_per_host(mut self, max: usize) -> Self { + self.pool_max_idle_per_host = Some(max); + self + } + + /// Sets an interval for HTTP2 Ping frames should be sent to keep a connection alive. + /// + /// Default is disabled + pub fn with_http2_keep_alive_interval(mut self, interval: Duration) -> Self { + self.http2_keep_alive_interval = Some(interval); + self + } + + /// Sets a timeout for receiving an acknowledgement of the keep-alive ping. + /// + /// If the ping is not acknowledged within the timeout, the connection will be closed. + /// Does nothing if http2_keep_alive_interval is disabled. + /// + /// Default is disabled + pub fn with_http2_keep_alive_timeout(mut self, interval: Duration) -> Self { + self.http2_keep_alive_timeout = Some(interval); + self + } + + /// Enable HTTP2 keep alive pings for idle connections + /// + /// If disabled, keep-alive pings are only sent while there are open request/response + /// streams. If enabled, pings are also sent when no streams are active + /// + /// Default is disabled + pub fn with_http2_keep_alive_while_idle(mut self) -> Self { + self.http2_keep_alive_while_idle = true; + self + } + pub(crate) fn client(&self) -> super::Result { let mut builder = ClientBuilder::new(); + + match &self.user_agent { + Some(user_agent) => builder = builder.user_agent(user_agent), + None => builder = builder.user_agent(DEFAULT_USER_AGENT), + } + + if let Some(headers) = &self.default_headers { + builder = builder.default_headers(headers.clone()) + } + if let Some(proxy) = &self.proxy_url { let proxy = Proxy::all(proxy).map_err(map_client_error)?; builder = builder.proxy(proxy); } + if let Some(timeout) = self.timeout { + builder = builder.timeout(timeout) + } + + if let Some(timeout) = self.connect_timeout { + builder = builder.connect_timeout(timeout) + } + + if let Some(timeout) = self.pool_idle_timeout { + builder = builder.pool_idle_timeout(timeout) + } + + if let Some(max) = self.pool_max_idle_per_host { + builder = builder.pool_max_idle_per_host(max) + } + + if let Some(interval) = self.http2_keep_alive_interval { + builder = builder.http2_keep_alive_interval(interval) + } + + if let Some(interval) = self.http2_keep_alive_timeout { + builder = builder.http2_keep_alive_timeout(interval) + } + + if self.http2_keep_alive_while_idle { + builder = builder.http2_keep_alive_while_idle(true) + } + + if self.http1_only { + builder = builder.http1_only() + } + + if self.http2_only { + builder = builder.http2_prior_knowledge() + } + builder .https_only(!self.allow_http) .build()