Skip to content

Commit

Permalink
Allow generalising over RPC implementation (#634)
Browse files Browse the repository at this point in the history
* WIP generalising RPC client

* WIP: non-object-safe RpcClientT.. aah generics everywhere

* WIP object-safe RpcClientT trait and no more extra generics

* Get core things compiling again with object-safe RpcClientT trait

* Make jsonrpsee optional and get test-runtime working again

* cargo fmt

* add RpcParams object to enforce correct formatting of rps params

* Wee tweaks

* clippy fixes

* cargo fmt

* TWeak a few types

* make sure we get jsonrpsee-types, too

* Add examples for rpc_params/RpcParams

* more doc tweaks

* remove a now unneeded dev note

* Option<Box<RawValue>> instead to avoid allocations in some cases

* update docs

* tweak RpcClientT trait docs

* Tweak docs around RpcClient and RpcClientT. Don't expose RpcClientT directly

* more doc tweaking about RpcParams and undo decision not to expose RpcParamsT

* Doc tweak

Co-authored-by: Alexandru Vasile <60601340+lexnv@users.noreply.github.com>

* more doc tweaks

* Fix doc thing

* Add an example of injecting a custom RPC client

* Fix a typo

* Address clippy things in example

* Fix a silly typo

* another clippy fix

* rpc_params to panic instead of returning a result, like serde_json::json, and deref on Rpc<T>

* fix docs

Co-authored-by: Alexandru Vasile <60601340+lexnv@users.noreply.github.com>
  • Loading branch information
jsdw and lexnv authored Aug 31, 2022
1 parent 5ff8493 commit 599107b
Show file tree
Hide file tree
Showing 17 changed files with 737 additions and 143 deletions.
1 change: 0 additions & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,3 @@ futures = "0.3.13"
codec = { package = "parity-scale-codec", version = "3.0.0", default-features = false, features = ["derive", "full", "bit-vec"] }
hex = "0.4.3"
tracing-subscriber = "0.3.11"

2 changes: 1 addition & 1 deletion examples/examples/custom_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.

//! This example should compile but should aos fail to work, since we've modified the
//! This example should compile but should fail to work, since we've modified the
//! config to not align with a Polkadot node.
use sp_keyring::AccountKeyring;
Expand Down
96 changes: 96 additions & 0 deletions examples/examples/custom_rpc_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.

use std::{
fmt::Write,
pin::Pin,
sync::{
Arc,
Mutex,
},
};
use subxt::{
rpc::{
RawValue,
RpcClientT,
RpcFuture,
RpcSubscription,
},
OnlineClient,
PolkadotConfig,
};

// A dummy RPC client that doesn't actually handle requests properly
// at all, but instead just logs what requests to it were made.
struct MyLoggingClient {
log: Arc<Mutex<String>>,
}

// We have to implement this fairly low level trait to turn [`MyLoggingClient`]
// into an RPC client that we can make use of in Subxt. Here we just log the requests
// made but don't forward them to any real node, and instead just return nonsense.
impl RpcClientT for MyLoggingClient {
fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<RawValue>>,
) -> RpcFuture<'a, Box<RawValue>> {
writeln!(
self.log.lock().unwrap(),
"{method}({})",
params.as_ref().map(|p| p.get()).unwrap_or("[]")
)
.unwrap();

// We've logged the request; just return garbage. Because a boxed future is returned,
// you're able to run whatever async code you'd need to actually talk to a node.
let res = RawValue::from_string("[]".to_string()).unwrap();
Box::pin(std::future::ready(Ok(res)))
}

fn subscribe_raw<'a>(
&'a self,
sub: &'a str,
params: Option<Box<RawValue>>,
unsub: &'a str,
) -> RpcFuture<'a, RpcSubscription> {
writeln!(
self.log.lock().unwrap(),
"{sub}({}) (unsub: {unsub})",
params.as_ref().map(|p| p.get()).unwrap_or("[]")
)
.unwrap();

// We've logged the request; just return garbage. Because a boxed future is returned,
// and that will return a boxed Stream impl, you have a bunch of flexibility to build
// and return whatever type of Stream you see fit.
let res = RawValue::from_string("[]".to_string()).unwrap();
let stream = futures::stream::once(async move { Ok(res) });
let stream: Pin<Box<dyn futures::Stream<Item = _> + Send>> = Box::pin(stream);
Box::pin(std::future::ready(Ok(stream)))
}
}

#[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata.scale")]
pub mod polkadot {}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();

// Instantiate our replacement RPC client.
let log = Arc::default();
let rpc_client = MyLoggingClient {
log: Arc::clone(&log),
};

// Pass this into our OnlineClient to instantiate it. This will lead to some
// RPC calls being made to fetch chain details/metadata, which will immediately
// fail..
let _ = OnlineClient::<PolkadotConfig>::from_rpc_client(rpc_client).await;

