Skip to content

Commit

Permalink
Extract (headers, exchange, messages) relay loops into separate crates (
Browse files Browse the repository at this point in the history
paritytech#357)

* extracted relay crates

* moved metrics to utils

* exchange-relay compilation

* fix compilation of headers-relay

* fixed messages-relay compilation

* fixed ethereum-poa-relay compilation

* cargo lock

* cargo fmt --all

* clippy

* cargo fmt --all

* fix tests compilation

* clippy

* eof

* module level docs

* removed obsolete comment

* #![warn(missing_docs)]

* .0 -> Deref

* post-merge fix

* cargo fmt

* Update relays/headers-relay/src/headers.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* Update relays/headers-relay/src/headers.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* Update relays/headers-relay/src/lib.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>
  • Loading branch information
2 people authored and bkchr committed Apr 10, 2024
1 parent fa2abfb commit d614cda
Show file tree
Hide file tree
Showing 35 changed files with 435 additions and 241 deletions.
10 changes: 4 additions & 6 deletions bridges/relays/ethereum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
[dependencies]
ansi_term = "0.12"
async-std = "1.6.2"
async-stream = "0.3.0"
async-trait = "0.1.40"
backoff = "0.2"
bp-currency-exchange = { path = "../../primitives/currency-exchange" }
bp-eth-poa = { path = "../../primitives/ethereum-poa" }
clap = { version = "2.33.3", features = ["yaml"] }
Expand All @@ -20,18 +18,18 @@ ethabi = "12.0"
ethabi-contract = "11.0"
ethabi-derive = "12.0"
ethereum-tx-sign = "3.0"
exchange-relay = { path = "../exchange-relay" }
futures = "0.3.5"
headers-relay = { path = "../headers-relay" }
hex = "0.4"
hex-literal = "0.3"
linked-hash-map = "0.5.3"
log = "0.4.11"
messages-relay = { path = "../messages-relay" }
num-traits = "0.2"
parity-crypto = { version = "0.6", features = ["publickey"] }
parking_lot = "0.11.0"
rustc-hex = "2.0.1"
relay-utils = { path = "../utils" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.57"
sysinfo = "0.15"
time = "0.2"
web3 = "0.13"

Expand Down
30 changes: 16 additions & 14 deletions bridges/relays/ethereum/src/ethereum_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,16 @@ use crate::ethereum_types::{
use crate::rpc::{Ethereum, EthereumRpc};
use crate::rpc_errors::{EthereumNodeError, RpcError};
use crate::substrate_types::{GrandpaJustification, Hash as SubstrateHash, QueuedSubstrateHeader, SubstrateHeaderId};
use crate::sync_types::SubmittedHeaders;
use crate::utils::{HeaderId, MaybeConnectionError};

use async_trait::async_trait;
use codec::{Decode, Encode};
use ethabi::FunctionOutputDecoder;
use headers_relay::sync_types::SubmittedHeaders;
use jsonrpsee::raw::RawClient;
use jsonrpsee::transport::http::HttpTransportClient;
use jsonrpsee::Client;
use parity_crypto::publickey::KeyPair;

use relay_utils::{HeaderId, MaybeConnectionError};
use std::collections::HashSet;

// to encode/decode contract calls
Expand Down Expand Up @@ -693,17 +692,20 @@ mod tests {
}

fn header(number: SubstrateBlockNumber) -> QueuedSubstrateHeader {
QueuedSubstrateHeader::new(SubstrateHeader::new(
number,
Default::default(),
Default::default(),
if number == 0 {
Default::default()
} else {
header(number - 1).id().1
},
Default::default(),
))
QueuedSubstrateHeader::new(
SubstrateHeader::new(
number,
Default::default(),
Default::default(),
if number == 0 {
Default::default()
} else {
header(number - 1).id().1
},
Default::default(),
)
.into(),
)
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion bridges/relays/ethereum/src/ethereum_deploy_contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use crate::instances::BridgeInstance;
use crate::rpc::SubstrateRpc;
use crate::substrate_client::{SubstrateConnectionParams, SubstrateRpcClient};
use crate::substrate_types::{Hash as SubstrateHash, Header as SubstrateHeader, SubstrateHeaderId};
use crate::utils::HeaderId;

use codec::{Decode, Encode};
use num_traits::Zero;
use relay_utils::HeaderId;

/// Ethereum synchronization parameters.
#[derive(Debug)]
Expand Down
13 changes: 6 additions & 7 deletions bridges/relays/ethereum/src/ethereum_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,22 @@ use crate::ethereum_types::{
EthereumHeaderId, HeaderWithTransactions as EthereumHeaderWithTransactions, Transaction as EthereumTransaction,
TransactionHash as EthereumTransactionHash, H256,
};
use crate::exchange::{
relay_single_transaction_proof, SourceBlock, SourceClient, SourceTransaction, TargetClient,
TransactionProofPipeline,
};
use crate::exchange_loop::{run as run_loop, InMemoryStorage};
use crate::instances::BridgeInstance;
use crate::metrics::MetricsParams;
use crate::rpc::{EthereumRpc, SubstrateRpc};
use crate::rpc_errors::RpcError;
use crate::substrate_client::{
SubmitEthereumExchangeTransactionProof, SubstrateConnectionParams, SubstrateRpcClient, SubstrateSigningParams,
};
use crate::substrate_types::into_substrate_ethereum_receipt;
use crate::utils::HeaderId;

use async_trait::async_trait;
use bp_currency_exchange::MaybeLockFundsTransaction;
use exchange_relay::exchange::{
relay_single_transaction_proof, SourceBlock, SourceClient, SourceTransaction, TargetClient,
TransactionProofPipeline,
};
use exchange_relay::exchange_loop::{run as run_loop, InMemoryStorage};
use relay_utils::{metrics::MetricsParams, HeaderId};
use rialto_runtime::exchange::EthereumTransactionInclusionProof;
use std::time::Duration;

Expand Down
20 changes: 12 additions & 8 deletions bridges/relays/ethereum/src/ethereum_sync_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,24 @@
//! Ethereum PoA -> Substrate synchronization.
use crate::ethereum_client::{EthereumConnectionParams, EthereumHighLevelRpc, EthereumRpcClient};
use crate::ethereum_types::{EthereumHeaderId, EthereumHeadersSyncPipeline, Header, QueuedEthereumHeader, Receipt};
use crate::ethereum_types::{
EthereumHeaderId, EthereumHeadersSyncPipeline, EthereumSyncHeader as Header, QueuedEthereumHeader, Receipt,
};
use crate::instances::BridgeInstance;
use crate::metrics::MetricsParams;
use crate::rpc::{EthereumRpc, SubstrateRpc};
use crate::rpc_errors::RpcError;
use crate::substrate_client::{
SubmitEthereumHeaders, SubstrateConnectionParams, SubstrateRpcClient, SubstrateSigningParams,
};
use crate::substrate_types::into_substrate_ethereum_header;
use crate::sync::{HeadersSyncParams, TargetTransactionMode};
use crate::sync_loop::{SourceClient, TargetClient};
use crate::sync_types::{SourceHeader, SubmittedHeaders};

use async_trait::async_trait;
use headers_relay::{
sync::{HeadersSyncParams, TargetTransactionMode},
sync_loop::{SourceClient, TargetClient},
sync_types::{SourceHeader, SubmittedHeaders},
};
use relay_utils::metrics::MetricsParams;
use web3::types::H256;

use std::fmt::Debug;
Expand Down Expand Up @@ -95,11 +99,11 @@ impl SourceClient<EthereumHeadersSyncPipeline> for EthereumHeadersSource {
}

async fn header_by_hash(&self, hash: H256) -> Result<Header, Self::Error> {
self.client.header_by_hash(hash).await
self.client.header_by_hash(hash).await.map(Into::into)
}

async fn header_by_number(&self, number: u64) -> Result<Header, Self::Error> {
self.client.header_by_number(number).await
self.client.header_by_number(number).await.map(Into::into)
}

async fn header_completion(&self, id: EthereumHeaderId) -> Result<(EthereumHeaderId, Option<()>), Self::Error> {
Expand Down Expand Up @@ -199,7 +203,7 @@ pub fn run(params: EthereumSyncParams) -> Result<(), RpcError> {
let source = EthereumHeadersSource::new(eth_client);
let target = SubstrateHeadersTarget::new(sub_client, sign_sub_transactions, sub_sign);

crate::sync_loop::run(
headers_relay::sync_loop::run(
source,
consts::ETHEREUM_TICK_INTERVAL,
target,
Expand Down
26 changes: 22 additions & 4 deletions bridges/relays/ethereum/src/ethereum_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.

use crate::substrate_types::{into_substrate_ethereum_header, into_substrate_ethereum_receipts};
use crate::sync_types::{HeadersSyncPipeline, QueuedHeader, SourceHeader};
use crate::utils::HeaderId;

use codec::Encode;
use headers_relay::sync_types::{HeadersSyncPipeline, QueuedHeader, SourceHeader};
use relay_utils::HeaderId;

pub use web3::types::{Address, Bytes, CallRequest, H256, U128, U256, U64};

Expand All @@ -34,6 +35,17 @@ pub type Transaction = web3::types::Transaction;
/// Ethereum header type.
pub type Header = web3::types::Block<H256>;

/// Ethereum header type used in headers sync.
#[derive(Clone, Debug, PartialEq)]
pub struct EthereumSyncHeader(Header);

impl std::ops::Deref for EthereumSyncHeader {
type Target = Header;
fn deref(&self) -> &Self::Target {
&self.0
}
}

/// Ethereum header with transactions type.
pub type HeaderWithTransactions = web3::types::Block<Transaction>;

Expand All @@ -60,7 +72,7 @@ impl HeadersSyncPipeline for EthereumHeadersSyncPipeline {

type Hash = H256;
type Number = u64;
type Header = Header;
type Header = EthereumSyncHeader;
type Extra = Vec<Receipt>;
type Completion = ();

Expand All @@ -72,7 +84,13 @@ impl HeadersSyncPipeline for EthereumHeadersSyncPipeline {
}
}

impl SourceHeader<H256, u64> for Header {
impl From<Header> for EthereumSyncHeader {
fn from(header: Header) -> Self {
Self(header)
}
}

impl SourceHeader<H256, u64> for EthereumSyncHeader {
fn id(&self) -> EthereumHeaderId {
HeaderId(
self.number.expect(HEADER_ID_PROOF).as_u64(),
Expand Down
4 changes: 2 additions & 2 deletions bridges/relays/ethereum/src/instances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl BridgeInstance for Rialto {
.into_iter()
.map(|header| {
(
into_substrate_ethereum_header(header.header()),
into_substrate_ethereum_header(&header.header()),
into_substrate_ethereum_receipts(header.extra()),
)
})
Expand All @@ -65,7 +65,7 @@ impl BridgeInstance for Rialto {

fn build_unsigned_header_call(&self, header: QueuedEthereumHeader) -> Call {
let pallet_call = rialto_runtime::BridgeEthPoACall::import_unsigned_header(
into_substrate_ethereum_header(header.header()),
into_substrate_ethereum_header(&header.header()),
into_substrate_ethereum_receipts(header.extra()),
);

Expand Down
54 changes: 29 additions & 25 deletions bridges/relays/ethereum/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,42 +22,28 @@ mod ethereum_exchange;
mod ethereum_exchange_submit;
mod ethereum_sync_loop;
mod ethereum_types;
mod exchange;
mod exchange_loop;
mod exchange_loop_metrics;
mod headers;
mod instances;
mod message_lane;
mod message_lane_loop;
mod message_race_delivery;
mod message_race_loop;
mod message_race_receiving;
mod metrics;
mod rpc;
mod rpc_errors;
mod substrate_client;
mod substrate_sync_loop;
mod substrate_types;
mod sync;
mod sync_loop;
mod sync_loop_metrics;
mod sync_loop_tests;
mod sync_types;
mod utils;

use ethereum_client::{EthereumConnectionParams, EthereumSigningParams};
use ethereum_deploy_contract::EthereumDeployContractParams;
use ethereum_exchange::EthereumExchangeParams;
use ethereum_exchange_submit::EthereumExchangeSubmitParams;
use ethereum_sync_loop::EthereumSyncParams;
use headers_relay::sync::TargetTransactionMode;
use hex_literal::hex;
use instances::{BridgeInstance, Kovan, Rialto};
use parity_crypto::publickey::{KeyPair, Secret};
use relay_utils::metrics::MetricsParams;
use sp_core::crypto::Pair;
use substrate_client::{SubstrateConnectionParams, SubstrateSigningParams};
use substrate_sync_loop::SubstrateSyncParams;
use sync::HeadersSyncParams;

use headers_relay::sync::HeadersSyncParams;
use std::io::Write;

fn main() {
Expand Down Expand Up @@ -223,19 +209,28 @@ fn substrate_signing_params(matches: &clap::ArgMatches) -> Result<SubstrateSigni
}

fn ethereum_sync_params(matches: &clap::ArgMatches) -> Result<EthereumSyncParams, String> {
let mut sync_params = HeadersSyncParams::ethereum_sync_default();
use crate::ethereum_sync_loop::consts::*;

let mut sync_params = HeadersSyncParams {
max_future_headers_to_download: MAX_FUTURE_HEADERS_TO_DOWNLOAD,
max_headers_in_submitted_status: MAX_SUBMITTED_HEADERS,
max_headers_in_single_submit: MAX_HEADERS_IN_SINGLE_SUBMIT,
max_headers_size_in_single_submit: MAX_HEADERS_SIZE_IN_SINGLE_SUBMIT,
prune_depth: PRUNE_DEPTH,
target_tx_mode: TargetTransactionMode::Signed,
};

match matches.value_of("sub-tx-mode") {
Some("signed") => sync_params.target_tx_mode = sync::TargetTransactionMode::Signed,
Some("signed") => sync_params.target_tx_mode = TargetTransactionMode::Signed,
Some("unsigned") => {
sync_params.target_tx_mode = sync::TargetTransactionMode::Unsigned;
sync_params.target_tx_mode = TargetTransactionMode::Unsigned;

// tx pool won't accept too much unsigned transactions
sync_params.max_headers_in_submitted_status = 10;
}
Some("backup") => sync_params.target_tx_mode = sync::TargetTransactionMode::Backup,
Some("backup") => sync_params.target_tx_mode = TargetTransactionMode::Backup,
Some(mode) => return Err(format!("Invalid sub-tx-mode: {}", mode)),
None => sync_params.target_tx_mode = sync::TargetTransactionMode::Signed,
None => sync_params.target_tx_mode = TargetTransactionMode::Signed,
}

let params = EthereumSyncParams {
Expand All @@ -253,6 +248,8 @@ fn ethereum_sync_params(matches: &clap::ArgMatches) -> Result<EthereumSyncParams
}

fn substrate_sync_params(matches: &clap::ArgMatches) -> Result<SubstrateSyncParams, String> {
use crate::substrate_sync_loop::consts::*;

let eth_contract_address: ethereum_types::Address = if let Some(eth_contract) = matches.value_of("eth-contract") {
eth_contract.parse().map_err(|e| format!("{}", e))?
} else {
Expand All @@ -267,7 +264,14 @@ fn substrate_sync_params(matches: &clap::ArgMatches) -> Result<SubstrateSyncPara
eth_sign: ethereum_signing_params(matches)?,
metrics_params: metrics_params(matches)?,
instance: instance_params(matches)?,
sync_params: HeadersSyncParams::substrate_sync_default(),
sync_params: HeadersSyncParams {
max_future_headers_to_download: MAX_FUTURE_HEADERS_TO_DOWNLOAD,
max_headers_in_submitted_status: MAX_SUBMITTED_HEADERS,
max_headers_in_single_submit: 4,
max_headers_size_in_single_submit: std::usize::MAX,
prune_depth: PRUNE_DEPTH,
target_tx_mode: TargetTransactionMode::Signed,
},
eth_contract_address,
};

Expand Down Expand Up @@ -388,12 +392,12 @@ fn ethereum_exchange_params(matches: &clap::ArgMatches) -> Result<EthereumExchan
Ok(params)
}

fn metrics_params(matches: &clap::ArgMatches) -> Result<Option<metrics::MetricsParams>, String> {
fn metrics_params(matches: &clap::ArgMatches) -> Result<Option<MetricsParams>, String> {
if matches.is_present("no-prometheus") {
return Ok(None);
}

let mut metrics_params = metrics::MetricsParams::default();
let mut metrics_params = MetricsParams::default();

if let Some(prometheus_host) = matches.value_of("prometheus-host") {
metrics_params.host = prometheus_host.into();
Expand Down
3 changes: 1 addition & 2 deletions bridges/relays/ethereum/src/rpc_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.

use crate::utils::MaybeConnectionError;

use jsonrpsee::client::RequestError;
use relay_utils::MaybeConnectionError;

/// Contains common errors that can occur when
/// interacting with a Substrate or Ethereum node
Expand Down
Loading

0 comments on commit d614cda

Please sign in to comment.