Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Middleware to Logger #845

Merged
merged 14 commits into from
Aug 4, 2022
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Support `WebSocket` and `HTTP` transports for both client and server.
- [WebSocket](./examples/examples/ws.rs)
- [WebSocket pubsub](./examples/examples/ws_pubsub_broadcast.rs)
- [API generation with proc macro](./examples/examples/proc_macro.rs)
- [Middleware](./examples/examples/multi_middleware.rs)
- [Logger](./examples/examples/multi_logger.rs)
- [CORS server](./examples/examples/cors_server.rs)
- [Core client](./examples/examples/core_client.rs)

Expand Down
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ server = [
"lazy_static",
"unicase",
"http",
"hyper",
]
client = ["futures-util/sink", "futures-channel/sink", "futures-channel/std"]
async-client = [
Expand Down
2 changes: 1 addition & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ cfg_http_helpers! {
cfg_server! {
pub mod id_providers;
pub mod server;
pub mod middleware;
pub mod logger;
}

cfg_client! {
Expand Down
44 changes: 23 additions & 21 deletions core/src/middleware.rs → core/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

//! Middleware for `jsonrpsee` servers.
//! Logger for `jsonrpsee` servers.

use std::net::SocketAddr;

pub use http::request::Request;
pub use http::HeaderMap as Headers;
pub use hyper::Body;
pub use jsonrpsee_types::Params;

/// The type JSON-RPC v2 call, it can be a subscription, method call or unknown.
Expand Down Expand Up @@ -57,18 +59,18 @@ impl std::fmt::Display for MethodKind {
}
}

/// Defines a middleware specifically for HTTP requests with callbacks during the RPC request life-cycle.
/// Defines a logger specifically for HTTP requests with callbacks during the RPC request life-cycle.
/// The primary use case for this is to collect timings for a larger metrics collection solution.
///
/// See [`HttpServerBuilder::set_middleware`](../../jsonrpsee_http_server/struct.HttpServerBuilder.html#method.set_middleware) method
/// See [`HttpServerBuilder::set_logger`](../../jsonrpsee_http_server/struct.HttpServerBuilder.html#method.set_logger) method
/// for examples.
pub trait HttpMiddleware: Send + Sync + Clone + 'static {
/// Intended to carry timestamp of a request, for example `std::time::Instant`. How the middleware
pub trait HttpLogger: Send + Sync + Clone + 'static {
/// Intended to carry timestamp of a request, for example `std::time::Instant`. How the trait
/// measures time, if at all, is entirely up to the implementation.
type Instant: std::fmt::Debug + Send + Sync + Copy;

/// Called when a new JSON-RPC request comes to the server.
fn on_request(&self, remote_addr: SocketAddr, headers: &Headers) -> Self::Instant;
fn on_request(&self, remote_addr: SocketAddr, request: &Request<Body>) -> Self::Instant;

/// Called on each JSON-RPC method call, batch requests will trigger `on_call` multiple times.
fn on_call(&self, method_name: &str, params: Params, kind: MethodKind);
Expand All @@ -80,13 +82,13 @@ pub trait HttpMiddleware: Send + Sync + Clone + 'static {
fn on_response(&self, result: &str, _started_at: Self::Instant);
}

/// Defines a middleware specifically for WebSocket connections with callbacks during the RPC request life-cycle.
/// Defines a logger specifically for WebSocket connections with callbacks during the RPC request life-cycle.
/// The primary use case for this is to collect timings for a larger metrics collection solution.
///
/// See the [`WsServerBuilder::set_middleware`](../../jsonrpsee_ws_server/struct.WsServerBuilder.html#method.set_middleware)
/// See the [`WsServerBuilder::set_logger`](../../jsonrpsee_ws_server/struct.WsServerBuilder.html#method.set_logger)
/// for examples.
pub trait WsMiddleware: Send + Sync + Clone + 'static {
/// Intended to carry timestamp of a request, for example `std::time::Instant`. How the middleware
pub trait WsLogger: Send + Sync + Clone + 'static {
/// Intended to carry timestamp of a request, for example `std::time::Instant`. How the trait
/// measures time, if at all, is entirely up to the implementation.
type Instant: std::fmt::Debug + Send + Sync + Copy;

Expand All @@ -109,10 +111,10 @@ pub trait WsMiddleware: Send + Sync + Clone + 'static {
fn on_disconnect(&self, remote_addr: std::net::SocketAddr);
}

