Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow generalising over RPC implementation #634

Merged
merged 31 commits into from
Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
68abe93
WIP generalising RPC client
jsdw Aug 22, 2022
3440b79
WIP: non-object-safe RpcClientT.. aah generics everywhere
jsdw Aug 23, 2022
eab9fc0
WIP object-safe RpcClientT trait and no more extra generics
jsdw Aug 23, 2022
642facd
Get core things compiling again with object-safe RpcClientT trait
jsdw Aug 24, 2022
0a29484
Make jsonrpsee optional and get test-runtime working again
jsdw Aug 24, 2022
19dd1b0
cargo fmt
jsdw Aug 24, 2022
6deb5f8
add RpcParams object to enforce correct formatting of rps params
jsdw Aug 24, 2022
1583351
Wee tweaks
jsdw Aug 24, 2022
6b6de23
clippy fixes
jsdw Aug 24, 2022
69a0696
cargo fmt
jsdw Aug 24, 2022
5c4a846
TWeak a few types
jsdw Aug 24, 2022
9e30f78
make sure we get jsonrpsee-types, too
jsdw Aug 24, 2022
c2bd26c
Add examples for rpc_params/RpcParams
jsdw Aug 24, 2022
c70c599
more doc tweaks
jsdw Aug 24, 2022
e61b342
remove a now unneeded dev note
jsdw Aug 24, 2022
f60931d
Option<Box<RawValue>> instead to avoid allocations in some cases
jsdw Aug 25, 2022
8b86509
update docs
jsdw Aug 25, 2022
5a5924a
tweak RpcClientT trait docs
jsdw Aug 25, 2022
a9ad2d8
Tweak docs around RpcClient and RpcClientT. Don't expose RpcClientT d…
jsdw Aug 25, 2022
0ebfe08
more doc tweaking about RpcParams and undo decision not to expose Rpc…
jsdw Aug 25, 2022
beb959d
Doc tweak
jsdw Aug 25, 2022
439c43f
more doc tweaks
jsdw Aug 25, 2022
4dd18c2
Fix doc thing
jsdw Aug 25, 2022
1f393ba
Merge branch 'master' into jsdw-generic-rpc
jsdw Aug 25, 2022
ea6d6a3
Add an example of injecting a custom RPC client
jsdw Aug 25, 2022
f59c0a1
Fix a typo
jsdw Aug 25, 2022
ec08976
Address clippy things in example
jsdw Aug 26, 2022
0fdc5e0
Fix a silly typo
jsdw Aug 26, 2022
8a342b9
another clippy fix
jsdw Aug 26, 2022
e028c94
rpc_params to panic instead of returning a result, like serde_json::j…
jsdw Aug 30, 2022
682be15
fix docs
jsdw Aug 30, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,3 @@ futures = "0.3.13"
codec = { package = "parity-scale-codec", version = "3.0.0", default-features = false, features = ["derive", "full", "bit-vec"] }
hex = "0.4.3"
tracing-subscriber = "0.3.11"

2 changes: 1 addition & 1 deletion examples/examples/custom_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.

//! This example should compile but should aos fail to work, since we've modified the
//! This example should compile but should fail to work, since we've modified the
//! config to not align with a Polkadot node.

use sp_keyring::AccountKeyring;
Expand Down
96 changes: 96 additions & 0 deletions examples/examples/custom_rpc_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.

use std::{
fmt::Write,
pin::Pin,
sync::{
Arc,
Mutex,
},
};
use subxt::{
rpc::{
RawValue,
RpcClientT,
RpcFuture,
RpcSubscription,
},
OnlineClient,
PolkadotConfig,
};

// A dummy RPC client that doesn't actually handle requests properly
// at all, but instead just logs what requests to it were made.
struct MyLoggingClient {
log: Arc<Mutex<String>>,
}

// We have to implement this fairly low level trait to turn [`MyLoggingClient`]
// into an RPC client that we can make use of in Subxt. Here we just log the requests
// made but don't forward them to any real node, and instead just return nonsense.
impl RpcClientT for MyLoggingClient {
fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<RawValue>>,
) -> RpcFuture<'a, Box<RawValue>> {
writeln!(
self.log.lock().unwrap(),
"{method}({})",
params.as_ref().map(|p| p.get()).unwrap_or("[]")
)
.unwrap();

// We've logged the request; just return garbage. Because a boxed future is returned,
// you're able to run whatever async code you'd need to actually talk to a node.
let res = RawValue::from_string("[]".to_string()).unwrap();
Box::pin(std::future::ready(Ok(res)))
}

fn subscribe_raw<'a>(
&'a self,
sub: &'a str,
params: Option<Box<RawValue>>,
unsub: &'a str,
) -> RpcFuture<'a, RpcSubscription> {
writeln!(
self.log.lock().unwrap(),
"{sub}({}) (unsub: {unsub})",
params.as_ref().map(|p| p.get()).unwrap_or("[]")
)
.unwrap();

