Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Server: add optional connection_guard in config #1301

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,21 @@ where

let stop_handle = StopHandle::new(stop_rx);

let connection_guard = match self.server_cfg.connection_guard.take() {
Some(guard) => guard,
None => ConnectionGuard::new(self.server_cfg.max_connections as usize)
};

match self.server_cfg.tokio_runtime.take() {
Some(rt) => rt.spawn(self.start_inner(methods, stop_handle)),
None => tokio::spawn(self.start_inner(methods, stop_handle)),
Some(rt) => rt.spawn(self.start_inner(methods, stop_handle, connection_guard)),
None => tokio::spawn(self.start_inner(methods, stop_handle, connection_guard)),
};

ServerHandle::new(stop_tx)
}

async fn start_inner(self, methods: Methods, stop_handle: StopHandle) {
async fn start_inner(self, methods: Methods, stop_handle: StopHandle, connection_guard: ConnectionGuard) {
let mut id: u32 = 0;
let connection_guard = ConnectionGuard::new(self.server_cfg.max_connections as usize);
let listener = self.listener;

let stopped = stop_handle.clone().shutdown();
Expand Down Expand Up @@ -186,6 +190,8 @@ pub struct ServerConfig {
pub(crate) max_response_body_size: u32,
/// Maximum number of incoming connections allowed.
pub(crate) max_connections: u32,
/// Custom/shared connection guard. If not `None`, this will override `max_connections`.
pub(crate) connection_guard: Option<ConnectionGuard>,
/// Maximum number of subscriptions per connection.
pub(crate) max_subscriptions_per_connection: u32,
/// Whether batch requests are supported by this server or not.
Expand Down Expand Up @@ -344,6 +350,7 @@ impl Default for ServerConfig {
max_request_body_size: TEN_MB_SIZE_BYTES,
max_response_body_size: TEN_MB_SIZE_BYTES,
max_connections: MAX_CONNECTIONS,
connection_guard: None,
max_subscriptions_per_connection: 1024,
batch_requests_config: BatchRequestConfig::Unlimited,
tokio_runtime: None,
Expand Down Expand Up @@ -651,6 +658,12 @@ impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
self
}

/// Configure a custom [`ConnectionGuard`] for the server to use.
pub fn custom_connection_guard(mut self, guard: ConnectionGuard) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my perspective, this is similar to having a .max_connections() on the server builder.

If I understood the problem right, please correct me if I'm wrong, the only purpose of passing this object to the server is to later call ConnectionGuard::available_connections to check how many connections the server has active.

I think a much better approach for this would be writing a custom middleware, similar to the one we have for the logger layer:

pub struct RpcLogger<S> {
max: u32,
service: S,
}
impl<'a, S> RpcServiceT<'a> for RpcLogger<S>
where
S: RpcServiceT<'a>,
{
type Future = Instrumented<ResponseFuture<S::Future>>;
#[tracing::instrument(name = "method_call", skip_all, fields(method = request.method_name()), level = "trace")]
fn call(&self, request: Request<'a>) -> Self::Future {
rx_log_from_json(&request, self.max);
ResponseFuture { fut: self.service.call(request), max: self.max }.in_current_span()
}
}

If the number of connections is all you wish to monitor, a similar middleware was implemented for substrate chains:
https://github.com/paritytech/polkadot-sdk/blob/6c5a42a690f96d59dbdf94640188f5d5b5cc47e2/substrate/client/rpc-servers/src/middleware/metrics.rs#L61-L63

Copy link
Member

@niklasad1 niklasad1 Feb 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR.

I think it's an easy way to get the number connections on the server without writing some custom logic such as manual metrics, correct?

With this PR one could just to do:

let rpc_conns = ConnectionGuard::new(100);

tokio::spawn(async move  {
    let server = Server::builder().custom_connection_guard(rpc_conns.clone()).build(...).await.unwrap();
    server.start(...)
}

// some other task
do something with rpc_conns....

So, the idea now is that jsonrpsee now exposes "a low-level API" where the users can inject whatever data they want, have a look at this example: https://github.com/paritytech/jsonrpsee/blob/master/examples/examples/jsonrpsee_as_service.rs

Then it's possible to inject that data in specific JSON-RPC middleware (which @lexnv tried to explain) or somewhere else (up to each specific use-case).

However, utilizing the low-level API may be a little annoying for simple use-cases where one only wants a server and no custom stuff and then this PR could be useful I guess.

Thus, technically it's possible implement such thing already but much harder to do currently. Hmm, could be useful what does @lexnv @jsdw think?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understood the problem right, please correct me if I'm wrong, the only purpose of passing this object to the server is to later call ConnectionGuard::available_connections to check how many connections the server has active.

Yes, that is the reason.

If the number of connections is all you wish to monitor, a similar middleware was implemented for substrate chains:
https://github.com/paritytech/polkadot-sdk/blob/6c5a42a690f96d59dbdf94640188f5d5b5cc47e2/substrate/client/rpc-servers/src/middleware/metrics.rs#L61-L63

Thanks, I wasn't aware of this.

However, utilizing the low-level API may be a little annoying for simple use-cases where one only wants a server and no custom stuff and then this PR could be useful I guess.

I agree. While it might be easier to use the low-level api if you are just starting, it is annoying if you are already using the intended api.

self.server_cfg.connection_guard = Some(guard);
self
}

/// Enable WebSocket ping/pong on the server.
///
/// Default: pings are disabled.
Expand Down Expand Up @@ -841,14 +854,17 @@ impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
/// }
/// ```
pub fn to_service_builder(self) -> TowerServiceBuilder<RpcMiddleware, HttpMiddleware> {
let max_conns = self.server_cfg.max_connections as usize;
let conn_guard = match &self.server_cfg.connection_guard {
Some(guard) => guard.clone(),
None => ConnectionGuard::new(self.server_cfg.max_connections as usize),
};

TowerServiceBuilder {
server_cfg: self.server_cfg,
rpc_middleware: self.rpc_middleware,
http_middleware: self.http_middleware,
conn_id: Arc::new(AtomicU32::new(0)),
conn_guard: ConnectionGuard::new(max_conns),
conn_guard,
}
}

Expand Down