Skip to content

Commit

Permalink
Impose a timeout on all requests
Browse files Browse the repository at this point in the history
Variant of #367

This PR takes a more opinionated stance than #367, where timeouts are optional. In this PR I suggest we make a all requests use a timeout and only let users choose the length.
  • Loading branch information
dvdplm committed Jul 6, 2021
1 parent 163f9df commit c9d1881
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 54 deletions.
30 changes: 8 additions & 22 deletions http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::v2::{
use crate::{Error, TEN_MB_SIZE_BYTES};
use async_trait::async_trait;
use fnv::FnvHashMap;
use futures::Future;
use serde::de::DeserializeOwned;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
Expand All @@ -18,7 +17,7 @@ use std::time::Duration;
#[derive(Debug)]
pub struct HttpClientBuilder {
max_request_body_size: u32,
request_timeout: Option<Duration>,
request_timeout: Duration,
}

impl HttpClientBuilder {
Expand All @@ -29,9 +28,7 @@ impl HttpClientBuilder {
}

/// Set request timeout (default is 60 seconds).
///
/// None - implies that no timeout is used.
pub fn request_timeout(mut self, timeout: Option<Duration>) -> Self {
pub fn request_timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = timeout;
self
}
Expand All @@ -46,7 +43,7 @@ impl HttpClientBuilder {

impl Default for HttpClientBuilder {
fn default() -> Self {
Self { max_request_body_size: TEN_MB_SIZE_BYTES, request_timeout: Some(Duration::from_secs(60)) }
Self { max_request_body_size: TEN_MB_SIZE_BYTES, request_timeout: Duration::from_secs(60) }
}
}

Expand All @@ -57,16 +54,16 @@ pub struct HttpClient {
transport: HttpTransportClient,
/// Request ID that wraps around when overflowing.
request_id: AtomicU64,
/// Request timeout
request_timeout: Option<Duration>,
/// Request timeout. Defaults to 60sec.
request_timeout: Duration,
}

#[async_trait]
impl Client for HttpClient {
async fn notification<'a>(&self, method: &'a str, params: JsonRpcParams<'a>) -> Result<(), Error> {
let notif = JsonRpcNotificationSer::new(method, params);
let fut = self.transport.send(serde_json::to_string(&notif).map_err(Error::ParseError)?);
match call_with_maybe_timeout(fut, self.request_timeout).await {
match crate::tokio::timeout(self.request_timeout, fut).await {
Ok(Ok(ok)) => Ok(ok),
Err(_) => Err(Error::RequestTimeout),
Ok(Err(e)) => Err(Error::Transport(Box::new(e))),
Expand All @@ -83,7 +80,7 @@ impl Client for HttpClient {
let request = JsonRpcCallSer::new(Id::Number(id), method, params);

let fut = self.transport.send_and_read_body(serde_json::to_string(&request).map_err(Error::ParseError)?);
let body = match call_with_maybe_timeout(fut, self.request_timeout).await {
let body = match crate::tokio::timeout(self.request_timeout, fut).await {
Ok(Ok(body)) => body,
Err(_e) => return Err(Error::RequestTimeout),
Ok(Err(e)) => return Err(Error::Transport(Box::new(e))),
Expand Down Expand Up @@ -124,7 +121,7 @@ impl Client for HttpClient {

let fut = self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?);

let body = match call_with_maybe_timeout(fut, self.request_timeout).await {
let body = match crate::tokio::timeout(self.request_timeout, fut).await {
Ok(Ok(body)) => body,
Err(_e) => return Err(Error::RequestTimeout),
Ok(Err(e)) => return Err(Error::Transport(Box::new(e))),
Expand All @@ -151,14 +148,3 @@ impl Client for HttpClient {
Ok(responses)
}
}

async fn call_with_maybe_timeout<F>(fut: F, timeout: Option<Duration>) -> Result<F::Output, crate::tokio::Elapsed>
where
F: Future,
{
if let Some(dur) = timeout {
crate::tokio::timeout(dur, fut).await
} else {
Ok(fut.await)
}
}
2 changes: 0 additions & 2 deletions http-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ mod transport;

#[cfg(all(feature = "tokio1", not(feature = "tokio02")))]
mod tokio {
// Required for `tokio::test` to work correctly.
pub(crate) use tokioV1::time::error::Elapsed;
pub(crate) use tokioV1::time::timeout;
#[cfg(test)]
pub(crate) use tokioV1::{runtime, test};
Expand Down
32 changes: 14 additions & 18 deletions ws-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::{
};
use crate::{
manager::RequestManager, BatchMessage, Error, FrontToBack, RegisterNotificationMessage, RequestMessage,
Subscription, SubscriptionMessage,
Subscription, SubscriptionMessage, helpers::call_with_timeout,
};
use async_trait::async_trait;
use futures::{
Expand Down Expand Up @@ -101,8 +101,8 @@ pub struct WsClient {
/// If the background thread terminates the error is sent to this channel.
// NOTE(niklasad1): This is a Mutex to circumvent that the async fns takes immutable references.
error: Mutex<ErrorFromBack>,
/// Request timeout
request_timeout: Option<Duration>,
/// Request timeout. Defaults to 60sec.
request_timeout: Duration,
/// Request ID manager.
id_guard: RequestIdGuard,
}
Expand Down Expand Up @@ -173,7 +173,7 @@ impl RequestIdGuard {
pub struct WsClientBuilder<'a> {
certificate_store: CertificateStore,
max_request_body_size: u32,
request_timeout: Option<Duration>,
request_timeout: Duration,
connection_timeout: Duration,
origin_header: Option<Cow<'a, str>>,
max_concurrent_requests: usize,
Expand All @@ -185,7 +185,7 @@ impl<'a> Default for WsClientBuilder<'a> {
Self {
certificate_store: CertificateStore::Native,
max_request_body_size: TEN_MB_SIZE_BYTES,
request_timeout: Some(Duration::from_secs(60)),
request_timeout: Duration::from_secs(60),
connection_timeout: Duration::from_secs(10),
origin_header: None,
max_concurrent_requests: 256,
Expand All @@ -210,7 +210,7 @@ impl<'a> WsClientBuilder<'a> {
/// Set request timeout (default is 60 seconds).
///
/// None - no timeout is used.
pub fn request_timeout(mut self, timeout: Option<Duration>) -> Self {
pub fn request_timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = timeout;
self
}
Expand Down Expand Up @@ -318,17 +318,13 @@ impl Client for WsClient {
let mut sender = self.to_back.clone();
let fut = sender.send(FrontToBack::Notification(raw));

let res = if let Some(dur) = self.request_timeout {
let timeout = crate::tokio::sleep(dur);
futures::pin_mut!(fut, timeout);
let timeout = crate::tokio::sleep(self.request_timeout);
futures::pin_mut!(fut, timeout);
let res =
match futures::future::select(fut, timeout).await {
futures::future::Either::Left((res, _)) => res,
futures::future::Either::Right((_, _)) => return Err(Error::RequestTimeout),
}
} else {
fut.await
};

};
self.id_guard.reclaim_request_id();
match res {
Ok(()) => Ok(()),
Expand Down Expand Up @@ -359,7 +355,7 @@ impl Client for WsClient {
return Err(self.read_error_from_backend().await);
}

let res = crate::helpers::call_with_maybe_timeout(send_back_rx, self.request_timeout).await;
let res = call_with_timeout(self.request_timeout, send_back_rx).await;

self.id_guard.reclaim_request_id();
let json_value = match res {
Expand Down Expand Up @@ -399,7 +395,7 @@ impl Client for WsClient {
return Err(self.read_error_from_backend().await);
}

let res = crate::helpers::call_with_maybe_timeout(send_back_rx, self.request_timeout).await;
let res = call_with_timeout(self.request_timeout, send_back_rx).await;

self.id_guard.reclaim_request_id();
let json_values = match res {
Expand Down Expand Up @@ -460,7 +456,7 @@ impl SubscriptionClient for WsClient {
return Err(self.read_error_from_backend().await);
}

let res = crate::helpers::call_with_maybe_timeout(send_back_rx, self.request_timeout).await;
let res = call_with_timeout(self.request_timeout, send_back_rx).await;

self.id_guard.reclaim_request_id();
let (notifs_rx, id) = match res {
Expand Down Expand Up @@ -492,7 +488,7 @@ impl SubscriptionClient for WsClient {
return Err(self.read_error_from_backend().await);
}

let res = crate::helpers::call_with_maybe_timeout(send_back_rx, self.request_timeout).await;
let res = call_with_timeout(self.request_timeout, send_back_rx).await;

let (notifs_rx, method) = match res {
Ok(Ok(val)) => val,
Expand Down
20 changes: 8 additions & 12 deletions ws-client/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,19 +190,15 @@ pub fn process_error_response(manager: &mut RequestManager, err: JsonRpcError) -
}
}

/// Wait for a stream to complete with optional timeout.
pub async fn call_with_maybe_timeout<T>(
/// Wait for a stream to complete within the given timeout.
pub async fn call_with_timeout<T>(
timeout: Duration,
rx: oneshot::Receiver<Result<T, Error>>,
timeout: Option<Duration>,
) -> Result<Result<T, Error>, oneshot::Canceled> {
if let Some(dur) = timeout {
let timeout = crate::tokio::sleep(dur);
futures::pin_mut!(rx, timeout);
match futures::future::select(rx, timeout).await {
futures::future::Either::Left((res, _)) => res,
futures::future::Either::Right((_, _)) => Ok(Err(Error::RequestTimeout)),
}
} else {
rx.await
let timeout = crate::tokio::sleep(timeout);
futures::pin_mut!(rx, timeout);
match futures::future::select(rx, timeout).await {
futures::future::Either::Left((res, _)) => res,
futures::future::Either::Right((_, _)) => Ok(Err(Error::RequestTimeout)),
}
}

0 comments on commit c9d1881

Please sign in to comment.