Skip to content

Commit

Permalink
Module API refactor (#412)
Browse files Browse the repository at this point in the history
* Do not register methods on servers

* fmt

* Infallible `to_rpc` proc macro

* Remove dead code

* Check for duplicate names at compile time

* Add a UI test for name conflicts

* Apply suggestions from code review

Co-authored-by: David <dvdplm@gmail.com>

Co-authored-by: David Palm <dvdplm@gmail.com>
  • Loading branch information
maciejhirsz and dvdplm authored Jul 12, 2021
1 parent b8af4cc commit 8db65b4
Show file tree
Hide file tree
Showing 20 changed files with 185 additions and 183 deletions.
10 changes: 4 additions & 6 deletions benches/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@ pub(crate) const UNSUB_METHOD_NAME: &str = "unsub";
pub async fn http_server() -> String {
let (server_started_tx, server_started_rx) = oneshot::channel();
tokio::spawn(async move {
let mut server =
let server =
HttpServerBuilder::default().max_request_body_size(u32::MAX).build("127.0.0.1:0".parse().unwrap()).unwrap();
let mut module = RpcModule::new(());
module.register_method(SYNC_METHOD_NAME, |_, _| Ok("lo")).unwrap();
module.register_async_method(ASYNC_METHOD_NAME, |_, _| (async { Ok("lo") }).boxed()).unwrap();
server.register_module(module).unwrap();
server_started_tx.send(server.local_addr().unwrap()).unwrap();
server.start().await
server.start(module).await
});
format!("http://{}", server_started_rx.await.unwrap())
}
Expand All @@ -30,7 +29,7 @@ pub async fn http_server() -> String {
pub async fn ws_server() -> String {
let (server_started_tx, server_started_rx) = oneshot::channel();
tokio::spawn(async move {
let mut server = WsServerBuilder::default().build("127.0.0.1:0").await.unwrap();
let server = WsServerBuilder::default().build("127.0.0.1:0").await.unwrap();
let mut module = RpcModule::new(());
module.register_method(SYNC_METHOD_NAME, |_, _| Ok("lo")).unwrap();
module.register_async_method(ASYNC_METHOD_NAME, |_, _| (async { Ok("lo") }).boxed()).unwrap();
Expand All @@ -42,9 +41,8 @@ pub async fn ws_server() -> String {
})
.unwrap();

server.register_module(module).unwrap();
server_started_tx.send(server.local_addr().unwrap()).unwrap();
server.start().await
server.start(module).await
});
format!("ws://{}", server_started_rx.await.unwrap())
}
Expand Down
5 changes: 2 additions & 3 deletions examples/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,11 @@ async fn main() -> anyhow::Result<()> {
}

async fn run_server() -> anyhow::Result<SocketAddr> {
let mut server = HttpServerBuilder::default().build("127.0.0.1:0".parse()?)?;
let server = HttpServerBuilder::default().build("127.0.0.1:0".parse()?)?;
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo"))?;
server.register_module(module).unwrap();

let addr = server.local_addr()?;
tokio::spawn(async move { server.start().await });
tokio::spawn(server.start(module));
Ok(addr)
}
5 changes: 2 additions & 3 deletions examples/proc_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,11 @@ async fn main() -> anyhow::Result<()> {
}

async fn run_server() -> anyhow::Result<SocketAddr> {
let mut server = HttpServerBuilder::default().build("127.0.0.1:0".parse()?)?;
let server = HttpServerBuilder::default().build("127.0.0.1:0".parse()?)?;
let mut module = RpcModule::new(());
module.register_method("state_getPairs", |_, _| Ok(vec![1, 2, 3]))?;
server.register_module(module).unwrap();

let addr = server.local_addr()?;
tokio::spawn(async move { server.start().await });
tokio::spawn(server.start(module));
Ok(addr)
}
6 changes: 2 additions & 4 deletions examples/weather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ struct WeatherApiCx {
}

async fn run_server() -> anyhow::Result<SocketAddr> {
let mut server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;

let api_client = restson::RestClient::new("http://api.openweathermap.org").unwrap();
let last_weather = Weather::default();
Expand All @@ -126,9 +126,7 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
})
.unwrap();

server.register_module(module).unwrap();

let addr = server.local_addr()?;
tokio::spawn(async move { server.start().await });
tokio::spawn(server.start(module));
Ok(addr)
}
5 changes: 2 additions & 3 deletions examples/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,10 @@ async fn main() -> anyhow::Result<()> {
}