// We've logged the request; just return garbage. Because a boxed future is returned,
// and that will return a boxed Stream impl, you have a bunch of flexibility to build
// and return whatever type of Stream you see fit.
let res = RawValue::from_string("[]".to_string()).unwrap();
let stream = futures::stream::once(async move { Ok(res) });
let stream: Pin<Box<dyn futures::Stream<Item = _> + Send>> = Box::pin(stream);
Box::pin(std::future::ready(Ok(stream)))
}
}

#[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata.scale")]
pub mod polkadot {}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();

// Instantiate our replacement RPC client.
let log = Arc::default();
let rpc_client = MyLoggingClient {
log: Arc::clone(&log),
};

// Pass this into our OnlineClient to instantiate it. This will lead to some
// RPC calls being made to fetch chain details/metadata, which will immediately
// fail..
let _ = OnlineClient::<PolkadotConfig>::from_rpc_client(rpc_client).await;

// But, we can see that the calls were made via our custom RPC client:
println!("Log of calls made:\n\n{}", log.lock().unwrap().as_str());
Ok(())
}
8 changes: 7 additions & 1 deletion subxt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@ description = "Submit extrinsics (transactions) to a substrate node via RPC"
keywords = ["parity", "substrate", "blockchain"]

[features]
default = ["jsonrpsee"]

# Activate this to expose functionality only used for integration testing.
# The exposed functionality is subject to breaking changes at any point,
# and should not be relied upon.
integration-tests = []

# Jsonrpsee if the default RPC provider used in Subxt. However, it can be
# swapped out for an alternative implementation, and so is optional.
jsonrpsee = ["dep:jsonrpsee"]
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved

[dependencies]
bitvec = { version = "1.0.0", default-features = false, features = ["alloc"] }
codec = { package = "parity-scale-codec", version = "3.0.0", default-features = false, features = ["derive", "full", "bit-vec"] }
Expand All @@ -26,7 +32,7 @@ scale-value = "0.5.0"
scale-decode = "0.3.0"
futures = "0.3.13"
hex = "0.4.3"
jsonrpsee = { version = "0.15.1", features = ["async-client", "client-ws-transport"] }
jsonrpsee = { version = "0.15.1", features = ["async-client", "client-ws-transport", "jsonrpsee-types"], optional = true }
serde = { version = "1.0.124", features = ["derive"] }
serde_json = "1.0.64"
thiserror = "1.0.24"
Expand Down
61 changes: 53 additions & 8 deletions subxt/src/client/online_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
events::EventsClient,
rpc::{
Rpc,
RpcClient,
RpcClientT,
RuntimeVersion,
},
storage::StorageClient,
Expand Down Expand Up @@ -52,12 +52,14 @@ struct Inner<T: Config> {
impl<T: Config> std::fmt::Debug for OnlineClient<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Client")
.field("rpc", &"<Rpc>")
.field("rpc", &"RpcClient")
.field("inner", &self.inner)
.finish()
}
}

