Skip to content

Commit

Permalink
Middleware for metrics (#576)
Browse files Browse the repository at this point in the history
* Squashed MethodSink

* Middleware WIP

* Passing all the information through

* Unnecessary `false`

* Apply suggestions from code review

Co-authored-by: David <dvdplm@gmail.com>

* Add a setter for middleware (#577)

* Fix try-build tests

* Add a middleware setter and an example

* Actually add the example

* Grumbles

* Use an atomic

* Set middleware with a constructor instead

* Resolve a todo

* Update ws-server/src/server.rs

Co-authored-by: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com>

* Update ws-server/src/server.rs

Co-authored-by: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com>

* Update ws-server/src/server.rs

Co-authored-by: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com>

Co-authored-by: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com>

* Middleware::on_response for batches

* Middleware in HTTP

* fmt

* Server builder for HTTP

* Use actual time in the example

* HTTP example

* Middleware to capture method not found calls

* An example of adding multiple middlewares. (#581)

* Add an example of adding multiple middlewares.

* Update examples/multi-middleware.rs

Co-authored-by: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com>

* Update examples/Cargo.toml

Co-authored-by: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com>

Co-authored-by: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com>

* Move `Middleware` to jsonrpsee-types (#582)

* Move `Middleware` to jsonrpsee-types

* Move Middleware trait to jsonrpsee-types

* Add some docs.

* Link middleware to `with_middleware` methods in docs

* Doctests

* Doc comment fixed

* Clean up a TODO

* Switch back to `set_middleware`

* fmt

* Tests

* Add `on_connect` and `on_disconnect`

* Add note to future selves

Co-authored-by: David <dvdplm@gmail.com>
  • Loading branch information
maciejhirsz and dvdplm authored Dec 1, 2021
1 parent 3c3f3ac commit 1657e26
Show file tree
Hide file tree
Showing 13 changed files with 1,026 additions and 199 deletions.
13 changes: 13 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,24 @@ jsonrpsee = { path = "../jsonrpsee", features = ["full"] }
tracing = "0.1"
tracing-subscriber = "0.2"
tokio = { version = "1", features = ["full"] }
palaver = "0.2"

[[example]]
name = "http"
path = "http.rs"

[[example]]
name = "middleware_ws"
path = "middleware_ws.rs"

[[example]]
name = "middleware_http"
path = "middleware_http.rs"

[[example]]
name = "multi_middleware"
path = "multi_middleware.rs"

[[example]]
name = "ws"
path = "ws.rs"
Expand Down
84 changes: 84 additions & 0 deletions examples/middleware_http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use jsonrpsee::{
http_client::HttpClientBuilder,
http_server::{HttpServerBuilder, HttpServerHandle, RpcModule},
types::{middleware, traits::Client},
};
use std::net::SocketAddr;
use std::time::Instant;

#[derive(Clone)]
struct Timings;

impl middleware::Middleware for Timings {
type Instant = Instant;

fn on_request(&self) -> Self::Instant {
Instant::now()
}

fn on_call(&self, name: &str) {
println!("[Middleware::on_call] '{}'", name);
}

fn on_result(&self, name: &str, succeess: bool, started_at: Self::Instant) {
println!("[Middleware::on_result] '{}', worked? {}, time elapsed {:?}", name, succeess, started_at.elapsed());
}

fn on_response(&self, started_at: Self::Instant) {
println!("[Middleware::on_response] time elapsed {:?}", started_at.elapsed());
}
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init()
.expect("setting default subscriber failed");

let (addr, _handle) = run_server().await?;
let url = format!("http://{}", addr);

let client = HttpClientBuilder::default().build(&url)?;
let response: String = client.request("say_hello", None).await?;
println!("response: {:?}", response);
let _response: Result<String, _> = client.request("unknown_method", None).await;
let _ = client.request::<String>("say_hello", None).await?;

Ok(())
}

async fn run_server() -> anyhow::Result<(SocketAddr, HttpServerHandle)> {
let server = HttpServerBuilder::new().set_middleware(Timings).build("127.0.0.1:0")?;
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo"))?;
let addr = server.local_addr()?;
let server_handle = server.start(module)?;
Ok((addr, server_handle))
}
84 changes: 84 additions & 0 deletions examples/middleware_ws.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use jsonrpsee::{
types::{middleware, traits::Client},
ws_client::WsClientBuilder,
ws_server::{RpcModule, WsServerBuilder},
};
use std::net::SocketAddr;
use std::time::Instant;

#[derive(Clone)]
struct Timings;

impl middleware::Middleware for Timings {
type Instant = Instant;

fn on_request(&self) -> Self::Instant {
Instant::now()
}

fn on_call(&self, name: &str) {
println!("[Middleware::on_call] '{}'", name);
}

fn on_result(&self, name: &str, succeess: bool, started_at: Self::Instant) {
println!("[Middleware::on_result] '{}', worked? {}, time elapsed {:?}", name, succeess, started_at.elapsed());
}

fn on_response(&self, started_at: Self::Instant) {
println!("[Middleware::on_response] time elapsed {:?}", started_at.elapsed());
}
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init()
.expect("setting default subscriber failed");

let addr = run_server().await?;
let url = format!("ws://{}", addr);

let client = WsClientBuilder::default().build(&url).await?;
let response: String = client.request("say_hello", None).await?;
println!("response: {:?}", response);
let _response: Result<String, _> = client.request("unknown_method", None).await;
let _ = client.request::<String>("say_hello", None).await?;

Ok(())
}

async fn run_server() -> anyhow::Result<SocketAddr> {
let server = WsServerBuilder::new().set_middleware(Timings).build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo"))?;
let addr = server.local_addr()?;
server.start(module)?;
Ok(addr)
}
115 changes: 115 additions & 0 deletions examples/multi_middleware.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

//! Example showing how to add multiple middlewares to the same server.
use jsonrpsee::{
rpc_params,
types::{middleware, traits::Client},
ws_client::WsClientBuilder,
ws_server::{RpcModule, WsServerBuilder},
};
use std::net::SocketAddr;
use std::time::Instant;

/// Example middleware to measure call execution time.
#[derive(Clone)]
struct Timings;

impl middleware::Middleware for Timings {
type Instant = Instant;

fn on_request(&self) -> Self::Instant {
Instant::now()
}

fn on_call(&self, name: &str) {
println!("[Timings] They called '{}'", name);
}

fn on_result(&self, name: &str, succeess: bool, started_at: Self::Instant) {
println!("[Timings] call={}, worked? {}, duration {:?}", name, succeess, started_at.elapsed());
}

fn on_response(&self, started_at: Self::Instant) {
println!("[Timings] Response duration {:?}", started_at.elapsed());
}
}

/// Example middleware to keep a watch on the number of total threads started in the system.
#[derive(Clone)]
struct ThreadWatcher;

impl middleware::Middleware for ThreadWatcher {
type Instant = isize;

fn on_request(&self) -> Self::Instant {
let threads = palaver::process::count_threads();
println!("[ThreadWatcher] Threads running on the machine at the start of a call: {}", threads);
threads as isize
}

fn on_response(&self, started_at: Self::Instant) {
let current_nr_threads = palaver::process::count_threads() as isize;
println!("[ThreadWatcher] Request started {} threads", current_nr_threads - started_at);
}
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init()
.expect("setting default subscriber failed");

let addr = run_server().await?;
let url = format!("ws://{}", addr);

let client = WsClientBuilder::default().build(&url).await?;
let response: String = client.request("say_hello", None).await?;
println!("response: {:?}", response);
let _response: Result<String, _> = client.request("unknown_method", None).await;
let _ = client.request::<String>("say_hello", None).await?;
let _ = client.request::<()>("thready", rpc_params![4]).await?;

Ok(())
}

async fn run_server() -> anyhow::Result<SocketAddr> {
let server = WsServerBuilder::new().set_middleware((Timings, ThreadWatcher)).build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo"))?;
module.register_method("thready", |params, _| {
let thread_count: usize = params.one().unwrap();
for _ in 0..thread_count {
std::thread::spawn(|| std::thread::sleep(std::time::Duration::from_secs(1)));
}
Ok(())
})?;
let addr = server.local_addr()?;
server.start(module)?;
Ok(addr)
}
Loading

0 comments on commit 1657e26

Please sign in to comment.