diff --git a/Cargo.lock b/Cargo.lock index 3447bed57ac3..6cafa0802c59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4111,6 +4111,15 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata", +] + [[package]] name = "matches" version = "0.1.9" @@ -9170,7 +9179,7 @@ dependencies = [ "thiserror", "tracing", "tracing-log", - "tracing-subscriber", + "tracing-subscriber 0.2.25", ] [[package]] @@ -10264,7 +10273,7 @@ dependencies = [ "sp-std", "tracing", "tracing-core", - "tracing-subscriber", + "tracing-subscriber 0.2.25", ] [[package]] @@ -10379,7 +10388,6 @@ name = "staking-miner" version = "0.9.17" dependencies = [ "clap", - "env_logger 0.9.0", "frame-election-provider-support", "frame-support", "frame-system", @@ -10407,6 +10415,7 @@ dependencies = [ "sub-tokens", "thiserror", "tokio", + "tracing-subscriber 0.3.8", "westend-runtime", ] @@ -10782,9 +10791,9 @@ checksum = "3bf63baf9f5039dadc247375c29eb13706706cfde997d0330d05aa63a77d8820" [[package]] name = "thread_local" -version = "1.1.3" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8018d24e04c95ac8790716a5987d0fec4f8b27249ffa0f7d33f1369bdfb88cbd" +checksum = "5516c27b78311c50bf42c071425c560ac799b11c30b31f87e3081965fe5e0180" dependencies = [ "once_cell", ] @@ -11086,7 +11095,7 @@ dependencies = [ "ansi_term", "chrono", "lazy_static", - "matchers", + "matchers 0.0.1", "parking_lot 0.11.2", "regex", "serde", @@ -11100,6 +11109,24 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "tracing-subscriber" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74786ce43333fcf51efe947aed9718fbe46d5c7328ec3f1029e818083966d9aa" +dependencies = [ + "ansi_term", + "lazy_static", + "matchers 0.1.0", + "regex", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", +] + [[package]] name = "trie-db" version = "0.23.1" diff --git a/utils/staking-miner/Cargo.toml b/utils/staking-miner/Cargo.toml index 685ea333c07e..c29db291e0fe 100644 --- a/utils/staking-miner/Cargo.toml +++ b/utils/staking-miner/Cargo.toml @@ -7,8 +7,8 @@ edition = "2018" [dependencies] clap = { version = "3.0", features = ["derive", "env"] } codec = { package = "parity-scale-codec", version = "2.0.0" } -env_logger = "0.9.0" -jsonrpsee = { version = "0.8", features = ["ws-client"] } +tracing-subscriber = { version = "0.3.8", features = ["env-filter"] } +jsonrpsee = { version = "0.8", features = ["ws-client", "macros"] } log = "0.4.11" paste = "1.0.6" serde = "1.0.136" diff --git a/utils/staking-miner/src/dry_run.rs b/utils/staking-miner/src/dry_run.rs index 225475956265..7c11f7ee106e 100644 --- a/utils/staking-miner/src/dry_run.rs +++ b/utils/staking-miner/src/dry_run.rs @@ -16,12 +16,10 @@ //! The dry-run command. -use crate::{ - prelude::*, rpc_helpers::*, signer::Signer, DryRunConfig, Error, SharedConfig, WsClient, -}; +use crate::{prelude::*, rpc::*, signer::Signer, DryRunConfig, Error, SharedRpcClient}; use codec::Encode; use frame_support::traits::Currency; -use jsonrpsee::rpc_params; +use sp_core::Bytes; use sp_npos_elections::ElectionScore; /// Forcefully create the snapshot. This can be used to compute the election at anytime. @@ -39,10 +37,10 @@ fn force_create_snapshot(ext: &mut Ext) -> Result<(), Error> /// Helper method to print the encoded size of the snapshot. async fn print_info( - client: &WsClient, + rpc: &SharedRpcClient, ext: &mut Ext, raw_solution: &EPM::RawSolution>, - extrinsic: sp_core::Bytes, + extrinsic: &Bytes, ) where ::Currency: Currency, { @@ -79,12 +77,8 @@ async fn print_info( ); }); - let info = rpc::>( - client, - "payment_queryInfo", - rpc_params! { extrinsic }, - ) - .await; + let info = rpc.payment_query_info(&extrinsic, None).await; + log::info!( target: LOG_TARGET, "payment_queryInfo: (fee = {}) {:?}", @@ -116,14 +110,13 @@ fn find_threshold(ext: &mut Ext, count: usize) { macro_rules! dry_run_cmd_for { ($runtime:ident) => { paste::paste! { /// Execute the dry-run command. pub(crate) async fn []( - client: &WsClient, - shared: SharedConfig, + rpc: SharedRpcClient, config: DryRunConfig, signer: Signer, ) -> Result<(), Error<$crate::[<$runtime _runtime_exports>]::Runtime>> { use $crate::[<$runtime _runtime_exports>]::*; let mut ext = crate::create_election_ext::( - shared.uri.clone(), + rpc.clone(), config.at, vec!["Staking".to_string(), "System".to_string()], ).await?; @@ -131,7 +124,7 @@ macro_rules! dry_run_cmd_for { ($runtime:ident) => { paste::paste! { let (raw_solution, witness) = crate::mine_with::(&config.solver, &mut ext, false)?; - let nonce = crate::get_account_info::(client, &signer.account, config.at) + let nonce = crate::get_account_info::(&rpc, &signer.account, config.at) .await? .map(|i| i.nonce) .expect("signer account is checked to exist upon startup; it can only die if it \ @@ -143,7 +136,7 @@ macro_rules! dry_run_cmd_for { ($runtime:ident) => { paste::paste! { let extrinsic = ext.execute_with(|| create_uxt(raw_solution.clone(), witness, signer.clone(), nonce, tip, era)); let bytes = sp_core::Bytes(extrinsic.encode().to_vec()); - print_info::(client, &mut ext, &raw_solution, bytes.clone()).await; + print_info::(&rpc, &mut ext, &raw_solution, &bytes).await; let feasibility_result = ext.execute_with(|| { EPM::Pallet::::feasibility_check(raw_solution.clone(), EPM::ElectionCompute::Signed) @@ -157,9 +150,8 @@ macro_rules! dry_run_cmd_for { ($runtime:ident) => { paste::paste! { }); log::info!(target: LOG_TARGET, "dispatch result is {:?}", dispatch_result); - let outcome = rpc_decode::(client, "system_dryRun", rpc_params!{ bytes }) - .await - .map_err::, _>(Into::into)?; + let dry_run_fut = rpc.dry_run(&bytes, None); + let outcome: sp_runtime::ApplyExtrinsicResult = await_request_and_decode(dry_run_fut).await.map_err::, _>(Into::into)?; log::info!(target: LOG_TARGET, "dry-run outcome is {:?}", outcome); Ok(()) } diff --git a/utils/staking-miner/src/emergency_solution.rs b/utils/staking-miner/src/emergency_solution.rs index 43bfc389cca9..a59aaa0acada 100644 --- a/utils/staking-miner/src/emergency_solution.rs +++ b/utils/staking-miner/src/emergency_solution.rs @@ -16,19 +16,19 @@ //! The emergency-solution command. -use crate::{prelude::*, EmergencySolutionConfig, Error, SharedConfig}; +use crate::{prelude::*, EmergencySolutionConfig, Error, SharedRpcClient}; use codec::Encode; use std::io::Write; macro_rules! emergency_solution_cmd_for { ($runtime:ident) => { paste::paste! { /// Execute the emergency-solution command. pub(crate) async fn []( - shared: SharedConfig, + client: SharedRpcClient, config: EmergencySolutionConfig, ) -> Result<(), Error<$crate::[<$runtime _runtime_exports>]::Runtime>> { use $crate::[<$runtime _runtime_exports>]::*; - let mut ext = crate::create_election_ext::(shared.uri.clone(), config.at, vec![]).await?; + let mut ext = crate::create_election_ext::(client, config.at, vec![]).await?; let (raw_solution, _witness) = crate::mine_with::(&config.solver, &mut ext, false)?; ext.execute_with(|| { diff --git a/utils/staking-miner/src/main.rs b/utils/staking-miner/src/main.rs index daf29d8c2809..045a849eee5e 100644 --- a/utils/staking-miner/src/main.rs +++ b/utils/staking-miner/src/main.rs @@ -32,7 +32,7 @@ mod dry_run; mod emergency_solution; mod monitor; mod prelude; -mod rpc_helpers; +mod rpc; mod signer; pub(crate) use prelude::*; @@ -43,8 +43,12 @@ use frame_election_provider_support::NposSolver; use frame_support::traits::Get; use jsonrpsee::ws_client::{WsClient, WsClientBuilder}; use remote_externalities::{Builder, Mode, OnlineConfig}; +use rpc::{RpcApiClient, SharedRpcClient}; use sp_npos_elections::ExtendedBalance; use sp_runtime::{traits::Block as BlockT, DeserializeOwned}; +use tracing_subscriber::{fmt, EnvFilter}; + +use std::{ops::Deref, sync::Arc}; pub(crate) enum AnyRuntime { Polkadot, @@ -90,10 +94,10 @@ macro_rules! construct_runtime_prelude { let extra: SignedExtra = crate::[](nonce, tip, era); let raw_payload = SignedPayload::new(call, extra).expect("creating signed payload infallible; qed."); let signature = raw_payload.using_encoded(|payload| { - pair.clone().sign(payload) + pair.sign(payload) }); let (call, extra, _) = raw_payload.deconstruct(); - let address = ::Lookup::unlookup(account.clone()); + let address = ::Lookup::unlookup(account); let extrinsic = UncheckedExtrinsic::new_signed(call, address, signature.into(), extra); log::debug!( target: crate::LOG_TARGET, "constructed extrinsic {} with length {}", @@ -229,7 +233,7 @@ macro_rules! any_runtime_unit { enum Error { Io(#[from] std::io::Error), JsonRpsee(#[from] jsonrpsee::core::Error), - RpcHelperError(#[from] rpc_helpers::RpcHelperError), + RpcHelperError(#[from] rpc::RpcHelperError), Codec(#[from] codec::Error), Crypto(sp_core::crypto::SecretStringError), RemoteExternalities(&'static str), @@ -368,7 +372,7 @@ struct Opt { /// Build the Ext at hash with all the data of `ElectionProviderMultiPhase` and any additional /// pallets. async fn create_election_ext( - uri: String, + client: SharedRpcClient, at: Option, additional: Vec, ) -> Result> { @@ -381,7 +385,7 @@ async fn create_election_ext( pallets.extend(additional); Builder::::new() .mode(Mode::Online(OnlineConfig { - transport: uri.into(), + transport: client.into_inner().into(), at, pallets, ..Default::default() @@ -493,13 +497,13 @@ fn mine_dpos(ext: &mut Ext) -> Result<(), Error> { } pub(crate) async fn check_versions( - client: &WsClient, + rpc: &SharedRpcClient, ) -> Result<(), Error> { let linked_version = T::Version::get(); - let on_chain_version = - rpc_helpers::rpc::(client, "state_getRuntimeVersion", None) - .await - .expect("runtime version RPC should always work; qed"); + let on_chain_version = rpc + .runtime_version(None) + .await + .expect("runtime version RPC should always work; qed"); log::debug!(target: LOG_TARGET, "linked version {:?}", linked_version); log::debug!(target: LOG_TARGET, "on-chain version {:?}", on_chain_version); @@ -517,20 +521,13 @@ pub(crate) async fn check_versions( #[tokio::main] async fn main() { - env_logger::Builder::from_default_env() - .format_module_path(true) - .format_level(true) - .init(); + fmt().with_env_filter(EnvFilter::from_default_env()).init(); + let Opt { shared, command } = Opt::parse(); log::debug!(target: LOG_TARGET, "attempting to connect to {:?}", shared.uri); - let client = loop { - let maybe_client = WsClientBuilder::default() - .connection_timeout(std::time::Duration::new(20, 0)) - .max_request_body_size(u32::MAX) - .build(&shared.uri) - .await; - match maybe_client { + let rpc = loop { + match SharedRpcClient::new(&shared.uri).await { Ok(client) => break client, Err(why) => { log::warn!( @@ -538,14 +535,12 @@ async fn main() { "failed to connect to client due to {:?}, retrying soon..", why ); - std::thread::sleep(std::time::Duration::from_millis(2500)); + tokio::time::sleep(std::time::Duration::from_millis(2500)).await; }, } }; - let chain = rpc_helpers::rpc::(&client, "system_chain", None) - .await - .expect("system_chain infallible; qed."); + let chain: String = rpc.system_chain().await.expect("system_chain infallible; qed."); match chain.to_lowercase().as_str() { "polkadot" | "development" => { sp_core::crypto::set_default_ss58_version( @@ -591,26 +586,26 @@ async fn main() { log::info!(target: LOG_TARGET, "connected to chain {:?}", chain); any_runtime_unit! { - check_versions::(&client).await + check_versions::(&rpc).await }; let signer_account = any_runtime! { - signer::signer_uri_from_string::(&shared.seed, &client) + signer::signer_uri_from_string::(&shared.seed, &rpc) .await .expect("Provided account is invalid, terminating.") }; let outcome = any_runtime! { - match command.clone() { - Command::Monitor(c) => monitor_cmd(client, shared, c, signer_account).await + match command { + Command::Monitor(cmd) => monitor_cmd(rpc, cmd, signer_account).await .map_err(|e| { log::error!(target: LOG_TARGET, "Monitor error: {:?}", e); }), - Command::DryRun(c) => dry_run_cmd(&client, shared, c, signer_account).await + Command::DryRun(cmd) => dry_run_cmd(rpc, cmd, signer_account).await .map_err(|e| { log::error!(target: LOG_TARGET, "DryRun error: {:?}", e); }), - Command::EmergencySolution(c) => emergency_solution_cmd(shared.clone(), c).await + Command::EmergencySolution(cmd) => emergency_solution_cmd(rpc, cmd).await .map_err(|e| { log::error!(target: LOG_TARGET, "EmergencySolution error: {:?}", e); }), diff --git a/utils/staking-miner/src/monitor.rs b/utils/staking-miner/src/monitor.rs index 8d71b242a411..1e7d3a3422fb 100644 --- a/utils/staking-miner/src/monitor.rs +++ b/utils/staking-miner/src/monitor.rs @@ -16,28 +16,21 @@ //! The monitor command. -use crate::{prelude::*, rpc_helpers::*, signer::Signer, Error, MonitorConfig, SharedConfig}; +use crate::{prelude::*, rpc::*, signer::Signer, Error, MonitorConfig, SharedRpcClient}; use codec::Encode; -use jsonrpsee::{ - core::{ - client::{Subscription, SubscriptionClientT}, - Error as RpcError, - }, - rpc_params, - ws_client::WsClient, -}; +use jsonrpsee::core::Error as RpcError; use sc_transaction_pool_api::TransactionStatus; use sp_core::storage::StorageKey; -use std::sync::Arc; use tokio::sync::mpsc; /// Ensure that now is the signed phase. -async fn ensure_signed_phase( - client: &WsClient, +async fn ensure_signed_phase>( + rpc: &SharedRpcClient, at: B::Hash, ) -> Result<(), Error> { let key = StorageKey(EPM::CurrentPhase::::hashed_key().to_vec()); - let phase = get_storage::>(client, rpc_params! {key, at}) + let phase = rpc + .get_storage_and_decode::>(&key, Some(at)) .await .map_err::, _>(Into::into)? .unwrap_or_default(); @@ -71,23 +64,21 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { /// The monitor command. pub(crate) async fn []( - client: WsClient, - shared: SharedConfig, + rpc: SharedRpcClient, config: MonitorConfig, signer: Signer, ) -> Result<(), Error<$crate::[<$runtime _runtime_exports>]::Runtime>> { use $crate::[<$runtime _runtime_exports>]::*; type StakingMinerError = Error<$crate::[<$runtime _runtime_exports>]::Runtime>; - let (sub, unsub) = if config.listen == "head" { - ("chain_subscribeNewHeads", "chain_unsubscribeNewHeads") - } else { - ("chain_subscribeFinalizedHeads", "chain_unsubscribeFinalizedHeads") - }; - - let mut subscription: Subscription
= client.subscribe(&sub, None, &unsub).await?; + let heads_subscription = || + if config.listen == "head" { + rpc.subscribe_new_heads() + } else { + rpc.subscribe_finalized_heads() + }; - let client = Arc::new(client); + let mut subscription = heads_subscription().await?; let (tx, mut rx) = mpsc::unbounded_channel::(); loop { @@ -97,8 +88,8 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { Some(Ok(r)) => r, // Custom `jsonrpsee` message sent by the server if the subscription was closed on the server side. Some(Err(RpcError::SubscriptionClosed(reason))) => { - log::warn!(target: LOG_TARGET, "subscription to {} terminated: {:?}. Retrying..", sub, reason); - subscription = client.subscribe(&sub, None, &unsub).await?; + log::warn!(target: LOG_TARGET, "subscription to `subscribeNewHeads/subscribeFinalizedHeads` terminated: {:?}. Retrying..", reason); + subscription = heads_subscription().await?; continue; } Some(Err(e)) => { @@ -109,8 +100,8 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { // - the connection was closed. // - the subscription could not keep up with the server. None => { - log::warn!(target: LOG_TARGET, "subscription to {} terminated. Retrying..", sub); - subscription = client.subscribe(&sub, None, &unsub).await?; + log::warn!(target: LOG_TARGET, "subscription to `subscribeNewHeads/subscribeFinalizedHeads` terminated. Retrying.."); + subscription = heads_subscription().await?; continue } } @@ -126,17 +117,17 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { // Spawn task and non-recoverable errors are sent back to the main task // such as if the connection has been closed. tokio::spawn( - send_and_watch_extrinsic(client.clone(), tx.clone(), at, signer.clone(), shared.clone(), config.clone()) + send_and_watch_extrinsic(rpc.clone(), tx.clone(), at, signer.clone(), config.clone()) ); + } /// Construct extrinsic at given block and watch it. async fn send_and_watch_extrinsic( - client: Arc, + rpc: SharedRpcClient, tx: mpsc::UnboundedSender, at: Header, signer: Signer, - shared: SharedConfig, config: MonitorConfig, ) { @@ -144,20 +135,20 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { log::trace!(target: LOG_TARGET, "new event at #{:?} ({:?})", at.number, hash); // if the runtime version has changed, terminate. - if let Err(err) = crate::check_versions::(&*client).await { + if let Err(err) = crate::check_versions::(&rpc).await { let _ = tx.send(err.into()); return; } // we prefer doing this check before fetching anything into a remote-ext. - if ensure_signed_phase::(&*client, hash).await.is_err() { + if ensure_signed_phase::(&rpc, hash).await.is_err() { log::debug!(target: LOG_TARGET, "phase closed, not interested in this block at all."); return; } // grab an externalities without staking, just the election snapshot. let mut ext = match crate::create_election_ext::( - shared.uri.clone(), + rpc.clone(), Some(hash), vec![], ).await { @@ -184,7 +175,7 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { log::info!(target: LOG_TARGET, "mined solution with {:?}", &raw_solution.score); - let nonce = match crate::get_account_info::(&*client, &signer.account, Some(hash)).await { + let nonce = match crate::get_account_info::(&rpc, &signer.account, Some(hash)).await { Ok(maybe_account) => { let acc = maybe_account.expect(crate::signer::SIGNER_ACCOUNT_WILL_EXIST); acc.nonce @@ -209,13 +200,7 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { let extrinsic = ext.execute_with(|| create_uxt(raw_solution, witness, signer.clone(), nonce, tip, era)); let bytes = sp_core::Bytes(extrinsic.encode()); - let mut tx_subscription: Subscription< - TransactionStatus<::Hash, ::Hash> - > = match client.subscribe( - "author_submitAndWatchExtrinsic", - rpc_params! { bytes }, - "author_unwatchExtrinsic" - ).await { + let mut tx_subscription = match rpc.watch_extrinsic(&bytes).await { Ok(sub) => sub, Err(RpcError::RestartNeeded(e)) => { let _ = tx.send(RpcError::RestartNeeded(e).into()); @@ -267,11 +252,10 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { let key = StorageKey( frame_support::storage::storage_prefix(b"System", b"Events").to_vec(), ); - let key2 = key.clone(); - let events = match get_storage::< + let events = match rpc.get_storage_and_decode::< Vec::Hash>>, - >(&*client, rpc_params! { key, hash }) + >(&key, Some(hash)) .await { Ok(rp) => rp.unwrap_or_default(), Err(RpcHelperError::JsonRpsee(RpcError::RestartNeeded(e))) => { @@ -281,7 +265,7 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { // Decoding or other RPC error => just terminate the task. Err(e) => { log::warn!(target: LOG_TARGET, "get_storage [key: {:?}, hash: {:?}] failed: {:?}; skip block: {}", - key2, hash, e, at.number + key, hash, e, at.number ); return; } diff --git a/utils/staking-miner/src/prelude.rs b/utils/staking-miner/src/prelude.rs index db9ce4e2c1e2..eeca85c9b704 100644 --- a/utils/staking-miner/src/prelude.rs +++ b/utils/staking-miner/src/prelude.rs @@ -30,11 +30,13 @@ pub type Balance = core_primitives::Balance; pub type Index = core_primitives::AccountIndex; /// The hash type. We re-export it here, but we can easily get it from block as well. pub type Hash = core_primitives::Hash; +/// The header type. We re-export it here, but we can easily get it from block as well. +pub type Header = core_primitives::Header; pub use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; /// Default URI to connect to. -pub const DEFAULT_URI: &str = "wss://rpc.polkadot.io"; +pub const DEFAULT_URI: &str = "wss://rpc.polkadot.io:443"; /// The logging target. pub const LOG_TARGET: &str = "staking-miner"; diff --git a/utils/staking-miner/src/rpc.rs b/utils/staking-miner/src/rpc.rs new file mode 100644 index 000000000000..ad973be2a32e --- /dev/null +++ b/utils/staking-miner/src/rpc.rs @@ -0,0 +1,166 @@ +// Copyright 2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! JSON-RPC related types and helpers. + +use super::*; +use jsonrpsee::{ + core::{Error as RpcError, RpcResult}, + proc_macros::rpc, +}; +use pallet_transaction_payment::RuntimeDispatchInfo; +use sc_transaction_pool_api::TransactionStatus; +use sp_core::{storage::StorageKey, Bytes}; +use sp_version::RuntimeVersion; +use std::{future::Future, time::Duration}; + +const MAX_CONNECTION_DURATION: Duration = Duration::from_secs(20); +const MAX_REQUEST_DURATION: Duration = Duration::from_secs(60); + +#[derive(frame_support::DebugNoBound, thiserror::Error)] +pub(crate) enum RpcHelperError { + JsonRpsee(#[from] jsonrpsee::core::Error), + Codec(#[from] codec::Error), +} + +impl std::fmt::Display for RpcHelperError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + ::fmt(self, f) + } +} + +#[rpc(client)] +pub trait RpcApi { + /// Fetch system name. + #[method(name = "system_chain")] + async fn system_chain(&self) -> RpcResult; + + /// Fetch a storage key. + #[method(name = "state_getStorage")] + async fn storage(&self, key: &StorageKey, hash: Option) -> RpcResult>; + + /// Fetch the runtime version. + #[method(name = "state_getRuntimeVersion")] + async fn runtime_version(&self, at: Option) -> RpcResult; + + /// Fetch the payment query info. + #[method(name = "payment_queryInfo")] + async fn payment_query_info( + &self, + encoded_xt: &Bytes, + at: Option<&Hash>, + ) -> RpcResult>; + + /// Dry run an extrinsic at a given block. Return SCALE encoded [`sp_runtine::ApplyExtrinsicResult`]. + #[method(name = "system_dryRun")] + async fn dry_run(&self, extrinsic: &Bytes, at: Option) -> RpcResult; + + /// Submit an extrinsic to watch. + /// + /// See [`TransactionStatus`](sc_transaction_pool_api::TransactionStatus) for details on + /// transaction life cycle. + // + // TODO: https://github.com/paritytech/jsonrpsee/issues/698. + #[subscription( + name = "author_submitAndWatchExtrinsic" => "author_extrinsicUpdate", + item = TransactionStatus, + )] + fn watch_extrinsic(&self, bytes: &Bytes) -> RpcResult<()>; + + /// New head subscription. + #[subscription( + name = "chain_subscribeNewHeads" => "newHead", + item = Header + )] + fn subscribe_new_heads(&self) -> RpcResult<()>; + + /// Finalized head subscription. + #[subscription( + name = "chain_subscribeFinalizedHeads" => "chain_finalizedHead", + item = Header + )] + fn subscribe_finalized_heads(&self) -> RpcResult<()>; +} + +/// Wraps a shared web-socket JSON-RPC client that can be cloned. +#[derive(Clone, Debug)] +pub(crate) struct SharedRpcClient(Arc); + +impl Deref for SharedRpcClient { + type Target = WsClient; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl SharedRpcClient { + /// Consume and extract the inner client. + pub fn into_inner(self) -> Arc { + self.0 + } + + /// Create a new shared JSON-RPC web-socket client. + pub(crate) async fn new(uri: &str) -> Result { + let client = WsClientBuilder::default() + .connection_timeout(MAX_CONNECTION_DURATION) + .max_request_body_size(u32::MAX) + .request_timeout(MAX_REQUEST_DURATION) + .build(uri) + .await?; + Ok(Self(Arc::new(client))) + } + + /// Get a storage item and decode it as `T`. + /// + /// # Return value: + /// + /// The function returns: + /// + /// * `Ok(Some(val))` if successful. + /// * `Ok(None)` if the storage item was not found. + /// * `Err(e)` if the JSON-RPC call failed. + pub(crate) async fn get_storage_and_decode<'a, T: codec::Decode>( + &self, + key: &StorageKey, + hash: Option, + ) -> Result, RpcHelperError> { + if let Some(bytes) = self.storage(key, hash).await? { + let decoded = ::decode(&mut &*bytes.0) + .map_err::(Into::into)?; + Ok(Some(decoded)) + } else { + Ok(None) + } + } +} + +/// Takes a future that returns `Bytes` and tries to decode those bytes into the type `Dec`. +/// Warning: don't use for storage, it will fail for non-existent storage items. +/// +/// # Return value: +/// +/// The function returns: +/// +/// * `Ok(val)` if successful. +/// * `Err(RpcHelperError::JsonRpsee)` if the JSON-RPC call failed. +/// * `Err(RpcHelperError::Codec)` if `Bytes` could not be decoded. +pub(crate) async fn await_request_and_decode<'a, Dec: codec::Decode>( + req: impl Future>, +) -> Result { + let bytes = req.await?; + Dec::decode(&mut &*bytes.0).map_err::(Into::into) +} diff --git a/utils/staking-miner/src/rpc_helpers.rs b/utils/staking-miner/src/rpc_helpers.rs deleted file mode 100644 index 153ca0e65c03..000000000000 --- a/utils/staking-miner/src/rpc_helpers.rs +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright 2021 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! Helper method for RPC. - -use super::*; -use jsonrpsee::core::client::ClientT; -pub(crate) use jsonrpsee::types::ParamsSer; - -#[derive(frame_support::DebugNoBound, thiserror::Error)] -pub(crate) enum RpcHelperError { - JsonRpsee(#[from] jsonrpsee::core::Error), - Codec(#[from] codec::Error), -} - -impl std::fmt::Display for RpcHelperError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - ::fmt(self, f) - } -} - -/// Make the rpc request, returning `Ret`. -pub(crate) async fn rpc<'a, Ret: serde::de::DeserializeOwned>( - client: &WsClient, - method: &'a str, - params: Option>, -) -> Result { - client - .request::(method, params) - .await - .map_err::(Into::into) -} - -/// Make the rpc request, decode the outcome into `Dec`. Don't use for storage, it will fail for -/// non-existent storage items. -pub(crate) async fn rpc_decode<'a, Dec: codec::Decode>( - client: &WsClient, - method: &'a str, - params: Option>, -) -> Result { - let bytes = rpc::(client, method, params) - .await - .map_err::(Into::into)?; - ::decode(&mut &*bytes.0).map_err::(Into::into) -} - -/// Get the storage item. -pub(crate) async fn get_storage<'a, T: codec::Decode>( - client: &WsClient, - params: Option>, -) -> Result, RpcHelperError> { - let maybe_bytes = rpc::>(client, "state_getStorage", params) - .await - .map_err::(Into::into)?; - if let Some(bytes) = maybe_bytes { - let decoded = ::decode(&mut &*bytes.0) - .map_err::(Into::into)?; - Ok(Some(decoded)) - } else { - Ok(None) - } -} diff --git a/utils/staking-miner/src/signer.rs b/utils/staking-miner/src/signer.rs index e976cb8f24aa..6a0e45eabb13 100644 --- a/utils/staking-miner/src/signer.rs +++ b/utils/staking-miner/src/signer.rs @@ -16,8 +16,9 @@ //! Wrappers around creating a signer account. -use crate::{prelude::*, rpc_helpers, AccountId, Error, Index, Pair, WsClient, LOG_TARGET}; -use sp_core::crypto::Pair as _; +use crate::{prelude::*, rpc::SharedRpcClient, AccountId, Error, Index, Pair, LOG_TARGET}; +use frame_system::AccountInfo; +use sp_core::{crypto::Pair as _, storage::StorageKey}; pub(crate) const SIGNER_ACCOUNT_WILL_EXIST: &str = "signer account is checked to exist upon startup; it can only die if it transfers funds out \ @@ -34,17 +35,14 @@ pub(crate) struct Signer { pub(crate) pair: Pair, } -pub(crate) async fn get_account_info( - client: &WsClient, +pub(crate) async fn get_account_info + EPM::Config>( + rpc: &SharedRpcClient, who: &T::AccountId, maybe_at: Option, -) -> Result>, Error> { - rpc_helpers::get_storage::>( - client, - jsonrpsee::rpc_params! { - sp_core::storage::StorageKey(>::hashed_key_for(&who)), - maybe_at - }, +) -> Result>, Error> { + rpc.get_storage_and_decode::>( + &StorageKey(>::hashed_key_for(&who)), + maybe_at, ) .await .map_err(Into::into) @@ -56,10 +54,11 @@ pub(crate) async fn signer_uri_from_string< AccountId = AccountId, Index = Index, AccountData = pallet_balances::AccountData, + Hash = Hash, > + EPM::Config, >( seed: &str, - client: &WsClient, + client: &SharedRpcClient, ) -> Result> { let seed = seed.trim();