Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Allow to specify multiple relay chain RPC urls for collator node #1880

Merged
merged 45 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
8555f84
Allow specification of multiple urls for relay chain rpc nodes
skunert Oct 24, 2022
5a12179
Add pooled RPC client basics
skunert Oct 24, 2022
4cd0f65
Add list of clients to pooled client
skunert Oct 25, 2022
a1f54e3
Improve
skunert Nov 7, 2022
0f2b5bc
Merge remote-tracking branch 'origin/master' into multiple-rpc-urls
skunert Nov 7, 2022
1547ded
Merge branch 'master' into multiple-rpc-urls
skunert Nov 9, 2022
77babd2
Forward requests to dispatcher
skunert Nov 9, 2022
49efacc
Switch clients on error
skunert Nov 10, 2022
835204d
Implement rotation logic
skunert Nov 14, 2022
f3cee77
Merge branch 'master' into multiple-rpc-urls
skunert Nov 14, 2022
1821ae1
Improve subscription handling
skunert Nov 14, 2022
364b78c
Error handling cleanup
skunert Nov 15, 2022
3df52c3
Remove retry from rpc-client
skunert Nov 15, 2022
3274c8d
Improve naming
skunert Nov 15, 2022
5606d1c
Improve documentation
skunert Nov 15, 2022
2b518d2
Improve `ClientManager` abstraction
skunert Nov 15, 2022
79625dd
Adjust zombienet test
skunert Nov 15, 2022
5f6fe6b
Add more comments
skunert Nov 15, 2022
98341d7
fmt
skunert Nov 15, 2022
8d46204
Apply reviewers comments
skunert Nov 21, 2022
7225bb7
Extract reconnection to extra method
skunert Nov 22, 2022
8f97803
Merge branch 'master' into multiple-rpc-urls
skunert Nov 22, 2022
8e01060
Add comment to reconnection method
skunert Nov 22, 2022
619edb9
Clean up some dependencies
skunert Nov 22, 2022
2ca7c4b
Fix build
skunert Nov 22, 2022
0ad786f
fmt
skunert Nov 22, 2022
d3ee905
Provide alias for cli argument
skunert Nov 23, 2022
2b888be
Apply review comments
skunert Nov 25, 2022
70b59f0
Rename P* to Relay*
skunert Nov 25, 2022
5957a7a
Improve zombienet test
skunert Nov 29, 2022
e8352fa
Merge branch 'master' into multiple-rpc-urls
skunert Nov 29, 2022
2df5418
fmt
skunert Nov 29, 2022
b318e3f
Fix zombienet sleep
skunert Nov 30, 2022
878468a
Merge branch 'master' into multiple-rpc-urls
skunert Dec 6, 2022
8329760
Simplify zombienet test
skunert Dec 12, 2022
a5815f2
Merge branch 'master' into multiple-rpc-urls
skunert Dec 12, 2022
513bb13
Reduce log clutter and fix starting position
skunert Dec 12, 2022
df5f0ba
Merge branch 'master' into multiple-rpc-urls
skunert Dec 12, 2022
9c10469
Do not distribute duplicated imported and finalized blocks
skunert Dec 13, 2022
8a5f76e
Merge branch 'master' into multiple-rpc-urls
skunert Dec 13, 2022
3b27fd8
fmt
skunert Dec 13, 2022
7a18946
Apply code review suggestions
skunert Dec 14, 2022
56b1f7b
Move building of relay chain interface to `cumulus-client-service`
skunert Dec 14, 2022
b065a76
Refactoring to not push back into channel
skunert Dec 14, 2022
6431473
FMT
skunert Dec 15, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 5 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions client/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,11 @@ pub struct RunCmd {
/// EXPERIMENTAL: Specify an URL to a relay chain full node to communicate with.
#[arg(
long,
value_parser = validate_relay_chain_url
value_parser = validate_relay_chain_url,
num_args = 0..,
alias = "relay-chain-rpc-url"
)]
pub relay_chain_rpc_url: Option<Url>,
pub relay_chain_rpc_urls: Vec<Url>,
}

impl RunCmd {
Expand All @@ -310,15 +312,15 @@ impl RunCmd {

/// Create [`CollatorOptions`] representing options only relevant to parachain collator nodes
pub fn collator_options(&self) -> CollatorOptions {
CollatorOptions { relay_chain_rpc_url: self.relay_chain_rpc_url.clone() }
CollatorOptions { relay_chain_rpc_urls: self.relay_chain_rpc_urls.clone() }
}
}

/// Options only relevant for collator nodes
#[derive(Clone, Debug)]
pub struct CollatorOptions {
/// Location of relay chain full node
pub relay_chain_rpc_url: Option<Url>,
pub relay_chain_rpc_urls: Vec<Url>,
}

/// A non-redundant version of the `RunCmd` that sets the `validator` field when the
Expand Down
1 change: 1 addition & 0 deletions client/relay-chain-interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "mas
sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }

