Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[client]: add timeouts to all request kinds + default timeout #367

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions http-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ documentation = "https://docs.rs/jsonrpsee-http-client"

[dependencies]
async-trait = "0.1"
futures = { version = "0.3.14", default-features = false, features = ["std"] }
hyper13-rustls = { package = "hyper-rustls", version = "0.21", optional = true }
hyper14-rustls = { package = "hyper-rustls", version = "0.22", optional = true }
hyper14 = { package = "hyper", version = "0.14", features = ["client", "http1", "http2", "tcp"], optional = true }
Expand All @@ -20,15 +21,17 @@ jsonrpsee-utils = { path = "../utils", version = "=0.2.0-alpha.7", optional = tr
log = "0.4"
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = "1.0"
tokioV1 = { package = "tokio", version = "1", features = ["time"], optional = true }
tokioV02 = { package = "tokio", version = "0.2", features = ["time"], optional = true }
thiserror = "1.0"
url = "2.2"
fnv = "1"

[features]
default = ["tokio1"]
tokio1 = ["hyper14", "hyper14-rustls", "jsonrpsee-utils/hyper_14"]
tokio02 = ["hyper13", "hyper13-rustls", "jsonrpsee-utils/hyper_13"]
tokio1 = ["hyper14", "hyper14-rustls", "jsonrpsee-utils/hyper_14", "tokioV1" ]
tokio02 = ["hyper13", "hyper13-rustls", "jsonrpsee-utils/hyper_13", "tokioV02" ]

[dev-dependencies]
jsonrpsee-test-utils = { path = "../test-utils" }
tokio = { version = "1.0", features = ["net", "rt-multi-thread", "macros"] }
tokioV1 = { package = "tokio", version = "1", features = ["net", "rt-multi-thread", "macros"] }
61 changes: 45 additions & 16 deletions http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ use crate::v2::{
use crate::{Error, TEN_MB_SIZE_BYTES};
use async_trait::async_trait;
use fnv::FnvHashMap;
use futures::Future;
use serde::de::DeserializeOwned;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;

/// Http Client Builder.
#[derive(Debug)]
pub struct HttpClientBuilder {
max_request_body_size: u32,
request_timeout: Option<Duration>,
}

impl HttpClientBuilder {
Expand All @@ -25,17 +28,25 @@ impl HttpClientBuilder {
self
}

/// Set request timeout (default is 60 seconds).
///
/// None - implies that no timeout is used.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// None - implies that no timeout is used.
/// `None` - no timeout is used.

It's explicit, not implied :)

pub fn request_timeout(mut self, timeout: Option<Duration>) -> Self {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

little annoying with option here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reckon that's fine.

self.request_timeout = timeout;
self
}

/// Build the HTTP client with target to connect to.
pub fn build(self, target: impl AsRef<str>) -> Result<HttpClient, Error> {
let transport =
HttpTransportClient::new(target, self.max_request_body_size).map_err(|e| Error::Transport(Box::new(e)))?;
Ok(HttpClient { transport, request_id: AtomicU64::new(0) })
Ok(HttpClient { transport, request_id: AtomicU64::new(0), request_timeout: self.request_timeout })
}
}

impl Default for HttpClientBuilder {
fn default() -> Self {
Self { max_request_body_size: TEN_MB_SIZE_BYTES }
Self { max_request_body_size: TEN_MB_SIZE_BYTES, request_timeout: Some(Duration::from_secs(60)) }
}
}

Expand All @@ -46,16 +57,20 @@ pub struct HttpClient {
transport: HttpTransportClient,
/// Request ID that wraps around when overflowing.
request_id: AtomicU64,
/// Request timeout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Request timeout
/// Request timeout. Defaults to 60sec.

request_timeout: Option<Duration>,
}

#[async_trait]
impl Client for HttpClient {
async fn notification<'a>(&self, method: &'a str, params: JsonRpcParams<'a>) -> Result<(), Error> {
let notif = JsonRpcNotificationSer::new(method, params);
self.transport
.send(serde_json::to_string(&notif).map_err(Error::ParseError)?)
.await
.map_err(|e| Error::Transport(Box::new(e)))
let fut = self.transport.send(serde_json::to_string(&notif).map_err(Error::ParseError)?);
match call_with_maybe_timeout(fut, self.request_timeout).await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can drop the maybe here; call_with_timeout is verbose enough! :)

Ok(Ok(ok)) => Ok(ok),
Err(_) => Err(Error::RequestTimeout),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is tokio::time::Elapsed

Ok(Err(e)) => Err(Error::Transport(Box::new(e))),
}
}

