Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Middleware for metrics #576

Merged
merged 28 commits into from
Dec 1, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6ffc4d2
Squashed MethodSink
maciejhirsz Nov 17, 2021
5ddf9f8
Middleware WIP
maciejhirsz Nov 23, 2021
2e76638
Passing all the information through
maciejhirsz Nov 23, 2021
e580f44
Merge branch 'master' into mh-metrics-middleware
maciejhirsz Nov 23, 2021
a700902
Unnecessary `false`
maciejhirsz Nov 23, 2021
53b1ac2
Apply suggestions from code review
maciejhirsz Nov 24, 2021
c8eac37
Add a setter for middleware (#577)
dvdplm Nov 24, 2021
cc7b3f5
Middleware::on_response for batches
maciejhirsz Nov 24, 2021
040d384
Middleware in HTTP
maciejhirsz Nov 25, 2021
189fe0b
fmt
maciejhirsz Nov 25, 2021
20e2ee2
Server builder for HTTP
maciejhirsz Nov 25, 2021
17ab865
Use actual time in the example
maciejhirsz Nov 29, 2021
53bf4d1
HTTP example
maciejhirsz Nov 29, 2021
27d700d
Middleware to capture method not found calls
maciejhirsz Nov 29, 2021
012a6d8
An example of adding multiple middlewares. (#581)
dvdplm Nov 29, 2021
9d2be21
Move `Middleware` to jsonrpsee-types (#582)
dvdplm Nov 29, 2021
f481464
Merge branch 'mh-metrics-middleware' of github.com:paritytech/jsonrps…
maciejhirsz Nov 29, 2021
3c4aff4
Link middleware to `with_middleware` methods in docs
maciejhirsz Nov 29, 2021
ef1c90d
Doctests
maciejhirsz Nov 29, 2021
d8b7e07
Doc comment fixed
maciejhirsz Nov 29, 2021
e6d47b5
Clean up a TODO
maciejhirsz Nov 29, 2021
2b5da63
Merge branch 'master' into mh-metrics-middleware
dvdplm Nov 30, 2021
245b011
Switch back to `set_middleware`
maciejhirsz Nov 30, 2021
4d1313f
Merge branch 'mh-metrics-middleware' of github.com:paritytech/jsonrps…
maciejhirsz Nov 30, 2021
9544dd1
fmt
maciejhirsz Nov 30, 2021
9fefc84
Tests
maciejhirsz Nov 30, 2021
10e586a
Add `on_connect` and `on_disconnect`
maciejhirsz Nov 30, 2021
4204cb6
Add note to future selves
dvdplm Dec 1, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ tokio = { version = "1", features = ["full"] }
name = "http"
path = "http.rs"

[[example]]
name = "middleware"
path = "middleware.rs"

[[example]]
name = "ws"
path = "ws.rs"
Expand Down
94 changes: 94 additions & 0 deletions examples/middleware.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use jsonrpsee::{
types::traits::Client,
utils::server::middleware,
ws_client::WsClientBuilder,
ws_server::{RpcModule, WsServerBuilder},
};
use std::net::SocketAddr;
use std::sync::atomic;

#[derive(Default)]
struct ManInTheMiddle {
when: atomic::AtomicU64,
}

impl Clone for ManInTheMiddle {
fn clone(&self) -> Self {
ManInTheMiddle { when: atomic::AtomicU64::new(self.when.load(atomic::Ordering::SeqCst)) }
}
}

impl middleware::Middleware for ManInTheMiddle {
type Instant = u64;
fn on_request(&self) -> Self::Instant {
self.when.fetch_add(1, atomic::Ordering::SeqCst)
}

fn on_call(&self, name: &str) {
println!("They called '{}'", name);
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
}

fn on_result(&self, name: &str, succeess: bool, started_at: Self::Instant) {
println!("call={}, worked? {}, when? {}", name, succeess, started_at);
}

fn on_response(&self, started_at: Self::Instant) {
println!("Response started_at={}", started_at);
}
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init()
.expect("setting default subscriber failed");

let addr = run_server().await?;
let url = format!("ws://{}", addr);

let client = WsClientBuilder::default().build(&url).await?;
let response: String = client.request("say_hello", None).await?;
tracing::info!("response: {:?}", response);
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
// TODO: This prints `They called 'blabla'` but nothing more. I expected the `on_response` callback to be called too?
let _response: Result<String, _> = client.request("blabla", None).await;
let _ = client.request::<String>("say_hello", None).await?;

Ok(())
}

async fn run_server() -> anyhow::Result<SocketAddr> {
let m = ManInTheMiddle::default();
let server = WsServerBuilder::with_middleware(m).build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo"))?;
let addr = server.local_addr()?;
server.start(module)?;
Ok(addr)
}
20 changes: 12 additions & 8 deletions http-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ use jsonrpsee_types::{
};
use jsonrpsee_utils::http_helpers::read_body;
use jsonrpsee_utils::server::{
helpers::{collect_batch_response, prepare_error, send_error},
helpers::{collect_batch_response, prepare_error, MethodSink},
resource_limiting::Resources,
rpc_module::Methods,
rpc_module::{MethodResult, Methods},
};

use serde_json::value::RawValue;
Expand Down Expand Up @@ -278,38 +278,42 @@ impl Server {

// NOTE(niklasad1): it's a channel because it's needed for batch requests.
let (tx, mut rx) = mpsc::unbounded::<String>();
let sink = MethodSink::new_with_limit(tx, max_request_body_size);

type Notif<'a> = Notification<'a, Option<&'a RawValue>>;

// Single request or notification
if is_single {
if let Ok(req) = serde_json::from_slice::<Request>(&body) {
// NOTE: we don't need to track connection id on HTTP, so using hardcoded 0 here.
if let Some(fut) =
methods.execute_with_resources(&tx, req, 0, &resources, max_request_body_size)
if let Some((_, MethodResult::Async(fut))) =
methods.execute_with_resources(&sink, req, 0, &resources)
{
fut.await;
}
} else if let Ok(_req) = serde_json::from_slice::<Notif>(&body) {
return Ok::<_, HyperError>(response::ok_response("".into()));
} else {
let (id, code) = prepare_error(&body);
send_error(id, &tx, code.into());
sink.send_error(id, code.into());
}

// Batch of requests or notifications
} else if let Ok(batch) = serde_json::from_slice::<Vec<Request>>(&body) {
if !batch.is_empty() {
join_all(batch.into_iter().filter_map(|req| {
methods.execute_with_resources(&tx, req, 0, &resources, max_request_body_size)
match methods.execute_with_resources(&sink, req, 0, &resources) {
Some((_, MethodResult::Async(fut))) => Some(fut),
_ => None,
}
}))
.await;
} else {
// "If the batch rpc call itself fails to be recognized as an valid JSON or as an
// Array with at least one value, the response from the Server MUST be a single
// Response object." – The Spec.
is_single = true;
send_error(Id::Null, &tx, ErrorCode::InvalidRequest.into());
sink.send_error(Id::Null, ErrorCode::InvalidRequest.into());
}
} else if let Ok(_batch) = serde_json::from_slice::<Vec<Notif>>(&body) {
return Ok::<_, HyperError>(response::ok_response("".into()));
Expand All @@ -319,7 +323,7 @@ impl Server {
// Response object." – The Spec.
is_single = true;
let (id, code) = prepare_error(&body);
send_error(id, &tx, code.into());
sink.send_error(id, code.into());
}

// Closes the receiving half of a channel without dropping it. This prevents any further
Expand Down
4 changes: 4 additions & 0 deletions jsonrpsee/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ pub use jsonrpsee_types as types;
#[cfg(any(feature = "http-server", feature = "ws-server"))]
pub use jsonrpsee_utils::server::rpc_module::{RpcModule, SubscriptionSink};

/// TODO: (dp) any reason not to export this? narrow the scope to `jsonrpsee_utils::server`?
#[cfg(any(feature = "http-server", feature = "ws-server"))]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can remove these feature guards, the utils by itself have conditional compilation that should be sufficient but I think you have to change utils to be a non-optional dependency for that to work.

pub use jsonrpsee_utils as utils;

#[cfg(feature = "http-server")]
pub use http_server::tracing;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
error: Unknown argument `magic`, expected one of: `aliases`, `blocking`, `name`, `param_kind`, `resources`
--> tests/ui/incorrect/method/method_unexpected_field.rs:6:25
--> $DIR/method_unexpected_field.rs:6:25
|
6 | #[method(name = "foo", magic = false)]
| ^^^^^
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
error: "override" is already defined
--> tests/ui/incorrect/sub/sub_dup_name_override.rs:9:5
--> $DIR/sub_dup_name_override.rs:9:5
|
9 | fn two(&self) -> RpcResult<()>;
| ^^^
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
error: "one" is already defined
--> tests/ui/incorrect/sub/sub_name_override.rs:7:5
--> $DIR/sub_name_override.rs:7:5
|
7 | fn one(&self) -> RpcResult<()>;
| ^^^
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
error: Unknown argument `magic`, expected one of: `aliases`, `item`, `name`, `param_kind`, `unsubscribe_aliases`
--> tests/ui/incorrect/sub/sub_unsupported_field.rs:6:42
--> $DIR/sub_unsupported_field.rs:6:42
|
6 | #[subscription(name = "sub", item = u8, magic = true)]
| ^^^^^
140 changes: 90 additions & 50 deletions utils/src/server/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::server::rpc_module::MethodSink;
use futures_channel::mpsc;
use futures_util::stream::StreamExt;
use jsonrpsee_types::error::{CallError, Error};
Expand Down Expand Up @@ -82,67 +81,108 @@ impl<'a> io::Write for &'a mut BoundedWriter {
}
}

/// Helper for sending JSON-RPC responses to the client
pub fn send_response(id: Id, tx: &MethodSink, result: impl Serialize, max_response_size: u32) {
let mut writer = BoundedWriter::new(max_response_size as usize);
/// Sink that is used to send back the result to the server for a specific method.
#[derive(Clone, Debug)]
pub struct MethodSink {
/// Channel sender
tx: mpsc::UnboundedSender<String>,
/// Max response size in bytes for a executed call.
max_response_size: u32,
}

let json = match serde_json::to_writer(&mut writer, &Response { jsonrpc: TwoPointZero, id: id.clone(), result }) {
Ok(_) => {
// Safety - serde_json does not emit invalid UTF-8.
unsafe { String::from_utf8_unchecked(writer.into_bytes()) }
}
Err(err) => {
tracing::error!("Error serializing response: {:?}", err);

if err.is_io() {
let data = to_json_raw_value(&format!("Exceeded max limit {}", max_response_size)).ok();
let err = ErrorObject {
code: ErrorCode::ServerError(OVERSIZED_RESPONSE_CODE),
message: OVERSIZED_RESPONSE_MSG,
data: data.as_deref(),
};
return send_error(id, tx, err);
} else {
return send_error(id, tx, ErrorCode::InternalError.into());
impl MethodSink {
/// Create a new `MethodSink` with unlimited response size
pub fn new(tx: mpsc::UnboundedSender<String>) -> Self {
MethodSink { tx, max_response_size: u32::MAX }
}

/// Create a new `MethodSink` with a limited response size
pub fn new_with_limit(tx: mpsc::UnboundedSender<String>, max_response_size: u32) -> Self {
Copy link
Member

@niklasad1 niklasad1 Nov 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice 👍

MethodSink { tx, max_response_size }
}

/// Send a JSON-RPC response to the client. If the serialization of `result` exceeds `max_response_size`,
/// an error will be sent instead.
pub fn send_response(&self, id: Id, result: impl Serialize) -> bool {
let mut writer = BoundedWriter::new(self.max_response_size as usize);

let json = match serde_json::to_writer(&mut writer, &Response { jsonrpc: TwoPointZero, id: id.clone(), result })
{
Ok(_) => {
// Safety - serde_json does not emit invalid UTF-8.
unsafe { String::from_utf8_unchecked(writer.into_bytes()) }
}
}
};
Err(err) => {
tracing::error!("Error serializing response: {:?}", err);

if err.is_io() {
let data = to_json_raw_value(&format!("Exceeded max limit {}", self.max_response_size)).ok();
let err = ErrorObject {
code: ErrorCode::ServerError(OVERSIZED_RESPONSE_CODE),
message: OVERSIZED_RESPONSE_MSG,
data: data.as_deref(),
};
return self.send_error(id, err);
} else {
return self.send_error(id, ErrorCode::InternalError.into());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that the max size error is an error based on user input being too large?

I wonder whether the tracing::error!("Error serializing response: {:?}", err); line should instead be above this InternalError bit, so we only get output for the unexpected errors?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means that output from an executed call exceeded the max limit but the "request" itself was below the limit.
Thus, it depends what you mean with the user but the registered callback created a response bigger than the max limit.

Yes, or downgrade the error above to warn or something but the InternalError "should" be infallible/bug IIRC.

}
}
};

if let Err(err) = tx.unbounded_send(json) {
tracing::error!("Error sending response to the client: {:?}", err)
if let Err(err) = self.tx.unbounded_send(json) {
tracing::error!("Error sending response to the client: {:?}", err);
false
} else {
true
}
}
}

/// Helper for sending JSON-RPC errors to the client
pub fn send_error(id: Id, tx: &MethodSink, error: ErrorObject) {
let json = match serde_json::to_string(&RpcError { jsonrpc: TwoPointZero, error, id }) {
Ok(json) => json,
Err(err) => {
tracing::error!("Error serializing error message: {:?}", err);
/// Send a JSON-RPC error to the client
pub fn send_error(&self, id: Id, error: ErrorObject) -> bool {
let json = match serde_json::to_string(&RpcError { jsonrpc: TwoPointZero, error, id }) {
Ok(json) => json,
Err(err) => {
tracing::error!("Error serializing error message: {:?}", err);

return false;
}
};

return;
if let Err(err) = self.tx.unbounded_send(json) {
tracing::error!("Could not send error response to the client: {:?}", err)
}
};

if let Err(err) = tx.unbounded_send(json) {
tracing::error!("Could not send error response to the client: {:?}", err)
false
}
}

/// Helper for sending the general purpose `Error` as a JSON-RPC errors to the client
pub fn send_call_error(id: Id, tx: &MethodSink, err: Error) {
let (code, message, data) = match err {
Error::Call(CallError::InvalidParams(e)) => (ErrorCode::InvalidParams, e.to_string(), None),
Error::Call(CallError::Failed(e)) => (ErrorCode::ServerError(CALL_EXECUTION_FAILED_CODE), e.to_string(), None),
Error::Call(CallError::Custom { code, message, data }) => (code.into(), message, data),
// This should normally not happen because the most common use case is to
// return `Error::Call` in `register_async_method`.
e => (ErrorCode::ServerError(UNKNOWN_ERROR_CODE), e.to_string(), None),
};
/// Helper for sending the general purpose `Error` as a JSON-RPC errors to the client
pub fn send_call_error(&self, id: Id, err: Error) -> bool {
let (code, message, data) = match err {
Error::Call(CallError::InvalidParams(e)) => (ErrorCode::InvalidParams, e.to_string(), None),
Error::Call(CallError::Failed(e)) => {
(ErrorCode::ServerError(CALL_EXECUTION_FAILED_CODE), e.to_string(), None)
}
Error::Call(CallError::Custom { code, message, data }) => (code.into(), message, data),
// This should normally not happen because the most common use case is to
// return `Error::Call` in `register_async_method`.
e => (ErrorCode::ServerError(UNKNOWN_ERROR_CODE), e.to_string(), None),
};

let err = ErrorObject { code, message: &message, data: data.as_deref() };

let err = ErrorObject { code, message: &message, data: data.as_deref() };
self.send_error(id, err)
}

send_error(id, tx, err)
/// Send a raw JSON-RPC message to the client, `MethodSink` does not check verify the validity
/// of the JSON being sent.
pub fn send_raw(&self, raw_json: String) -> Result<(), mpsc::TrySendError<String>> {
self.tx.unbounded_send(raw_json)
}

/// Close the channel for any further messages.
pub fn close(&self) {
self.tx.close_channel();
}
}

/// Figure out if this is a sufficiently complete request that we can extract an [`Id`] out of, or just plain
Expand Down
Loading