async fn run_server() -> anyhow::Result<SocketAddr> {
let mut server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo"))?;
server.register_module(module).unwrap();
let addr = server.local_addr()?;
tokio::spawn(async move { server.start().await });
tokio::spawn(server.start(module));
Ok(addr)
}
5 changes: 2 additions & 3 deletions examples/ws_sub_with_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn main() -> anyhow::Result<()> {

async fn run_server() -> anyhow::Result<SocketAddr> {
const LETTERS: &str = "abcdefghijklmnopqrstuvxyz";
let mut server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module
.register_subscription("sub_one_param", "unsub_one_param", |params, mut sink, _| {
Expand All @@ -77,8 +77,7 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
})
.unwrap();

server.register_module(module).unwrap();
let addr = server.local_addr()?;
tokio::spawn(async move { server.start().await });
tokio::spawn(server.start(module));
Ok(addr)
}
5 changes: 2 additions & 3 deletions examples/ws_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn main() -> anyhow::Result<()> {
}

async fn run_server() -> anyhow::Result<SocketAddr> {
let mut server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module.register_subscription("subscribe_hello", "unsubscribe_hello", |_, mut sink, _| {
std::thread::spawn(move || loop {
Expand All @@ -65,8 +65,7 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
});
Ok(())
})?;
server.register_module(module).unwrap();
let addr = server.local_addr()?;
tokio::spawn(async move { server.start().await });
tokio::spawn(server.start(module));
Ok(addr)
}
18 changes: 2 additions & 16 deletions http-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use jsonrpsee_types::{
TEN_MB_SIZE_BYTES,
};
use jsonrpsee_utils::hyper_helpers::read_response_to_body;
use jsonrpsee_utils::server::rpc_module::RpcModule;
use jsonrpsee_utils::server::{
helpers::{collect_batch_response, send_error},
rpc_module::Methods,
Expand Down Expand Up @@ -160,19 +159,6 @@ pub struct Server {
}

impl Server {
/// Register all methods from a [`Methods`] of provided [`RpcModule`] on this server.
/// In case a method already is registered with the same name, no method is added and a [`Error::MethodAlreadyRegistered`]
/// is returned. Note that the [`RpcModule`] is consumed after this call.
pub fn register_module<Context: Send + Sync + 'static>(&mut self, module: RpcModule<Context>) -> Result<(), Error> {
self.methods.merge(module.into_methods())?;
Ok(())
}

/// Returns a `Vec` with all the method names registered on this server.
pub fn method_names(&self) -> Vec<&'static str> {
self.methods.method_names()
}

/// Returns socket address to which the server is bound.
pub fn local_addr(&self) -> Result<SocketAddr, Error> {
self.local_addr.ok_or_else(|| Error::Custom("Local address not found".into()))
Expand All @@ -184,15 +170,15 @@ impl Server {
}

/// Start the server.
pub async fn start(self) -> Result<(), Error> {
pub async fn start(self, methods: impl Into<Methods>) -> Result<(), Error> {
// Lock the stop mutex so existing stop handles can wait for server to stop.
// It will be unlocked once this function returns.
let _stop_handle = self.stop_handle.lock().await;

let methods = Arc::new(self.methods);
let max_request_body_size = self.max_request_body_size;
let access_control = self.access_control;
let mut stop_receiver = self.stop_pair.1;
let methods = methods.into();

let make_service = make_service_fn(move |_| {
let methods = methods.clone();
Expand Down
15 changes: 6 additions & 9 deletions http-server/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn server() -> SocketAddr {
}

async fn server_with_handles() -> (SocketAddr, JoinHandle<Result<(), Error>>, StopHandle) {
let mut server = HttpServerBuilder::default().build("127.0.0.1:0".parse().unwrap()).unwrap();
let server = HttpServerBuilder::default().build("127.0.0.1:0".parse().unwrap()).unwrap();
let ctx = TestContext;
let mut module = RpcModule::new(ctx);
let addr = server.local_addr().unwrap();
Expand Down Expand Up @@ -87,9 +87,8 @@ async fn server_with_handles() -> (SocketAddr, JoinHandle<Result<(), Error>>, St
})
.unwrap();

server.register_module(module).unwrap();
let stop_handle = server.stop_handle();
let join_handle = tokio::spawn(async move { server.start().with_default_timeout().await.unwrap() });
let join_handle = tokio::spawn(async move { server.start(module).with_default_timeout().await.unwrap() });
(addr, join_handle, stop_handle)
}

Expand Down Expand Up @@ -323,23 +322,21 @@ async fn can_register_modules() {
let cx2 = Vec::<u8>::new();
let mut mod2 = RpcModule::new(cx2);

let mut server = HttpServerBuilder::default().build("127.0.0.1:0".parse().unwrap()).unwrap();
assert_eq!(server.method_names().len(), 0);
assert_eq!(mod1.method_names().count(), 0);
mod1.register_method("bla", |_, cx| Ok(format!("Gave me {}", cx))).unwrap();
mod1.register_method("bla2", |_, cx| Ok(format!("Gave me {}", cx))).unwrap();
mod2.register_method("yada", |_, cx| Ok(format!("Gave me {:?}", cx))).unwrap();

// Won't register, name clashes
mod2.register_method("bla", |_, cx| Ok(format!("Gave me {:?}", cx))).unwrap();

server.register_module(mod1).unwrap();
assert_eq!(server.method_names().len(), 2);
assert_eq!(mod1.method_names().count(), 2);

let err = server.register_module(mod2).unwrap_err();
let err = mod1.merge(mod2).unwrap_err();

let expected_err = Error::MethodAlreadyRegistered(String::from("bla"));
assert_eq!(err.to_string(), expected_err.to_string());
assert_eq!(server.method_names().len(), 2);
assert_eq!(mod1.method_names().count(), 2);
}

#[tokio::test]
Expand Down
5 changes: 2 additions & 3 deletions proc-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,14 +325,13 @@ pub fn rpc_client_api(input_token_stream: TokenStream) -> TokenStream {
///
/// std::thread::spawn(move || {
/// let rt = tokio::runtime::Runtime::new().unwrap();
/// let mut server = rt.block_on(WsServerBuilder::default().build("127.0.0.1:0")).unwrap();
/// let server = rt.block_on(WsServerBuilder::default().build("127.0.0.1:0")).unwrap();
/// // `into_rpc()` method was generated inside of the `RpcServer` trait under the hood.
/// server.register_module(RpcServerImpl.into_rpc().unwrap()).unwrap();
///
/// rt.block_on(async move {
/// server_started_tx.send(server.local_addr().unwrap()).unwrap();
///
/// server.start().await
/// server.start(RpcServerImpl.into_rpc()).await
/// });
/// });
///
Expand Down
Loading

0 comments on commit 8db65b4

Please sign in to comment.