/// Perform a request towards the server.
Expand All @@ -67,11 +82,12 @@ impl Client for HttpClient {
let id = self.request_id.fetch_add(1, Ordering::SeqCst);
let request = JsonRpcCallSer::new(Id::Number(id), method, params);

let body = self
.transport
.send_and_read_body(serde_json::to_string(&request).map_err(Error::ParseError)?)
.await
.map_err(|e| Error::Transport(Box::new(e)))?;
let fut = self.transport.send_and_read_body(serde_json::to_string(&request).map_err(Error::ParseError)?);
let body = match call_with_maybe_timeout(fut, self.request_timeout).await {
Ok(Ok(body)) => body,
Err(_e) => return Err(Error::RequestTimeout),
Ok(Err(e)) => return Err(Error::Transport(Box::new(e))),
};

let response: JsonRpcResponse<_> = match serde_json::from_slice(&body) {
Ok(response) => response,
Expand Down Expand Up @@ -106,11 +122,13 @@ impl Client for HttpClient {
request_set.insert(id, pos);
}

let body = self
.transport
.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?)
.await
.map_err(|e| Error::Transport(Box::new(e)))?;
let fut = self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?);

let body = match call_with_maybe_timeout(fut, self.request_timeout).await {
Ok(Ok(body)) => body,
Err(_e) => return Err(Error::RequestTimeout),
Ok(Err(e)) => return Err(Error::Transport(Box::new(e))),
};

let rps: Vec<JsonRpcResponse<_>> = match serde_json::from_slice(&body) {
Ok(response) => response,
Expand All @@ -133,3 +151,14 @@ impl Client for HttpClient {
Ok(responses)
}
}

async fn call_with_maybe_timeout<F>(fut: F, timeout: Option<Duration>) -> Result<F::Output, crate::tokio::Elapsed>
where
F: Future,
{
if let Some(dur) = timeout {
crate::tokio::timeout(dur, fut).await
} else {
Ok(fut.await)
}
}
15 changes: 15 additions & 0 deletions http-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,21 @@ extern crate hyper13_rustls as hyper_rustls;
mod client;
mod transport;

#[cfg(all(feature = "tokio1", not(feature = "tokio02")))]
mod tokio {
// Required for `tokio::test` to work correctly.
pub(crate) use tokioV1::time::error::Elapsed;
pub(crate) use tokioV1::time::timeout;
#[cfg(test)]
pub(crate) use tokioV1::{runtime, test};
}

#[cfg(all(feature = "tokio02", not(feature = "tokio1")))]
mod tokio {
pub(crate) use tokioV02::time::timeout;
pub(crate) use tokioV02::time::Elapsed;
}

#[cfg(test)]
mod tests;

