From 6ffc4d2b8c2d5ddc623b7c2900d6d9d1f7376d8c Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Wed, 17 Nov 2021 09:11:43 +0000 Subject: [PATCH 01/24] Squashed MethodSink --- http-server/src/server.rs | 13 ++-- utils/src/server/helpers.rs | 138 +++++++++++++++++++++------------ utils/src/server/rpc_module.rs | 73 ++++++++--------- ws-server/src/server.rs | 27 +++---- 4 files changed, 143 insertions(+), 108 deletions(-) diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 44e2bfcd45..5f21e27c9e 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -39,7 +39,7 @@ use jsonrpsee_types::{ }; use jsonrpsee_utils::http_helpers::read_body; use jsonrpsee_utils::server::{ - helpers::{collect_batch_response, prepare_error, send_error}, + helpers::{collect_batch_response, prepare_error, MethodSink}, resource_limiting::Resources, rpc_module::Methods, }; @@ -244,6 +244,7 @@ impl Server { // NOTE(niklasad1): it's a channel because it's needed for batch requests. let (tx, mut rx) = mpsc::unbounded::(); + let sink = MethodSink::new_with_limit(tx, max_request_body_size); type Notif<'a> = Notification<'a, Option<&'a RawValue>>; @@ -252,7 +253,7 @@ impl Server { if let Ok(req) = serde_json::from_slice::(&body) { // NOTE: we don't need to track connection id on HTTP, so using hardcoded 0 here. if let Some(fut) = - methods.execute_with_resources(&tx, req, 0, &resources, max_request_body_size) + methods.execute_with_resources(&sink, req, 0, &resources) { fut.await; } @@ -260,14 +261,14 @@ impl Server { return Ok::<_, HyperError>(response::ok_response("".into())); } else { let (id, code) = prepare_error(&body); - send_error(id, &tx, code.into()); + sink.send_error(id, code.into()); } // Batch of requests or notifications } else if let Ok(batch) = serde_json::from_slice::>(&body) { if !batch.is_empty() { join_all(batch.into_iter().filter_map(|req| { - methods.execute_with_resources(&tx, req, 0, &resources, max_request_body_size) + methods.execute_with_resources(&sink, req, 0, &resources) })) .await; } else { @@ -275,7 +276,7 @@ impl Server { // Array with at least one value, the response from the Server MUST be a single // Response object." – The Spec. is_single = true; - send_error(Id::Null, &tx, ErrorCode::InvalidRequest.into()); + sink.send_error(Id::Null, ErrorCode::InvalidRequest.into()); } } else if let Ok(_batch) = serde_json::from_slice::>(&body) { return Ok::<_, HyperError>(response::ok_response("".into())); @@ -285,7 +286,7 @@ impl Server { // Response object." – The Spec. is_single = true; let (id, code) = prepare_error(&body); - send_error(id, &tx, code.into()); + sink.send_error(id, code.into()); } // Closes the receiving half of a channel without dropping it. This prevents any further diff --git a/utils/src/server/helpers.rs b/utils/src/server/helpers.rs index 99d2256f62..bdb7305c64 100644 --- a/utils/src/server/helpers.rs +++ b/utils/src/server/helpers.rs @@ -24,7 +24,6 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::server::rpc_module::MethodSink; use futures_channel::mpsc; use futures_util::stream::StreamExt; use jsonrpsee_types::error::{CallError, Error}; @@ -82,67 +81,106 @@ impl<'a> io::Write for &'a mut BoundedWriter { } } -/// Helper for sending JSON-RPC responses to the client -pub fn send_response(id: Id, tx: &MethodSink, result: impl Serialize, max_response_size: u32) { - let mut writer = BoundedWriter::new(max_response_size as usize); +/// Sink that is used to send back the result to the server for a specific method. +#[derive(Clone, Debug)] +pub struct MethodSink { + /// Channel sender + tx: mpsc::UnboundedSender, + /// Max response size in bytes for a executed call. + max_response_size: u32, +} - let json = match serde_json::to_writer(&mut writer, &Response { jsonrpc: TwoPointZero, id: id.clone(), result }) { - Ok(_) => { - // Safety - serde_json does not emit invalid UTF-8. - unsafe { String::from_utf8_unchecked(writer.into_bytes()) } - } - Err(err) => { - tracing::error!("Error serializing response: {:?}", err); - - if err.is_io() { - let data = to_json_raw_value(&format!("Exceeded max limit {}", max_response_size)).ok(); - let err = ErrorObject { - code: ErrorCode::ServerError(OVERSIZED_RESPONSE_CODE), - message: OVERSIZED_RESPONSE_MSG, - data: data.as_deref(), - }; - return send_error(id, tx, err); - } else { - return send_error(id, tx, ErrorCode::InternalError.into()); - } +impl MethodSink { + /// Create a new `MethodSink` with unlimited response size + pub fn new(tx: mpsc::UnboundedSender) -> Self { + MethodSink { + tx, + max_response_size: u32::MAX, } - }; + } - if let Err(err) = tx.unbounded_send(json) { - tracing::error!("Error sending response to the client: {:?}", err) + /// Create a new `MethodSink` with a limited response size + pub fn new_with_limit(tx: mpsc::UnboundedSender, max_response_size: u32) -> Self { + MethodSink { + tx, + max_response_size, + } } -} -/// Helper for sending JSON-RPC errors to the client -pub fn send_error(id: Id, tx: &MethodSink, error: ErrorObject) { - let json = match serde_json::to_string(&RpcError { jsonrpc: TwoPointZero, error, id }) { - Ok(json) => json, - Err(err) => { - tracing::error!("Error serializing error message: {:?}", err); + /// Send a JSON-RPC response to the client. If the serialization of `result` exceeds `max_response_size`, + /// an error will be sent instead. + pub fn send_response(&self, id: Id, result: impl Serialize) { + let mut writer = BoundedWriter::new(self.max_response_size as usize); - return; + let json = match serde_json::to_writer(&mut writer, &Response { jsonrpc: TwoPointZero, id: id.clone(), result }) { + Ok(_) => { + // Safety - serde_json does not emit invalid UTF-8. + unsafe { String::from_utf8_unchecked(writer.into_bytes()) } + } + Err(err) => { + tracing::error!("Error serializing response: {:?}", err); + + if err.is_io() { + let data = to_json_raw_value(&format!("Exceeded max limit {}", self.max_response_size)).ok(); + let err = ErrorObject { + code: ErrorCode::ServerError(OVERSIZED_RESPONSE_CODE), + message: OVERSIZED_RESPONSE_MSG, + data: data.as_deref(), + }; + return self.send_error(id, err); + } else { + return self.send_error(id, ErrorCode::InternalError.into()); + } + } + }; + + if let Err(err) = self.tx.unbounded_send(json) { + tracing::error!("Error sending response to the client: {:?}", err) } - }; + } - if let Err(err) = tx.unbounded_send(json) { - tracing::error!("Could not send error response to the client: {:?}", err) + /// Send a JSON-RPC error to the client + pub fn send_error(&self, id: Id, error: ErrorObject) { + let json = match serde_json::to_string(&RpcError { jsonrpc: TwoPointZero, error, id }) { + Ok(json) => json, + Err(err) => { + tracing::error!("Error serializing error message: {:?}", err); + + return; + } + }; + + if let Err(err) = self.tx.unbounded_send(json) { + tracing::error!("Could not send error response to the client: {:?}", err) + } } -} -/// Helper for sending the general purpose `Error` as a JSON-RPC errors to the client -pub fn send_call_error(id: Id, tx: &MethodSink, err: Error) { - let (code, message, data) = match err { - Error::Call(CallError::InvalidParams(e)) => (ErrorCode::InvalidParams, e.to_string(), None), - Error::Call(CallError::Failed(e)) => (ErrorCode::ServerError(CALL_EXECUTION_FAILED_CODE), e.to_string(), None), - Error::Call(CallError::Custom { code, message, data }) => (code.into(), message, data), - // This should normally not happen because the most common use case is to - // return `Error::Call` in `register_async_method`. - e => (ErrorCode::ServerError(UNKNOWN_ERROR_CODE), e.to_string(), None), - }; + /// Helper for sending the general purpose `Error` as a JSON-RPC errors to the client + pub fn send_call_error(&self, id: Id, err: Error) { + let (code, message, data) = match err { + Error::Call(CallError::InvalidParams(e)) => (ErrorCode::InvalidParams, e.to_string(), None), + Error::Call(CallError::Failed(e)) => (ErrorCode::ServerError(CALL_EXECUTION_FAILED_CODE), e.to_string(), None), + Error::Call(CallError::Custom { code, message, data }) => (code.into(), message, data), + // This should normally not happen because the most common use case is to + // return `Error::Call` in `register_async_method`. + e => (ErrorCode::ServerError(UNKNOWN_ERROR_CODE), e.to_string(), None), + }; - let err = ErrorObject { code, message: &message, data: data.as_deref() }; + let err = ErrorObject { code, message: &message, data: data.as_deref() }; - send_error(id, tx, err) + self.send_error(id, err) + } + + /// Send a raw JSON-RPC message to the client, `MethodSink` does not check verify the validity + /// of the JSON being sent. + pub fn send_raw(&self, raw_json: String) -> Result<(), mpsc::TrySendError> { + self.tx.unbounded_send(raw_json) + } + + /// Close the channel for any further messages. + pub fn close(&self) { + self.tx.close_channel(); + } } /// Figure out if this is a sufficiently complete request that we can extract an [`Id`] out of, or just plain diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index fe2aadfb64..a90ec96455 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -24,7 +24,7 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::server::helpers::{send_call_error, send_error, send_response}; +use crate::server::helpers::MethodSink; use crate::server::resource_limiting::{ResourceGuard, ResourceTable, ResourceVec, Resources}; use beef::Cow; use futures_channel::{mpsc, oneshot}; @@ -53,20 +53,16 @@ use std::sync::Arc; /// implemented as a function pointer to a `Fn` function taking four arguments: /// the `id`, `params`, a channel the function uses to communicate the result (or error) /// back to `jsonrpsee`, and the connection ID (useful for the websocket transport). -pub type SyncMethod = Arc; +pub type SyncMethod = Arc; /// Similar to [`SyncMethod`], but represents an asynchronous handler and takes an additional argument containing a [`ResourceGuard`] if configured. pub type AsyncMethod<'a> = Arc< - dyn Send + Sync + Fn(Id<'a>, Params<'a>, MethodSink, Option, MaxResponseSize) -> BoxFuture<'a, ()>, + dyn Send + Sync + Fn(Id<'a>, Params<'a>, MethodSink, Option) -> BoxFuture<'a, ()>, >; /// Connection ID, used for stateful protocol such as WebSockets. /// For stateless protocols such as http it's unused, so feel free to set it some hardcoded value. pub type ConnectionId = usize; /// Subscription ID. pub type SubscriptionId = u64; -/// Sink that is used to send back the result to the server for a specific method. -pub type MethodSink = mpsc::UnboundedSender; -/// Max response size in bytes for a executed call. -pub type MaxResponseSize = u32; type Subscribers = Arc)>>>; @@ -149,7 +145,6 @@ impl MethodCallback { req: Request<'_>, conn_id: ConnectionId, claimed: Option, - max_response_size: MaxResponseSize, ) -> Option> { let id = req.id.clone(); let params = Params::new(req.params.map(|params| params.get())); @@ -162,7 +157,7 @@ impl MethodCallback { id, conn_id ); - (callback)(id, params, tx, conn_id, max_response_size); + (callback)(id, params, tx, conn_id); // Release claimed resources drop(claimed); @@ -180,7 +175,7 @@ impl MethodCallback { conn_id ); - Some((callback)(id, params, tx, claimed, max_response_size)) + Some((callback)(id, params, tx, claimed)) } } } @@ -289,16 +284,15 @@ impl Methods { /// Attempt to execute a callback, sending the resulting JSON (success or error) to the specified sink. pub fn execute( &self, - tx: &MethodSink, + sink: &MethodSink, req: Request, conn_id: ConnectionId, - max_response_size: MaxResponseSize, ) -> Option> { tracing::trace!("[Methods::execute] Executing request: {:?}", req); match self.callbacks.get(&*req.method) { - Some(callback) => callback.execute(tx, req, conn_id, None, max_response_size), + Some(callback) => callback.execute(sink, req, conn_id, None), None => { - send_error(req.id, tx, ErrorCode::MethodNotFound.into()); + sink.send_error(req.id, ErrorCode::MethodNotFound.into()); None } } @@ -307,24 +301,23 @@ impl Methods { /// Attempt to execute a callback while checking that the call does not exhaust the available resources, sending the resulting JSON (success or error) to the specified sink. pub fn execute_with_resources( &self, - tx: &MethodSink, + sink: &MethodSink, req: Request, conn_id: ConnectionId, resources: &Resources, - max_response_size: MaxResponseSize, ) -> Option> { tracing::trace!("[Methods::execute_with_resources] Executing request: {:?}", req); match self.callbacks.get(&*req.method) { Some(callback) => match callback.claim(&req.method, resources) { - Ok(guard) => callback.execute(tx, req, conn_id, Some(guard), max_response_size), + Ok(guard) => callback.execute(sink, req, conn_id, Some(guard)), Err(err) => { tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err); - send_error(req.id, tx, ErrorCode::ServerIsBusy.into()); + sink.send_error(req.id, ErrorCode::ServerIsBusy.into()); None } }, None => { - send_error(req.id, tx, ErrorCode::MethodNotFound.into()); + sink.send_error(req.id, ErrorCode::MethodNotFound.into()); None } } @@ -349,8 +342,9 @@ impl Methods { }; let (tx, mut rx) = mpsc::unbounded(); + let sink = MethodSink::new(tx); - if let Some(fut) = self.execute(&tx, req, 0, MaxResponseSize::MAX) { + if let Some(fut) = self.execute(&sink, req, 0) { fut.await; } @@ -366,8 +360,9 @@ impl Methods { Request { jsonrpc: TwoPointZero, id: Id::Number(0), method: Cow::borrowed(method), params: Some(¶ms) }; let (tx, mut rx) = mpsc::unbounded(); + let sink = MethodSink::new(tx.clone()); - if let Some(fut) = self.execute(&tx, req, 0, MaxResponseSize::MAX) { + if let Some(fut) = self.execute(&sink, req, 0) { fut.await; } let response = rx.next().await.expect("Could not establish subscription."); @@ -434,10 +429,10 @@ impl RpcModule { let ctx = self.ctx.clone(); let callback = self.methods.verify_and_insert( method_name, - MethodCallback::new_sync(Arc::new(move |id, params, tx, _, max_response_size| { + MethodCallback::new_sync(Arc::new(move |id, params, sink, _| { match callback(params, &*ctx) { - Ok(res) => send_response(id, tx, res, max_response_size), - Err(err) => send_call_error(id, tx, err), + Ok(res) => sink.send_response(id, res), + Err(err) => sink.send_call_error(id, err), }; })), )?; @@ -459,12 +454,12 @@ impl RpcModule { let ctx = self.ctx.clone(); let callback = self.methods.verify_and_insert( method_name, - MethodCallback::new_async(Arc::new(move |id, params, tx, claimed, max_response_size| { + MethodCallback::new_async(Arc::new(move |id, params, sink, claimed| { let ctx = ctx.clone(); let future = async move { match callback(params, ctx).await { - Ok(res) => send_response(id, &tx, res, max_response_size), - Err(err) => send_call_error(id, &tx, err), + Ok(res) => sink.send_response(id, res), + Err(err) => sink.send_call_error(id, err), }; // Release claimed resources @@ -492,13 +487,13 @@ impl RpcModule { let ctx = self.ctx.clone(); let callback = self.methods.verify_and_insert( method_name, - MethodCallback::new_async(Arc::new(move |id, params, tx, claimed, max_response_size| { + MethodCallback::new_async(Arc::new(move |id, params, sink, claimed| { let ctx = ctx.clone(); tokio::task::spawn_blocking(move || { match callback(params, ctx) { - Ok(res) => send_response(id, &tx, res, max_response_size), - Err(err) => send_call_error(id, &tx, err), + Ok(res) => sink.send_response(id, res), + Err(err) => sink.send_call_error(id, err), }; // Release claimed resources @@ -559,7 +554,7 @@ impl RpcModule { let subscribers = subscribers.clone(); self.methods.mut_callbacks().insert( subscribe_method_name, - MethodCallback::new_sync(Arc::new(move |id, params, method_sink, conn_id, max_response_size| { + MethodCallback::new_sync(Arc::new(move |id, params, method_sink, conn_id| { let (conn_tx, conn_rx) = oneshot::channel::<()>(); let sub_id = { const JS_NUM_MASK: SubscriptionId = !0 >> 11; @@ -571,7 +566,7 @@ impl RpcModule { sub_id }; - send_response(id.clone(), method_sink, sub_id, max_response_size); + method_sink.send_response(id.clone(), sub_id); let sink = SubscriptionSink { inner: method_sink.clone(), @@ -587,7 +582,7 @@ impl RpcModule { err, id ); - send_error(id, method_sink, ErrorCode::ServerError(-1).into()); + method_sink.send_error(id, ErrorCode::ServerError(-1).into()); } })), ); @@ -596,7 +591,7 @@ impl RpcModule { { self.methods.mut_callbacks().insert( unsubscribe_method_name, - MethodCallback::new_sync(Arc::new(move |id, params, tx, conn_id, max_response_size| { + MethodCallback::new_sync(Arc::new(move |id, params, sink, conn_id| { let sub_id = match params.one() { Ok(sub_id) => sub_id, Err(_) => { @@ -605,12 +600,12 @@ impl RpcModule { unsubscribe_method_name, id ); - send_error(id, tx, ErrorCode::ServerError(-1).into()); + sink.send_error(id, ErrorCode::ServerError(-1).into()); return; } }; subscribers.lock().remove(&SubscriptionKey { conn_id, sub_id }); - send_response(id, tx, "Unsubscribed", max_response_size); + sink.send_response(id, "Unsubscribed"); })), ); } @@ -637,7 +632,7 @@ impl RpcModule { #[derive(Debug)] pub struct SubscriptionSink { /// Sink. - inner: mpsc::UnboundedSender, + inner: MethodSink, /// MethodCallback. method: &'static str, /// Unique subscription. @@ -670,7 +665,7 @@ impl SubscriptionSink { let res = match self.is_connected.as_ref() { Some(conn) if !conn.is_canceled() => { // unbounded send only fails if the receiver has been dropped. - self.inner.unbounded_send(msg).map_err(|_| { + self.inner.send_raw(msg).map_err(|_| { Some(SubscriptionClosedError::new("Closed by the client (connection reset)", self.uniq_sub.sub_id)) }) } @@ -699,7 +694,7 @@ impl SubscriptionSink { self.is_connected.take(); if let Some((sink, _)) = self.subscribers.lock().remove(&self.uniq_sub) { let msg = self.build_message(err).expect("valid json infallible; qed"); - let _ = sink.unbounded_send(msg); + let _ = sink.send_raw(msg); } } } diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index bb950871aa..b43782060b 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -45,7 +45,7 @@ use soketto::Sender; use tokio::net::{TcpListener, TcpStream, ToSocketAddrs}; use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; -use jsonrpsee_utils::server::helpers::{collect_batch_response, prepare_error, send_error}; +use jsonrpsee_utils::server::helpers::{collect_batch_response, prepare_error, MethodSink}; use jsonrpsee_utils::server::resource_limiting::Resources; use jsonrpsee_utils::server::rpc_module::{ConnectionId, Methods}; @@ -269,6 +269,7 @@ async fn background_task( let (mut sender, mut receiver) = builder.finish(); let (tx, mut rx) = mpsc::unbounded::(); let stop_server2 = stop_server.clone(); + let sink = MethodSink::new_with_limit(tx, max_request_body_size); // Send results back to the client. tokio::spawn(async move { @@ -307,7 +308,7 @@ async fn background_task( match err { MonitoredError::Selector(SokettoError::Closed) => { tracing::debug!("WS transport error: remote peer terminated the connection: {}", conn_id); - tx.close_channel(); + sink.close(); return Ok(()); } MonitoredError::Selector(SokettoError::MessageTooLarge { current, maximum }) => { @@ -316,13 +317,13 @@ async fn background_task( current, maximum ); - send_error(Id::Null, &tx, ErrorCode::OversizedRequest.into()); + sink.send_error(Id::Null, ErrorCode::OversizedRequest.into()); continue; } // These errors can not be gracefully handled, so just log them and terminate the connection. MonitoredError::Selector(err) => { tracing::error!("WS transport error: {:?} => terminating connection {}", err, conn_id); - tx.close_channel(); + sink.close(); return Err(err.into()); } MonitoredError::Shutdown => break, @@ -338,13 +339,13 @@ async fn background_task( tracing::debug!("recv method call={}", req.method); tracing::trace!("recv: req={:?}", req); if let Some(fut) = - methods.execute_with_resources(&tx, req, conn_id, &resources, max_request_body_size) + methods.execute_with_resources(&sink, req, conn_id, &resources) { method_executors.add(fut); } } else { let (id, code) = prepare_error(&data); - send_error(id, &tx, code.into()); + sink.send_error(id, code.into()); } } Some(b'[') => { @@ -352,24 +353,24 @@ async fn background_task( let d = std::mem::take(&mut data); let resources = &resources; let methods = &methods; - let tx2 = tx.clone(); + let sink = sink.clone(); let fut = async move { // Batch responses must be sent back as a single message so we read the results from each // request in the batch and read the results off of a new channel, `rx_batch`, and then send the // complete batch response back to the client over `tx`. let (tx_batch, mut rx_batch) = mpsc::unbounded(); + let sink_batch = MethodSink::new_with_limit(tx_batch, max_request_body_size); if let Ok(batch) = serde_json::from_slice::>(&d) { tracing::debug!("recv batch len={}", batch.len()); tracing::trace!("recv: batch={:?}", batch); if !batch.is_empty() { let methods_stream = stream::iter(batch.into_iter().filter_map(|req| { methods.execute_with_resources( - &tx_batch, + &sink_batch, req, conn_id, resources, - max_request_body_size, ) })); @@ -381,21 +382,21 @@ async fn background_task( }) .await; - if let Err(err) = tx2.unbounded_send(results) { + if let Err(err) = sink.send_raw(results) { tracing::error!("Error sending batch response to the client: {:?}", err) } } else { - send_error(Id::Null, &tx2, ErrorCode::InvalidRequest.into()); + sink.send_error(Id::Null, ErrorCode::InvalidRequest.into()); } } else { let (id, code) = prepare_error(&d); - send_error(id, &tx2, code.into()); + sink.send_error(id, code.into()); } }; method_executors.add(Box::pin(fut)); } - _ => send_error(Id::Null, &tx, ErrorCode::ParseError.into()), + _ => sink.send_error(Id::Null, ErrorCode::ParseError.into()), } } From 5ddf9f8d0b8a88889015de9e24708b79e9f32340 Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Tue, 23 Nov 2021 15:08:10 +0000 Subject: [PATCH 02/24] Middleware WIP --- http-server/src/server.rs | 12 ++--- utils/src/server/helpers.rs | 17 +++---- utils/src/server/middleware.rs | 85 ++++++++++++++++++++++++++++++++++ utils/src/server/mod.rs | 2 + utils/src/server/rpc_module.rs | 17 ++++--- ws-server/src/server.rs | 82 +++++++++++++++++++++----------- 6 files changed, 163 insertions(+), 52 deletions(-) create mode 100644 utils/src/server/middleware.rs diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 5f21e27c9e..c8f275439c 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -252,9 +252,7 @@ impl Server { if is_single { if let Ok(req) = serde_json::from_slice::(&body) { // NOTE: we don't need to track connection id on HTTP, so using hardcoded 0 here. - if let Some(fut) = - methods.execute_with_resources(&sink, req, 0, &resources) - { + if let Some(fut) = methods.execute_with_resources(&sink, req, 0, &resources) { fut.await; } } else if let Ok(_req) = serde_json::from_slice::(&body) { @@ -267,9 +265,11 @@ impl Server { // Batch of requests or notifications } else if let Ok(batch) = serde_json::from_slice::>(&body) { if !batch.is_empty() { - join_all(batch.into_iter().filter_map(|req| { - methods.execute_with_resources(&sink, req, 0, &resources) - })) + join_all( + batch + .into_iter() + .filter_map(|req| methods.execute_with_resources(&sink, req, 0, &resources)), + ) .await; } else { // "If the batch rpc call itself fails to be recognized as an valid JSON or as an diff --git a/utils/src/server/helpers.rs b/utils/src/server/helpers.rs index bdb7305c64..9a5f4cc6c5 100644 --- a/utils/src/server/helpers.rs +++ b/utils/src/server/helpers.rs @@ -93,18 +93,12 @@ pub struct MethodSink { impl MethodSink { /// Create a new `MethodSink` with unlimited response size pub fn new(tx: mpsc::UnboundedSender) -> Self { - MethodSink { - tx, - max_response_size: u32::MAX, - } + MethodSink { tx, max_response_size: u32::MAX } } /// Create a new `MethodSink` with a limited response size pub fn new_with_limit(tx: mpsc::UnboundedSender, max_response_size: u32) -> Self { - MethodSink { - tx, - max_response_size, - } + MethodSink { tx, max_response_size } } /// Send a JSON-RPC response to the client. If the serialization of `result` exceeds `max_response_size`, @@ -112,7 +106,8 @@ impl MethodSink { pub fn send_response(&self, id: Id, result: impl Serialize) { let mut writer = BoundedWriter::new(self.max_response_size as usize); - let json = match serde_json::to_writer(&mut writer, &Response { jsonrpc: TwoPointZero, id: id.clone(), result }) { + let json = match serde_json::to_writer(&mut writer, &Response { jsonrpc: TwoPointZero, id: id.clone(), result }) + { Ok(_) => { // Safety - serde_json does not emit invalid UTF-8. unsafe { String::from_utf8_unchecked(writer.into_bytes()) } @@ -159,7 +154,9 @@ impl MethodSink { pub fn send_call_error(&self, id: Id, err: Error) { let (code, message, data) = match err { Error::Call(CallError::InvalidParams(e)) => (ErrorCode::InvalidParams, e.to_string(), None), - Error::Call(CallError::Failed(e)) => (ErrorCode::ServerError(CALL_EXECUTION_FAILED_CODE), e.to_string(), None), + Error::Call(CallError::Failed(e)) => { + (ErrorCode::ServerError(CALL_EXECUTION_FAILED_CODE), e.to_string(), None) + } Error::Call(CallError::Custom { code, message, data }) => (code.into(), message, data), // This should normally not happen because the most common use case is to // return `Error::Call` in `register_async_method`. diff --git a/utils/src/server/middleware.rs b/utils/src/server/middleware.rs new file mode 100644 index 0000000000..ad64eecb25 --- /dev/null +++ b/utils/src/server/middleware.rs @@ -0,0 +1,85 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! TODO + +/// TODO +pub trait Middleware: Send + Sync + Clone + 'static { + /// Intended to carry timestamp of a request, for example `std::time::Instant`. How the middleware + /// measures time, if at all, is entirely up to the implementation. + type Instant: Send + Copy + 'static; + + /// Called when a new JSON-RPC comes to the server. + fn on_request(&self) -> Self::Instant; + + /// Called on each JSON-RPC method call, batch requests will trigger `on_call` multiple times. + fn on_call(&self, name: &str); + + /// Called on each JSON-RPC method completion, batch requests will trigger `on_result` multiple times. + fn on_result(&self, name: &str, succeess: bool, started_at: Self::Instant); + + /// Called once the JSON-RPC request is finished and response is sent to the output buffer. + fn on_response(&self, started_at: Self::Instant); +} + +impl Middleware for () { + type Instant = (); + + fn on_request(&self) -> Self::Instant {} + + fn on_call(&self, _name: &str) {} + + fn on_result(&self, _name: &str, _succeess: bool, _started_at: Self::Instant) {} + + fn on_response(&self, _started_at: Self::Instant) {} +} + +impl Middleware for (A, B) +where + A: Middleware, + B: Middleware, +{ + type Instant = (A::Instant, B::Instant); + + fn on_request(&self) -> Self::Instant { + (self.0.on_request(), self.1.on_request()) + } + + fn on_call(&self, name: &str) { + self.0.on_call(name); + self.1.on_call(name); + } + + fn on_result(&self, name: &str, success: bool, started_at: Self::Instant) { + self.0.on_result(name, success, started_at.0); + self.1.on_result(name, success, started_at.1); + } + + fn on_response(&self, started_at: Self::Instant) { + self.0.on_response(started_at.0); + self.1.on_response(started_at.1); + } +} diff --git a/utils/src/server/mod.rs b/utils/src/server/mod.rs index fe1a99277b..f05454ba48 100644 --- a/utils/src/server/mod.rs +++ b/utils/src/server/mod.rs @@ -32,3 +32,5 @@ pub mod helpers; pub mod resource_limiting; /// JSON-RPC "modules" group sets of methods that belong together and handles method/subscription registration. pub mod rpc_module; + +pub mod middleware; diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index a90ec96455..d9da24db56 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -55,9 +55,8 @@ use std::sync::Arc; /// back to `jsonrpsee`, and the connection ID (useful for the websocket transport). pub type SyncMethod = Arc; /// Similar to [`SyncMethod`], but represents an asynchronous handler and takes an additional argument containing a [`ResourceGuard`] if configured. -pub type AsyncMethod<'a> = Arc< - dyn Send + Sync + Fn(Id<'a>, Params<'a>, MethodSink, Option) -> BoxFuture<'a, ()>, ->; +pub type AsyncMethod<'a> = + Arc, Params<'a>, MethodSink, Option) -> BoxFuture<'a, ()>>; /// Connection ID, used for stateful protocol such as WebSockets. /// For stateless protocols such as http it's unused, so feel free to set it some hardcoded value. pub type ConnectionId = usize; @@ -281,13 +280,13 @@ impl Methods { self.callbacks.get(method_name) } + /// TODO + pub fn method_with_name(&self, method_name: &str) -> Option<(&'static str, &MethodCallback)> { + self.callbacks.get_key_value(method_name).map(|(k, v)| (*k, v)) + } + /// Attempt to execute a callback, sending the resulting JSON (success or error) to the specified sink. - pub fn execute( - &self, - sink: &MethodSink, - req: Request, - conn_id: ConnectionId, - ) -> Option> { + pub fn execute(&self, sink: &MethodSink, req: Request, conn_id: ConnectionId) -> Option> { tracing::trace!("[Methods::execute] Executing request: {:?}", req); match self.callbacks.get(&*req.method) { Some(callback) => callback.execute(sink, req, conn_id, None), diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index b43782060b..b63b581744 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -36,9 +36,10 @@ use crate::types::{ TEN_MB_SIZE_BYTES, }; use futures_channel::mpsc; +use futures_util::future::join_all; use futures_util::future::FutureExt; use futures_util::io::{BufReader, BufWriter}; -use futures_util::stream::{self, StreamExt}; +use futures_util::stream::StreamExt; use soketto::connection::Error as SokettoError; use soketto::handshake::{server::Response, Server as SokettoServer}; use soketto::Sender; @@ -46,6 +47,7 @@ use tokio::net::{TcpListener, TcpStream, ToSocketAddrs}; use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; use jsonrpsee_utils::server::helpers::{collect_batch_response, prepare_error, MethodSink}; +use jsonrpsee_utils::server::middleware::Middleware; use jsonrpsee_utils::server::resource_limiting::Resources; use jsonrpsee_utils::server::rpc_module::{ConnectionId, Methods}; @@ -54,14 +56,15 @@ const MAX_CONNECTIONS: u64 = 100; /// A WebSocket JSON RPC server. #[derive(Debug)] -pub struct Server { +pub struct Server { listener: TcpListener, cfg: Settings, stop_monitor: StopMonitor, resources: Resources, + middleware: M, } -impl Server { +impl Server { /// Returns socket address to which the server is bound. pub fn local_addr(&self) -> Result { self.listener.local_addr().map_err(Into::into) @@ -88,6 +91,7 @@ impl Server { async fn start_inner(self, methods: Methods) { let stop_monitor = self.stop_monitor; let resources = self.resources; + let middleware = self.middleware; let mut id = 0; let mut connections = FutureDriver::default(); @@ -118,6 +122,7 @@ impl Server { resources: &resources, cfg, stop_monitor: &stop_monitor, + middleware: middleware.clone(), }, ))); @@ -184,7 +189,7 @@ where } } -enum HandshakeResponse<'a> { +enum HandshakeResponse<'a, M> { Reject { status_code: u16, }, @@ -194,10 +199,14 @@ enum HandshakeResponse<'a> { resources: &'a Resources, cfg: &'a Settings, stop_monitor: &'a StopMonitor, + middleware: M, }, } -async fn handshake(socket: tokio::net::TcpStream, mode: HandshakeResponse<'_>) -> Result<(), Error> { +async fn handshake(socket: tokio::net::TcpStream, mode: HandshakeResponse<'_, M>) -> Result<(), Error> +where + M: Middleware, +{ // For each incoming background_task we perform a handshake. let mut server = SokettoServer::new(BufReader::new(BufWriter::new(socket.compat()))); @@ -214,7 +223,7 @@ async fn handshake(socket: tokio::net::TcpStream, mode: HandshakeResponse<'_>) - Ok(()) } - HandshakeResponse::Accept { conn_id, methods, resources, cfg, stop_monitor } => { + HandshakeResponse::Accept { conn_id, methods, resources, cfg, stop_monitor, middleware } => { tracing::debug!("Accepting new connection: {}", conn_id); let key = { let req = server.receive_request().await?; @@ -244,6 +253,7 @@ async fn handshake(socket: tokio::net::TcpStream, mode: HandshakeResponse<'_>) - resources.clone(), cfg.max_request_body_size, stop_monitor.clone(), + middleware, )) .await; @@ -262,6 +272,7 @@ async fn background_task( resources: Resources, max_request_body_size: u32, stop_server: StopMonitor, + middleware: impl Middleware, ) -> Result<(), Error> { // And we can finally transition to a websocket background_task. let mut builder = server.into_builder(); @@ -294,6 +305,7 @@ async fn background_task( // Buffer for incoming data. let mut data = Vec::with_capacity(100); let mut method_executors = FutureDriver::default(); + let middleware = &middleware; loop { data.clear(); @@ -333,19 +345,33 @@ async fn background_task( tracing::debug!("recv {} bytes", data.len()); + let request_start = middleware.on_request(); + match data.get(0) { Some(b'{') => { if let Ok(req) = serde_json::from_slice::(&data) { + middleware.on_call(req.method.as_ref()); + tracing::debug!("recv method call={}", req.method); tracing::trace!("recv: req={:?}", req); - if let Some(fut) = - methods.execute_with_resources(&sink, req, conn_id, &resources) - { - method_executors.add(fut); + if let Some(fut) = methods.execute_with_resources(&sink, req, conn_id, &resources) { + let request_start = request_start; + + let fut = async move { + fut.await; + middleware.on_result("TODO", true, request_start); + middleware.on_response(request_start); + }; + + method_executors.add(fut.boxed()); + } else { + middleware.on_result("TODO", true, request_start); + middleware.on_response(request_start); } } else { let (id, code) = prepare_error(&data); sink.send_error(id, code.into()); + middleware.on_response(request_start); } } Some(b'[') => { @@ -365,32 +391,34 @@ async fn background_task( tracing::debug!("recv batch len={}", batch.len()); tracing::trace!("recv: batch={:?}", batch); if !batch.is_empty() { - let methods_stream = stream::iter(batch.into_iter().filter_map(|req| { - methods.execute_with_resources( - &sink_batch, - req, - conn_id, - resources, - ) - })); - - let results = methods_stream - .for_each_concurrent(None, |item| item) - .then(|_| { - rx_batch.close(); - collect_batch_response(rx_batch) - }) - .await; + join_all(batch.into_iter().filter_map(move |req| { + if let Some(fut) = methods.execute_with_resources(&sink_batch, req, conn_id, resources) + { + Some(async move { + fut.await; + middleware.on_result("TODO", true, request_start); + }) + } else { + middleware.on_result("TODO", true, request_start); + None + } + })) + .await; + + rx_batch.close(); + let results = collect_batch_response(rx_batch).await; if let Err(err) = sink.send_raw(results) { tracing::error!("Error sending batch response to the client: {:?}", err) } } else { sink.send_error(Id::Null, ErrorCode::InvalidRequest.into()); + middleware.on_response(request_start); } } else { let (id, code) = prepare_error(&d); sink.send_error(id, code.into()); + middleware.on_response(request_start); } }; @@ -568,7 +596,7 @@ impl Builder { let listener = TcpListener::bind(addr).await?; let stop_monitor = StopMonitor::new(); let resources = self.resources; - Ok(Server { listener, cfg: self.settings, stop_monitor, resources }) + Ok(Server { listener, cfg: self.settings, stop_monitor, resources, middleware: () }) } } From 2e7663807029e54d7207239768fa152168e7f7bc Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Tue, 23 Nov 2021 18:11:53 +0000 Subject: [PATCH 03/24] Passing all the information through --- http-server/src/server.rs | 17 +++--- utils/src/server/helpers.rs | 15 ++++-- utils/src/server/middleware.rs | 2 +- utils/src/server/rpc_module.rs | 94 ++++++++++++++++++++++------------ ws-server/src/server.rs | 51 ++++++++++-------- 5 files changed, 112 insertions(+), 67 deletions(-) diff --git a/http-server/src/server.rs b/http-server/src/server.rs index c8f275439c..caddf65a97 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -41,7 +41,7 @@ use jsonrpsee_utils::http_helpers::read_body; use jsonrpsee_utils::server::{ helpers::{collect_batch_response, prepare_error, MethodSink}, resource_limiting::Resources, - rpc_module::Methods, + rpc_module::{MethodResult, Methods}, }; use serde_json::value::RawValue; @@ -252,7 +252,9 @@ impl Server { if is_single { if let Ok(req) = serde_json::from_slice::(&body) { // NOTE: we don't need to track connection id on HTTP, so using hardcoded 0 here. - if let Some(fut) = methods.execute_with_resources(&sink, req, 0, &resources) { + if let Some((_, MethodResult::Async(fut))) = + methods.execute_with_resources(&sink, req, 0, &resources) + { fut.await; } } else if let Ok(_req) = serde_json::from_slice::(&body) { @@ -265,11 +267,12 @@ impl Server { // Batch of requests or notifications } else if let Ok(batch) = serde_json::from_slice::>(&body) { if !batch.is_empty() { - join_all( - batch - .into_iter() - .filter_map(|req| methods.execute_with_resources(&sink, req, 0, &resources)), - ) + join_all(batch.into_iter().filter_map(|req| { + match methods.execute_with_resources(&sink, req, 0, &resources) { + Some((_, MethodResult::Async(fut))) => Some(fut), + _ => None, + } + })) .await; } else { // "If the batch rpc call itself fails to be recognized as an valid JSON or as an diff --git a/utils/src/server/helpers.rs b/utils/src/server/helpers.rs index 9a5f4cc6c5..3f2d3df9dc 100644 --- a/utils/src/server/helpers.rs +++ b/utils/src/server/helpers.rs @@ -103,7 +103,7 @@ impl MethodSink { /// Send a JSON-RPC response to the client. If the serialization of `result` exceeds `max_response_size`, /// an error will be sent instead. - pub fn send_response(&self, id: Id, result: impl Serialize) { + pub fn send_response(&self, id: Id, result: impl Serialize) -> bool { let mut writer = BoundedWriter::new(self.max_response_size as usize); let json = match serde_json::to_writer(&mut writer, &Response { jsonrpc: TwoPointZero, id: id.clone(), result }) @@ -130,28 +130,33 @@ impl MethodSink { }; if let Err(err) = self.tx.unbounded_send(json) { - tracing::error!("Error sending response to the client: {:?}", err) + tracing::error!("Error sending response to the client: {:?}", err); + false + } else { + true } } /// Send a JSON-RPC error to the client - pub fn send_error(&self, id: Id, error: ErrorObject) { + pub fn send_error(&self, id: Id, error: ErrorObject) -> bool { let json = match serde_json::to_string(&RpcError { jsonrpc: TwoPointZero, error, id }) { Ok(json) => json, Err(err) => { tracing::error!("Error serializing error message: {:?}", err); - return; + return false; } }; if let Err(err) = self.tx.unbounded_send(json) { tracing::error!("Could not send error response to the client: {:?}", err) } + + false } /// Helper for sending the general purpose `Error` as a JSON-RPC errors to the client - pub fn send_call_error(&self, id: Id, err: Error) { + pub fn send_call_error(&self, id: Id, err: Error) -> bool { let (code, message, data) = match err { Error::Call(CallError::InvalidParams(e)) => (ErrorCode::InvalidParams, e.to_string(), None), Error::Call(CallError::Failed(e)) => { diff --git a/utils/src/server/middleware.rs b/utils/src/server/middleware.rs index ad64eecb25..40098c67df 100644 --- a/utils/src/server/middleware.rs +++ b/utils/src/server/middleware.rs @@ -30,7 +30,7 @@ pub trait Middleware: Send + Sync + Clone + 'static { /// Intended to carry timestamp of a request, for example `std::time::Instant`. How the middleware /// measures time, if at all, is entirely up to the implementation. - type Instant: Send + Copy + 'static; + type Instant: Send + Copy; /// Called when a new JSON-RPC comes to the server. fn on_request(&self) -> Self::Instant; diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index d9da24db56..f3874ad567 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -44,7 +44,7 @@ use rustc_hash::FxHashMap; use serde::Serialize; use serde_json::value::RawValue; use std::collections::hash_map::Entry; -use std::fmt::Debug; +use std::fmt::{self, Debug}; use std::future::Future; use std::ops::{Deref, DerefMut}; use std::sync::Arc; @@ -53,10 +53,10 @@ use std::sync::Arc; /// implemented as a function pointer to a `Fn` function taking four arguments: /// the `id`, `params`, a channel the function uses to communicate the result (or error) /// back to `jsonrpsee`, and the connection ID (useful for the websocket transport). -pub type SyncMethod = Arc; +pub type SyncMethod = Arc bool>; /// Similar to [`SyncMethod`], but represents an asynchronous handler and takes an additional argument containing a [`ResourceGuard`] if configured. pub type AsyncMethod<'a> = - Arc, Params<'a>, MethodSink, Option) -> BoxFuture<'a, ()>>; + Arc, Params<'a>, MethodSink, Option) -> BoxFuture<'a, bool>>; /// Connection ID, used for stateful protocol such as WebSockets. /// For stateless protocols such as http it's unused, so feel free to set it some hardcoded value. pub type ConnectionId = usize; @@ -98,6 +98,23 @@ pub struct MethodCallback { resources: MethodResources, } +/// Result of a method, either direct value or a future of one. +pub enum MethodResult { + /// Result by value + Sync(T), + /// Future of a value + Async(BoxFuture<'static, T>), +} + +impl Debug for MethodResult { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + MethodResult::Sync(result) => result.fmt(f), + MethodResult::Async(_) => f.write_str(""), + } + } +} + /// Builder for configuring resources used by a method. #[derive(Debug)] pub struct MethodResourcesBuilder<'a> { @@ -140,15 +157,15 @@ impl MethodCallback { /// Execute the callback, sending the resulting JSON (success or error) to the specified sink. pub fn execute( &self, - tx: &MethodSink, + sink: &MethodSink, req: Request<'_>, conn_id: ConnectionId, claimed: Option, - ) -> Option> { + ) -> MethodResult { let id = req.id.clone(); let params = Params::new(req.params.map(|params| params.get())); - match &self.callback { + let result = match &self.callback { MethodKind::Sync(callback) => { tracing::trace!( "[MethodCallback::execute] Executing sync callback, params={:?}, req.id={:?}, conn_id={:?}", @@ -156,15 +173,16 @@ impl MethodCallback { id, conn_id ); - (callback)(id, params, tx, conn_id); + + let result = (callback)(id, params, sink, conn_id); // Release claimed resources drop(claimed); - None + MethodResult::Sync(result) } MethodKind::Async(callback) => { - let tx = tx.clone(); + let sink = sink.clone(); let params = params.into_owned(); let id = id.into_owned(); tracing::trace!( @@ -174,9 +192,11 @@ impl MethodCallback { conn_id ); - Some((callback)(id, params, tx, claimed)) + MethodResult::Async((callback)(id, params, sink, claimed)) } - } + }; + + result } } @@ -280,19 +300,20 @@ impl Methods { self.callbacks.get(method_name) } - /// TODO + /// Returns the method callback along with its name. The returned name is same as the + /// `method_name`, but its lifetime bound is `'static`. pub fn method_with_name(&self, method_name: &str) -> Option<(&'static str, &MethodCallback)> { self.callbacks.get_key_value(method_name).map(|(k, v)| (*k, v)) } /// Attempt to execute a callback, sending the resulting JSON (success or error) to the specified sink. - pub fn execute(&self, sink: &MethodSink, req: Request, conn_id: ConnectionId) -> Option> { + pub fn execute(&self, sink: &MethodSink, req: Request, conn_id: ConnectionId) -> MethodResult { tracing::trace!("[Methods::execute] Executing request: {:?}", req); match self.callbacks.get(&*req.method) { Some(callback) => callback.execute(sink, req, conn_id, None), None => { sink.send_error(req.id, ErrorCode::MethodNotFound.into()); - None + MethodResult::Sync(false) } } } @@ -304,15 +325,15 @@ impl Methods { req: Request, conn_id: ConnectionId, resources: &Resources, - ) -> Option> { + ) -> Option<(&'static str, MethodResult)> { tracing::trace!("[Methods::execute_with_resources] Executing request: {:?}", req); - match self.callbacks.get(&*req.method) { - Some(callback) => match callback.claim(&req.method, resources) { - Ok(guard) => callback.execute(sink, req, conn_id, Some(guard)), + match self.callbacks.get_key_value(&*req.method) { + Some((&name, callback)) => match callback.claim(&req.method, resources) { + Ok(guard) => Some((name, callback.execute(sink, req, conn_id, Some(guard)))), Err(err) => { tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err); sink.send_error(req.id, ErrorCode::ServerIsBusy.into()); - None + Some((name, MethodResult::Sync(false))) } }, None => { @@ -343,7 +364,7 @@ impl Methods { let (tx, mut rx) = mpsc::unbounded(); let sink = MethodSink::new(tx); - if let Some(fut) = self.execute(&sink, req, 0) { + if let MethodResult::Async(fut) = self.execute(&sink, req, 0) { fut.await; } @@ -361,7 +382,7 @@ impl Methods { let (tx, mut rx) = mpsc::unbounded(); let sink = MethodSink::new(tx.clone()); - if let Some(fut) = self.execute(&sink, req, 0) { + if let MethodResult::Async(fut) = self.execute(&sink, req, 0) { fut.await; } let response = rx.next().await.expect("Could not establish subscription."); @@ -428,11 +449,9 @@ impl RpcModule { let ctx = self.ctx.clone(); let callback = self.methods.verify_and_insert( method_name, - MethodCallback::new_sync(Arc::new(move |id, params, sink, _| { - match callback(params, &*ctx) { - Ok(res) => sink.send_response(id, res), - Err(err) => sink.send_call_error(id, err), - }; + MethodCallback::new_sync(Arc::new(move |id, params, sink, _| match callback(params, &*ctx) { + Ok(res) => sink.send_response(id, res), + Err(err) => sink.send_call_error(id, err), })), )?; @@ -456,13 +475,15 @@ impl RpcModule { MethodCallback::new_async(Arc::new(move |id, params, sink, claimed| { let ctx = ctx.clone(); let future = async move { - match callback(params, ctx).await { + let result = match callback(params, ctx).await { Ok(res) => sink.send_response(id, res), Err(err) => sink.send_call_error(id, err), }; // Release claimed resources drop(claimed); + + result }; future.boxed() })), @@ -490,16 +511,22 @@ impl RpcModule { let ctx = ctx.clone(); tokio::task::spawn_blocking(move || { - match callback(params, ctx) { + let result = match callback(params, ctx) { Ok(res) => sink.send_response(id, res), Err(err) => sink.send_call_error(id, err), }; // Release claimed resources drop(claimed); + + result }) - .map(|err| { - tracing::error!("Join error for blocking RPC method: {:?}", err); + .map(|result| match result { + Ok(r) => r, + Err(err) => { + tracing::error!("Join error for blocking RPC method: {:?}", err); + false + } }) .boxed() })), @@ -582,6 +609,9 @@ impl RpcModule { id ); method_sink.send_error(id, ErrorCode::ServerError(-1).into()); + false + } else { + true } })), ); @@ -600,11 +630,11 @@ impl RpcModule { id ); sink.send_error(id, ErrorCode::ServerError(-1).into()); - return; + return false; } }; subscribers.lock().remove(&SubscriptionKey { conn_id, sub_id }); - sink.send_response(id, "Unsubscribed"); + sink.send_response(id, "Unsubscribed") })), ); } diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index b63b581744..348ba73cd9 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -49,7 +49,7 @@ use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; use jsonrpsee_utils::server::helpers::{collect_batch_response, prepare_error, MethodSink}; use jsonrpsee_utils::server::middleware::Middleware; use jsonrpsee_utils::server::resource_limiting::Resources; -use jsonrpsee_utils::server::rpc_module::{ConnectionId, Methods}; +use jsonrpsee_utils::server::rpc_module::{ConnectionId, MethodResult, Methods}; /// Default maximum connections allowed. const MAX_CONNECTIONS: u64 = 100; @@ -354,19 +354,23 @@ async fn background_task( tracing::debug!("recv method call={}", req.method); tracing::trace!("recv: req={:?}", req); - if let Some(fut) = methods.execute_with_resources(&sink, req, conn_id, &resources) { - let request_start = request_start; - - let fut = async move { - fut.await; - middleware.on_result("TODO", true, request_start); + match methods.execute_with_resources(&sink, req, conn_id, &resources) { + Some((name, MethodResult::Sync(success))) => { + middleware.on_result(name, success, request_start); middleware.on_response(request_start); - }; + } + Some((name, MethodResult::Async(fut))) => { + let request_start = request_start; - method_executors.add(fut.boxed()); - } else { - middleware.on_result("TODO", true, request_start); - middleware.on_response(request_start); + let fut = async move { + let success = fut.await; + middleware.on_result(name, success, request_start); + middleware.on_response(request_start); + }; + + method_executors.add(fut.boxed()); + } + None => (), } } else { let (id, code) = prepare_error(&data); @@ -392,15 +396,16 @@ async fn background_task( tracing::trace!("recv: batch={:?}", batch); if !batch.is_empty() { join_all(batch.into_iter().filter_map(move |req| { - if let Some(fut) = methods.execute_with_resources(&sink_batch, req, conn_id, resources) - { - Some(async move { - fut.await; - middleware.on_result("TODO", true, request_start); - }) - } else { - middleware.on_result("TODO", true, request_start); - None + match methods.execute_with_resources(&sink_batch, req, conn_id, resources) { + Some((name, MethodResult::Sync(success))) => { + middleware.on_result(name, success, request_start); + None + } + Some((name, MethodResult::Async(fut))) => Some(async move { + let success = fut.await; + middleware.on_result(name, success, request_start); + }), + None => None, } })) .await; @@ -424,7 +429,9 @@ async fn background_task( method_executors.add(Box::pin(fut)); } - _ => sink.send_error(Id::Null, ErrorCode::ParseError.into()), + _ => { + sink.send_error(Id::Null, ErrorCode::ParseError.into()); + } } } From a700902475b836a7748425afbaa2a51b1412aadc Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Tue, 23 Nov 2021 18:24:21 +0000 Subject: [PATCH 04/24] Unnecessary `false` --- utils/src/server/rpc_module.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index aaa99e2967..aacf5f20e8 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -623,8 +623,7 @@ impl RpcModule { err, id ); - method_sink.send_error(id, ErrorCode::ServerError(CALL_EXECUTION_FAILED_CODE).into()); - false + method_sink.send_error(id, ErrorCode::ServerError(CALL_EXECUTION_FAILED_CODE).into()) } else { true } From 53b1ac209557de0c4788a27f71f540c29f845300 Mon Sep 17 00:00:00 2001 From: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com> Date: Wed, 24 Nov 2021 13:08:49 +0100 Subject: [PATCH 05/24] Apply suggestions from code review Co-authored-by: David --- utils/src/server/middleware.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/src/server/middleware.rs b/utils/src/server/middleware.rs index 40098c67df..f2b929ada5 100644 --- a/utils/src/server/middleware.rs +++ b/utils/src/server/middleware.rs @@ -39,7 +39,7 @@ pub trait Middleware: Send + Sync + Clone + 'static { fn on_call(&self, name: &str); /// Called on each JSON-RPC method completion, batch requests will trigger `on_result` multiple times. - fn on_result(&self, name: &str, succeess: bool, started_at: Self::Instant); + fn on_result(&self, name: &str, success: bool, started_at: Self::Instant); /// Called once the JSON-RPC request is finished and response is sent to the output buffer. fn on_response(&self, started_at: Self::Instant); From c8eac375af04bd0b38e9984f5b34b679a8b3fe73 Mon Sep 17 00:00:00 2001 From: David Date: Wed, 24 Nov 2021 14:27:56 +0100 Subject: [PATCH 06/24] Add a setter for middleware (#577) * Fix try-build tests * Add a middleware setter and an example * Actually add the example * Grumbles * Use an atomic * Set middleware with a constructor instead * Resolve a todo * Update ws-server/src/server.rs Co-authored-by: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com> * Update ws-server/src/server.rs Co-authored-by: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com> * Update ws-server/src/server.rs Co-authored-by: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com> Co-authored-by: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com> --- examples/Cargo.toml | 4 + examples/middleware.rs | 94 +++++++++++++++++++ jsonrpsee/src/lib.rs | 4 + .../method/method_unexpected_field.stderr | 2 +- .../sub/sub_dup_name_override.stderr | 2 +- .../ui/incorrect/sub/sub_name_override.stderr | 2 +- .../sub/sub_unsupported_field.stderr | 2 +- utils/src/server/middleware.rs | 2 +- ws-server/src/server.rs | 29 +++++- 9 files changed, 131 insertions(+), 10 deletions(-) create mode 100644 examples/middleware.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 7eb987a911..33a9ddc67e 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -18,6 +18,10 @@ tokio = { version = "1", features = ["full"] } name = "http" path = "http.rs" +[[example]] +name = "middleware" +path = "middleware.rs" + [[example]] name = "ws" path = "ws.rs" diff --git a/examples/middleware.rs b/examples/middleware.rs new file mode 100644 index 0000000000..6b65559631 --- /dev/null +++ b/examples/middleware.rs @@ -0,0 +1,94 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use jsonrpsee::{ + types::traits::Client, + utils::server::middleware, + ws_client::WsClientBuilder, + ws_server::{RpcModule, WsServerBuilder}, +}; +use std::net::SocketAddr; +use std::sync::atomic; + +#[derive(Default)] +struct ManInTheMiddle { + when: atomic::AtomicU64, +} + +impl Clone for ManInTheMiddle { + fn clone(&self) -> Self { + ManInTheMiddle { when: atomic::AtomicU64::new(self.when.load(atomic::Ordering::SeqCst)) } + } +} + +impl middleware::Middleware for ManInTheMiddle { + type Instant = u64; + fn on_request(&self) -> Self::Instant { + self.when.fetch_add(1, atomic::Ordering::SeqCst) + } + + fn on_call(&self, name: &str) { + println!("They called '{}'", name); + } + + fn on_result(&self, name: &str, succeess: bool, started_at: Self::Instant) { + println!("call={}, worked? {}, when? {}", name, succeess, started_at); + } + + fn on_response(&self, started_at: Self::Instant) { + println!("Response started_at={}", started_at); + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::FmtSubscriber::builder() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init() + .expect("setting default subscriber failed"); + + let addr = run_server().await?; + let url = format!("ws://{}", addr); + + let client = WsClientBuilder::default().build(&url).await?; + let response: String = client.request("say_hello", None).await?; + tracing::info!("response: {:?}", response); + // TODO: This prints `They called 'blabla'` but nothing more. I expected the `on_response` callback to be called too? + let _response: Result = client.request("blabla", None).await; + let _ = client.request::("say_hello", None).await?; + + Ok(()) +} + +async fn run_server() -> anyhow::Result { + let m = ManInTheMiddle::default(); + let server = WsServerBuilder::with_middleware(m).build("127.0.0.1:0").await?; + let mut module = RpcModule::new(()); + module.register_method("say_hello", |_, _| Ok("lo"))?; + let addr = server.local_addr()?; + server.start(module)?; + Ok(addr) +} diff --git a/jsonrpsee/src/lib.rs b/jsonrpsee/src/lib.rs index 4fc8cbddc5..2ffe413ef7 100644 --- a/jsonrpsee/src/lib.rs +++ b/jsonrpsee/src/lib.rs @@ -76,6 +76,10 @@ pub use jsonrpsee_types as types; #[cfg(any(feature = "http-server", feature = "ws-server"))] pub use jsonrpsee_utils::server::rpc_module::{RpcModule, SubscriptionSink}; +/// TODO: (dp) any reason not to export this? narrow the scope to `jsonrpsee_utils::server`? +#[cfg(any(feature = "http-server", feature = "ws-server"))] +pub use jsonrpsee_utils as utils; + #[cfg(feature = "http-server")] pub use http_server::tracing; diff --git a/proc-macros/tests/ui/incorrect/method/method_unexpected_field.stderr b/proc-macros/tests/ui/incorrect/method/method_unexpected_field.stderr index 81b031b034..57c82ce5eb 100644 --- a/proc-macros/tests/ui/incorrect/method/method_unexpected_field.stderr +++ b/proc-macros/tests/ui/incorrect/method/method_unexpected_field.stderr @@ -1,5 +1,5 @@ error: Unknown argument `magic`, expected one of: `aliases`, `blocking`, `name`, `param_kind`, `resources` - --> tests/ui/incorrect/method/method_unexpected_field.rs:6:25 + --> $DIR/method_unexpected_field.rs:6:25 | 6 | #[method(name = "foo", magic = false)] | ^^^^^ diff --git a/proc-macros/tests/ui/incorrect/sub/sub_dup_name_override.stderr b/proc-macros/tests/ui/incorrect/sub/sub_dup_name_override.stderr index a34210fe70..45e3a50301 100644 --- a/proc-macros/tests/ui/incorrect/sub/sub_dup_name_override.stderr +++ b/proc-macros/tests/ui/incorrect/sub/sub_dup_name_override.stderr @@ -1,5 +1,5 @@ error: "override" is already defined - --> tests/ui/incorrect/sub/sub_dup_name_override.rs:9:5 + --> $DIR/sub_dup_name_override.rs:9:5 | 9 | fn two(&self) -> RpcResult<()>; | ^^^ diff --git a/proc-macros/tests/ui/incorrect/sub/sub_name_override.stderr b/proc-macros/tests/ui/incorrect/sub/sub_name_override.stderr index 719b2e88cf..0a46b0bcd0 100644 --- a/proc-macros/tests/ui/incorrect/sub/sub_name_override.stderr +++ b/proc-macros/tests/ui/incorrect/sub/sub_name_override.stderr @@ -1,5 +1,5 @@ error: "one" is already defined - --> tests/ui/incorrect/sub/sub_name_override.rs:7:5 + --> $DIR/sub_name_override.rs:7:5 | 7 | fn one(&self) -> RpcResult<()>; | ^^^ diff --git a/proc-macros/tests/ui/incorrect/sub/sub_unsupported_field.stderr b/proc-macros/tests/ui/incorrect/sub/sub_unsupported_field.stderr index 87e90136fe..d0613d1c12 100644 --- a/proc-macros/tests/ui/incorrect/sub/sub_unsupported_field.stderr +++ b/proc-macros/tests/ui/incorrect/sub/sub_unsupported_field.stderr @@ -1,5 +1,5 @@ error: Unknown argument `magic`, expected one of: `aliases`, `item`, `name`, `param_kind`, `unsubscribe_aliases` - --> tests/ui/incorrect/sub/sub_unsupported_field.rs:6:42 + --> $DIR/sub_unsupported_field.rs:6:42 | 6 | #[subscription(name = "sub", item = u8, magic = true)] | ^^^^^ diff --git a/utils/src/server/middleware.rs b/utils/src/server/middleware.rs index f2b929ada5..8cbe03c0bc 100644 --- a/utils/src/server/middleware.rs +++ b/utils/src/server/middleware.rs @@ -27,7 +27,7 @@ //! TODO /// TODO -pub trait Middleware: Send + Sync + Clone + 'static { +pub trait Middleware: Default + Send + Sync + Clone + 'static { /// Intended to carry timestamp of a request, for example `std::time::Instant`. How the middleware /// measures time, if at all, is entirely up to the implementation. type Instant: Send + Copy; diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 3198ce7b80..84a79d2108 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -56,7 +56,7 @@ const MAX_CONNECTIONS: u64 = 100; /// A WebSocket JSON RPC server. #[derive(Debug)] -pub struct Server { +pub struct Server { listener: TcpListener, cfg: Settings, stop_monitor: StopMonitor, @@ -489,13 +489,32 @@ impl Default for Settings { } /// Builder to configure and create a JSON-RPC Websocket server -#[derive(Debug, Default)] -pub struct Builder { +#[derive(Debug)] +pub struct Builder { settings: Settings, resources: Resources, + middleware: M, +} + +impl Default for Builder<()> { + fn default() -> Self { + Self { settings: Default::default(), resources: Default::default(), middleware: () } + } } impl Builder { + /// Build a default server. + pub fn new() -> Self { + Default::default() + } +} + +impl Builder { + /// Build a server with the specified [`Middleware`]. + pub fn with_middleware(middleware: M) -> Self { + Builder { settings: Default::default(), resources: Default::default(), middleware } + } + /// Set the maximum size of a request body in bytes. Default is 10 MiB. pub fn max_request_body_size(mut self, size: u32) -> Self { self.settings.max_request_body_size = size; @@ -614,11 +633,11 @@ impl Builder { /// } /// ``` /// - pub async fn build(self, addrs: impl ToSocketAddrs) -> Result { + pub async fn build(self, addrs: impl ToSocketAddrs) -> Result, Error> { let listener = TcpListener::bind(addrs).await?; let stop_monitor = StopMonitor::new(); let resources = self.resources; - Ok(Server { listener, cfg: self.settings, stop_monitor, resources, middleware: () }) + Ok(Server { listener, cfg: self.settings, stop_monitor, resources, middleware: self.middleware }) } } From cc7b3f5bdcb9e38ad09ec136fb7fb2b4bed1da65 Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Wed, 24 Nov 2021 13:57:44 +0000 Subject: [PATCH 07/24] Middleware::on_response for batches --- ws-server/src/server.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 84a79d2108..4e1e11db56 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -415,6 +415,8 @@ async fn background_task( if let Err(err) = sink.send_raw(results) { tracing::error!("Error sending batch response to the client: {:?}", err) + } else { + middleware.on_response(request_start); } } else { sink.send_error(Id::Null, ErrorCode::InvalidRequest.into()); From 040d384126d9a42077b6dff6611fbf012fea204a Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Thu, 25 Nov 2021 09:56:48 +0000 Subject: [PATCH 08/24] Middleware in HTTP --- http-server/src/server.rs | 54 ++++++++++++++++++++++++++++++--------- ws-server/src/server.rs | 2 +- 2 files changed, 43 insertions(+), 13 deletions(-) diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 9047531538..0c1c81107c 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -39,6 +39,7 @@ use jsonrpsee_types::{ }; use jsonrpsee_utils::http_helpers::read_body; use jsonrpsee_utils::server::{ + middleware::Middleware, helpers::{collect_batch_response, prepare_error, MethodSink}, resource_limiting::Resources, rpc_module::{MethodResult, Methods}, @@ -56,16 +57,17 @@ use std::{ /// Builder to create JSON-RPC HTTP server. #[derive(Debug)] -pub struct Builder { +pub struct Builder { access_control: AccessControl, resources: Resources, max_request_body_size: u32, keep_alive: bool, /// Custom tokio runtime to run the server on. tokio_runtime: Option, + middleware: M, } -impl Builder { +impl Builder { /// Sets the maximum size of a request body in bytes (default is 10 MiB). pub fn max_request_body_size(mut self, size: u32) -> Self { self.max_request_body_size = size; @@ -120,7 +122,7 @@ impl Builder { /// assert!(jsonrpsee_http_server::HttpServerBuilder::default().build(addrs).is_ok()); /// } /// ``` - pub fn build(self, addrs: impl ToSocketAddrs) -> Result { + pub fn build(self, addrs: impl ToSocketAddrs) -> Result, Error> { let mut err: Option = None; for addr in addrs.to_socket_addrs()? { @@ -139,6 +141,7 @@ impl Builder { max_request_body_size: self.max_request_body_size, resources: self.resources, tokio_runtime: self.tokio_runtime, + middleware: self.middleware, }); } @@ -175,6 +178,7 @@ impl Default for Builder { access_control: AccessControl::default(), keep_alive: true, tokio_runtime: None, + middleware: (), } } } @@ -212,7 +216,7 @@ impl Future for ServerHandle { /// An HTTP JSON RPC server. #[derive(Debug)] -pub struct Server { +pub struct Server { /// Hyper server. listener: HyperBuilder, /// Local address @@ -225,9 +229,10 @@ pub struct Server { resources: Resources, /// Custom tokio runtime to run the server on. tokio_runtime: Option, + middleware: M, } -impl Server { +impl Server { /// Returns socket address to which the server is bound. pub fn local_addr(&self) -> Result { self.local_addr.ok_or_else(|| Error::Custom("Local address not found".into())) @@ -240,18 +245,21 @@ impl Server { let (tx, mut rx) = mpsc::channel(1); let listener = self.listener; let resources = self.resources; + let middleware = self.middleware; let methods = methods.into().initialize_resources(&resources)?; let make_service = make_service_fn(move |_| { let methods = methods.clone(); let access_control = access_control.clone(); let resources = resources.clone(); + let middleware = middleware.clone(); async move { Ok::<_, HyperError>(service_fn(move |request| { let methods = methods.clone(); let access_control = access_control.clone(); let resources = resources.clone(); + let middleware = middleware.clone(); // Run some validation on the http request, then read the body and try to deserialize it into one of // two cases: a single RPC request or a batch of RPC requests. @@ -276,6 +284,8 @@ impl Server { } }; + let request_start = middleware.on_request(); + // NOTE(niklasad1): it's a channel because it's needed for batch requests. let (tx, mut rx) = mpsc::unbounded::(); let sink = MethodSink::new_with_limit(tx, max_request_body_size); @@ -285,11 +295,21 @@ impl Server { // Single request or notification if is_single { if let Ok(req) = serde_json::from_slice::(&body) { + middleware.on_call(req.method.as_ref()); + // NOTE: we don't need to track connection id on HTTP, so using hardcoded 0 here. - if let Some((_, MethodResult::Async(fut))) = - methods.execute_with_resources(&sink, req, 0, &resources) - { - fut.await; + match methods.execute_with_resources(&sink, req, 0, &resources) { + Some((name, MethodResult::Sync(success))) => { + middleware.on_result(name, success, request_start); + middleware.on_response(request_start); + }, + Some((name, MethodResult::Async(fut))) => { + let success = fut.await; + + middleware.on_result(name, success, request_start); + middleware.on_response(request_start); + }, + None => (), } } else if let Ok(_req) = serde_json::from_slice::(&body) { return Ok::<_, HyperError>(response::ok_response("".into())); @@ -301,10 +321,19 @@ impl Server { // Batch of requests or notifications } else if let Ok(batch) = serde_json::from_slice::>(&body) { if !batch.is_empty() { - join_all(batch.into_iter().filter_map(|req| { + let middleware = &middleware; + + join_all(batch.into_iter().filter_map(move |req| { match methods.execute_with_resources(&sink, req, 0, &resources) { - Some((_, MethodResult::Async(fut))) => Some(fut), - _ => None, + Some((name, MethodResult::Sync(success))) => { + middleware.on_result(name, success, request_start); + None + } + Some((name, MethodResult::Async(fut))) => Some(async move { + let success = fut.await; + middleware.on_result(name, success, request_start); + }), + None => None, } })) .await; @@ -335,6 +364,7 @@ impl Server { collect_batch_response(rx).await }; tracing::debug!("[service_fn] sending back: {:?}", &response[..cmp::min(response.len(), 1024)]); + middleware.on_response(request_start); Ok::<_, HyperError>(response::ok_response(response)) } })) diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 4e1e11db56..eacda866dc 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -498,7 +498,7 @@ pub struct Builder { middleware: M, } -impl Default for Builder<()> { +impl Default for Builder { fn default() -> Self { Self { settings: Default::default(), resources: Default::default(), middleware: () } } From 189fe0bd1962361ef3f4dd5a0db33ad0cc4f5221 Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Thu, 25 Nov 2021 10:18:04 +0000 Subject: [PATCH 09/24] fmt --- http-server/src/server.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 0c1c81107c..3b6e3e97c2 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -39,8 +39,8 @@ use jsonrpsee_types::{ }; use jsonrpsee_utils::http_helpers::read_body; use jsonrpsee_utils::server::{ - middleware::Middleware, helpers::{collect_batch_response, prepare_error, MethodSink}, + middleware::Middleware, resource_limiting::Resources, rpc_module::{MethodResult, Methods}, }; @@ -302,13 +302,13 @@ impl Server { Some((name, MethodResult::Sync(success))) => { middleware.on_result(name, success, request_start); middleware.on_response(request_start); - }, + } Some((name, MethodResult::Async(fut))) => { let success = fut.await; middleware.on_result(name, success, request_start); middleware.on_response(request_start); - }, + } None => (), } } else if let Ok(_req) = serde_json::from_slice::(&body) { From 20e2ee2c1add765e09c256bba0e0875546165f8b Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Thu, 25 Nov 2021 10:37:03 +0000 Subject: [PATCH 10/24] Server builder for HTTP --- http-server/src/server.rs | 38 +++++++++++++++++++++++++------------- ws-server/src/server.rs | 8 ++++---- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 3b6e3e97c2..6b97924ca7 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -67,7 +67,32 @@ pub struct Builder { middleware: M, } +impl Default for Builder { + fn default() -> Self { + Self::with_middleware(()) + } +} + +impl Builder { + /// Create a default server builder. + pub fn new() -> Self { + Self::with_middleware(()) + } +} + impl Builder { + /// Create a server builder with the specified [`Middleware`]. + pub fn with_middleware(middleware: M) -> Self { + Self { + max_request_body_size: TEN_MB_SIZE_BYTES, + resources: Resources::default(), + access_control: AccessControl::default(), + keep_alive: true, + tokio_runtime: None, + middleware, + } + } + /// Sets the maximum size of a request body in bytes (default is 10 MiB). pub fn max_request_body_size(mut self, size: u32) -> Self { self.max_request_body_size = size; @@ -170,19 +195,6 @@ impl Builder { } } -impl Default for Builder { - fn default() -> Self { - Self { - max_request_body_size: TEN_MB_SIZE_BYTES, - resources: Resources::default(), - access_control: AccessControl::default(), - keep_alive: true, - tokio_runtime: None, - middleware: (), - } - } -} - /// Handle used to run or stop the server. #[derive(Debug)] pub struct ServerHandle { diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index eacda866dc..5d51414065 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -500,19 +500,19 @@ pub struct Builder { impl Default for Builder { fn default() -> Self { - Self { settings: Default::default(), resources: Default::default(), middleware: () } + Self::with_middleware(()) } } impl Builder { - /// Build a default server. + /// Create a default server builder. pub fn new() -> Self { - Default::default() + Self::with_middleware(()) } } impl Builder { - /// Build a server with the specified [`Middleware`]. + /// Create a server builder with the specified [`Middleware`]. pub fn with_middleware(middleware: M) -> Self { Builder { settings: Default::default(), resources: Default::default(), middleware } } From 17ab865b0b840fb29c698317118b53ae33927899 Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Mon, 29 Nov 2021 09:40:21 +0000 Subject: [PATCH 11/24] Use actual time in the example --- examples/middleware.rs | 26 ++++++++++---------------- ws-server/src/server.rs | 2 +- 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/examples/middleware.rs b/examples/middleware.rs index 6b65559631..624b25a480 100644 --- a/examples/middleware.rs +++ b/examples/middleware.rs @@ -31,23 +31,16 @@ use jsonrpsee::{ ws_server::{RpcModule, WsServerBuilder}, }; use std::net::SocketAddr; -use std::sync::atomic; +use std::time::Instant; -#[derive(Default)] -struct ManInTheMiddle { - when: atomic::AtomicU64, -} - -impl Clone for ManInTheMiddle { - fn clone(&self) -> Self { - ManInTheMiddle { when: atomic::AtomicU64::new(self.when.load(atomic::Ordering::SeqCst)) } - } -} +#[derive(Default, Clone)] +struct ManInTheMiddle; impl middleware::Middleware for ManInTheMiddle { - type Instant = u64; + type Instant = Instant; + fn on_request(&self) -> Self::Instant { - self.when.fetch_add(1, atomic::Ordering::SeqCst) + Instant::now() } fn on_call(&self, name: &str) { @@ -55,11 +48,12 @@ impl middleware::Middleware for ManInTheMiddle { } fn on_result(&self, name: &str, succeess: bool, started_at: Self::Instant) { - println!("call={}, worked? {}, when? {}", name, succeess, started_at); + // println!("call={}, worked? {}, when? {}", name, succeess, started_at); + println!("call={}, worked? {}, duration {:?}", name, succeess, started_at.elapsed()); } fn on_response(&self, started_at: Self::Instant) { - println!("Response started_at={}", started_at); + println!("Response duration {:?}", started_at.elapsed()); } } @@ -75,7 +69,7 @@ async fn main() -> anyhow::Result<()> { let client = WsClientBuilder::default().build(&url).await?; let response: String = client.request("say_hello", None).await?; - tracing::info!("response: {:?}", response); + println!("response: {:?}", response); // TODO: This prints `They called 'blabla'` but nothing more. I expected the `on_response` callback to be called too? let _response: Result = client.request("blabla", None).await; let _ = client.request::("say_hello", None).await?; diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 5d51414065..f5ca33760a 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -370,7 +370,7 @@ async fn background_task( method_executors.add(fut.boxed()); } - None => (), + None => middleware.on_response(request_start), } } else { let (id, code) = prepare_error(&data); From 53bf4d1d9c22429fe9eec08ef8504ed8a728fea7 Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Mon, 29 Nov 2021 09:54:48 +0000 Subject: [PATCH 12/24] HTTP example --- examples/Cargo.toml | 8 +- examples/middleware_http.rs | 85 ++++++++++++++++++++ examples/{middleware.rs => middleware_ws.rs} | 13 ++- http-server/src/server.rs | 2 - utils/src/server/middleware.rs | 2 +- 5 files changed, 97 insertions(+), 13 deletions(-) create mode 100644 examples/middleware_http.rs rename examples/{middleware.rs => middleware_ws.rs} (84%) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 33a9ddc67e..53f42a590f 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -19,8 +19,12 @@ name = "http" path = "http.rs" [[example]] -name = "middleware" -path = "middleware.rs" +name = "middleware_ws" +path = "middleware_ws.rs" + +[[example]] +name = "middleware_http" +path = "middleware_http.rs" [[example]] name = "ws" diff --git a/examples/middleware_http.rs b/examples/middleware_http.rs new file mode 100644 index 0000000000..258a59e3ad --- /dev/null +++ b/examples/middleware_http.rs @@ -0,0 +1,85 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use jsonrpsee::{ + http_client::HttpClientBuilder, + http_server::{HttpServerBuilder, HttpServerHandle, RpcModule}, + types::traits::Client, + utils::server::middleware, +}; +use std::net::SocketAddr; +use std::time::Instant; + +#[derive(Clone)] +struct Timings; + +impl middleware::Middleware for Timings { + type Instant = Instant; + + fn on_request(&self) -> Self::Instant { + Instant::now() + } + + fn on_call(&self, name: &str) { + println!("They called '{}'", name); + } + + fn on_result(&self, name: &str, succeess: bool, started_at: Self::Instant) { + println!("call={}, worked? {}, duration {:?}", name, succeess, started_at.elapsed()); + } + + fn on_response(&self, started_at: Self::Instant) { + println!("Response duration {:?}", started_at.elapsed()); + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::FmtSubscriber::builder() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init() + .expect("setting default subscriber failed"); + + let (addr, _handle) = run_server().await?; + let url = format!("http://{}", addr); + + let client = HttpClientBuilder::default().build(&url)?; + let response: String = client.request("say_hello", None).await?; + println!("response: {:?}", response); + let _response: Result = client.request("unknown_method", None).await; + let _ = client.request::("say_hello", None).await?; + + Ok(()) +} + +async fn run_server() -> anyhow::Result<(SocketAddr, HttpServerHandle)> { + let server = HttpServerBuilder::with_middleware(Timings).build("127.0.0.1:0")?; + let mut module = RpcModule::new(()); + module.register_method("say_hello", |_, _| Ok("lo"))?; + let addr = server.local_addr()?; + let server_handle = server.start(module)?; + Ok((addr, server_handle)) +} diff --git a/examples/middleware.rs b/examples/middleware_ws.rs similarity index 84% rename from examples/middleware.rs rename to examples/middleware_ws.rs index 624b25a480..e48a5d55ba 100644 --- a/examples/middleware.rs +++ b/examples/middleware_ws.rs @@ -33,10 +33,10 @@ use jsonrpsee::{ use std::net::SocketAddr; use std::time::Instant; -#[derive(Default, Clone)] -struct ManInTheMiddle; +#[derive(Clone)] +struct Timings; -impl middleware::Middleware for ManInTheMiddle { +impl middleware::Middleware for Timings { type Instant = Instant; fn on_request(&self) -> Self::Instant { @@ -48,7 +48,6 @@ impl middleware::Middleware for ManInTheMiddle { } fn on_result(&self, name: &str, succeess: bool, started_at: Self::Instant) { - // println!("call={}, worked? {}, when? {}", name, succeess, started_at); println!("call={}, worked? {}, duration {:?}", name, succeess, started_at.elapsed()); } @@ -70,16 +69,14 @@ async fn main() -> anyhow::Result<()> { let client = WsClientBuilder::default().build(&url).await?; let response: String = client.request("say_hello", None).await?; println!("response: {:?}", response); - // TODO: This prints `They called 'blabla'` but nothing more. I expected the `on_response` callback to be called too? - let _response: Result = client.request("blabla", None).await; + let _response: Result = client.request("unknown_method", None).await; let _ = client.request::("say_hello", None).await?; Ok(()) } async fn run_server() -> anyhow::Result { - let m = ManInTheMiddle::default(); - let server = WsServerBuilder::with_middleware(m).build("127.0.0.1:0").await?; + let server = WsServerBuilder::with_middleware(Timings).build("127.0.0.1:0").await?; let mut module = RpcModule::new(()); module.register_method("say_hello", |_, _| Ok("lo"))?; let addr = server.local_addr()?; diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 6b97924ca7..d2b06ce626 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -313,13 +313,11 @@ impl Server { match methods.execute_with_resources(&sink, req, 0, &resources) { Some((name, MethodResult::Sync(success))) => { middleware.on_result(name, success, request_start); - middleware.on_response(request_start); } Some((name, MethodResult::Async(fut))) => { let success = fut.await; middleware.on_result(name, success, request_start); - middleware.on_response(request_start); } None => (), } diff --git a/utils/src/server/middleware.rs b/utils/src/server/middleware.rs index 8cbe03c0bc..f2b929ada5 100644 --- a/utils/src/server/middleware.rs +++ b/utils/src/server/middleware.rs @@ -27,7 +27,7 @@ //! TODO /// TODO -pub trait Middleware: Default + Send + Sync + Clone + 'static { +pub trait Middleware: Send + Sync + Clone + 'static { /// Intended to carry timestamp of a request, for example `std::time::Instant`. How the middleware /// measures time, if at all, is entirely up to the implementation. type Instant: Send + Copy; From 27d700d4bb41e03bed60023a1aa0b20293be5698 Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Mon, 29 Nov 2021 10:49:30 +0000 Subject: [PATCH 13/24] Middleware to capture method not found calls --- examples/middleware_http.rs | 6 +++--- examples/middleware_ws.rs | 6 +++--- http-server/src/server.rs | 17 +++++++++++------ utils/src/server/rpc_module.rs | 15 ++++++++------- ws-server/src/server.rs | 18 ++++++++++++------ 5 files changed, 37 insertions(+), 25 deletions(-) diff --git a/examples/middleware_http.rs b/examples/middleware_http.rs index 258a59e3ad..884a854e3c 100644 --- a/examples/middleware_http.rs +++ b/examples/middleware_http.rs @@ -44,15 +44,15 @@ impl middleware::Middleware for Timings { } fn on_call(&self, name: &str) { - println!("They called '{}'", name); + println!("[Middleware::on_call] '{}'", name); } fn on_result(&self, name: &str, succeess: bool, started_at: Self::Instant) { - println!("call={}, worked? {}, duration {:?}", name, succeess, started_at.elapsed()); + println!("[Middleware::on_result] '{}', worked? {}, time elapsed {:?}", name, succeess, started_at.elapsed()); } fn on_response(&self, started_at: Self::Instant) { - println!("Response duration {:?}", started_at.elapsed()); + println!("[Middleware::on_response] time elapsed {:?}", started_at.elapsed()); } } diff --git a/examples/middleware_ws.rs b/examples/middleware_ws.rs index e48a5d55ba..2ac5ca837a 100644 --- a/examples/middleware_ws.rs +++ b/examples/middleware_ws.rs @@ -44,15 +44,15 @@ impl middleware::Middleware for Timings { } fn on_call(&self, name: &str) { - println!("They called '{}'", name); + println!("[Middleware::on_call] '{}'", name); } fn on_result(&self, name: &str, succeess: bool, started_at: Self::Instant) { - println!("call={}, worked? {}, duration {:?}", name, succeess, started_at.elapsed()); + println!("[Middleware::on_result] '{}', worked? {}, time elapsed {:?}", name, succeess, started_at.elapsed()); } fn on_response(&self, started_at: Self::Instant) { - println!("Response duration {:?}", started_at.elapsed()); + println!("[Middleware::on_response] time elapsed {:?}", started_at.elapsed()); } } diff --git a/http-server/src/server.rs b/http-server/src/server.rs index d2b06ce626..d5d1714e44 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -311,15 +311,17 @@ impl Server { // NOTE: we don't need to track connection id on HTTP, so using hardcoded 0 here. match methods.execute_with_resources(&sink, req, 0, &resources) { - Some((name, MethodResult::Sync(success))) => { + Ok((name, MethodResult::Sync(success))) => { middleware.on_result(name, success, request_start); } - Some((name, MethodResult::Async(fut))) => { + Ok((name, MethodResult::Async(fut))) => { let success = fut.await; middleware.on_result(name, success, request_start); } - None => (), + Err(name) => { + middleware.on_result(name.as_ref(), false, request_start); + } } } else if let Ok(_req) = serde_json::from_slice::(&body) { return Ok::<_, HyperError>(response::ok_response("".into())); @@ -335,15 +337,18 @@ impl Server { join_all(batch.into_iter().filter_map(move |req| { match methods.execute_with_resources(&sink, req, 0, &resources) { - Some((name, MethodResult::Sync(success))) => { + Ok((name, MethodResult::Sync(success))) => { middleware.on_result(name, success, request_start); None } - Some((name, MethodResult::Async(fut))) => Some(async move { + Ok((name, MethodResult::Async(fut))) => Some(async move { let success = fut.await; middleware.on_result(name, success, request_start); }), - None => None, + Err(name) => { + middleware.on_result(name.as_ref(), false, request_start); + None + } } })) .await; diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index aacf5f20e8..9e941b9b37 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -320,27 +320,28 @@ impl Methods { } } - /// Attempt to execute a callback while checking that the call does not exhaust the available resources, sending the resulting JSON (success or error) to the specified sink. - pub fn execute_with_resources( + /// Attempt to execute a callback while checking that the call does not exhaust the available resources, + // sending the resulting JSON (success or error) to the specified sink. + pub fn execute_with_resources<'r>( &self, sink: &MethodSink, - req: Request, + req: Request<'r>, conn_id: ConnectionId, resources: &Resources, - ) -> Option<(&'static str, MethodResult)> { + ) -> Result<(&'static str, MethodResult), Cow<'r, str>> { tracing::trace!("[Methods::execute_with_resources] Executing request: {:?}", req); match self.callbacks.get_key_value(&*req.method) { Some((&name, callback)) => match callback.claim(&req.method, resources) { - Ok(guard) => Some((name, callback.execute(sink, req, conn_id, Some(guard)))), + Ok(guard) => Ok((name, callback.execute(sink, req, conn_id, Some(guard)))), Err(err) => { tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err); sink.send_error(req.id, ErrorCode::ServerIsBusy.into()); - Some((name, MethodResult::Sync(false))) + Ok((name, MethodResult::Sync(false))) } }, None => { sink.send_error(req.id, ErrorCode::MethodNotFound.into()); - None + Err(req.method) } } } diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index f5ca33760a..8dbbaf6c6f 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -355,11 +355,11 @@ async fn background_task( tracing::debug!("recv method call={}", req.method); tracing::trace!("recv: req={:?}", req); match methods.execute_with_resources(&sink, req, conn_id, &resources) { - Some((name, MethodResult::Sync(success))) => { + Ok((name, MethodResult::Sync(success))) => { middleware.on_result(name, success, request_start); middleware.on_response(request_start); } - Some((name, MethodResult::Async(fut))) => { + Ok((name, MethodResult::Async(fut))) => { let request_start = request_start; let fut = async move { @@ -370,7 +370,10 @@ async fn background_task( method_executors.add(fut.boxed()); } - None => middleware.on_response(request_start), + Err(name) => { + middleware.on_result(name.as_ref(), false, request_start); + middleware.on_response(request_start); + } } } else { let (id, code) = prepare_error(&data); @@ -397,15 +400,18 @@ async fn background_task( if !batch.is_empty() { join_all(batch.into_iter().filter_map(move |req| { match methods.execute_with_resources(&sink_batch, req, conn_id, resources) { - Some((name, MethodResult::Sync(success))) => { + Ok((name, MethodResult::Sync(success))) => { middleware.on_result(name, success, request_start); None } - Some((name, MethodResult::Async(fut))) => Some(async move { + Ok((name, MethodResult::Async(fut))) => Some(async move { let success = fut.await; middleware.on_result(name, success, request_start); }), - None => None, + Err(name) => { + middleware.on_result(name.as_ref(), false, request_start); + None + } } })) .await; From 012a6d8b1ce979fb249c23effa25f539cd3121c7 Mon Sep 17 00:00:00 2001 From: David Date: Mon, 29 Nov 2021 15:23:04 +0100 Subject: [PATCH 14/24] An example of adding multiple middlewares. (#581) * Add an example of adding multiple middlewares. * Update examples/multi-middleware.rs Co-authored-by: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com> * Update examples/Cargo.toml Co-authored-by: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com> Co-authored-by: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com> --- examples/Cargo.toml | 5 ++ examples/multi-middleware.rs | 117 +++++++++++++++++++++++++++++++++ utils/src/server/middleware.rs | 6 +- 3 files changed, 125 insertions(+), 3 deletions(-) create mode 100644 examples/multi-middleware.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 53f42a590f..4858efcbd9 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -13,6 +13,7 @@ jsonrpsee = { path = "../jsonrpsee", features = ["full"] } tracing = "0.1" tracing-subscriber = "0.2" tokio = { version = "1", features = ["full"] } +palaver = "0.2" [[example]] name = "http" @@ -26,6 +27,10 @@ path = "middleware_ws.rs" name = "middleware_http" path = "middleware_http.rs" +[[example]] +name = "multi_middleware" +path = "multi_middleware.rs" + [[example]] name = "ws" path = "ws.rs" diff --git a/examples/multi-middleware.rs b/examples/multi-middleware.rs new file mode 100644 index 0000000000..4e9e4bc5d5 --- /dev/null +++ b/examples/multi-middleware.rs @@ -0,0 +1,117 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Example showing how to add multiple middlewares to the same server. + +use jsonrpsee::{ + rpc_params, + types::traits::Client, + utils::server::middleware, + ws_client::WsClientBuilder, + ws_server::{RpcModule, WsServerBuilder}, +}; +use std::net::SocketAddr; +use std::time::Instant; + +/// Example middleware to measure call execution time. +#[derive(Clone)] +struct Timings; + +impl middleware::Middleware for Timings { + type Instant = Instant; + + fn on_request(&self) -> Self::Instant { + Instant::now() + } + + fn on_call(&self, name: &str) { + println!("[Timings] They called '{}'", name); + } + + fn on_result(&self, name: &str, succeess: bool, started_at: Self::Instant) { + println!("[Timings] call={}, worked? {}, duration {:?}", name, succeess, started_at.elapsed()); + } + + fn on_response(&self, started_at: Self::Instant) { + println!("[Timings] Response duration {:?}", started_at.elapsed()); + } +} + +/// Example middleware to keep a watch on the number of total threads started in the system. +#[derive(Clone)] +struct ThreadWatcher; + +impl middleware::Middleware for ThreadWatcher { + type Instant = isize; + + fn on_request(&self) -> Self::Instant { + let threads = palaver::process::count_threads(); + println!("[ThreadWatcher] Threads running on the machine at the start of a call: {}", threads); + threads as isize + } + + fn on_response(&self, started_at: Self::Instant) { + let current_nr_threads = palaver::process::count_threads() as isize; + println!("[ThreadWatcher] Request started {} threads", current_nr_threads - started_at); + } +} + + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::FmtSubscriber::builder() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init() + .expect("setting default subscriber failed"); + + let addr = run_server().await?; + let url = format!("ws://{}", addr); + + let client = WsClientBuilder::default().build(&url).await?; + let response: String = client.request("say_hello", None).await?; + println!("response: {:?}", response); + let _response: Result = client.request("unknown_method", None).await; + let _ = client.request::("say_hello", None).await?; + let _ = client.request::<()>("thready", rpc_params![4]).await?; + + Ok(()) +} + +async fn run_server() -> anyhow::Result { + let server = WsServerBuilder::with_middleware((Timings, ThreadWatcher)).build("127.0.0.1:0").await?; + let mut module = RpcModule::new(()); + module.register_method("say_hello", |_, _| Ok("lo"))?; + module.register_method("thready", |params, _| { + let thread_count: usize = params.one().unwrap(); + for _ in 0..thread_count { + std::thread::spawn(|| {std::thread::sleep(std::time::Duration::from_secs(1))}); + } + Ok(()) + })?; + let addr = server.local_addr()?; + server.start(module)?; + Ok(addr) +} diff --git a/utils/src/server/middleware.rs b/utils/src/server/middleware.rs index f2b929ada5..8edcc10465 100644 --- a/utils/src/server/middleware.rs +++ b/utils/src/server/middleware.rs @@ -36,13 +36,13 @@ pub trait Middleware: Send + Sync + Clone + 'static { fn on_request(&self) -> Self::Instant; /// Called on each JSON-RPC method call, batch requests will trigger `on_call` multiple times. - fn on_call(&self, name: &str); + fn on_call(&self, _name: &str) {} /// Called on each JSON-RPC method completion, batch requests will trigger `on_result` multiple times. - fn on_result(&self, name: &str, success: bool, started_at: Self::Instant); + fn on_result(&self, _name: &str, _success: bool, _started_at: Self::Instant) {} /// Called once the JSON-RPC request is finished and response is sent to the output buffer. - fn on_response(&self, started_at: Self::Instant); + fn on_response(&self, _started_at: Self::Instant) {} } impl Middleware for () { From 9d2be2108537fcae5a8e57432ed3a8397c27ada8 Mon Sep 17 00:00:00 2001 From: David Date: Mon, 29 Nov 2021 15:48:30 +0100 Subject: [PATCH 15/24] Move `Middleware` to jsonrpsee-types (#582) * Move `Middleware` to jsonrpsee-types * Move Middleware trait to jsonrpsee-types * Add some docs. --- examples/middleware_http.rs | 3 +-- examples/middleware_ws.rs | 3 +-- examples/{multi-middleware.rs => multi_middleware.rs} | 6 ++---- http-server/src/server.rs | 2 +- types/src/lib.rs | 3 +++ {utils/src/server => types/src}/middleware.rs | 7 +++++-- utils/src/server/mod.rs | 2 -- ws-server/src/server.rs | 10 ++++++---- 8 files changed, 19 insertions(+), 17 deletions(-) rename examples/{multi-middleware.rs => multi_middleware.rs} (96%) rename {utils/src/server => types/src}/middleware.rs (88%) diff --git a/examples/middleware_http.rs b/examples/middleware_http.rs index 258a59e3ad..5876512e68 100644 --- a/examples/middleware_http.rs +++ b/examples/middleware_http.rs @@ -27,8 +27,7 @@ use jsonrpsee::{ http_client::HttpClientBuilder, http_server::{HttpServerBuilder, HttpServerHandle, RpcModule}, - types::traits::Client, - utils::server::middleware, + types::{middleware, traits::Client}, }; use std::net::SocketAddr; use std::time::Instant; diff --git a/examples/middleware_ws.rs b/examples/middleware_ws.rs index e48a5d55ba..302843fae7 100644 --- a/examples/middleware_ws.rs +++ b/examples/middleware_ws.rs @@ -25,8 +25,7 @@ // DEALINGS IN THE SOFTWARE. use jsonrpsee::{ - types::traits::Client, - utils::server::middleware, + types::{middleware, traits::Client}, ws_client::WsClientBuilder, ws_server::{RpcModule, WsServerBuilder}, }; diff --git a/examples/multi-middleware.rs b/examples/multi_middleware.rs similarity index 96% rename from examples/multi-middleware.rs rename to examples/multi_middleware.rs index 4e9e4bc5d5..d755b78ada 100644 --- a/examples/multi-middleware.rs +++ b/examples/multi_middleware.rs @@ -28,8 +28,7 @@ use jsonrpsee::{ rpc_params, - types::traits::Client, - utils::server::middleware, + types::{middleware, traits::Client}, ws_client::WsClientBuilder, ws_server::{RpcModule, WsServerBuilder}, }; @@ -79,7 +78,6 @@ impl middleware::Middleware for ThreadWatcher { } } - #[tokio::main] async fn main() -> anyhow::Result<()> { tracing_subscriber::FmtSubscriber::builder() @@ -107,7 +105,7 @@ async fn run_server() -> anyhow::Result { module.register_method("thready", |params, _| { let thread_count: usize = params.one().unwrap(); for _ in 0..thread_count { - std::thread::spawn(|| {std::thread::sleep(std::time::Duration::from_secs(1))}); + std::thread::spawn(|| std::thread::sleep(std::time::Duration::from_secs(1))); } Ok(()) })?; diff --git a/http-server/src/server.rs b/http-server/src/server.rs index d2b06ce626..4eafe8a737 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -34,13 +34,13 @@ use hyper::{ }; use jsonrpsee_types::{ error::{Error, GenericTransportError}, + middleware::Middleware, v2::{ErrorCode, Id, Notification, Request}, TEN_MB_SIZE_BYTES, }; use jsonrpsee_utils::http_helpers::read_body; use jsonrpsee_utils::server::{ helpers::{collect_batch_response, prepare_error, MethodSink}, - middleware::Middleware, resource_limiting::Resources, rpc_module::{MethodResult, Methods}, }; diff --git a/types/src/lib.rs b/types/src/lib.rs index 649096da57..a932e94d5c 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -46,6 +46,9 @@ mod client; /// Traits pub mod traits; +/// Middleware trait and implementation. +pub mod middleware; + pub use async_trait::async_trait; pub use beef::Cow; pub use client::*; diff --git a/utils/src/server/middleware.rs b/types/src/middleware.rs similarity index 88% rename from utils/src/server/middleware.rs rename to types/src/middleware.rs index 8edcc10465..0e208d7db5 100644 --- a/utils/src/server/middleware.rs +++ b/types/src/middleware.rs @@ -24,9 +24,12 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -//! TODO +//! Middleware for `jsonrpsee` servers. -/// TODO +/// Defines a middleware with callbacks during the RPC request life-cycle. The primary use case for +/// this is to collect timings for a larger metrics collection solution but the only constraints on +/// the associated type is that it be [`Send`] and [`Copy`], giving users some freedom to do what +/// they need to do. pub trait Middleware: Send + Sync + Clone + 'static { /// Intended to carry timestamp of a request, for example `std::time::Instant`. How the middleware /// measures time, if at all, is entirely up to the implementation. diff --git a/utils/src/server/mod.rs b/utils/src/server/mod.rs index f05454ba48..fe1a99277b 100644 --- a/utils/src/server/mod.rs +++ b/utils/src/server/mod.rs @@ -32,5 +32,3 @@ pub mod helpers; pub mod resource_limiting; /// JSON-RPC "modules" group sets of methods that belong together and handles method/subscription registration. pub mod rpc_module; - -pub mod middleware; diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index f5ca33760a..b25e8e3329 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -32,6 +32,7 @@ use std::task::{Context, Poll}; use crate::future::{FutureDriver, ServerHandle, StopMonitor}; use crate::types::{ error::Error, + middleware::Middleware, v2::{ErrorCode, Id, Request}, TEN_MB_SIZE_BYTES, }; @@ -46,10 +47,11 @@ use soketto::Sender; use tokio::net::{TcpListener, TcpStream, ToSocketAddrs}; use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; -use jsonrpsee_utils::server::helpers::{collect_batch_response, prepare_error, MethodSink}; -use jsonrpsee_utils::server::middleware::Middleware; -use jsonrpsee_utils::server::resource_limiting::Resources; -use jsonrpsee_utils::server::rpc_module::{ConnectionId, MethodResult, Methods}; +use jsonrpsee_utils::server::{ + helpers::{collect_batch_response, prepare_error, MethodSink}, + resource_limiting::Resources, + rpc_module::{ConnectionId, MethodResult, Methods}, +}; /// Default maximum connections allowed. const MAX_CONNECTIONS: u64 = 100; From 3c4aff443543e3f2830597a5eadeaff78e513b19 Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Mon, 29 Nov 2021 14:50:54 +0000 Subject: [PATCH 16/24] Link middleware to `with_middleware` methods in docs --- types/src/middleware.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/types/src/middleware.rs b/types/src/middleware.rs index 0e208d7db5..bc72d420fa 100644 --- a/types/src/middleware.rs +++ b/types/src/middleware.rs @@ -30,6 +30,10 @@ /// this is to collect timings for a larger metrics collection solution but the only constraints on /// the associated type is that it be [`Send`] and [`Copy`], giving users some freedom to do what /// they need to do. +/// +/// See the [`WsServerBuilder::with_middleware`](../../jsonrpsee_ws_server/struct.WsServerBuilder.html#method.with_middleware) +/// or the [`HttpServerBuilder::with_middleware`](../../jsonrpsee_ws_server/struct.WsServerBuilder.html#method.with_middleware) method +/// for examples. pub trait Middleware: Send + Sync + Clone + 'static { /// Intended to carry timestamp of a request, for example `std::time::Instant`. How the middleware /// measures time, if at all, is entirely up to the implementation. From ef1c90d59be1bebafe567a6eee932e405c1ae952 Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Mon, 29 Nov 2021 16:36:26 +0100 Subject: [PATCH 17/24] Doctests --- ws-server/src/server.rs | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index a0f1193142..61fed4df36 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -520,7 +520,30 @@ impl Builder { } impl Builder { - /// Create a server builder with the specified [`Middleware`]. + /// Create a server builder with the specified [`Middleware`](../jsonrpsee_types/middleware/trait.Middleware.html). + /// + /// ``` + /// use jsonrpsee_types::middleware::Middleware; + /// use jsonrpsee_ws_server::WsServerBuilder; + /// use std::time::Instant; + /// + /// struct MyMiddleware; + /// + /// impl Middelware for MyMiddleware { + /// type Instant = Instant; + /// + /// fn on_request(&self) -> Instant { + /// Instant::now() + /// } + /// + /// fn on_result(&self, method: &str, success: bool, started_at: Instant) { + /// println!("Call to '{}' took {:?}", started_at.elapsed()); + /// } + /// } + /// + /// let builder = WsServerBuilder::with_middleware(()); + /// builder.set_allowed_origins(["https://example.com"]); + /// ``` pub fn with_middleware(middleware: M) -> Self { Builder { settings: Default::default(), resources: Default::default(), middleware } } From d8b7e07c56ff263bede7937297df58e5efe23bcb Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Mon, 29 Nov 2021 16:27:53 +0000 Subject: [PATCH 18/24] Doc comment fixed --- http-server/src/server.rs | 25 ++++++++++++++++++++++++- types/src/middleware.rs | 2 +- utils/src/server/resource_limiting.rs | 2 +- ws-server/src/server.rs | 10 +++++----- 4 files changed, 31 insertions(+), 8 deletions(-) diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 72add1e92f..aa3cf23cc6 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -81,7 +81,30 @@ impl Builder { } impl Builder { - /// Create a server builder with the specified [`Middleware`]. + /// Create a server builder with the specified [`Middleware`](../jsonrpsee_types/middleware/trait.Middleware.html). + /// + /// ``` + /// use jsonrpsee_types::middleware::Middleware; + /// use jsonrpsee_http_server::HttpServerBuilder; + /// use std::time::Instant; + /// + /// #[derive(Clone)] + /// struct MyMiddleware; + /// + /// impl Middleware for MyMiddleware { + /// type Instant = Instant; + /// + /// fn on_request(&self) -> Instant { + /// Instant::now() + /// } + /// + /// fn on_result(&self, name: &str, success: bool, started_at: Instant) { + /// println!("Call to '{}' took {:?}", name, started_at.elapsed()); + /// } + /// } + /// + /// let builder = HttpServerBuilder::with_middleware(MyMiddleware); + /// ``` pub fn with_middleware(middleware: M) -> Self { Self { max_request_body_size: TEN_MB_SIZE_BYTES, diff --git a/types/src/middleware.rs b/types/src/middleware.rs index bc72d420fa..19347edb3f 100644 --- a/types/src/middleware.rs +++ b/types/src/middleware.rs @@ -32,7 +32,7 @@ /// they need to do. /// /// See the [`WsServerBuilder::with_middleware`](../../jsonrpsee_ws_server/struct.WsServerBuilder.html#method.with_middleware) -/// or the [`HttpServerBuilder::with_middleware`](../../jsonrpsee_ws_server/struct.WsServerBuilder.html#method.with_middleware) method +/// or the [`HttpServerBuilder::with_middleware`](../../jsonrpsee_http_server/struct.HttpServerBuilder.html#method.with_middleware) method /// for examples. pub trait Middleware: Send + Sync + Clone + 'static { /// Intended to carry timestamp of a request, for example `std::time::Instant`. How the middleware diff --git a/utils/src/server/resource_limiting.rs b/utils/src/server/resource_limiting.rs index a7fe3dd71c..3127176e5e 100644 --- a/utils/src/server/resource_limiting.rs +++ b/utils/src/server/resource_limiting.rs @@ -38,7 +38,7 @@ //! and then defining your units such that the limits (`capacity`) can be adjusted for different hardware configurations. //! //! Up to 8 resources can be defined using the [`WsServerBuilder::register_resource`](../../../jsonrpsee_ws_server/struct.WsServerBuilder.html#method.register_resource) -//! or [`HttpServerBuilder::register_resource`](../../../jsonrpsee_ws_server/struct.WsServerBuilder.html#method.register_resource) method +//! or [`HttpServerBuilder::register_resource`](../../../jsonrpsee_http_server/struct.HttpServerBuilder.html#method.register_resource) method //! for the WebSocket and HTTP server respectively. //! //! Each method will claim the specified number of units (or the default) for the duration of its execution. diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 61fed4df36..acfea6f1f8 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -527,22 +527,22 @@ impl Builder { /// use jsonrpsee_ws_server::WsServerBuilder; /// use std::time::Instant; /// + /// #[derive(Clone)] /// struct MyMiddleware; /// - /// impl Middelware for MyMiddleware { + /// impl Middleware for MyMiddleware { /// type Instant = Instant; /// /// fn on_request(&self) -> Instant { /// Instant::now() /// } /// - /// fn on_result(&self, method: &str, success: bool, started_at: Instant) { - /// println!("Call to '{}' took {:?}", started_at.elapsed()); + /// fn on_result(&self, name: &str, success: bool, started_at: Instant) { + /// println!("Call to '{}' took {:?}", name, started_at.elapsed()); /// } /// } /// - /// let builder = WsServerBuilder::with_middleware(()); - /// builder.set_allowed_origins(["https://example.com"]); + /// let builder = WsServerBuilder::with_middleware(MyMiddleware); /// ``` pub fn with_middleware(middleware: M) -> Self { Builder { settings: Default::default(), resources: Default::default(), middleware } From e6d47b54e3b11043d9e0e46c576c8aa5d4db18ba Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Mon, 29 Nov 2021 16:29:24 +0000 Subject: [PATCH 19/24] Clean up a TODO --- jsonrpsee/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/jsonrpsee/src/lib.rs b/jsonrpsee/src/lib.rs index 2ffe413ef7..5d2e97ca5f 100644 --- a/jsonrpsee/src/lib.rs +++ b/jsonrpsee/src/lib.rs @@ -76,7 +76,6 @@ pub use jsonrpsee_types as types; #[cfg(any(feature = "http-server", feature = "ws-server"))] pub use jsonrpsee_utils::server::rpc_module::{RpcModule, SubscriptionSink}; -/// TODO: (dp) any reason not to export this? narrow the scope to `jsonrpsee_utils::server`? #[cfg(any(feature = "http-server", feature = "ws-server"))] pub use jsonrpsee_utils as utils; From 245b011c0ba4ce53d7a42ad1515c0e589f43cedb Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Tue, 30 Nov 2021 11:38:32 +0000 Subject: [PATCH 20/24] Switch back to `set_middleware` --- examples/middleware_http.rs | 2 +- examples/middleware_ws.rs | 2 +- examples/multi_middleware.rs | 2 +- http-server/src/server.rs | 29 ++++++++++------- types/src/middleware.rs | 4 +-- ws-server/src/server.rs | 60 ++++++++++++++++++------------------ 6 files changed, 53 insertions(+), 46 deletions(-) diff --git a/examples/middleware_http.rs b/examples/middleware_http.rs index 441549670f..c734ca5d47 100644 --- a/examples/middleware_http.rs +++ b/examples/middleware_http.rs @@ -75,7 +75,7 @@ async fn main() -> anyhow::Result<()> { } async fn run_server() -> anyhow::Result<(SocketAddr, HttpServerHandle)> { - let server = HttpServerBuilder::with_middleware(Timings).build("127.0.0.1:0")?; + let server = HttpServerBuilder::new().set_middleware(Timings).build("127.0.0.1:0")?; let mut module = RpcModule::new(()); module.register_method("say_hello", |_, _| Ok("lo"))?; let addr = server.local_addr()?; diff --git a/examples/middleware_ws.rs b/examples/middleware_ws.rs index 564af452b4..19d262a2bd 100644 --- a/examples/middleware_ws.rs +++ b/examples/middleware_ws.rs @@ -75,7 +75,7 @@ async fn main() -> anyhow::Result<()> { } async fn run_server() -> anyhow::Result { - let server = WsServerBuilder::with_middleware(Timings).build("127.0.0.1:0").await?; + let server = WsServerBuilder::new().set_middleware(Timings).build("127.0.0.1:0").await?; let mut module = RpcModule::new(()); module.register_method("say_hello", |_, _| Ok("lo"))?; let addr = server.local_addr()?; diff --git a/examples/multi_middleware.rs b/examples/multi_middleware.rs index d755b78ada..b240fffcff 100644 --- a/examples/multi_middleware.rs +++ b/examples/multi_middleware.rs @@ -99,7 +99,7 @@ async fn main() -> anyhow::Result<()> { } async fn run_server() -> anyhow::Result { - let server = WsServerBuilder::with_middleware((Timings, ThreadWatcher)).build("127.0.0.1:0").await?; + let server = WsServerBuilder::new().set_middleware((Timings, ThreadWatcher)).build("127.0.0.1:0").await?; let mut module = RpcModule::new(()); module.register_method("say_hello", |_, _| Ok("lo"))?; module.register_method("thready", |params, _| { diff --git a/http-server/src/server.rs b/http-server/src/server.rs index aa3cf23cc6..f58828a39f 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -69,19 +69,26 @@ pub struct Builder { impl Default for Builder { fn default() -> Self { - Self::with_middleware(()) + Self { + max_request_body_size: TEN_MB_SIZE_BYTES, + resources: Resources::default(), + access_control: AccessControl::default(), + keep_alive: true, + tokio_runtime: None, + middleware: () + } } } impl Builder { /// Create a default server builder. pub fn new() -> Self { - Self::with_middleware(()) + Self::default() } } impl Builder { - /// Create a server builder with the specified [`Middleware`](../jsonrpsee_types/middleware/trait.Middleware.html). + /// Add a middleware to the builder [`Middleware`](../jsonrpsee_types/middleware/trait.Middleware.html). /// /// ``` /// use jsonrpsee_types::middleware::Middleware; @@ -103,15 +110,15 @@ impl Builder { /// } /// } /// - /// let builder = HttpServerBuilder::with_middleware(MyMiddleware); + /// let builder = HttpServerBuilder::new().set_middleware(MyMiddleware); /// ``` - pub fn with_middleware(middleware: M) -> Self { - Self { - max_request_body_size: TEN_MB_SIZE_BYTES, - resources: Resources::default(), - access_control: AccessControl::default(), - keep_alive: true, - tokio_runtime: None, + pub fn set_middleware(self, middleware: T) -> Builder { + Builder { + max_request_body_size: self.max_request_body_size, + resources: self.resources, + access_control: self.access_control, + keep_alive: self.keep_alive, + tokio_runtime: self.tokio_runtime, middleware, } } diff --git a/types/src/middleware.rs b/types/src/middleware.rs index 19347edb3f..3b0f0ce8c0 100644 --- a/types/src/middleware.rs +++ b/types/src/middleware.rs @@ -31,8 +31,8 @@ /// the associated type is that it be [`Send`] and [`Copy`], giving users some freedom to do what /// they need to do. /// -/// See the [`WsServerBuilder::with_middleware`](../../jsonrpsee_ws_server/struct.WsServerBuilder.html#method.with_middleware) -/// or the [`HttpServerBuilder::with_middleware`](../../jsonrpsee_http_server/struct.HttpServerBuilder.html#method.with_middleware) method +/// See the [`WsServerBuilder::set_middleware`](../../jsonrpsee_ws_server/struct.WsServerBuilder.html#method.set_middleware) +/// or the [`HttpServerBuilder::set_middleware`](../../jsonrpsee_http_server/struct.HttpServerBuilder.html#method.set_middleware) method /// for examples. pub trait Middleware: Send + Sync + Clone + 'static { /// Intended to carry timestamp of a request, for example `std::time::Instant`. How the middleware diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index acfea6f1f8..76cfb84e06 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -508,46 +508,18 @@ pub struct Builder { impl Default for Builder { fn default() -> Self { - Self::with_middleware(()) + Builder { settings: Settings::default(), resources: Resources::default(), middleware: () } } } impl Builder { /// Create a default server builder. pub fn new() -> Self { - Self::with_middleware(()) + Self::default() } } impl Builder { - /// Create a server builder with the specified [`Middleware`](../jsonrpsee_types/middleware/trait.Middleware.html). - /// - /// ``` - /// use jsonrpsee_types::middleware::Middleware; - /// use jsonrpsee_ws_server::WsServerBuilder; - /// use std::time::Instant; - /// - /// #[derive(Clone)] - /// struct MyMiddleware; - /// - /// impl Middleware for MyMiddleware { - /// type Instant = Instant; - /// - /// fn on_request(&self) -> Instant { - /// Instant::now() - /// } - /// - /// fn on_result(&self, name: &str, success: bool, started_at: Instant) { - /// println!("Call to '{}' took {:?}", name, started_at.elapsed()); - /// } - /// } - /// - /// let builder = WsServerBuilder::with_middleware(MyMiddleware); - /// ``` - pub fn with_middleware(middleware: M) -> Self { - Builder { settings: Default::default(), resources: Default::default(), middleware } - } - /// Set the maximum size of a request body in bytes. Default is 10 MiB. pub fn max_request_body_size(mut self, size: u32) -> Self { self.settings.max_request_body_size = size; @@ -599,6 +571,34 @@ impl Builder { Ok(self) } + /// Add a middleware to the builder [`Middleware`](../jsonrpsee_types/middleware/trait.Middleware.html). + /// + /// ``` + /// use jsonrpsee_types::middleware::Middleware; + /// use jsonrpsee_ws_server::WsServerBuilder; + /// use std::time::Instant; + /// + /// #[derive(Clone)] + /// struct MyMiddleware; + /// + /// impl Middleware for MyMiddleware { + /// type Instant = Instant; + /// + /// fn on_request(&self) -> Instant { + /// Instant::now() + /// } + /// + /// fn on_result(&self, name: &str, success: bool, started_at: Instant) { + /// println!("Call to '{}' took {:?}", name, started_at.elapsed()); + /// } + /// } + /// + /// let builder = WsServerBuilder::new().set_middleware(MyMiddleware); + /// ``` + pub fn set_middleware(self, middleware: T) -> Builder { + Builder { settings: self.settings, resources: self.resources, middleware } + } + /// Restores the default behavior of allowing connections with `Origin` header /// containing any value. This will undo any list set by [`set_allowed_origins`](Builder::set_allowed_origins). pub fn allow_all_origins(mut self) -> Self { From 9544dd1bef400213619055bf5ba4bc17c0fe87b0 Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Tue, 30 Nov 2021 11:45:20 +0000 Subject: [PATCH 21/24] fmt --- http-server/src/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/http-server/src/server.rs b/http-server/src/server.rs index f58828a39f..7ecb9f7025 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -75,7 +75,7 @@ impl Default for Builder { access_control: AccessControl::default(), keep_alive: true, tokio_runtime: None, - middleware: () + middleware: (), } } } From 9fefc84e27e13123cb171f1befc10c5c87afa871 Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Tue, 30 Nov 2021 14:32:37 +0000 Subject: [PATCH 22/24] Tests --- tests/tests/middleware.rs | 177 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 177 insertions(+) create mode 100644 tests/tests/middleware.rs diff --git a/tests/tests/middleware.rs b/tests/tests/middleware.rs new file mode 100644 index 0000000000..9326e93fed --- /dev/null +++ b/tests/tests/middleware.rs @@ -0,0 +1,177 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use jsonrpsee::{ + http_client::HttpClientBuilder, + http_server::{HttpServerBuilder, HttpServerHandle}, + proc_macros::rpc, + types::{middleware::Middleware, traits::Client, Error}, + ws_client::WsClientBuilder, + ws_server::{WsServerBuilder, WsServerHandle}, + RpcModule, +}; +use tokio::time::sleep; + +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +#[derive(Clone, Default)] +struct Counter { + inner: Arc>, +} + +#[derive(Default)] +struct CounterInner { + /// (Number of started requests, number of finished requests) + requests: (u32, u32), + /// Mapping method names to (number of calls, ids of successfully completed calls) + calls: HashMap)>, +} + +impl Middleware for Counter { + /// Auto-incremented id of the call + type Instant = u32; + + fn on_request(&self) -> u32 { + let mut inner = self.inner.lock().unwrap(); + let n = inner.requests.0; + + inner.requests.0 += 1; + + n + } + + fn on_call(&self, name: &str) { + let mut inner = self.inner.lock().unwrap(); + let entry = inner.calls.entry(name.into()).or_insert((0, Vec::new())); + + entry.0 += 1; + } + + fn on_result(&self, name: &str, success: bool, n: u32) { + if success { + self.inner.lock().unwrap().calls.get_mut(name).unwrap().1.push(n); + } + } + + fn on_response(&self, _: u32) { + self.inner.lock().unwrap().requests.1 += 1; + } +} + +fn test_module() -> RpcModule<()> { + #[rpc(server)] + pub trait Rpc { + #[method(name = "say_hello")] + async fn hello(&self) -> Result<&'static str, Error> { + sleep(Duration::from_millis(50)).await; + Ok("hello") + } + } + + impl RpcServer for () {} + + ().into_rpc() +} + +async fn websocket_server(module: RpcModule<()>, counter: Counter) -> Result<(SocketAddr, WsServerHandle), Error> { + let server = WsServerBuilder::default() + .register_resource("CPU", 6, 2)? + .register_resource("MEM", 10, 1)? + .set_middleware(counter) + .build("127.0.0.1:0") + .await?; + + let addr = server.local_addr()?; + let handle = server.start(module)?; + + Ok((addr, handle)) +} + +async fn http_server(module: RpcModule<()>, counter: Counter) -> Result<(SocketAddr, HttpServerHandle), Error> { + let server = HttpServerBuilder::default() + .register_resource("CPU", 6, 2)? + .register_resource("MEM", 10, 1)? + .set_middleware(counter) + .build("127.0.0.1:0")?; + + let addr = server.local_addr()?; + let handle = server.start(module)?; + + Ok((addr, handle)) +} + +#[tokio::test] +async fn ws_server_middleware() { + let counter = Counter::default(); + let (server_addr, server_handle) = websocket_server(test_module(), counter.clone()).await.unwrap(); + + let server_url = format!("ws://{}", server_addr); + let client = WsClientBuilder::default().build(&server_url).await.unwrap(); + + assert_eq!(client.request::("say_hello", None).await.unwrap(), "hello"); + + assert!(client.request::("unknown_method", None).await.is_err()); + + assert_eq!(client.request::("say_hello", None).await.unwrap(), "hello"); + assert_eq!(client.request::("say_hello", None).await.unwrap(), "hello"); + + assert!(client.request::("unknown_method", None).await.is_err()); + + let inner = counter.inner.lock().unwrap(); + + assert_eq!(inner.requests, (5, 5)); + assert_eq!(inner.calls["say_hello"], (3, vec![0, 2, 3])); + assert_eq!(inner.calls["unknown_method"], (2, vec![])); + + server_handle.stop().unwrap().await; +} + +#[tokio::test] +async fn http_server_middleware() { + let counter = Counter::default(); + let (server_addr, _server_handle) = http_server(test_module(), counter.clone()).await.unwrap(); + + let server_url = format!("http://{}", server_addr); + let client = HttpClientBuilder::default().build(&server_url).unwrap(); + + assert_eq!(client.request::("say_hello", None).await.unwrap(), "hello"); + + assert!(client.request::("unknown_method", None).await.is_err()); + + assert_eq!(client.request::("say_hello", None).await.unwrap(), "hello"); + assert_eq!(client.request::("say_hello", None).await.unwrap(), "hello"); + + assert!(client.request::("unknown_method", None).await.is_err()); + + let inner = counter.inner.lock().unwrap(); + + assert_eq!(inner.requests, (5, 5)); + assert_eq!(inner.calls["say_hello"], (3, vec![0, 2, 3])); + assert_eq!(inner.calls["unknown_method"], (2, vec![])); +} From 10e586af7baa3fbbfb71a95ae42aae809c35d773 Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Tue, 30 Nov 2021 15:47:36 +0000 Subject: [PATCH 23/24] Add `on_connect` and `on_disconnect` --- tests/tests/middleware.rs | 30 +++++++++++++++++++++++++----- types/src/middleware.rs | 12 ++++++------ ws-server/src/server.rs | 16 ++++++++++------ 3 files changed, 41 insertions(+), 17 deletions(-) diff --git a/tests/tests/middleware.rs b/tests/tests/middleware.rs index 9326e93fed..6767b3968f 100644 --- a/tests/tests/middleware.rs +++ b/tests/tests/middleware.rs @@ -47,6 +47,8 @@ struct Counter { #[derive(Default)] struct CounterInner { + /// (Number of started connections, number of finished connections) + connections: (u32, u32), /// (Number of started requests, number of finished requests) requests: (u32, u32), /// Mapping method names to (number of calls, ids of successfully completed calls) @@ -57,6 +59,10 @@ impl Middleware for Counter { /// Auto-incremented id of the call type Instant = u32; + fn on_connect(&self) { + self.inner.lock().unwrap().connections.0 += 1; + } + fn on_request(&self) -> u32 { let mut inner = self.inner.lock().unwrap(); let n = inner.requests.0; @@ -82,6 +88,10 @@ impl Middleware for Counter { fn on_response(&self, _: u32) { self.inner.lock().unwrap().requests.1 += 1; } + + fn on_disconnect(&self) { + self.inner.lock().unwrap().connections.1 += 1; + } } fn test_module() -> RpcModule<()> { @@ -143,19 +153,24 @@ async fn ws_server_middleware() { assert!(client.request::("unknown_method", None).await.is_err()); - let inner = counter.inner.lock().unwrap(); + { + let inner = counter.inner.lock().unwrap(); - assert_eq!(inner.requests, (5, 5)); - assert_eq!(inner.calls["say_hello"], (3, vec![0, 2, 3])); - assert_eq!(inner.calls["unknown_method"], (2, vec![])); + assert_eq!(inner.connections, (1, 0)); + assert_eq!(inner.requests, (5, 5)); + assert_eq!(inner.calls["say_hello"], (3, vec![0, 2, 3])); + assert_eq!(inner.calls["unknown_method"], (2, vec![])); + } server_handle.stop().unwrap().await; + + assert_eq!(counter.inner.lock().unwrap().connections, (1, 1)); } #[tokio::test] async fn http_server_middleware() { let counter = Counter::default(); - let (server_addr, _server_handle) = http_server(test_module(), counter.clone()).await.unwrap(); + let (server_addr, server_handle) = http_server(test_module(), counter.clone()).await.unwrap(); let server_url = format!("http://{}", server_addr); let client = HttpClientBuilder::default().build(&server_url).unwrap(); @@ -174,4 +189,9 @@ async fn http_server_middleware() { assert_eq!(inner.requests, (5, 5)); assert_eq!(inner.calls["say_hello"], (3, vec![0, 2, 3])); assert_eq!(inner.calls["unknown_method"], (2, vec![])); + + server_handle.stop().unwrap().await.unwrap(); + + // HTTP server doesn't track connections + assert_eq!(inner.connections, (0, 0)); } diff --git a/types/src/middleware.rs b/types/src/middleware.rs index 3b0f0ce8c0..2ca8f3db5f 100644 --- a/types/src/middleware.rs +++ b/types/src/middleware.rs @@ -39,6 +39,9 @@ pub trait Middleware: Send + Sync + Clone + 'static { /// measures time, if at all, is entirely up to the implementation. type Instant: Send + Copy; + /// Called when a new client connects (WebSocket only) + fn on_connect(&self) {} + /// Called when a new JSON-RPC comes to the server. fn on_request(&self) -> Self::Instant; @@ -50,18 +53,15 @@ pub trait Middleware: Send + Sync + Clone + 'static { /// Called once the JSON-RPC request is finished and response is sent to the output buffer. fn on_response(&self, _started_at: Self::Instant) {} + + /// Called when a client disconnects (WebSocket only) + fn on_disconnect(&self) {} } impl Middleware for () { type Instant = (); fn on_request(&self) -> Self::Instant {} - - fn on_call(&self, _name: &str) {} - - fn on_result(&self, _name: &str, _succeess: bool, _started_at: Self::Instant) {} - - fn on_response(&self, _started_at: Self::Instant) {} } impl Middleware for (A, B) diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 76cfb84e06..6e8c00474a 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -284,6 +284,8 @@ async fn background_task( let stop_server2 = stop_server.clone(); let sink = MethodSink::new_with_limit(tx, max_request_body_size); + middleware.on_connect(); + // Send results back to the client. tokio::spawn(async move { while !stop_server2.shutdown_requested() { @@ -309,7 +311,7 @@ async fn background_task( let mut method_executors = FutureDriver::default(); let middleware = &middleware; - loop { + let result = loop { data.clear(); { @@ -323,7 +325,7 @@ async fn background_task( MonitoredError::Selector(SokettoError::Closed) => { tracing::debug!("WS transport error: remote peer terminated the connection: {}", conn_id); sink.close(); - return Ok(()); + break Ok(()); } MonitoredError::Selector(SokettoError::MessageTooLarge { current, maximum }) => { tracing::warn!( @@ -338,9 +340,9 @@ async fn background_task( MonitoredError::Selector(err) => { tracing::error!("WS transport error: {:?} => terminating connection {}", err, conn_id); sink.close(); - return Err(err.into()); + break Err(err.into()); } - MonitoredError::Shutdown => break, + MonitoredError::Shutdown => break Ok(()), }; }; }; @@ -443,12 +445,14 @@ async fn background_task( sink.send_error(Id::Null, ErrorCode::ParseError.into()); } } - } + }; + + middleware.on_disconnect(); // Drive all running methods to completion method_executors.await; - Ok(()) + result } #[derive(Debug, Clone)] From 4204cb68ed21aed5d716b67c6195f7dbc3fdc429 Mon Sep 17 00:00:00 2001 From: David Palm Date: Wed, 1 Dec 2021 10:17:35 +0100 Subject: [PATCH 24/24] Add note to future selves --- ws-server/src/server.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 6e8c00474a..f60df7e682 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -449,7 +449,9 @@ async fn background_task( middleware.on_disconnect(); - // Drive all running methods to completion + // Drive all running methods to completion. + // **NOTE** Do not return early in this function. This `await` needs to run to guarantee + // proper drop behaviour. method_executors.await; result