Skip to content

Commit

Permalink
enable custom subid gen through spawn_tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
niklasad1 committed Jan 25, 2022
1 parent 0a61c83 commit 5c5eb70
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 13 deletions.
2 changes: 2 additions & 0 deletions bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
use jsonrpsee::ws_server::RandomStringIdProvider;
use node_template_runtime::{self, opaque::Block, RuntimeApi};
use sc_client_api::{BlockBackend, ExecutorProvider};
use sc_consensus_aura::{ImportQueueParams, SlotProportion, StartAuraParams};
Expand Down Expand Up @@ -244,6 +245,7 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
system_rpc_tx,
config,
telemetry: telemetry.as_mut(),
rpc_id_provider: RandomStringIdProvider::new(16),
})?;

if role.is_authority() {
Expand Down
4 changes: 3 additions & 1 deletion bin/node/cli/benches/block_production.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

use criterion::{criterion_group, criterion_main, BatchSize, Criterion, Throughput};

use jsonrpsee::ws_server::RandomStringIdProvider;
use node_cli::service::{create_extrinsic, FullClient};
use node_runtime::{constants::currency::*, BalancesCall};
use sc_block_builder::{BlockBuilderProvider, BuiltBlock, RecordProof};
Expand Down Expand Up @@ -111,7 +112,8 @@ fn new_node(tokio_handle: Handle) -> node_cli::service::NewFullBase {
wasm_runtime_overrides: None,
};

node_cli::service::new_full_base(config, |_, _| ()).expect("creating a full node doesn't fail")
node_cli::service::new_full_base(config, RandomStringIdProvider::new(16), |_, _| ())
.expect("creating a full node doesn't fail")
}

fn extrinsic_set_time(now: u64) -> OpaqueExtrinsic {
Expand Down
4 changes: 3 additions & 1 deletion bin/node/cli/benches/transaction_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

use criterion::{criterion_group, criterion_main, BatchSize, Criterion, Throughput};
use futures::{future, StreamExt};
use jsonrpsee::ws_server::RandomStringIdProvider;
use node_cli::service::{create_extrinsic, fetch_nonce, FullClient, TransactionPool};
use node_primitives::AccountId;
use node_runtime::{constants::currency::*, BalancesCall, SudoCall};
Expand Down Expand Up @@ -103,7 +104,8 @@ fn new_node(tokio_handle: Handle) -> node_cli::service::NewFullBase {
wasm_runtime_overrides: None,
};

node_cli::service::new_full_base(config, |_, _| ()).expect("Creates node")
node_cli::service::new_full_base(config, RandomStringIdProvider::new(16), |_, _| ())
.expect("Creates node")
}