tokio = { version = "1.21.2", features = ["sync"] }
futures = "0.3.25"
async-trait = "0.1.59"
thiserror = "1.0.37"
Expand Down
6 changes: 3 additions & 3 deletions client/relay-chain-interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ pub enum RelayChainError {
BlockchainError(#[from] sp_blockchain::Error),
#[error("State machine error occured: {0}")]
StateMachineError(Box<dyn sp_state_machine::Error>),
#[error("Unable to call RPC method '{0}' due to error: {1}")]
RpcCallError(String, JsonRpcError),
#[error("Unable to call RPC method '{0}'")]
RpcCallError(String),
#[error("RPC Error: '{0}'")]
JsonRpcError(#[from] JsonRpcError),
#[error("Unable to reach RpcStreamWorker: {0}")]
#[error("Unable to communicate with RPC worker: {0}")]
WorkerCommunicationError(String),
#[error("Scale codec deserialization error: {0}")]
DeserializationError(CodecError),
Expand Down
4 changes: 2 additions & 2 deletions client/relay-chain-minimal-node/src/blockchain_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,13 +363,13 @@ impl BlockChainRpcClient {
pub async fn import_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = Header> + Send>>> {
Ok(self.rpc_client.get_imported_heads_stream().await?.boxed())
Ok(self.rpc_client.get_imported_heads_stream()?.boxed())
}

pub async fn finality_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = Header> + Send>>> {
Ok(self.rpc_client.get_finalized_heads_stream().await?.boxed())
Ok(self.rpc_client.get_finalized_heads_stream()?.boxed())
}
}

Expand Down
10 changes: 6 additions & 4 deletions client/relay-chain-minimal-node/src/collator_overseer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,9 @@ async fn forward_collator_events(
f = finality.next() => {
match f {
Some(header) => {
tracing::info!(target: "minimal-polkadot-node", "Received finalized block via RPC: #{} ({})", header.number, header.hash());
let block_info = BlockInfo { hash: header.hash(), parent_hash: header.parent_hash, number: header.number };
tracing::info!(target: "minimal-polkadot-node", "Received finalized block via RPC: #{} ({} -> {})",
header.number, header.parent_hash, header.hash());
skunert marked this conversation as resolved.
Show resolved Hide resolved
let block_info = BlockInfo { hash: header.hash(), parent_hash: header.parent_hash, number: header.number };
handle.block_finalized(block_info).await;
}
None => return Err(RelayChainError::GenericError("Relay chain finality stream ended.".to_string())),
Expand All @@ -233,8 +234,9 @@ async fn forward_collator_events(
i = imports.next() => {
match i {
Some(header) => {
tracing::info!(target: "minimal-polkadot-node", "Received imported block via RPC: #{} ({})", header.number, header.hash());
let block_info = BlockInfo { hash: header.hash(), parent_hash: header.parent_hash, number: header.number };
tracing::info!(target: "minimal-polkadot-node", "Received imported block via RPC: #{} ({} -> {})",
header.number, header.parent_hash, header.hash());
let block_info = BlockInfo { hash: header.hash(), parent_hash: header.parent_hash, number: header.number };
handle.block_imported(block_info).await;
}
None => return Err(RelayChainError::GenericError("Relay chain import stream ended.".to_string())),
Expand Down
2 changes: 1 addition & 1 deletion client/relay-chain-minimal-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ fn build_authority_discovery_service<Block: BlockT>(
pub async fn build_minimal_relay_chain_node(
polkadot_config: Configuration,
task_manager: &mut TaskManager,
relay_chain_url: Url,
relay_chain_url: Vec<Url>,
) -> RelayChainResult<(Arc<(dyn RelayChainInterface + 'static)>, Option<CollatorPair>)> {
let client = cumulus_relay_chain_rpc_interface::create_client_and_start_worker(
relay_chain_url,
Expand Down
8 changes: 4 additions & 4 deletions client/relay-chain-rpc-interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@ cumulus-relay-chain-interface = { path = "../relay-chain-interface" }

sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-consensus-babe = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-storage = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-rpc-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
tokio = { version = "1.22.0", features = ["sync"] }

tokio = { version = "1.21.2", features = ["sync", "time"] }
futures = "0.3.25"
futures-timer = "3.0.2"
parity-scale-codec = "3.2.1"
jsonrpsee = { version = "0.15.1", features = ["ws-client"] }
tracing = "0.1.37"
async-trait = "0.1.59"
url = "2.3.1"
backoff = { version = "0.4.0", features = ["tokio"] }
serde_json = "1.0.87"
serde = "1.0.147"
37 changes: 19 additions & 18 deletions client/relay-chain-rpc-interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use core::time::Duration;
use cumulus_primitives_core::{
relay_chain::{
v2::{CommittedCandidateReceipt, OccupiedCoreAssumption, SessionIndex, ValidatorId},
Hash as PHash, Header as PHeader, InboundHrmpMessage,
Hash as RelayHash, Header as RelayHeader, InboundHrmpMessage,
},
InboundDownwardMessage, ParaId, PersistedValidationData,
};
Expand All @@ -34,6 +34,7 @@ use std::pin::Pin;

pub use url::Url;

mod reconnecting_ws_client;
mod rpc_client;
pub use rpc_client::{create_client_and_start_worker, RelayChainRpcClient};

Expand All @@ -58,15 +59,15 @@ impl RelayChainInterface for RelayChainRpcInterface {
async fn retrieve_dmq_contents(
&self,
para_id: ParaId,
relay_parent: PHash,
relay_parent: RelayHash,
) -> RelayChainResult<Vec<InboundDownwardMessage>> {
self.rpc_client.parachain_host_dmq_contents(para_id, relay_parent).await
}

async fn retrieve_all_inbound_hrmp_channel_contents(
&self,
para_id: ParaId,
relay_parent: PHash,
relay_parent: RelayHash,
) -> RelayChainResult<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
self.rpc_client
.parachain_host_inbound_hrmp_channels_contents(para_id, relay_parent)
Expand All @@ -75,7 +76,7 @@ impl RelayChainInterface for RelayChainRpcInterface {

async fn persisted_validation_data(
&self,
hash: PHash,
hash: RelayHash,
para_id: ParaId,
occupied_core_assumption: OccupiedCoreAssumption,
) -> RelayChainResult<Option<PersistedValidationData>> {
Expand All @@ -86,39 +87,39 @@ impl RelayChainInterface for RelayChainRpcInterface {

async fn candidate_pending_availability(
&self,
hash: PHash,
hash: RelayHash,
para_id: ParaId,
) -> RelayChainResult<Option<CommittedCandidateReceipt>> {
self.rpc_client
.parachain_host_candidate_pending_availability(hash, para_id)
.await
}

async fn session_index_for_child(&self, hash: PHash) -> RelayChainResult<SessionIndex> {
async fn session_index_for_child(&self, hash: RelayHash) -> RelayChainResult<SessionIndex> {
self.rpc_client.parachain_host_session_index_for_child(hash).await
}

async fn validators(&self, block_id: PHash) -> RelayChainResult<Vec<ValidatorId>> {
async fn validators(&self, block_id: RelayHash) -> RelayChainResult<Vec<ValidatorId>> {
self.rpc_client.parachain_host_validators(block_id).await
}

async fn import_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
let imported_headers_stream = self.rpc_client.get_imported_heads_stream().await?;
) -> RelayChainResult<Pin<Box<dyn Stream<Item = RelayHeader> + Send>>> {
let imported_headers_stream = self.rpc_client.get_imported_heads_stream()?;

Ok(imported_headers_stream.boxed())
}

async fn finality_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
let imported_headers_stream = self.rpc_client.get_finalized_heads_stream().await?;
) -> RelayChainResult<Pin<Box<dyn Stream<Item = RelayHeader> + Send>>> {
let imported_headers_stream = self.rpc_client.get_finalized_heads_stream()?;

Ok(imported_headers_stream.boxed())
}

async fn best_block_hash(&self) -> RelayChainResult<PHash> {
async fn best_block_hash(&self) -> RelayChainResult<RelayHash> {
self.rpc_client.chain_get_head(None).await
}

Expand All @@ -132,7 +133,7 @@ impl RelayChainInterface for RelayChainRpcInterface {

async fn get_storage_by_key(
&self,
relay_parent: PHash,
relay_parent: RelayHash,
key: &[u8],
) -> RelayChainResult<Option<StorageValue>> {
let storage_key = StorageKey(key.to_vec());
Expand All @@ -144,7 +145,7 @@ impl RelayChainInterface for RelayChainRpcInterface {

async fn prove_read(
&self,
relay_parent: PHash,
relay_parent: RelayHash,
relevant_keys: &Vec<Vec<u8>>,
) -> RelayChainResult<StorageProof> {
let cloned = relevant_keys.clone();
Expand All @@ -167,8 +168,8 @@ impl RelayChainInterface for RelayChainRpcInterface {
/// 2. Check if the block is already in chain. If yes, succeed early.
/// 3. Wait for the block to be imported via subscription.
/// 4. If timeout is reached, we return an error.
async fn wait_for_block(&self, wait_for_hash: PHash) -> RelayChainResult<()> {
let mut head_stream = self.rpc_client.get_imported_heads_stream().await?;
async fn wait_for_block(&self, wait_for_hash: RelayHash) -> RelayChainResult<()> {
let mut head_stream = self.rpc_client.get_imported_heads_stream()?;

if self.rpc_client.chain_get_header(Some(wait_for_hash)).await?.is_some() {
return Ok(())
Expand All @@ -191,8 +192,8 @@ impl RelayChainInterface for RelayChainRpcInterface {

async fn new_best_notification_stream(
&self,
) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
let imported_headers_stream = self.rpc_client.get_best_heads_stream().await?;
) -> RelayChainResult<Pin<Box<dyn Stream<Item = RelayHeader> + Send>>> {
let imported_headers_stream = self.rpc_client.get_best_heads_stream()?;
Ok(imported_headers_stream.boxed())
}
}
Loading