// The default constructors assume Jsonrpsee.
#[cfg(feature = "jsonrpsee")]
impl<T: Config> OnlineClient<T> {
/// Construct a new [`OnlineClient`] using default settings which
/// point to a locally running node on `ws://127.0.0.1:9944`.
Expand All @@ -68,16 +70,20 @@ impl<T: Config> OnlineClient<T> {

/// Construct a new [`OnlineClient`], providing a URL to connect to.
pub async fn from_url(url: impl AsRef<str>) -> Result<OnlineClient<T>, Error> {
let client = crate::rpc::ws_client(url.as_ref()).await?;
let client = jsonrpsee_helpers::ws_client(url.as_ref())
.await
.map_err(|e| crate::error::RpcError(e.to_string()))?;
OnlineClient::from_rpc_client(client).await
}
}

/// Construct a new [`OnlineClient`] by providing the underlying [`RpcClient`]
/// to use to drive the connection.
pub async fn from_rpc_client(
rpc_client: impl Into<RpcClient>,
impl<T: Config> OnlineClient<T> {
/// Construct a new [`OnlineClient`] by providing an underlying [`RpcClientT`]
/// implementation to drive the connection.
pub async fn from_rpc_client<R: RpcClientT>(
rpc_client: R,
) -> Result<OnlineClient<T>, Error> {
let rpc = Rpc::new(rpc_client.into());
let rpc = Rpc::new(rpc_client);

let (genesis_hash, runtime_version, metadata) = future::join3(
rpc.genesis_hash(),
Expand Down Expand Up @@ -232,3 +238,42 @@ impl<T: Config> ClientRuntimeUpdater<T> {
Ok(())
}
}

// helpers for a jsonrpsee specific OnlineClient.
#[cfg(feature = "jsonrpsee")]
mod jsonrpsee_helpers {
pub use jsonrpsee::{
client_transport::ws::{
InvalidUri,
Receiver,
Sender,
Uri,
WsTransportClientBuilder,
},
core::{
client::{
Client,
ClientBuilder,
},
Error,
},
};

/// Build WS RPC client from URL
pub async fn ws_client(url: &str) -> Result<Client, Error> {
let (sender, receiver) = ws_transport(url).await?;
Ok(ClientBuilder::default()
.max_notifs_per_subscription(4096)
.build_with_tokio(sender, receiver))
}

async fn ws_transport(url: &str) -> Result<(Sender, Receiver), Error> {
let url: Uri = url
.parse()
.map_err(|e: InvalidUri| Error::Transport(e.into()))?;
WsTransportClientBuilder::default()
.build(url)
.await
.map_err(|e| Error::Transport(e.into()))
}
}
11 changes: 8 additions & 3 deletions subxt/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ pub use crate::metadata::{
InvalidMetadataError,
MetadataError,
};
pub use jsonrpsee::core::error::Error as RequestError;
pub use scale_value::scale::{
DecodeError,
EncodeError,
Expand All @@ -36,7 +35,7 @@ pub enum Error {
Codec(#[from] codec::Error),
/// Rpc error.
#[error("Rpc error: {0}")]
Rpc(#[from] RequestError),
Rpc(#[from] RpcError),
/// Serde serialization error
#[error("Serde json error: {0}")]
Serialization(#[from] serde_json::error::Error),
Expand Down Expand Up @@ -102,6 +101,12 @@ impl From<DispatchError> for Error {
}
}

/// An RPC error. Since we are generic over the RPC client that is used,
/// the error is any custom string.
#[derive(Debug, thiserror::Error)]
#[error("RPC error: {0}")]
pub struct RpcError(pub String);

/// This is our attempt to decode a runtime DispatchError. We either
/// successfully decode it into a [`ModuleError`], or we fail and keep
/// hold of the bytes, which we can attempt to decode if we have an
Expand Down Expand Up @@ -232,7 +237,7 @@ pub enum TransactionError {
/// block hasn't yet been finalized).
#[error("The finality subscription expired")]
FinalitySubscriptionTimeout,
/// The block hash that the tranaction was added to could not be found.
/// The block hash that the transaction was added to could not be found.
/// This is probably because the block was retracted before being finalized.
#[error("The block containing the transaction can no longer be found (perhaps it was on a non-finalized fork?)")]
BlockHashNotFound,
Expand Down
6 changes: 3 additions & 3 deletions subxt/src/events/event_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
client::OnlineClientT,
error::Error,
events::EventsClient,
rpc::Subscription,
Config,
};
use derivative::Derivative;
Expand All @@ -18,7 +19,6 @@ use futures::{
Stream,
StreamExt,
};
use jsonrpsee::core::client::Subscription;
use sp_runtime::traits::Header;
use std::{
marker::Unpin,
Expand All @@ -32,12 +32,12 @@ pub use super::{
FilterEvents,
};

/// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back
/// A Subscription. This forms a part of the `EventSubscription` type handed back
/// in codegen from `subscribe_finalized`, and is exposed to be used in codegen.
#[doc(hidden)]
pub type FinalizedEventSub<Header> = BoxStream<'static, Result<Header, Error>>;

/// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back
/// A Subscription. This forms a part of the `EventSubscription` type handed back
/// in codegen from `subscribe`, and is exposed to be used in codegen.
#[doc(hidden)]
pub type EventSub<Item> = Subscription<Item>;
Expand Down
86 changes: 86 additions & 0 deletions subxt/src/rpc/jsonrpsee_impl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.

use super::{
RpcClientT,
RpcFuture,
RpcSubscription,
};
use crate::error::RpcError;
use futures::stream::{
StreamExt,
TryStreamExt,
};
use jsonrpsee::{
core::client::{
Client,
ClientT,
SubscriptionClientT,
},
types::ParamsSer,
};
use serde_json::value::{
RawValue,
Value,
};

impl RpcClientT for Client {
fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<RawValue>>,
) -> RpcFuture<'a, Box<RawValue>> {
Box::pin(async move {
let params = prep_params_for_jsonrpsee(params)?;
let res = ClientT::request(self, method, Some(params))
.await
.map_err(|e| RpcError(e.to_string()))?;
Ok(res)
})
}

fn subscribe_raw<'a>(
&'a self,
sub: &'a str,
params: Option<Box<RawValue>>,
unsub: &'a str,
) -> RpcFuture<'a, RpcSubscription> {
Box::pin(async move {
let params = prep_params_for_jsonrpsee(params)?;
let sub = SubscriptionClientT::subscribe::<Box<RawValue>>(
self,
sub,
Some(params),
unsub,
)
.await
.map_err(|e| RpcError(e.to_string()))?
.map_err(|e| RpcError(e.to_string()))
.boxed();
Ok(sub)
})
}
}

// This is ugly; we have to encode to Value's to be compat with the jsonrpc interface.
// Remove and simplify this once something like https://github.com/paritytech/jsonrpsee/issues/862 is in:
fn prep_params_for_jsonrpsee(
params: Option<Box<RawValue>>,
) -> Result<ParamsSer<'static>, RpcError> {
let params = match params {
Some(params) => params,
// No params? avoid any work and bail early.
None => return Ok(ParamsSer::Array(Vec::new())),
};
let val = serde_json::to_value(&params).expect("RawValue guarantees valid JSON");
let arr = match val {
Value::Array(arr) => Ok(arr),
_ => {
Err(RpcError(format!(
"RPC Params are expected to be an array but got {params}"
)))
}
}?;
Ok(ParamsSer::Array(arr))
}
Loading