Expand Down
2 changes: 1 addition & 1 deletion http-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::v2::{
error::{JsonRpcError, JsonRpcErrorCode, JsonRpcErrorObject},
params::JsonRpcParams,
};
use crate::{traits::Client, Error, HttpClientBuilder, JsonValue};
use crate::{tokio, traits::Client, Error, HttpClientBuilder, JsonValue};
use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::types::Id;
use jsonrpsee_test_utils::TimeoutFutureExt;
Expand Down
1 change: 1 addition & 0 deletions http-client/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ where
#[cfg(test)]
mod tests {
use super::{Error, HttpTransportClient};
use crate::tokio;

#[test]
fn invalid_http_url_rejected() {
Expand Down
12 changes: 6 additions & 6 deletions ws-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ documentation = "https://docs.rs/jsonrpsee-ws-client"

[dependencies]
# Tokio v1 deps
tokioV1 = { package="tokio", version = "1", features = ["net", "time"], optional=true }
tokioV1-rustls = { package="tokio-rustls", version = "0.22", optional=true }
tokioV1-util = { package="tokio-util", version = "0.6", features = ["compat"], optional=true }
tokioV1 = { package = "tokio", version = "1", features = ["net", "time"], optional = true }
tokioV1-rustls = { package ="tokio-rustls", version = "0.22", optional = true }
tokioV1-util = { package= "tokio-util", version = "0.6", features = ["compat"], optional = true }

# Tokio v0.2 deps
tokioV02 = { package="tokio", version = "0.2", features = ["net", "time"], optional=true }
tokioV02-rustls = { package="tokio-rustls", version = "0.15", optional=true }
tokioV02-util = { package="tokio-util", version = "0.3", features = ["compat"], optional=true }
tokioV02 = { package = "tokio", version = "0.2", features = ["net", "time"], optional=true }
tokioV02-rustls = { package = "tokio-rustls", version = "0.15", optional = true }
tokioV02-util = { package = "tokio-util", version = "0.3", features = ["compat"], optional = true }

async-trait = "0.1"
fnv = "1"
Expand Down
48 changes: 29 additions & 19 deletions ws-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl<'a> Default for WsClientBuilder<'a> {
Self {
certificate_store: CertificateStore::Native,
max_request_body_size: TEN_MB_SIZE_BYTES,
request_timeout: None,
request_timeout: Some(Duration::from_secs(60)),
connection_timeout: Duration::from_secs(10),
origin_header: None,
max_concurrent_requests: 256,
Expand All @@ -207,9 +207,11 @@ impl<'a> WsClientBuilder<'a> {
self
}

/// Set request timeout.
pub fn request_timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = Some(timeout);
/// Set request timeout (default is 60 seconds).
///
/// None - no timeout is used.
pub fn request_timeout(mut self, timeout: Option<Duration>) -> Self {
self.request_timeout = timeout;
self
}

Expand Down Expand Up @@ -312,7 +314,21 @@ impl Client for WsClient {
Error::ParseError(e)
})?;
log::trace!("[frontend]: send notification: {:?}", raw);
let res = self.to_back.clone().send(FrontToBack::Notification(raw)).await;

let mut sender = self.to_back.clone();
let fut = sender.send(FrontToBack::Notification(raw));

let res = if let Some(dur) = self.request_timeout {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this annoying of boiler plate but I found no good way of having a helper for Result<T>, and T when the receiver is either expects T or Result<T>

on option is to use generic as the http client is using with F: Fut -> Result<Fut::Output, Error> but that became super complicated with lots of nested results.

let timeout = crate::tokio::sleep(dur);
futures::pin_mut!(fut, timeout);
match futures::future::select(fut, timeout).await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can use select! macro for a bit less boilerplate, but it's fine either way.

futures::future::Either::Left((res, _)) => res,
futures::future::Either::Right((_, _)) => return Err(Error::RequestTimeout),
}
} else {
fut.await
};

self.id_guard.reclaim_request_id();
match res {
Ok(()) => Ok(()),
Expand Down Expand Up @@ -343,19 +359,10 @@ impl Client for WsClient {
return Err(self.read_error_from_backend().await);
}

let send_back_rx_out = if let Some(duration) = self.request_timeout {
let timeout = crate::tokio::sleep(duration);
futures::pin_mut!(send_back_rx, timeout);
match future::select(send_back_rx, timeout).await {
future::Either::Left((send_back_rx_out, _)) => send_back_rx_out,
future::Either::Right((_, _)) => Ok(Err(Error::RequestTimeout)),
}
} else {
send_back_rx.await
};
let res = crate::helpers::call_with_maybe_timeout(send_back_rx, self.request_timeout).await;

self.id_guard.reclaim_request_id();
let json_value = match send_back_rx_out {
let json_value = match res {
Ok(Ok(v)) => v,
Ok(Err(err)) => return Err(err),
Err(_) => return Err(self.read_error_from_backend().await),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is oneshot::Cancled err in that case the background thread was terminated and we read the error message from the backend.

Expand Down Expand Up @@ -392,7 +399,8 @@ impl Client for WsClient {
return Err(self.read_error_from_backend().await);
}

let res = send_back_rx.await;
let res = crate::helpers::call_with_maybe_timeout(send_back_rx, self.request_timeout).await;

self.id_guard.reclaim_request_id();
let json_values = match res {
Ok(Ok(v)) => v,
Expand Down Expand Up @@ -452,7 +460,8 @@ impl SubscriptionClient for WsClient {
return Err(self.read_error_from_backend().await);
}

let res = send_back_rx.await;
let res = crate::helpers::call_with_maybe_timeout(send_back_rx, self.request_timeout).await;

self.id_guard.reclaim_request_id();
let (notifs_rx, id) = match res {
Ok(Ok(val)) => val,
Expand Down Expand Up @@ -483,7 +492,8 @@ impl SubscriptionClient for WsClient {
return Err(self.read_error_from_backend().await);
}

let res = send_back_rx.await;
let res = crate::helpers::call_with_maybe_timeout(send_back_rx, self.request_timeout).await;

let (notifs_rx, method) = match res {
Ok(Ok(val)) => val,
Ok(Err(err)) => return Err(err),
Expand Down
20 changes: 19 additions & 1 deletion ws-client/src/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::manager::{RequestManager, RequestStatus};
use crate::transport::Sender as WsSender;
use futures::channel::mpsc;
use futures::channel::{mpsc, oneshot};
use jsonrpsee_types::v2::params::{Id, JsonRpcParams, SubscriptionId};
use jsonrpsee_types::v2::request::JsonRpcCallSer;
use jsonrpsee_types::v2::response::{JsonRpcNotifResponse, JsonRpcResponse, JsonRpcSubscriptionResponseAlloc};
use jsonrpsee_types::{v2::error::JsonRpcError, Error, RequestMessage};
use serde_json::Value as JsonValue;
use std::time::Duration;

/// Attempts to process a batch response.
///
Expand Down Expand Up @@ -188,3 +189,20 @@ pub fn process_error_response(manager: &mut RequestManager, err: JsonRpcError) -
_ => Err(Error::InvalidRequestId),
}
}

/// Wait for a stream to complete with optional timeout.
pub async fn call_with_maybe_timeout<T>(
rx: oneshot::Receiver<Result<T, Error>>,
timeout: Option<Duration>,
) -> Result<Result<T, Error>, oneshot::Canceled> {
if let Some(dur) = timeout {
let timeout = crate::tokio::sleep(dur);
futures::pin_mut!(rx, timeout);
match futures::future::select(rx, timeout).await {
futures::future::Either::Left((res, _)) => res,
futures::future::Either::Right((_, _)) => Ok(Err(Error::RequestTimeout)),
}
} else {
rx.await
}
}