Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: Register raw method with connection ID #1297

Merged
merged 35 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
26ec635
server: Add a raw method
lexnv Feb 21, 2024
ef24148
server: Register raw methods, blocking or unblocking
lexnv Feb 21, 2024
4f8eb03
proc-macros: Add with-context attribute
lexnv Feb 21, 2024
9eba899
server: Register sync and nonblocking methods for raw API
lexnv Feb 21, 2024
239cafe
examples: Add with context example
lexnv Feb 21, 2024
0de6d95
core: Adjust docs for the raw method registering
lexnv Feb 21, 2024
4ce51c6
proc-macros: Cargo fmt
lexnv Feb 21, 2024
5f2b40a
server: Request Arc<Context> for the raw method callback
lexnv Feb 22, 2024
753224a
proc-macros: Per method raw-method attribute
lexnv Feb 22, 2024
d526b70
examples: Add server raw method
lexnv Feb 22, 2024
c8ff969
tests/ui: Check correct proc-macro behavior
lexnv Feb 22, 2024
b4880b9
tests/ui: Negative test for async with raw methods
lexnv Feb 22, 2024
d2da4b0
tests/ui: Negative test for blocking with raw methods
lexnv Feb 22, 2024
0dc48e9
tests/proc-macros: Ensure unique connection IDs from different clients
lexnv Feb 22, 2024
0029418
tests/integration: Ensure unique connection IDs from different clients
lexnv Feb 22, 2024
3a09a7e
proc-macros: Apply cargo fmt
lexnv Feb 22, 2024
1565927
Register raw method as async method
lexnv Feb 23, 2024
f5202ef
Fix testing
lexnv Feb 23, 2024
8cf1947
core: Fix documentation
lexnv Feb 23, 2024
bd571e1
Merge remote-tracking branch 'origin/master' into lexnv/low-level-con…
lexnv Mar 4, 2024
04dfd25
server: Rename raw method to `module.register_async_with_details`
lexnv Mar 4, 2024
e02ba61
server: Add connection details wrapper
lexnv Mar 4, 2024
7b72d8a
server: Add asyncWithDetails and connection details
lexnv Mar 4, 2024
87585ad
proc-macros: Provide connection details to methods
lexnv Mar 4, 2024
2c50375
Update core/src/server/rpc_module.rs
lexnv Mar 4, 2024
66b6a39
server: Remove connection details builder
lexnv Mar 4, 2024
cfe1aec
server: Refactor `.register_async_with_details` to `.register_async_m…
lexnv Mar 4, 2024
7327d8b
proc-macro: Clarify comment
lexnv Mar 4, 2024
6605409
Merge remote-tracking branch 'origin/lexnv/low-level-context-api-v2' …
lexnv Mar 4, 2024
650e24a
core: Doc hidden for async with details
lexnv Mar 5, 2024
f4d9085
Rename example
lexnv Mar 5, 2024
40d2bcf
Update core/src/server/rpc_module.rs
lexnv Mar 5, 2024
8185193
Merge remote-tracking branch 'origin/lexnv/low-level-context-api-v2' …
lexnv Mar 5, 2024
11ef18c
core: Remove doc(hidden) from ConnectionDetails::id
lexnv Mar 5, 2024
4f459ac
Update core/src/server/rpc_module.rs
niklasad1 Mar 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions core/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ pub type SyncMethod = Arc<dyn Send + Sync + Fn(Id, Params, MaxResponseSize) -> M
/// Similar to [`SyncMethod`], but represents an asynchronous handler.
pub type AsyncMethod<'a> =
Arc<dyn Send + Sync + Fn(Id<'a>, Params<'a>, ConnectionId, MaxResponseSize) -> BoxFuture<'a, MethodResponse>>;

/// Similar to [`AsyncMethod`], but represents an asynchronous handler with connection details.
#[doc(hidden)]
pub type AsyncMethodWithDetails<'a> =
Arc<dyn Send + Sync + Fn(Id<'a>, Params<'a>, ConnectionDetails, MaxResponseSize) -> BoxFuture<'a, MethodResponse>>;
/// Method callback for subscriptions.
pub type SubscriptionMethod<'a> =
Arc<dyn Send + Sync + Fn(Id, Params, MethodSink, SubscriptionState) -> BoxFuture<'a, MethodResponse>>;
Expand All @@ -79,6 +84,27 @@ pub type MaxResponseSize = usize;
/// - a [`mpsc::UnboundedReceiver<String>`] to receive future subscription results
pub type RawRpcResponse = (String, mpsc::Receiver<String>);

/// The connection details exposed to the server methods.
#[derive(Debug, Clone)]
#[allow(missing_copy_implementations)]
#[doc(hidden)]
pub struct ConnectionDetails {
id: ConnectionId,
}

impl ConnectionDetails {
/// Construct a new [`ConnectionDetails`].
#[doc(hidden)]
pub fn _new(id: ConnectionId) -> ConnectionDetails {
Self { id }
}

/// Get the connection ID.
pub fn id(&self) -> ConnectionId {
self.id
}
}

/// The error that can occur when [`Methods::call`] or [`Methods::subscribe`] is invoked.
#[derive(thiserror::Error, Debug)]
pub enum MethodsError {
Expand Down Expand Up @@ -131,6 +157,9 @@ pub enum MethodCallback {
Sync(SyncMethod),
/// Asynchronous method handler.
Async(AsyncMethod<'static>),
/// Asynchronous method handler with details.
#[doc(hidden)]
AsyncWithDetails(AsyncMethodWithDetails<'static>),
lexnv marked this conversation as resolved.
Show resolved Hide resolved
/// Subscription method handler.
Subscription(SubscriptionMethod<'static>),
/// Unsubscription method handler.
Expand Down Expand Up @@ -184,6 +213,7 @@ impl Debug for MethodCallback {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Async(_) => write!(f, "Async"),
Self::AsyncWithDetails(_) => write!(f, "AsyncWithDetails"),
Self::Sync(_) => write!(f, "Sync"),
Self::Subscription(_) => write!(f, "Subscription"),
Self::Unsubscription(_) => write!(f, "Unsubscription"),
Expand Down Expand Up @@ -355,6 +385,9 @@ impl Methods {
None => MethodResponse::error(req.id, ErrorObject::from(ErrorCode::MethodNotFound)),
Some(MethodCallback::Sync(cb)) => (cb)(id, params, usize::MAX),
Some(MethodCallback::Async(cb)) => (cb)(id.into_owned(), params.into_owned(), 0, usize::MAX).await,
Some(MethodCallback::AsyncWithDetails(cb)) => {
(cb)(id.into_owned(), params.into_owned(), ConnectionDetails::_new(0), usize::MAX).await
}
Some(MethodCallback::Subscription(cb)) => {
let conn_state =
SubscriptionState { conn_id: 0, id_provider: &RandomIntegerIdProvider, subscription_permit };
Expand Down Expand Up @@ -598,6 +631,43 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
Ok(callback)
}

/// Register a new raw RPC method, which computes the response with the given callback.
///
/// ## Examples
///
/// ```
/// use jsonrpsee_core::server::RpcModule;
///
/// let mut module = RpcModule::new(());
/// module.register_async_method_with_details("say_hello", |_params, _connection_details, _ctx| async { "lo" }).unwrap();
/// ```
#[doc(hidden)]
pub fn register_async_method_with_details<R, Fun, Fut>(
&mut self,
method_name: &'static str,
callback: Fun,
) -> Result<&mut MethodCallback, RegisterMethodError>
where
R: IntoResponse + 'static,
Fut: Future<Output = R> + Send,
Fun: (Fn(Params<'static>, ConnectionDetails, Arc<Context>) -> Fut) + Clone + Send + Sync + 'static,
{
let ctx = self.ctx.clone();
self.methods.verify_and_insert(
method_name,
MethodCallback::AsyncWithDetails(Arc::new(move |id, params, connection_details, max_response_size| {
let ctx = ctx.clone();
let callback = callback.clone();

let future = async move {
let rp = callback(params, connection_details, ctx).await.into_response();
MethodResponse::response(id, rp, max_response_size)
};
future.boxed()
})),
)
}

/// Register a new publish/subscribe interface using JSON-RPC notifications.
///
/// It implements the [ethereum pubsub specification](https://geth.ethereum.org/docs/rpc/pubsub)
Expand Down
126 changes: 126 additions & 0 deletions examples/examples/server_with_connection_details.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::net::SocketAddr;

use jsonrpsee::core::{async_trait, client::Subscription};
use jsonrpsee::proc_macros::rpc;
use jsonrpsee::server::{PendingSubscriptionSink, Server, SubscriptionMessage};
use jsonrpsee::types::ErrorObjectOwned;
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::ConnectionDetails;

#[rpc(server, client)]
pub trait Rpc {
/// Raw method with connection ID.
#[method(name = "connectionIdMethod", raw_method)]
async fn raw_method(&self, first_param: usize, second_param: u16) -> Result<usize, ErrorObjectOwned>;

/// Normal method call example.
#[method(name = "normalMethod")]
fn normal_method(&self, first_param: usize, second_param: u16) -> Result<usize, ErrorObjectOwned>;

/// Subscriptions expose the connection ID on the subscription sink.
#[subscription(name = "subscribeSync" => "sync", item = usize)]
fn sub(&self, first_param: usize);
}

pub struct RpcServerImpl;

#[async_trait]
impl RpcServer for RpcServerImpl {
async fn raw_method(
&self,
connection_details: ConnectionDetails,
_first_param: usize,
_second_param: u16,
) -> Result<usize, ErrorObjectOwned> {
// Return the connection ID from which this method was called.
Ok(connection_details.id())
}

fn normal_method(&self, _first_param: usize, _second_param: u16) -> Result<usize, ErrorObjectOwned> {
// The normal method does not have access to the connection ID.
Ok(usize::MAX)
}

fn sub(&self, pending: PendingSubscriptionSink, _first_param: usize) {
tokio::spawn(async move {
// The connection ID can be obtained before or after accepting the subscription
let pending_connection_id = pending.connection_id();
let sink = pending.accept().await.unwrap();
let sink_connection_id = sink.connection_id();

assert_eq!(pending_connection_id, sink_connection_id);

let msg = SubscriptionMessage::from_json(&sink_connection_id).unwrap();
sink.send(msg).await.unwrap();
});
}
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init()
.expect("setting default subscriber failed");

let server_addr = run_server().await?;
let url = format!("ws://{}", server_addr);

let client = WsClientBuilder::default().build(&url).await?;
let connection_id_first = client.raw_method(1, 2).await.unwrap();

// Second call from the same connection ID.
assert_eq!(client.raw_method(1, 2).await.unwrap(), connection_id_first);

// Second client will increment the connection ID.
let client_second = WsClientBuilder::default().build(&url).await?;
let connection_id_second = client_second.raw_method(1, 2).await.unwrap();
assert_ne!(connection_id_first, connection_id_second);

let mut sub: Subscription<usize> = RpcClient::sub(&client, 0).await.unwrap();
assert_eq!(connection_id_first, sub.next().await.transpose().unwrap().unwrap());

let mut sub: Subscription<usize> = RpcClient::sub(&client_second, 0).await.unwrap();
assert_eq!(connection_id_second, sub.next().await.transpose().unwrap().unwrap());

Ok(())
}

async fn run_server() -> anyhow::Result<SocketAddr> {
let server = Server::builder().build("127.0.0.1:0").await?;

let addr = server.local_addr()?;
let handle = server.start(RpcServerImpl.into_rpc());

// In this example we don't care about doing shutdown so let's it run forever.
// You may use the `ServerHandle` to shut it down or manage it yourself.
tokio::spawn(handle.stopped());

Ok(addr)
}
39 changes: 28 additions & 11 deletions proc-macros/src/render_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,15 @@
fn render_methods(&self) -> Result<TokenStream2, syn::Error> {
let methods = self.methods.iter().map(|method| {
let docs = &method.docs;
let method_sig = &method.signature;
let mut method_sig = method.signature.clone();

if method.raw_method {
let context_ty = self.jrps_server_item(quote! { ConnectionDetails });
// Add `ConnectionDetails` as the second parameter to the signature.
let context: syn::FnArg = syn::parse_quote!(connection_details: #context_ty);
method_sig.sig.inputs.insert(1, context);
}

quote! {
#docs
#method_sig
Expand Down Expand Up @@ -132,24 +140,33 @@

check_name(&rpc_method_name, rust_method_name.span());

if method.signature.sig.asyncness.is_some() {
if method.raw_method {
handle_register_result(quote! {
rpc.register_async_method(#rpc_method_name, |params, context| async move {
rpc.register_async_method_with_details(#rpc_method_name, |params, connection_details, context| async move {
#parsing
#into_response::into_response(context.as_ref().#rust_method_name(#params_seq).await)
#into_response::into_response(context.as_ref().#rust_method_name(connection_details, #params_seq).await)
})
})
} else {
let register_kind =
if method.blocking { quote!(register_blocking_method) } else { quote!(register_method) };
if method.signature.sig.asyncness.is_some() {
handle_register_result(quote! {
rpc.register_async_method(#rpc_method_name, |params, context| async move {
#parsing
#into_response::into_response(context.as_ref().#rust_method_name(#params_seq).await)
})
})
} else {
let register_kind =
if method.blocking { quote!(register_blocking_method) } else { quote!(register_method) };

handle_register_result(quote! {
rpc.#register_kind(#rpc_method_name, |params, context| {
#parsing
#into_response::into_response(context.#rust_method_name(#params_seq))
handle_register_result(quote! {
rpc.#register_kind(#rpc_method_name, |params, context| {
#parsing
#into_response::into_response(context.#rust_method_name(#params_seq))
})
})
})
}
}

Check warning on line 169 in proc-macros/src/render_server.rs

View workflow job for this annotation

GitHub Actions / clippy

this `else { if .. }` block can be collapsed

warning: this `else { if .. }` block can be collapsed --> proc-macros/src/render_server.rs:150:12 | 150 | } else { | ________________________^ 151 | | if method.signature.sig.asyncness.is_some() { 152 | | handle_register_result(quote! { 153 | | rpc.register_async_method(#rpc_method_name, |params, context| async move { ... | 168 | | } 169 | | } | |_________________^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#collapsible_else_if = note: `#[warn(clippy::collapsible_else_if)]` on by default help: collapse nested if block | 150 ~ } else if method.signature.sig.asyncness.is_some() { 151 ~ handle_register_result(quote! { 152 ~ rpc.register_async_method(#rpc_method_name, |params, context| async move { 153 ~ #parsing 154 ~ #into_response::into_response(context.as_ref().#rust_method_name(#params_seq).await) 155 ~ }) 156 ~ }) 157 ~ } else { 158 ~ let register_kind = 159 ~ if method.blocking { quote!(register_blocking_method) } else { quote!(register_method) }; 160 + 161 ~ handle_register_result(quote! { 162 ~ rpc.#register_kind(#rpc_method_name, |params, context| { 163 ~ #parsing 164 ~ #into_response::into_response(context.#rust_method_name(#params_seq)) 165 ~ }) 166 ~ }) 167 ~ } |

Check warning on line 169 in proc-macros/src/render_server.rs

View workflow job for this annotation

GitHub Actions / clippy

this `else { if .. }` block can be collapsed

warning: this `else { if .. }` block can be collapsed --> proc-macros/src/render_server.rs:150:12 | 150 | } else { | ________________________^ 151 | | if method.signature.sig.asyncness.is_some() { 152 | | handle_register_result(quote! { 153 | | rpc.register_async_method(#rpc_method_name, |params, context| async move { ... | 168 | | } 169 | | } | |_________________^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#collapsible_else_if = note: `#[warn(clippy::collapsible_else_if)]` on by default help: collapse nested if block | 150 ~ } else if method.signature.sig.asyncness.is_some() { 151 ~ handle_register_result(quote! { 152 ~ rpc.register_async_method(#rpc_method_name, |params, context| async move { 153 ~ #parsing 154 ~ #into_response::into_response(context.as_ref().#rust_method_name(#params_seq).await) 155 ~ }) 156 ~ }) 157 ~ } else { 158 ~ let register_kind = 159 ~ if method.blocking { quote!(register_blocking_method) } else { quote!(register_method) }; 160 + 161 ~ handle_register_result(quote! { 162 ~ rpc.#register_kind(#rpc_method_name, |params, context| { 163 ~ #parsing 164 ~ #into_response::into_response(context.#rust_method_name(#params_seq)) 165 ~ }) 166 ~ }) 167 ~ } |

Check warning on line 169 in proc-macros/src/render_server.rs

View workflow job for this annotation

GitHub Actions / clippy

this `else { if .. }` block can be collapsed

warning: this `else { if .. }` block can be collapsed --> proc-macros/src/render_server.rs:150:12 | 150 | } else { | ________________________^ 151 | | if method.signature.sig.asyncness.is_some() { 152 | | handle_register_result(quote! { 153 | | rpc.register_async_method(#rpc_method_name, |params, context| async move { ... | 168 | | } 169 | | } | |_________________^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#collapsible_else_if = note: `#[warn(clippy::collapsible_else_if)]` on by default help: collapse nested if block | 150 ~ } else if method.signature.sig.asyncness.is_some() { 151 ~ handle_register_result(quote! { 152 ~ rpc.register_async_method(#rpc_method_name, |params, context| async move { 153 ~ #parsing 154 ~ #into_response::into_response(context.as_ref().#rust_method_name(#params_seq).await) 155 ~ }) 156 ~ }) 157 ~ } else { 158 ~ let register_kind = 159 ~ if method.blocking { quote!(register_blocking_method) } else { quote!(register_method) }; 160 + 161 ~ handle_register_result(quote! { 162 ~ rpc.#register_kind(#rpc_method_name, |params, context| { 163 ~ #parsing 164 ~ #into_response::into_response(context.#rust_method_name(#params_seq)) 165 ~ }) 166 ~ }) 167 ~ } |
})
.collect::<Vec<_>>();

Expand Down
42 changes: 38 additions & 4 deletions proc-macros/src/rpc_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,19 @@ pub struct RpcMethod {
pub returns: Option<syn::Type>,
pub signature: syn::TraitItemFn,
pub aliases: Vec<String>,
pub raw_method: bool,
}

impl RpcMethod {
pub fn from_item(attr: Attribute, mut method: syn::TraitItemFn) -> syn::Result<Self> {
let [aliases, blocking, name, param_kind] =
AttributeMeta::parse(attr)?.retain(["aliases", "blocking", "name", "param_kind"])?;
let [aliases, blocking, name, param_kind, raw_method] =
AttributeMeta::parse(attr)?.retain(["aliases", "blocking", "name", "param_kind", "raw_method"])?;

let aliases = parse_aliases(aliases)?;
let blocking = optional(blocking, Argument::flag)?.is_some();
let name = name?.string()?;
let param_kind = parse_param_kind(param_kind)?;
let raw_method = optional(raw_method, Argument::flag)?.is_some();

let sig = method.sig.clone();
let docs = extract_doc_comments(&method.attrs);
Expand Down Expand Up @@ -98,7 +100,18 @@ impl RpcMethod {
// We've analyzed attributes and don't need them anymore.
method.attrs.clear();

Ok(Self { aliases, blocking, name, params, param_kind, returns, signature: method, docs, deprecated })
Ok(Self {
aliases,
blocking,
name,
params,
param_kind,
returns,
signature: method,
docs,
deprecated,
raw_method,
})
}
}

Expand Down Expand Up @@ -212,7 +225,6 @@ impl RpcDescription {
let namespace = optional(namespace, Argument::string)?;
let client_bounds = optional(client_bounds, Argument::group)?;
let server_bounds = optional(server_bounds, Argument::group)?;

if !needs_server && !needs_client {
return Err(syn::Error::new_spanned(&item.ident, "Either 'server' or 'client' attribute must be applied"));
}
Expand Down Expand Up @@ -260,6 +272,28 @@ impl RpcDescription {
is_method = true;

let method_data = RpcMethod::from_item(attr.clone(), method.clone())?;

if method_data.blocking && method_data.raw_method {
return Err(syn::Error::new_spanned(
method,
"Methods cannot be blocking when used with `raw_method`; remove `blocking` attribute or `raw_method` attribute",
));
}

if !needs_server && method_data.raw_method {
return Err(syn::Error::new_spanned(
&item.ident,
"Attribute 'raw_method' must be specified with 'server'",
));
}

if method.sig.asyncness.is_none() && method_data.raw_method {
return Err(syn::Error::new_spanned(
method,
"Methods must be asynchronous when used with `raw_method`; use `async fn` instead of `fn`",
));
}

methods.push(method_data);
}
if let Some(attr) = find_attr(&method.attrs, "subscription") {
Expand Down
14 changes: 14 additions & 0 deletions proc-macros/tests/ui/correct/server_with_raw_methods.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
//! Example of using proc macro to generate working client.

use jsonrpsee::{core::RpcResult, proc_macros::rpc, types::ErrorObjectOwned};

#[rpc(server)]
pub trait Rpc {
#[method(name = "foo", raw_method)]
async fn async_method(&self, param_a: u8, param_b: String) -> RpcResult<u16>;

#[method(name = "bar")]
fn sync_method(&self) -> Result<u16, ErrorObjectOwned>;
}

fn main() {}
Loading
Loading