// But, we can see that the calls were made via our custom RPC client:
println!("Log of calls made:\n\n{}", log.lock().unwrap().as_str());
Ok(())
}
8 changes: 7 additions & 1 deletion subxt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@ description = "Submit extrinsics (transactions) to a substrate node via RPC"
keywords = ["parity", "substrate", "blockchain"]

[features]
default = ["jsonrpsee"]

# Activate this to expose functionality only used for integration testing.
# The exposed functionality is subject to breaking changes at any point,
# and should not be relied upon.
integration-tests = []

# Jsonrpsee if the default RPC provider used in Subxt. However, it can be
# swapped out for an alternative implementation, and so is optional.
jsonrpsee = ["dep:jsonrpsee"]

[dependencies]
bitvec = { version = "1.0.0", default-features = false, features = ["alloc"] }
codec = { package = "parity-scale-codec", version = "3.0.0", default-features = false, features = ["derive", "full", "bit-vec"] }
Expand All @@ -26,7 +32,7 @@ scale-value = "0.5.0"
scale-decode = "0.3.0"
futures = "0.3.13"
hex = "0.4.3"
jsonrpsee = { version = "0.15.1", features = ["async-client", "client-ws-transport"] }
jsonrpsee = { version = "0.15.1", features = ["async-client", "client-ws-transport", "jsonrpsee-types"], optional = true }
serde = { version = "1.0.124", features = ["derive"] }
serde_json = "1.0.64"
thiserror = "1.0.24"
Expand Down
61 changes: 53 additions & 8 deletions subxt/src/client/online_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
events::EventsClient,
rpc::{
Rpc,
RpcClient,
RpcClientT,
RuntimeVersion,
},
storage::StorageClient,
Expand Down Expand Up @@ -52,12 +52,14 @@ struct Inner<T: Config> {
impl<T: Config> std::fmt::Debug for OnlineClient<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Client")
.field("rpc", &"<Rpc>")
.field("rpc", &"RpcClient")
.field("inner", &self.inner)
.finish()
}
}