fn create_accounts(num: usize) -> Vec<sr25519::Pair> {
Expand Down
3 changes: 2 additions & 1 deletion bin/node/cli/src/chain_spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ pub fn local_testnet_config() -> ChainSpec {
pub(crate) mod tests {
use super::*;
use crate::service::{new_full_base, NewFullBase};
use jsonrpsee::ws_server::RandomStringIdProvider;
use sc_service_test;
use sp_runtime::BuildStorage;

Expand Down Expand Up @@ -472,7 +473,7 @@ pub(crate) mod tests {

sc_service_test::connectivity(integration_test_config_with_two_authorities(), |config| {
let NewFullBase { task_manager, client, network, transaction_pool, .. } =
new_full_base(config, |_, _| ())?;
new_full_base(config, RandomStringIdProvider::new(16), |_, _| ())?;
Ok(sc_service_test::TestNetComponents::new(
task_manager,
client,
Expand Down
14 changes: 11 additions & 3 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@
use codec::Encode;
use frame_system_rpc_runtime_api::AccountNonceApi;
use futures::prelude::*;
use jsonrpsee::ws_server::RandomStringIdProvider;
use node_executor::ExecutorDispatch;
use node_primitives::Block;
use node_runtime::RuntimeApi;
use sc_client_api::{BlockBackend, ExecutorProvider};
use sc_consensus_babe::{self, SlotProportion};
use sc_executor::NativeElseWasmExecutor;
use sc_network::{Event, NetworkService};
use sc_service::{config::Configuration, error::Error as ServiceError, RpcHandlers, TaskManager};
use sc_service::{
config::Configuration, error::Error as ServiceError, RpcHandlers, RpcIdProvider, TaskManager,
};
use sc_telemetry::{Telemetry, TelemetryWorker};
use sp_api::ProvideRuntimeApi;
use sp_core::crypto::Pair;
Expand Down Expand Up @@ -308,6 +311,7 @@ pub struct NewFullBase {
/// Creates a full service from the configuration.
pub fn new_full_base(
mut config: Configuration,
rpc_id_provider: impl RpcIdProvider + 'static,
with_startup_data: impl FnOnce(
&sc_consensus_babe::BabeBlockImport<Block, FullClient, FullGrandpaBlockImport>,
&sc_consensus_babe::BabeLink<Block>,
Expand Down Expand Up @@ -380,6 +384,7 @@ pub fn new_full_base(
task_manager: &mut task_manager,
system_rpc_tx,
telemetry: telemetry.as_mut(),
rpc_id_provider,
})?;

let (block_import, grandpa_link, babe_link) = import_setup;
Expand Down Expand Up @@ -530,13 +535,15 @@ pub fn new_full_base(

/// Builds a new service for a full client.
pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
new_full_base(config, |_, _| ()).map(|NewFullBase { task_manager, .. }| task_manager)
new_full_base(config, RandomStringIdProvider::new(16), |_, _| ())
.map(|NewFullBase { task_manager, .. }| task_manager)
}

#[cfg(test)]
mod tests {
use crate::service::{new_full_base, NewFullBase};
use codec::Encode;
use jsonrpsee::ws_server::RandomStringIdProvider;
use node_primitives::{Block, DigestItem, Signature};
use node_runtime::{
constants::{currency::CENTS, time::SLOT_DURATION},
Expand Down Expand Up @@ -597,6 +604,7 @@ mod tests {
let NewFullBase { task_manager, client, network, transaction_pool, .. } =
new_full_base(
config,
RandomStringIdProvider::new(16),
|block_import: &sc_consensus_babe::BabeBlockImport<Block, _, _>,
babe_link: &sc_consensus_babe::BabeLink<Block>| {
setup_handles = Some((block_import.clone(), babe_link.clone()));
Expand Down Expand Up @@ -771,7 +779,7 @@ mod tests {
crate::chain_spec::tests::integration_test_config_with_two_authorities(),
|config| {
let NewFullBase { task_manager, client, network, transaction_pool, .. } =
new_full_base(config, |_, _| ())?;
new_full_base(config, RandomStringIdProvider::new(16), |_, _| ())?;
Ok(sc_service_test::TestNetComponents::new(
task_manager,
client,
Expand Down
5 changes: 3 additions & 2 deletions client/rpc-servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

use jsonrpsee::{
http_server::{AccessControlBuilder, HttpServerBuilder, HttpServerHandle},
ws_server::{RandomStringIdProvider, WsServerBuilder, WsServerHandle},
ws_server::{IdProvider, WsServerBuilder, WsServerHandle},
RpcModule,
};
use std::net::SocketAddr;
Expand Down Expand Up @@ -95,6 +95,7 @@ pub fn start_ws<M: Send + Sync + 'static>(
metrics: Option<RpcMetrics>,
rpc_api: RpcModule<M>,
rt: tokio::runtime::Handle,
id_provider: impl IdProvider + 'static,
) -> Result<WsServerHandle, anyhow::Error> {
let max_request_body_size = max_payload_mb
.map(|mb| mb.saturating_mul(MEGABYTE))
Expand All @@ -104,7 +105,7 @@ pub fn start_ws<M: Send + Sync + 'static>(
let mut builder = WsServerBuilder::new()
.max_request_body_size(max_request_body_size as u32)
.max_connections(max_connections as u64)
.set_id_provider(RandomStringIdProvider::new(16))
.set_id_provider(id_provider)
.custom_tokio_runtime(rt.clone());

if let Some(cors) = cors {
Expand Down
14 changes: 9 additions & 5 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
start_rpc_servers, RpcHandlers, SpawnTaskHandle, TaskManager, TransactionPoolAdapter,
};
use futures::{channel::oneshot, future::ready, FutureExt, StreamExt};
use jsonrpsee::RpcModule;
use jsonrpsee::{core::traits::IdProvider, RpcModule};
use log::info;
use prometheus_endpoint::Registry;
use sc_chain_spec::get_extension;
Expand Down Expand Up @@ -322,7 +322,7 @@ where
}

/// Parameters to pass into `build`.
pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend, TRpcId> {
/// The service configuration.
pub config: Configuration,
/// A shared client returned by `new_full_parts`.
Expand All @@ -344,6 +344,8 @@ pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
/// Telemetry instance for this node.
pub telemetry: Option<&'a mut Telemetry>,
/// Custom subscription generator for JSON-RPC subscriptions.
pub rpc_id_provider: TRpcId,
}

/// Build a shared offchain workers instance.
Expand Down Expand Up @@ -379,8 +381,8 @@ where
}

/// Spawn the tasks that are required to run a node.
pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
params: SpawnTasksParams<TBl, TCl, TExPool, TRpc, TBackend>,
pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl, TRpcId>(
params: SpawnTasksParams<TBl, TCl, TExPool, TRpc, TBackend, TRpcId>,
) -> Result<RpcHandlers, Error>
where
TCl: ProvideRuntimeApi<TBl>
Expand Down Expand Up @@ -409,6 +411,7 @@ where
TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash>
+ parity_util_mem::MallocSizeOf
+ 'static,
TRpcId: IdProvider + 'static,
{
let SpawnTasksParams {
mut config,
Expand All @@ -421,6 +424,7 @@ where
network,
system_rpc_tx,
telemetry,
rpc_id_provider,
} = params;

let chain_info = client.usage_info().chain;
Expand Down Expand Up @@ -491,7 +495,7 @@ where
)
};

let rpc = start_rpc_servers(&config, gen_rpc_module)?;
let rpc = start_rpc_servers(&config, gen_rpc_module, rpc_id_provider)?;
let rpc_handlers = RpcHandlers(Arc::new(gen_rpc_module(sc_rpc::DenyUnsafe::No)?.into()));

// Spawn informant task
Expand Down
3 changes: 3 additions & 0 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub use sc_chain_spec::{
Properties, RuntimeGenesis,
};

pub use jsonrpsee::core::traits::IdProvider as RpcIdProvider;
pub use sc_consensus::ImportQueue;
pub use sc_executor::NativeExecutionDispatch;
#[doc(hidden)]
Expand Down Expand Up @@ -314,6 +315,7 @@ mod waiting {
fn start_rpc_servers<R>(
config: &Configuration,
gen_rpc_module: R,
rpc_id_provider: impl RpcIdProvider + 'static,
) -> Result<Box<dyn std::any::Any + Send + Sync>, error::Error>
where
R: Fn(sc_rpc::DenyUnsafe) -> Result<RpcModule<()>, Error>,
Expand Down Expand Up @@ -360,6 +362,7 @@ where
metrics,
gen_rpc_module(deny_unsafe(http_addr, &config.rpc_methods))?,
config.tokio_handle.clone(),
rpc_id_provider,
)
.map_err(|e| Error::Application(e.into()))?;

Expand Down

0 comments on commit 5c5eb70

Please sign in to comment.