impl HttpMiddleware for () {
impl HttpLogger for () {
type Instant = ();

fn on_request(&self, _: std::net::SocketAddr, _: &Headers) -> Self::Instant {}
fn on_request(&self, _: std::net::SocketAddr, _: &Request<Body>) -> Self::Instant {}

fn on_call(&self, _: &str, _: Params, _: MethodKind) {}

Expand All @@ -121,7 +123,7 @@ impl HttpMiddleware for () {
fn on_response(&self, _: &str, _: Self::Instant) {}
}

impl WsMiddleware for () {
impl WsLogger for () {
type Instant = ();

fn on_connect(&self, _: std::net::SocketAddr, _: &Headers) {}
Expand All @@ -137,10 +139,10 @@ impl WsMiddleware for () {
fn on_disconnect(&self, _: std::net::SocketAddr) {}
}

impl<A, B> WsMiddleware for (A, B)
impl<A, B> WsLogger for (A, B)
where
A: WsMiddleware,
B: WsMiddleware,
A: WsLogger,
B: WsLogger,
{
type Instant = (A::Instant, B::Instant);

Expand Down Expand Up @@ -172,15 +174,15 @@ where
}
}

impl<A, B> HttpMiddleware for (A, B)
impl<A, B> HttpLogger for (A, B)
where
A: HttpMiddleware,
B: HttpMiddleware,
A: HttpLogger,
B: HttpLogger,
{
type Instant = (A::Instant, B::Instant);

fn on_request(&self, remote_addr: std::net::SocketAddr, headers: &Headers) -> Self::Instant {
(self.0.on_request(remote_addr, headers), self.1.on_request(remote_addr, headers))
fn on_request(&self, remote_addr: std::net::SocketAddr, request: &Request<Body>) -> Self::Instant {
(self.0.on_request(remote_addr, request), self.1.on_request(remote_addr, request))
}

fn on_call(&self, method_name: &str, params: Params, kind: MethodKind) {
Expand Down
6 changes: 3 additions & 3 deletions core/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ impl Methods {
let conn_state = ConnState { conn_id: 0, close_notify, id_provider: &RandomIntegerIdProvider };
let res = (cb)(id, params, sink.clone(), conn_state, None).await;

// This message is not used because it's used for middleware so we discard in other to
// This message is not used because it's used for metrics so we discard in other to
// not read once this is used for subscriptions.
//
// The same information is part of `res` above.
Expand Down Expand Up @@ -1045,9 +1045,9 @@ impl SubscriptionSink {

fn answer_subscription(&self, response: MethodResponse, subscribe_call: oneshot::Sender<MethodResponse>) -> bool {
let ws_send = self.inner.send_raw(response.result.clone()).is_ok();
let middleware_call = subscribe_call.send(response).is_ok();
let logger_call = subscribe_call.send(response).is_ok();

ws_send && middleware_call
ws_send && logger_call
}

fn build_message<T: Serialize>(&self, result: &T) -> Result<String, serde_json::Error> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,31 @@ use std::net::SocketAddr;
use std::time::Instant;

use jsonrpsee::core::client::ClientT;
use jsonrpsee::core::middleware::{self, Headers, MethodKind, Params};
use jsonrpsee::core::logger::{self, Body, MethodKind, Params, Request};
use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle, RpcModule};

#[derive(Clone)]
struct Timings;

impl middleware::HttpMiddleware for Timings {
impl logger::HttpLogger for Timings {
type Instant = Instant;

fn on_request(&self, remote_addr: SocketAddr, headers: &Headers) -> Self::Instant {
println!("[Middleware::on_request] remote_addr {}, headers: {:?}", remote_addr, headers);
fn on_request(&self, remote_addr: SocketAddr, request: &Request<Body>) -> Self::Instant {
println!("[Logger::on_request] remote_addr {}, request: {:?}", remote_addr, request);
Instant::now()
}

fn on_call(&self, name: &str, params: Params, kind: MethodKind) {
println!("[Middleware::on_call] method: '{}', params: {:?}, kind: {}", name, params, kind);
println!("[Logger::on_call] method: '{}', params: {:?}, kind: {}", name, params, kind);
}

fn on_result(&self, name: &str, succeess: bool, started_at: Self::Instant) {
println!("[Middleware::on_result] '{}', worked? {}, time elapsed {:?}", name, succeess, started_at.elapsed());
println!("[Logger::on_result] '{}', worked? {}, time elapsed {:?}", name, succeess, started_at.elapsed());
}

fn on_response(&self, result: &str, started_at: Self::Instant) {
println!("[Middleware::on_response] result: {}, time elapsed {:?}", result, started_at.elapsed());
println!("[Logger::on_response] result: {}, time elapsed {:?}", result, started_at.elapsed());
}
}

Expand All @@ -76,7 +76,7 @@ async fn main() -> anyhow::Result<()> {
}

async fn run_server() -> anyhow::Result<(SocketAddr, HttpServerHandle)> {
let server = HttpServerBuilder::new().set_middleware(Timings).build("127.0.0.1:0").await?;
let server = HttpServerBuilder::new().set_logger(Timings).build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo"))?;
let addr = server.local_addr()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,39 +28,39 @@ use std::net::SocketAddr;
use std::time::Instant;

use jsonrpsee::core::client::ClientT;
use jsonrpsee::core::middleware::{self, Headers, MethodKind, Params};
use jsonrpsee::core::logger::{self, Headers, MethodKind, Params};
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::ws_server::{RpcModule, WsServerBuilder};

#[derive(Clone)]
struct Timings;

impl middleware::WsMiddleware for Timings {
impl logger::WsLogger for Timings {
type Instant = Instant;

fn on_connect(&self, remote_addr: SocketAddr, headers: &Headers) {
println!("[Middleware::on_connect] remote_addr {}, headers: {:?}", remote_addr, headers);
println!("[Logger::on_connect] remote_addr {}, headers: {:?}", remote_addr, headers);
}

fn on_request(&self) -> Self::Instant {
println!("[Middleware::on_request]");
println!("[Logger::on_request]");
Instant::now()
}

fn on_call(&self, name: &str, params: Params, kind: MethodKind) {
println!("[Middleware::on_call] method: '{}', params: {:?}, kind: {}", name, params, kind);
println!("[Logger::on_call] method: '{}', params: {:?}, kind: {}", name, params, kind);
}

fn on_result(&self, name: &str, succeess: bool, started_at: Self::Instant) {
println!("[Middleware::on_result] '{}', worked? {}, time elapsed {:?}", name, succeess, started_at.elapsed());
println!("[Logger::on_result] '{}', worked? {}, time elapsed {:?}", name, succeess, started_at.elapsed());
}

fn on_response(&self, result: &str, started_at: Self::Instant) {
println!("[Middleware::on_response] result: {}, time elapsed {:?}", result, started_at.elapsed());
println!("[Logger::on_response] result: {}, time elapsed {:?}", result, started_at.elapsed());
}

fn on_disconnect(&self, remote_addr: SocketAddr) {
println!("[Middleware::on_disconnect] remote_addr: {}", remote_addr);
println!("[Logger::on_disconnect] remote_addr: {}", remote_addr);
}
}

Expand All @@ -84,7 +84,7 @@ async fn main() -> anyhow::Result<()> {
}

async fn run_server() -> anyhow::Result<SocketAddr> {
let server = WsServerBuilder::new().set_middleware(Timings).build("127.0.0.1:0").await?;
let server = WsServerBuilder::new().set_logger(Timings).build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo"))?;
let addr = server.local_addr()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,24 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

//! Example showing how to add multiple middlewares to the same server.
//! Example showing how to add multiple loggers to the same server.

use std::net::SocketAddr;
use std::process::Command;
use std::time::Instant;

use jsonrpsee::core::middleware::MethodKind;
use jsonrpsee::core::{client::ClientT, middleware, middleware::Headers};
use jsonrpsee::core::logger::MethodKind;
use jsonrpsee::core::{client::ClientT, logger, logger::Headers};
use jsonrpsee::rpc_params;
use jsonrpsee::types::Params;
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::ws_server::{RpcModule, WsServerBuilder};

/// Example middleware to measure call execution time.
/// Example logger to measure call execution time.
#[derive(Clone)]
struct Timings;

impl middleware::WsMiddleware for Timings {
impl logger::WsLogger for Timings {
type Instant = Instant;

fn on_connect(&self, remote_addr: SocketAddr, headers: &Headers) {
Expand Down Expand Up @@ -69,7 +69,7 @@ impl middleware::WsMiddleware for Timings {
}
}

/// Example middleware to keep a watch on the number of total threads started in the system.
/// Example logger to keep a watch on the number of total threads started in the system.
#[derive(Clone)]
struct ThreadWatcher;

Expand All @@ -88,7 +88,7 @@ impl ThreadWatcher {
}
}

impl middleware::WsMiddleware for ThreadWatcher {
impl logger::WsLogger for ThreadWatcher {
type Instant = isize;

fn on_connect(&self, remote_addr: SocketAddr, headers: &Headers) {
Expand Down Expand Up @@ -142,7 +142,7 @@ async fn main() -> anyhow::Result<()> {
}

async fn run_server() -> anyhow::Result<SocketAddr> {
let server = WsServerBuilder::new().set_middleware((Timings, ThreadWatcher)).build("127.0.0.1:0").await?;
let server = WsServerBuilder::new().set_logger((Timings, ThreadWatcher)).build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo"))?;
module.register_method("thready", |params, _| {
Expand Down
Loading