Skip to content

Commit 270bafd

Browse files
authored
unify usage of JSON via Box<RawValue> (#1545)
* feat: client rpc middleware * PoC works * cargo fmt * more refactoring * use Cow in Notification to avoid alloc * cleanup some todos * rpc trait: return Result<MethodResponse, Err> * remove infallible err * make it compile * fix tests * introduce client method response type * fix faulty imports * minor cleanup * introduce Batch/BatchEntry for middleware * remove ignore for batch test * fix rustdocs * add rpc middleware for the async client * remove serialize specific types * commit missing file * no serde_json::Value * more nit fixing * more cleanup * refactor method response client * fix some nits * add client middleware rpc * fix wasm build * add client middleware example * Update examples/examples/rpc_middleware_client.rs * ToJson -> RawValue * replace Future type with impl Trait * revert changelog * remove logger response future * some cleanup * move request timeout from transport to client * more nit fixing * have pass over examples * show proper batch middleware example * middleware: clean up batch type * fix wasm build * Update Cargo.toml * doc: fix typo * core: remove tracing mod * fix more clippy * refactor: use json rawvalue * replace StringError with SubscriptionErr * cleanup json APIs with RawValue * cleanup and fix tests * remove from_json api * revert faulty change in types * revert faulty change in types v2 * fix rustdocs * use explicit JSON null value instead of default * cleanup Rawvalue::from_string usage * fix tests * Update core/src/traits.rs * Update benches/bench.rs * Update core/src/params.rs * cleanup expects on RawValue * improve docs * replace manual serialization with to_raw_value It was much faster * fix clippy * fix nit
1 parent 3c40778 commit 270bafd

31 files changed

+312
-239
lines changed

benches/bench.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ pub fn jsonrpsee_types_v2(crit: &mut Criterion) {
113113
// Construct the serialized array request using the `RawValue` directly.
114114
crit.bench_function("jsonrpsee_types_array_params_baseline", |b| {
115115
b.iter(|| {
116-
let params = serde_json::value::RawValue::from_string("[1, 2]".to_string()).unwrap();
116+
let params = serde_json::value::to_raw_value(&[1, 2]).unwrap();
117117

118118
let request = Request::borrowed("say_hello", Some(&params), Id::Number(0));
119119
v2_serialize(request);
@@ -136,8 +136,7 @@ pub fn jsonrpsee_types_v2(crit: &mut Criterion) {
136136
// Construct the serialized object request using the `RawValue` directly.
137137
crit.bench_function("jsonrpsee_types_object_params_baseline", |b| {
138138
b.iter(|| {
139-
let params = serde_json::value::RawValue::from_string(r#"{"key": 1}"#.to_string()).unwrap();
140-
139+
let params = serde_json::value::to_raw_value(r#"{"key": 1}"#).unwrap();
141140
let request = Request::borrowed("say_hello", Some(&params), Id::Number(0));
142141
v2_serialize(request);
143142
})

benches/helpers.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ pub async fn http_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::
131131
/// Run jsonrpsee WebSocket server for benchmarks.
132132
#[cfg(not(feature = "jsonrpc-crate"))]
133133
pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::server::ServerHandle) {
134-
use jsonrpsee::server::{ServerBuilder, ServerConfig, SubscriptionMessage};
134+
use jsonrpsee::server::{ServerBuilder, ServerConfig};
135135

136136
let config = ServerConfig::builder()
137137
.max_request_body_size(u32::MAX)
@@ -150,8 +150,8 @@ pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::se
150150
UNSUB_METHOD_NAME,
151151
|_params, pending, _ctx, _| async move {
152152
let sink = pending.accept().await?;
153-
let msg = SubscriptionMessage::from("Hello");
154-
sink.send(msg).await?;
153+
let json = serde_json::value::to_raw_value(&"Hello").unwrap();
154+
sink.send(json).await?;
155155

156156
Ok(())
157157
},

core/src/client/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -787,7 +787,7 @@ impl ToJson for MethodResponse {
787787
fn to_json(&self) -> Result<Box<RawValue>, serde_json::Error> {
788788
match &self.inner {
789789
MethodResponseKind::MethodCall(call) => call.to_json(),
790-
MethodResponseKind::Notification => Ok(Box::<RawValue>::default()),
790+
MethodResponseKind::Notification => Ok(RawValue::NULL.to_owned()),
791791
MethodResponseKind::Batch(json) => serde_json::value::to_raw_value(json),
792792
MethodResponseKind::Subscription(s) => serde_json::value::to_raw_value(&s.rp),
793793
}

core/src/error.rs

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,53 @@
2424
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
2525
// DEALINGS IN THE SOFTWARE.
2626

27-
/// A type that returns the error as a `String` from `SubscriptionCallback`.
28-
#[derive(Debug)]
29-
pub struct StringError(pub(crate) String);
27+
use serde::Serialize;
28+
use serde_json::value::RawValue;
3029

31-
impl<T: ToString> From<T> for StringError {
30+
#[derive(Debug, Clone)]
31+
pub(crate) enum InnerSubscriptionErr {
32+
String(String),
33+
Json(Box<RawValue>),
34+
}
35+
36+
/// Error returned when a subscription fails where the error is returned
37+
/// as special error notification with the following format:
38+
///
39+
/// ```json
40+
/// {"jsonrpc":"2.0", "method":"subscription_error", "params": {"subscription": "sub_id", "error": <error message from this type>}}
41+
/// ```
42+
///
43+
/// It's recommended to use [`SubscriptionError::from_json`] to create a new instance of this error
44+
/// if the underlying error is a JSON value. That will ensure that the error is serialized correctly.
45+
///
46+
/// SubscriptionError::from will serialize the error as a string, which is not
47+
/// recommended and should only by used in the value of a `String` type.
48+
/// It's mainly provided for convenience and to allow for easy conversion any type that implements StdError.
49+
#[derive(Debug, Clone)]
50+
pub struct SubscriptionError(pub(crate) InnerSubscriptionErr);
51+
52+
impl Serialize for SubscriptionError {
53+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
54+
where
55+
S: serde::Serializer,
56+
{
57+
match &self.0 {
58+
InnerSubscriptionErr::String(s) => serializer.serialize_str(s),
59+
InnerSubscriptionErr::Json(json) => json.serialize(serializer),
60+
}
61+
}
62+
}
63+
64+
impl<T: ToString> From<T> for SubscriptionError {
3265
fn from(val: T) -> Self {
33-
StringError(val.to_string())
66+
Self(InnerSubscriptionErr::String(val.to_string()))
67+
}
68+
}
69+
70+
impl SubscriptionError {
71+
/// Create a new `SubscriptionError` from a JSON value.
72+
pub fn from_json(json: Box<RawValue>) -> Self {
73+
Self(InnerSubscriptionErr::Json(json))
3474
}
3575
}
3676

core/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ cfg_client_or_server! {
6161
}
6262

6363
pub use async_trait::async_trait;
64-
pub use error::{RegisterMethodError, StringError};
64+
pub use error::{RegisterMethodError, SubscriptionError};
6565

6666
/// JSON-RPC result.
6767
pub type RpcResult<T> = std::result::Result<T, jsonrpsee_types::ErrorObjectOwned>;
@@ -99,7 +99,7 @@ pub use std::borrow::Cow;
9999
pub const TEN_MB_SIZE_BYTES: u32 = 10 * 1024 * 1024;
100100

101101
/// The return type if the subscription wants to return `Result`.
102-
pub type SubscriptionResult = Result<(), StringError>;
102+
pub type SubscriptionResult = Result<(), SubscriptionError>;
103103

104104
/// Type erased error.
105105
pub type BoxError = Box<dyn std::error::Error + Send + Sync>;

core/src/params.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use serde_json::value::RawValue;
3333
/// Helper module for building parameters.
3434
mod params_builder {
3535
use serde::Serialize;
36+
use serde_json::value::RawValue;
3637

3738
/// Initial number of bytes for a parameter length.
3839
const PARAM_BYTES_CAPACITY: usize = 128;
@@ -106,8 +107,8 @@ mod params_builder {
106107
Ok(())
107108
}
108109

109-
/// Finish the building process and return a JSON compatible string.
110-
pub(crate) fn build(mut self) -> Option<String> {
110+
/// Finish the building process and return a JSON object.
111+
pub(crate) fn build(mut self) -> Option<Box<RawValue>> {
111112
if self.bytes.is_empty() {
112113
return None;
113114
}
@@ -120,7 +121,8 @@ mod params_builder {
120121
}
121122

122123
// Safety: This is safe because JSON does not emit invalid UTF-8.
123-
Some(unsafe { String::from_utf8_unchecked(self.bytes) })
124+
let json_str = unsafe { String::from_utf8_unchecked(self.bytes) };
125+
Some(RawValue::from_string(json_str).expect("Valid JSON String; qed"))
124126
}
125127
}
126128
}
@@ -164,7 +166,7 @@ impl Default for ObjectParams {
164166

165167
impl ToRpcParams for ObjectParams {
166168
fn to_rpc_params(self) -> Result<Option<Box<RawValue>>, serde_json::Error> {
167-
if let Some(json) = self.0.build() { RawValue::from_string(json).map(Some) } else { Ok(None) }
169+
Ok(self.0.build())
168170
}
169171
}
170172

@@ -206,7 +208,7 @@ impl Default for ArrayParams {
206208

207209
impl ToRpcParams for ArrayParams {
208210
fn to_rpc_params(self) -> Result<Option<Box<RawValue>>, serde_json::Error> {
209-
if let Some(json) = self.0.build() { RawValue::from_string(json).map(Some) } else { Ok(None) }
211+
Ok(self.0.build())
210212
}
211213
}
212214

core/src/server/error.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
// DEALINGS IN THE SOFTWARE.
2626

2727
use crate::server::SubscriptionMessage;
28+
use serde_json::value::RawValue;
2829
use tokio::sync::mpsc;
2930

3031
/// Error that may occur during [`crate::server::MethodSink::try_send`] or [`crate::server::SubscriptionSink::try_send`].
@@ -60,23 +61,23 @@ pub enum SendTimeoutError {
6061
#[error("The remote peer closed the connection")]
6162
pub struct PendingSubscriptionAcceptError;
6263

63-
impl From<mpsc::error::SendError<String>> for DisconnectError {
64-
fn from(e: mpsc::error::SendError<String>) -> Self {
64+
impl From<mpsc::error::SendError<Box<RawValue>>> for DisconnectError {
65+
fn from(e: mpsc::error::SendError<Box<RawValue>>) -> Self {
6566
DisconnectError(SubscriptionMessage::from_complete_message(e.0))
6667
}
6768
}
6869

69-
impl From<mpsc::error::TrySendError<String>> for TrySendError {
70-
fn from(e: mpsc::error::TrySendError<String>) -> Self {
70+
impl From<mpsc::error::TrySendError<Box<RawValue>>> for TrySendError {
71+
fn from(e: mpsc::error::TrySendError<Box<RawValue>>) -> Self {
7172
match e {
7273
mpsc::error::TrySendError::Closed(m) => Self::Closed(SubscriptionMessage::from_complete_message(m)),
7374
mpsc::error::TrySendError::Full(m) => Self::Full(SubscriptionMessage::from_complete_message(m)),
7475
}
7576
}
7677
}
7778

78-
impl From<mpsc::error::SendTimeoutError<String>> for SendTimeoutError {
79-
fn from(e: mpsc::error::SendTimeoutError<String>) -> Self {
79+
impl From<mpsc::error::SendTimeoutError<Box<RawValue>>> for SendTimeoutError {
80+
fn from(e: mpsc::error::SendTimeoutError<Box<RawValue>>) -> Self {
8081
match e {
8182
mpsc::error::SendTimeoutError::Closed(m) => Self::Closed(SubscriptionMessage::from_complete_message(m)),
8283
mpsc::error::SendTimeoutError::Timeout(m) => Self::Timeout(SubscriptionMessage::from_complete_message(m)),

core/src/server/helpers.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,27 +27,28 @@
2727
use std::time::Duration;
2828

2929
use jsonrpsee_types::{ErrorCode, ErrorObject, Id, InvalidRequest, Response, ResponsePayload};
30+
use serde_json::value::RawValue;
3031
use tokio::sync::mpsc;
3132

32-
use super::{DisconnectError, SendTimeoutError, SubscriptionMessage, TrySendError};
33+
use super::{DisconnectError, SendTimeoutError, TrySendError};
3334

3435
/// Sink that is used to send back the result to the server for a specific method.
3536
#[derive(Clone, Debug)]
3637
pub struct MethodSink {
3738
/// Channel sender.
38-
tx: mpsc::Sender<String>,
39+
tx: mpsc::Sender<Box<RawValue>>,
3940
/// Max response size in bytes for a executed call.
4041
max_response_size: u32,
4142
}
4243

4344
impl MethodSink {
4445
/// Create a new `MethodSink` with unlimited response size.
45-
pub fn new(tx: mpsc::Sender<String>) -> Self {
46+
pub fn new(tx: mpsc::Sender<Box<RawValue>>) -> Self {
4647
MethodSink { tx, max_response_size: u32::MAX }
4748
}
4849

4950
/// Create a new `MethodSink` with a limited response size.
50-
pub fn new_with_limit(tx: mpsc::Sender<String>, max_response_size: u32) -> Self {
51+
pub fn new_with_limit(tx: mpsc::Sender<Box<RawValue>>, max_response_size: u32) -> Self {
5152
MethodSink { tx, max_response_size }
5253
}
5354

@@ -74,25 +75,25 @@ impl MethodSink {
7475
/// connection has been closed or if the message buffer is full.
7576
///
7677
/// Returns the message if the send fails such that either can be thrown away or re-sent later.
77-
pub fn try_send(&mut self, msg: String) -> Result<(), TrySendError> {
78+
pub fn try_send(&mut self, msg: Box<RawValue>) -> Result<(), TrySendError> {
7879
self.tx.try_send(msg).map_err(Into::into)
7980
}
8081

8182
/// Async send which will wait until there is space in channel buffer or that the subscription is disconnected.
82-
pub async fn send(&self, msg: String) -> Result<(), DisconnectError> {
83+
pub async fn send(&self, msg: Box<RawValue>) -> Result<(), DisconnectError> {
8384
self.tx.send(msg).await.map_err(Into::into)
8485
}
8586

8687
/// Send a JSON-RPC error to the client
8788
pub async fn send_error<'a>(&self, id: Id<'a>, err: ErrorObject<'a>) -> Result<(), DisconnectError> {
8889
let payload = ResponsePayload::<()>::error_borrowed(err);
89-
let json = serde_json::to_string(&Response::new(payload, id)).expect("valid JSON; qed");
90+
let json = serde_json::value::to_raw_value(&Response::new(payload, id)).expect("valid JSON; qed");
9091

9192
self.send(json).await
9293
}
9394

9495
/// Similar to `MethodSink::send` but only waits for a limited time.
95-
pub async fn send_timeout(&self, msg: String, timeout: Duration) -> Result<(), SendTimeoutError> {
96+
pub async fn send_timeout(&self, msg: Box<RawValue>, timeout: Duration) -> Result<(), SendTimeoutError> {
9697
self.tx.send_timeout(msg, timeout).await.map_err(Into::into)
9798
}
9899

@@ -112,7 +113,7 @@ impl MethodSink {
112113
// The permit is thrown away here because it's just
113114
// a way to ensure that the return buffer has space.
114115
Ok(_) => Ok(()),
115-
Err(_) => Err(DisconnectError(SubscriptionMessage::empty())),
116+
Err(_) => Err(DisconnectError(RawValue::NULL.to_owned().into())),
116117
}
117118
}
118119
}

core/src/server/method_response.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ impl MethodResponse {
191191
Ok(_) => {
192192
// Safety - serde_json does not emit invalid UTF-8.
193193
let result = unsafe { String::from_utf8_unchecked(writer.into_bytes()) };
194-
let json = RawValue::from_string(result).expect("JSON serialization infallible; qed");
194+
let json = RawValue::from_string(result).expect("Valid JSON String; qed");
195195

196196
Self { json, success_or_error, kind, on_close: rp.on_exit, extensions: Extensions::new() }
197197
}
@@ -261,7 +261,7 @@ impl MethodResponse {
261261
/// Create notification response which is a response that doesn't expect a reply.
262262
pub fn notification() -> Self {
263263
Self {
264-
json: Box::<RawValue>::default(),
264+
json: RawValue::NULL.to_owned(),
265265
success_or_error: MethodResponseResult::Success,
266266
kind: ResponseKind::Notification,
267267
on_close: None,
@@ -370,7 +370,7 @@ impl BatchResponseBuilder {
370370
} else {
371371
self.result.pop();
372372
self.result.push(']');
373-
let json = RawValue::from_string(self.result).expect("JSON serialization infallible; qed");
373+
let json = RawValue::from_string(self.result).expect("BatchResponse builds a valid JSON String; qed");
374374
BatchResponse { json, extensions: self.extensions }
375375
}
376376
}

0 commit comments

Comments
 (0)