Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Allow to specify multiple relay chain RPC urls for collator node (#1880)
Browse files Browse the repository at this point in the history
* Allow specification of multiple urls for relay chain rpc nodes

* Add pooled RPC client basics

* Add list of clients to pooled client

* Improve

* Forward requests to dispatcher

* Switch clients on error

* Implement rotation logic

* Improve subscription handling

* Error handling cleanup

* Remove retry from rpc-client

* Improve naming

* Improve documentation

* Improve `ClientManager` abstraction

* Adjust zombienet test

* Add more comments

* fmt

* Apply reviewers comments

* Extract reconnection to extra method

* Add comment to reconnection method

* Clean up some dependencies

* Fix build

* fmt

* Provide alias for cli argument

* Apply review comments

* Rename P* to Relay*

* Improve zombienet test

* fmt

* Fix zombienet sleep

* Simplify zombienet test

* Reduce log clutter and fix starting position

* Do not distribute duplicated imported and finalized blocks

* fmt

* Apply code review suggestions

* Move building of relay chain interface to `cumulus-client-service`

* Refactoring to not push back into channel

* FMT
  • Loading branch information
skunert authored and vieira-giulia committed Jan 17, 2023
1 parent 16e144b commit f5f980d
Show file tree
Hide file tree
Showing 22 changed files with 720 additions and 428 deletions.
40 changes: 13 additions & 27 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions client/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,11 @@ pub struct RunCmd {
/// EXPERIMENTAL: Specify an URL to a relay chain full node to communicate with.
#[arg(
long,
value_parser = validate_relay_chain_url
value_parser = validate_relay_chain_url,
num_args = 0..,
alias = "relay-chain-rpc-url"
)]
pub relay_chain_rpc_url: Option<Url>,
pub relay_chain_rpc_urls: Vec<Url>,
}

impl RunCmd {
Expand All @@ -310,15 +312,15 @@ impl RunCmd {

/// Create [`CollatorOptions`] representing options only relevant to parachain collator nodes
pub fn collator_options(&self) -> CollatorOptions {
CollatorOptions { relay_chain_rpc_url: self.relay_chain_rpc_url.clone() }
CollatorOptions { relay_chain_rpc_urls: self.relay_chain_rpc_urls.clone() }
}
}

/// Options only relevant for collator nodes
#[derive(Clone, Debug)]
pub struct CollatorOptions {
/// Location of relay chain full node
pub relay_chain_rpc_url: Option<Url>,
pub relay_chain_rpc_urls: Vec<Url>,
}

/// A non-redundant version of the `RunCmd` that sets the `validator` field when the
Expand Down
1 change: 1 addition & 0 deletions client/relay-chain-interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "mas
sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }

tokio = { version = "1.21.2", features = ["sync"] }
futures = "0.3.25"
async-trait = "0.1.59"
thiserror = "1.0.37"
Expand Down
6 changes: 3 additions & 3 deletions client/relay-chain-interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ pub enum RelayChainError {
BlockchainError(#[from] sp_blockchain::Error),
#[error("State machine error occured: {0}")]
StateMachineError(Box<dyn sp_state_machine::Error>),
#[error("Unable to call RPC method '{0}' due to error: {1}")]
RpcCallError(String, JsonRpcError),
#[error("Unable to call RPC method '{0}'")]
RpcCallError(String),
#[error("RPC Error: '{0}'")]
JsonRpcError(#[from] JsonRpcError),
#[error("Unable to reach RpcStreamWorker: {0}")]
#[error("Unable to communicate with RPC worker: {0}")]
WorkerCommunicationError(String),
#[error("Scale codec deserialization error: {0}")]
DeserializationError(CodecError),
Expand Down
4 changes: 2 additions & 2 deletions client/relay-chain-minimal-node/src/blockchain_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,13 +363,13 @@ impl BlockChainRpcClient {
pub async fn import_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = Header> + Send>>> {
Ok(self.rpc_client.get_imported_heads_stream().await?.boxed())
Ok(self.rpc_client.get_imported_heads_stream()?.boxed())
}

pub async fn finality_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = Header> + Send>>> {
Ok(self.rpc_client.get_finalized_heads_stream().await?.boxed())
Ok(self.rpc_client.get_finalized_heads_stream()?.boxed())
}
}

Expand Down
20 changes: 16 additions & 4 deletions client/relay-chain-minimal-node/src/collator_overseer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,14 @@ async fn forward_collator_events(
f = finality.next() => {
match f {
Some(header) => {
tracing::info!(target: "minimal-polkadot-node", "Received finalized block via RPC: #{} ({})", header.number, header.hash());
let block_info = BlockInfo { hash: header.hash(), parent_hash: header.parent_hash, number: header.number };
tracing::info!(
target: "minimal-polkadot-node",
"Received finalized block via RPC: #{} ({} -> {})",
header.number,
header.parent_hash,
header.hash()
);
let block_info = BlockInfo { hash: header.hash(), parent_hash: header.parent_hash, number: header.number };
handle.block_finalized(block_info).await;
}
None => return Err(RelayChainError::GenericError("Relay chain finality stream ended.".to_string())),
Expand All @@ -233,8 +239,14 @@ async fn forward_collator_events(
i = imports.next() => {
match i {
Some(header) => {
tracing::info!(target: "minimal-polkadot-node", "Received imported block via RPC: #{} ({})", header.number, header.hash());
let block_info = BlockInfo { hash: header.hash(), parent_hash: header.parent_hash, number: header.number };
tracing::info!(
target: "minimal-polkadot-node",
"Received imported block via RPC: #{} ({} -> {})",
header.number,
header.parent_hash,
header.hash()
);
let block_info = BlockInfo { hash: header.hash(), parent_hash: header.parent_hash, number: header.number };
handle.block_imported(block_info).await;
}
None => return Err(RelayChainError::GenericError("Relay chain import stream ended.".to_string())),
Expand Down
2 changes: 1 addition & 1 deletion client/relay-chain-minimal-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ fn build_authority_discovery_service<Block: BlockT>(
pub async fn build_minimal_relay_chain_node(
polkadot_config: Configuration,
task_manager: &mut TaskManager,
relay_chain_url: Url,
relay_chain_url: Vec<Url>,
) -> RelayChainResult<(Arc<(dyn RelayChainInterface + 'static)>, Option<CollatorPair>)> {
let client = cumulus_relay_chain_rpc_interface::create_client_and_start_worker(
relay_chain_url,
Expand Down
7 changes: 4 additions & 3 deletions client/relay-chain-rpc-interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@ cumulus-relay-chain-interface = { path = "../relay-chain-interface" }

sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-consensus-babe = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-storage = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-rpc-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
tokio = { version = "1.23.0", features = ["sync"] }

Expand All @@ -29,4 +28,6 @@ jsonrpsee = { version = "0.16.2", features = ["ws-client"] }
tracing = "0.1.37"
async-trait = "0.1.59"
url = "2.3.1"
backoff = { version = "0.4.0", features = ["tokio"] }
serde_json = "1.0.87"
serde = "1.0.147"
lru = "0.8.1"
37 changes: 19 additions & 18 deletions client/relay-chain-rpc-interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use core::time::Duration;
use cumulus_primitives_core::{
relay_chain::{
v2::{CommittedCandidateReceipt, OccupiedCoreAssumption, SessionIndex, ValidatorId},
Hash as PHash, Header as PHeader, InboundHrmpMessage,
Hash as RelayHash, Header as RelayHeader, InboundHrmpMessage,
},
InboundDownwardMessage, ParaId, PersistedValidationData,
};
Expand All @@ -34,6 +34,7 @@ use std::pin::Pin;

pub use url::Url;

mod reconnecting_ws_client;
mod rpc_client;
pub use rpc_client::{create_client_and_start_worker, RelayChainRpcClient};

Expand All @@ -58,15 +59,15 @@ impl RelayChainInterface for RelayChainRpcInterface {
async fn retrieve_dmq_contents(
&self,
para_id: ParaId,
relay_parent: PHash,
relay_parent: RelayHash,
) -> RelayChainResult<Vec<InboundDownwardMessage>> {
self.rpc_client.parachain_host_dmq_contents(para_id, relay_parent).await
}

async fn retrieve_all_inbound_hrmp_channel_contents(
&self,
para_id: ParaId,
relay_parent: PHash,
relay_parent: RelayHash,
) -> RelayChainResult<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
self.rpc_client
.parachain_host_inbound_hrmp_channels_contents(para_id, relay_parent)
Expand All @@ -75,7 +76,7 @@ impl RelayChainInterface for RelayChainRpcInterface {

async fn persisted_validation_data(
&self,
hash: PHash,
hash: RelayHash,
para_id: ParaId,
occupied_core_assumption: OccupiedCoreAssumption,
) -> RelayChainResult<Option<PersistedValidationData>> {
Expand All @@ -86,39 +87,39 @@ impl RelayChainInterface for RelayChainRpcInterface {

async fn candidate_pending_availability(
&self,
hash: PHash,
hash: RelayHash,
para_id: ParaId,
) -> RelayChainResult<Option<CommittedCandidateReceipt>> {
self.rpc_client
.parachain_host_candidate_pending_availability(hash, para_id)
.await
}

async fn session_index_for_child(&self, hash: PHash) -> RelayChainResult<SessionIndex> {
async fn session_index_for_child(&self, hash: RelayHash) -> RelayChainResult<SessionIndex> {
self.rpc_client.parachain_host_session_index_for_child(hash).await
}

async fn validators(&self, block_id: PHash) -> RelayChainResult<Vec<ValidatorId>> {
async fn validators(&self, block_id: RelayHash) -> RelayChainResult<Vec<ValidatorId>> {
self.rpc_client.parachain_host_validators(block_id).await
}

async fn import_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
let imported_headers_stream = self.rpc_client.get_imported_heads_stream().await?;
) -> RelayChainResult<Pin<Box<dyn Stream<Item = RelayHeader> + Send>>> {
let imported_headers_stream = self.rpc_client.get_imported_heads_stream()?;

Ok(imported_headers_stream.boxed())
}

async fn finality_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
let imported_headers_stream = self.rpc_client.get_finalized_heads_stream().await?;
) -> RelayChainResult<Pin<Box<dyn Stream<Item = RelayHeader> + Send>>> {
let imported_headers_stream = self.rpc_client.get_finalized_heads_stream()?;

Ok(imported_headers_stream.boxed())
}

async fn best_block_hash(&self) -> RelayChainResult<PHash> {
async fn best_block_hash(&self) -> RelayChainResult<RelayHash> {
self.rpc_client.chain_get_head(None).await
}

Expand All @@ -132,7 +133,7 @@ impl RelayChainInterface for RelayChainRpcInterface {

async fn get_storage_by_key(
&self,
relay_parent: PHash,
relay_parent: RelayHash,
key: &[u8],
) -> RelayChainResult<Option<StorageValue>> {
let storage_key = StorageKey(key.to_vec());
Expand All @@ -144,7 +145,7 @@ impl RelayChainInterface for RelayChainRpcInterface {

async fn prove_read(
&self,
relay_parent: PHash,
relay_parent: RelayHash,
relevant_keys: &Vec<Vec<u8>>,
) -> RelayChainResult<StorageProof> {
let cloned = relevant_keys.clone();
Expand All @@ -167,8 +168,8 @@ impl RelayChainInterface for RelayChainRpcInterface {
/// 2. Check if the block is already in chain. If yes, succeed early.
/// 3. Wait for the block to be imported via subscription.
/// 4. If timeout is reached, we return an error.
async fn wait_for_block(&self, wait_for_hash: PHash) -> RelayChainResult<()> {
let mut head_stream = self.rpc_client.get_imported_heads_stream().await?;
async fn wait_for_block(&self, wait_for_hash: RelayHash) -> RelayChainResult<()> {
let mut head_stream = self.rpc_client.get_imported_heads_stream()?;

if self.rpc_client.chain_get_header(Some(wait_for_hash)).await?.is_some() {
return Ok(())
Expand All @@ -191,8 +192,8 @@ impl RelayChainInterface for RelayChainRpcInterface {

async fn new_best_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
let imported_headers_stream = self.rpc_client.get_best_heads_stream().await?;
) -> RelayChainResult<Pin<Box<dyn Stream<Item = RelayHeader> + Send>>> {
let imported_headers_stream = self.rpc_client.get_best_heads_stream()?;
Ok(imported_headers_stream.boxed())
}
}
Loading

0 comments on commit f5f980d

Please sign in to comment.