// The default constructors assume Jsonrpsee.
#[cfg(feature = "jsonrpsee")]
impl<T: Config> OnlineClient<T> {
/// Construct a new [`OnlineClient`] using default settings which
/// point to a locally running node on `ws://127.0.0.1:9944`.
Expand All @@ -68,16 +70,20 @@ impl<T: Config> OnlineClient<T> {

/// Construct a new [`OnlineClient`], providing a URL to connect to.
pub async fn from_url(url: impl AsRef<str>) -> Result<OnlineClient<T>, Error> {
let client = crate::rpc::ws_client(url.as_ref()).await?;
let client = jsonrpsee_helpers::ws_client(url.as_ref())
.await
.map_err(|e| crate::error::RpcError(e.to_string()))?;
OnlineClient::from_rpc_client(client).await
}
}

/// Construct a new [`OnlineClient`] by providing the underlying [`RpcClient`]
/// to use to drive the connection.
pub async fn from_rpc_client(
rpc_client: impl Into<RpcClient>,
impl<T: Config> OnlineClient<T> {
/// Construct a new [`OnlineClient`] by providing an underlying [`RpcClientT`]
/// implementation to drive the connection.
pub async fn from_rpc_client<R: RpcClientT>(
rpc_client: R,
) -> Result<OnlineClient<T>, Error> {
let rpc = Rpc::new(rpc_client.into());
let rpc = Rpc::new(rpc_client);

let (genesis_hash, runtime_version, metadata) = future::join3(
rpc.genesis_hash(),
Expand Down Expand Up @@ -232,3 +238,42 @@ impl<T: Config> ClientRuntimeUpdater<T> {
Ok(())
}
}

// helpers for a jsonrpsee specific OnlineClient.
#[cfg(feature = "jsonrpsee")]
mod jsonrpsee_helpers {
pub use jsonrpsee::{
client_transport::ws::{
InvalidUri,
Receiver,
Sender,
Uri,
WsTransportClientBuilder,
},
core::{
client::{
Client,
ClientBuilder,
},
Error,
},
};

/// Build WS RPC client from URL
pub async fn ws_client(url: &str) -> Result<Client, Error> {
let (sender, receiver) = ws_transport(url).await?;
Ok(ClientBuilder::default()
.max_notifs_per_subscription(4096)
.build_with_tokio(sender, receiver))
}

async fn ws_transport(url: &str) -> Result<(Sender, Receiver), Error> {
let url: Uri = url
.parse()
.map_err(|e: InvalidUri| Error::Transport(e.into()))?;
WsTransportClientBuilder::default()
.build(url)
.await
.map_err(|e| Error::Transport(e.into()))
}
}
11 changes: 8 additions & 3 deletions subxt/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ pub use crate::metadata::{
InvalidMetadataError,
MetadataError,
};
pub use jsonrpsee::core::error::Error as RequestError;
pub use scale_value::scale::{
DecodeError,
EncodeError,
Expand All @@ -36,7 +35,7 @@ pub enum Error {
Codec(#[from] codec::Error),
/// Rpc error.
#[error("Rpc error: {0}")]
Rpc(#[from] RequestError),
Rpc(#[from] RpcError),
/// Serde serialization error
#[error("Serde json error: {0}")]
Serialization(#[from] serde_json::error::Error),
Expand Down Expand Up @@ -102,6 +101,12 @@ impl From<DispatchError> for Error {
}
}

/// An RPC error. Since we are generic over the RPC client that is used,
/// the error is any custom string.
#[derive(Debug, thiserror::Error)]
#[error("RPC error: {0}")]
pub struct RpcError(pub String);

/// This is our attempt to decode a runtime DispatchError. We either
/// successfully decode it into a [`ModuleError`], or we fail and keep
/// hold of the bytes, which we can attempt to decode if we have an
Expand Down Expand Up @@ -232,7 +237,7 @@ pub enum TransactionError {
/// block hasn't yet been finalized).
#[error("The finality subscription expired")]
FinalitySubscriptionTimeout,
/// The block hash that the tranaction was added to could not be found.
/// The block hash that the transaction was added to could not be found.
/// This is probably because the block was retracted before being finalized.
#[error("The block containing the transaction can no longer be found (perhaps it was on a non-finalized fork?)")]
BlockHashNotFound,
Expand Down
6 changes: 3 additions & 3 deletions subxt/src/events/event_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
client::OnlineClientT,
error::Error,
events::EventsClient,
rpc::Subscription,
Config,
};
use derivative::Derivative;
Expand All @@ -18,7 +19,6 @@ use futures::{
Stream,
StreamExt,
};
use jsonrpsee::core::client::Subscription;
use sp_runtime::traits::Header;
use std::{
marker::Unpin,
Expand All @@ -32,12 +32,12 @@ pub use super::{
FilterEvents,
};

/// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back
/// A Subscription. This forms a part of the `EventSubscription` type handed back
/// in codegen from `subscribe_finalized`, and is exposed to be used in codegen.
#[doc(hidden)]
pub type FinalizedEventSub<Header> = BoxStream<'static, Result<Header, Error>>;

/// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back
/// A Subscription. This forms a part of the `EventSubscription` type handed back
/// in codegen from `subscribe`, and is exposed to be used in codegen.
#[doc(hidden)]
pub type EventSub<Item> = Subscription<Item>;
Expand Down
86 changes: 86 additions & 0 deletions subxt/src/rpc/jsonrpsee_impl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.

use super::{
RpcClientT,
RpcFuture,
RpcSubscription,
};
use crate::error::RpcError;
use futures::stream::{
StreamExt,
TryStreamExt,
};
use jsonrpsee::{
core::client::{
Client,
ClientT,
SubscriptionClientT,
},
types::ParamsSer,
};
use serde_json::value::{
RawValue,
Value,
};

impl RpcClientT for Client {
fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<RawValue>>,
) -> RpcFuture<'a, Box<RawValue>> {
Box::pin(async move {
let params = prep_params_for_jsonrpsee(params)?;
let res = ClientT::request(self, method, Some(params))
.await
.map_err(|e| RpcError(e.to_string()))?;
Ok(res)
})
}

fn subscribe_raw<'a>(
&'a self,
sub: &'a str,
params: Option<Box<RawValue>>,
unsub: &'a str,
) -> RpcFuture<'a, RpcSubscription> {
Box::pin(async move {
let params = prep_params_for_jsonrpsee(params)?;
let sub = SubscriptionClientT::subscribe::<Box<RawValue>>(
self,
sub,
Some(params),
unsub,
)
.await
.map_err(|e| RpcError(e.to_string()))?
.map_err(|e| RpcError(e.to_string()))
.boxed();
Ok(sub)
})
}
}

// This is ugly; we have to encode to Value's to be compat with the jsonrpc interface.
// Remove and simplify this once something like https://github.com/paritytech/jsonrpsee/issues/862 is in:
fn prep_params_for_jsonrpsee(
params: Option<Box<RawValue>>,
) -> Result<ParamsSer<'static>, RpcError> {
let params = match params {
Some(params) => params,
// No params? avoid any work and bail early.
None => return Ok(ParamsSer::Array(Vec::new())),
};
let val = serde_json::to_value(&params).expect("RawValue guarantees valid JSON");
let arr = match val {
Value::Array(arr) => Ok(arr),
_ => {
Err(RpcError(format!(
"RPC Params are expected to be an array but got {params}"
)))
}
}?;
Ok(ParamsSer::Array(arr))
}
Loading

0 comments on commit 599107b

Please sign in to comment.