-
Notifications
You must be signed in to change notification settings - Fork 2.6k
rpc server with HTTP/WS on the same socket #12663
Changes from 2 commits
45faece
981ccd2
f79c774
e5f4678
e676d4a
9eb9006
ad5ba6e
eca4168
4fd2eb2
8202ad0
f2cfadd
9b0b6c6
05001fe
90ac76b
8601c4d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,17 +21,21 @@ | |
#![warn(missing_docs)] | ||
|
||
use jsonrpsee::{ | ||
http_server::{AccessControlBuilder, HttpServerBuilder, HttpServerHandle}, | ||
ws_server::{WsServerBuilder, WsServerHandle}, | ||
server::{ | ||
middleware::proxy_get_request::ProxyGetRequestLayer, AllowHosts, ServerBuilder, | ||
ServerHandle, | ||
}, | ||
RpcModule, | ||
}; | ||
use std::{error::Error as StdError, net::SocketAddr}; | ||
|
||
pub use crate::middleware::{RpcMetrics, RpcMiddleware}; | ||
pub use crate::middleware::RpcMetrics; | ||
use http::header::HeaderValue; | ||
pub use jsonrpsee::core::{ | ||
id_providers::{RandomIntegerIdProvider, RandomStringIdProvider}, | ||
traits::IdProvider, | ||
}; | ||
use tower_http::cors::{AllowOrigin, CorsLayer}; | ||
|
||
const MEGABYTE: usize = 1024 * 1024; | ||
|
||
|
@@ -46,12 +50,11 @@ const WS_MAX_SUBS_PER_CONN: usize = 1024; | |
|
||
pub mod middleware; | ||
|
||
/// Type alias for http server | ||
pub type HttpServer = HttpServerHandle; | ||
/// Type alias for ws server | ||
pub type WsServer = WsServerHandle; | ||
/// Type alias JSON-RPC server | ||
pub type Server = ServerHandle; | ||
|
||
/// WebSocket specific settings on the server. | ||
/// Server config. | ||
#[derive(Debug, Clone)] | ||
pub struct WsConfig { | ||
/// Maximum connections. | ||
pub max_connections: Option<usize>, | ||
|
@@ -67,8 +70,8 @@ impl WsConfig { | |
// Deconstructs the config to get the finalized inner values. | ||
// | ||
// `Payload size` or `max subs per connection` bigger than u32::MAX will be truncated. | ||
fn deconstruct(self) -> (u32, u32, u64, u32) { | ||
let max_conns = self.max_connections.unwrap_or(WS_MAX_CONNECTIONS) as u64; | ||
fn deconstruct(self) -> (u32, u32, u32, u32) { | ||
let max_conns = self.max_connections.unwrap_or(WS_MAX_CONNECTIONS) as u32; | ||
let max_payload_in_mb = payload_size_or_default(self.max_payload_in_mb) as u32; | ||
let max_payload_out_mb = payload_size_or_default(self.max_payload_out_mb) as u32; | ||
let max_subs_per_conn = self.max_subs_per_conn.unwrap_or(WS_MAX_SUBS_PER_CONN) as u32; | ||
|
@@ -86,31 +89,28 @@ pub async fn start_http<M: Send + Sync + 'static>( | |
metrics: Option<RpcMetrics>, | ||
rpc_api: RpcModule<M>, | ||
rt: tokio::runtime::Handle, | ||
) -> Result<HttpServerHandle, Box<dyn StdError + Send + Sync>> { | ||
let max_payload_in = payload_size_or_default(max_payload_in_mb); | ||
let max_payload_out = payload_size_or_default(max_payload_out_mb); | ||
|
||
let mut acl = AccessControlBuilder::new(); | ||
|
||
if let Some(cors) = cors { | ||
// Whitelist listening address. | ||
// NOTE: set_allowed_hosts will whitelist both ports but only one will used. | ||
acl = acl.set_allowed_hosts(format_allowed_hosts(&addrs[..]))?; | ||
acl = acl.set_allowed_origins(cors)?; | ||
}; | ||
|
||
let builder = HttpServerBuilder::new() | ||
.max_request_body_size(max_payload_in as u32) | ||
.max_response_body_size(max_payload_out as u32) | ||
.set_access_control(acl.build()) | ||
.health_api("/health", "system_health")? | ||
.custom_tokio_runtime(rt); | ||
) -> Result<ServerHandle, Box<dyn StdError + Send + Sync>> { | ||
let max_payload_in = payload_size_or_default(max_payload_in_mb) as u32; | ||
let max_payload_out = payload_size_or_default(max_payload_out_mb) as u32; | ||
let host_filter = hosts_filter(cors.is_some(), &addrs); | ||
let cors = try_into_cors(cors)?; | ||
|
||
let middleware = tower::ServiceBuilder::new() | ||
// Proxy `GET /health` requests to internal `system_health` method. | ||
.layer(ProxyGetRequestLayer::new("/health", "system_health")?) | ||
.layer(cors.clone()); | ||
|
||
let builder = ServerBuilder::new() | ||
.max_request_body_size(max_payload_in) | ||
.max_response_body_size(max_payload_out) | ||
.set_host_filtering(host_filter) | ||
.set_middleware(middleware) | ||
.custom_tokio_runtime(rt) | ||
.http_only(); | ||
|
||
let rpc_api = build_rpc_api(rpc_api); | ||
let (handle, addr) = if let Some(metrics) = metrics { | ||
let middleware = RpcMiddleware::new(metrics, "http".into()); | ||
let builder = builder.set_middleware(middleware); | ||
let server = builder.build(&addrs[..]).await?; | ||
let server = builder.set_logger(metrics).build(&addrs[..]).await?; | ||
let addr = server.local_addr(); | ||
(server.start(rpc_api)?, addr) | ||
} else { | ||
|
@@ -120,44 +120,44 @@ pub async fn start_http<M: Send + Sync + 'static>( | |
}; | ||
|
||
log::info!( | ||
"Running JSON-RPC HTTP server: addr={}, allowed origins={:?}", | ||
"Running JSON-RPC HTTP server: addr={}, cors={:?}", | ||
addr.map_or_else(|_| "unknown".to_string(), |a| a.to_string()), | ||
cors | ||
); | ||
|
||
Ok(handle) | ||
} | ||
|
||
/// Start WS server listening on given address. | ||
pub async fn start_ws<M: Send + Sync + 'static>( | ||
/// Start a JSON-RPC server listening on given address that supports both HTTP and WS. | ||
pub async fn start<M: Send + Sync + 'static>( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I reckon that it would be useful for folks to inject their own tower middleware here but not possible currently. We could do it in another PR but it's a bit tricky to control the CORS then.... |
||
addrs: [SocketAddr; 2], | ||
cors: Option<&Vec<String>>, | ||
ws_config: WsConfig, | ||
metrics: Option<RpcMetrics>, | ||
rpc_api: RpcModule<M>, | ||
rt: tokio::runtime::Handle, | ||
id_provider: Option<Box<dyn IdProvider>>, | ||
) -> Result<WsServerHandle, Box<dyn StdError + Send + Sync>> { | ||
) -> Result<ServerHandle, Box<dyn StdError + Send + Sync>> { | ||
let (max_payload_in, max_payload_out, max_connections, max_subs_per_conn) = | ||
ws_config.deconstruct(); | ||
|
||
let mut acl = AccessControlBuilder::new(); | ||
let host_filter = hosts_filter(cors.is_some(), &addrs); | ||
let cors = try_into_cors(cors)?; | ||
|
||
if let Some(cors) = cors { | ||
// Whitelist listening address. | ||
// NOTE: set_allowed_hosts will whitelist both ports but only one will used. | ||
acl = acl.set_allowed_hosts(format_allowed_hosts(&addrs[..]))?; | ||
acl = acl.set_allowed_origins(cors)?; | ||
}; | ||
let middleware = tower::ServiceBuilder::new() | ||
// Proxy `GET /health` requests to internal `system_health` method. | ||
.layer(ProxyGetRequestLayer::new("/health", "system_health")?) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. DQ: previously we exposed the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
.layer(cors.clone()); | ||
|
||
let mut builder = WsServerBuilder::new() | ||
let mut builder = ServerBuilder::new() | ||
.max_request_body_size(max_payload_in) | ||
.max_response_body_size(max_payload_out) | ||
.max_connections(max_connections) | ||
.max_subscriptions_per_connection(max_subs_per_conn) | ||
.ping_interval(std::time::Duration::from_secs(30)) | ||
.custom_tokio_runtime(rt) | ||
.set_access_control(acl.build()); | ||
.set_host_filtering(host_filter) | ||
.set_middleware(middleware) | ||
.custom_tokio_runtime(rt); | ||
|
||
if let Some(provider) = id_provider { | ||
builder = builder.set_id_provider(provider); | ||
|
@@ -167,9 +167,7 @@ pub async fn start_ws<M: Send + Sync + 'static>( | |
|
||
let rpc_api = build_rpc_api(rpc_api); | ||
let (handle, addr) = if let Some(metrics) = metrics { | ||
let middleware = RpcMiddleware::new(metrics, "ws".into()); | ||
let builder = builder.set_middleware(middleware); | ||
let server = builder.build(&addrs[..]).await?; | ||
let server = builder.set_logger(metrics).build(&addrs[..]).await?; | ||
let addr = server.local_addr(); | ||
(server.start(rpc_api)?, addr) | ||
} else { | ||
|
@@ -179,23 +177,14 @@ pub async fn start_ws<M: Send + Sync + 'static>( | |
}; | ||
|
||
log::info!( | ||
"Running JSON-RPC WS server: addr={}, allowed origins={:?}", | ||
"Running JSON-RPC WS server: addr={}, cors={:?}", | ||
niklasad1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
addr.map_or_else(|_| "unknown".to_string(), |a| a.to_string()), | ||
cors | ||
); | ||
|
||
Ok(handle) | ||
} | ||
|
||
fn format_allowed_hosts(addrs: &[SocketAddr]) -> Vec<String> { | ||
let mut hosts = Vec::with_capacity(addrs.len() * 2); | ||
for addr in addrs { | ||
hosts.push(format!("localhost:{}", addr.port())); | ||
hosts.push(format!("127.0.0.1:{}", addr.port())); | ||
} | ||
hosts | ||
} | ||
|
||
fn build_rpc_api<M: Send + Sync + 'static>(mut rpc_api: RpcModule<M>) -> RpcModule<M> { | ||
let mut available_methods = rpc_api.method_names().collect::<Vec<_>>(); | ||
available_methods.sort(); | ||
|
@@ -214,3 +203,32 @@ fn build_rpc_api<M: Send + Sync + 'static>(mut rpc_api: RpcModule<M>) -> RpcModu | |
fn payload_size_or_default(size_mb: Option<usize>) -> usize { | ||
size_mb.map_or(RPC_MAX_PAYLOAD_DEFAULT, |mb| mb.saturating_mul(MEGABYTE)) | ||
} | ||
|
||
fn hosts_filter(enabled: bool, addrs: &[SocketAddr]) -> AllowHosts { | ||
if enabled { | ||
// NOTE The listening addresses are whitelisted by default. | ||
let mut hosts = Vec::with_capacity(addrs.len() * 2); | ||
for addr in addrs { | ||
hosts.push(format!("localhost:{}", addr.port()).into()); | ||
hosts.push(format!("127.0.0.1:{}", addr.port()).into()); | ||
} | ||
AllowHosts::Only(hosts) | ||
} else { | ||
AllowHosts::Any | ||
} | ||
} | ||
|
||
fn try_into_cors( | ||
maybe_cors: Option<&Vec<String>>, | ||
) -> Result<CorsLayer, Box<dyn StdError + Send + Sync>> { | ||
if let Some(cors) = maybe_cors { | ||
let mut list = Vec::new(); | ||
for origin in cors { | ||
list.push(HeaderValue::from_str(origin)?); | ||
} | ||
Ok(CorsLayer::new().allow_origin(AllowOrigin::list(list))) | ||
} else { | ||
// allow all cors | ||
Ok(CorsLayer::permissive()) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this runs only on HTTP and will be deprecated/removed in a few releases.
/cc @PierreBesson @jam206 would be great if you can migrate the
health checks
to the--ws-port
when this is deployed that should workI think the most reasonable is to keep
--rpc-port
and similar flags and just deprecate thews-related
flags.Then make the
9944
the default port.