Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Changed how relay loops are started (#840)
Browse files Browse the repository at this point in the history
* slightly changed relay loop initialization

* git mv

* clippy

* more clippy

* loop_run -> run_loop

* review and clippy

* clippy
  • Loading branch information
svyatonik authored Mar 24, 2021
1 parent 3866646 commit 68541c2
Show file tree
Hide file tree
Showing 21 changed files with 373 additions and 342 deletions.
2 changes: 1 addition & 1 deletion relays/bin-ethereum/src/ethereum_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ async fn run_auto_transactions_relay_loop(
metrics_params,
futures::future::pending(),
)
.await;
.await?;

Ok(())
}
7 changes: 4 additions & 3 deletions relays/bin-ethereum/src/ethereum_sync_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ pub async fn run(params: EthereumSyncParams) -> Result<(), RpcError> {
instance,
} = params;

let eth_client = async_std::task::block_on(EthereumClient::new(eth_params))?;
let sub_client = async_std::task::block_on(SubstrateClient::<Rialto>::new(sub_params))?;
let eth_client = EthereumClient::new(eth_params).await?;
let sub_client = SubstrateClient::<Rialto>::new(sub_params).await?;

let sign_sub_transactions = match sync_params.target_tx_mode {
TargetTransactionMode::Signed | TargetTransactionMode::Backup => true,
Expand All @@ -279,7 +279,8 @@ pub async fn run(params: EthereumSyncParams) -> Result<(), RpcError> {
metrics_params,
futures::future::pending(),
)
.await;
.await
.map_err(RpcError::SyncLoop)?;

Ok(())
}
3 changes: 3 additions & 0 deletions relays/bin-ethereum/src/rpc_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub enum RpcError {
Ethereum(EthereumNodeError),
/// An error occured when interacting with a Substrate node.
Substrate(SubstrateNodeError),
/// Error running relay loop.
SyncLoop(String),
}

impl From<RpcError> for String {
Expand All @@ -37,6 +39,7 @@ impl From<RpcError> for String {
RpcError::Serialization(e) => e.to_string(),
RpcError::Ethereum(e) => e.to_string(),
RpcError::Substrate(e) => e.to_string(),
RpcError::SyncLoop(e) => e,
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions relays/bin-ethereum/src/substrate_sync_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ pub async fn run(params: SubstrateSyncParams) -> Result<(), RpcError> {
metrics_params,
} = params;

let eth_client = async_std::task::block_on(EthereumClient::new(eth_params))?;
let sub_client = async_std::task::block_on(SubstrateClient::<Rialto>::new(sub_params))?;
let eth_client = EthereumClient::new(eth_params).await?;
let sub_client = SubstrateClient::<Rialto>::new(sub_params).await?;

let target = EthereumHeadersTarget::new(eth_client, eth_contract_address, eth_sign);
let source = SubstrateHeadersSource::new(sub_client);
Expand All @@ -189,7 +189,8 @@ pub async fn run(params: SubstrateSyncParams) -> Result<(), RpcError> {
metrics_params,
futures::future::pending(),
)
.await;
.await
.map_err(RpcError::SyncLoop)?;

Ok(())
}
5 changes: 3 additions & 2 deletions relays/bin-substrate/src/finality_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ pub async fn run<SourceChain, TargetChain, P>(
source_client: Client<SourceChain>,
target_client: Client<TargetChain>,
metrics_params: Option<relay_utils::metrics::MetricsParams>,
) where
) -> Result<(), String>
where
P: SubstrateFinalitySyncPipeline<
Hash = HashOf<SourceChain>,
Number = BlockNumberOf<SourceChain>,
Expand Down Expand Up @@ -127,5 +128,5 @@ pub async fn run<SourceChain, TargetChain, P>(
metrics_params,
futures::future::pending(),
)
.await;
.await
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ pub async fn run(
rialto_client: RialtoClient,
rialto_sign: RialtoSigningParams,
metrics_params: Option<relay_utils::metrics::MetricsParams>,
) {
) -> Result<(), String> {
crate::finality_pipeline::run(
MillauFinalityToRialto::new(rialto_client.clone(), rialto_sign),
millau_client,
rialto_client,
metrics_params,
)
.await;
.await
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ pub async fn run(
rialto_sign: RialtoSigningParams,
lane_id: LaneId,
metrics_params: Option<MetricsParams>,
) {
) -> Result<(), String> {
let stall_timeout = Duration::from_secs(5 * 60);
let relayer_id_at_millau = millau_sign.signer.public().as_array_ref().clone().into();

Expand Down Expand Up @@ -186,5 +186,5 @@ pub async fn run(
metrics_params,
futures::future::pending(),
)
.await;
.await
}
12 changes: 5 additions & 7 deletions relays/bin-substrate/src/rialto_millau/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ async fn run_relay_headers(command: cli::RelayHeaders) -> Result<(), String> {
let millau_client = millau.into_client().await?;
let rialto_client = rialto.into_client().await?;
let rialto_sign = rialto_sign.parse()?;
millau_headers_to_rialto::run(millau_client, rialto_client, rialto_sign, prometheus_params.into()).await;
millau_headers_to_rialto::run(millau_client, rialto_client, rialto_sign, prometheus_params.into()).await
}
cli::RelayHeaders::RialtoToMillau {
rialto,
Expand All @@ -162,7 +162,7 @@ async fn run_relay_headers(command: cli::RelayHeaders) -> Result<(), String> {
let rialto_client = rialto.into_client().await?;
let millau_client = millau.into_client().await?;
let millau_sign = millau_sign.parse()?;
rialto_headers_to_millau::run(rialto_client, millau_client, millau_sign, prometheus_params.into()).await;
rialto_headers_to_millau::run(rialto_client, millau_client, millau_sign, prometheus_params.into()).await
}
cli::RelayHeaders::WestendToMillau {
westend,
Expand All @@ -173,10 +173,9 @@ async fn run_relay_headers(command: cli::RelayHeaders) -> Result<(), String> {
let westend_client = westend.into_client().await?;
let millau_client = millau.into_client().await?;
let millau_sign = millau_sign.parse()?;
westend_headers_to_millau::run(westend_client, millau_client, millau_sign, prometheus_params.into()).await;
westend_headers_to_millau::run(westend_client, millau_client, millau_sign, prometheus_params.into()).await
}
}
Ok(())
}

async fn run_relay_messages(command: cli::RelayMessages) -> Result<(), String> {
Expand All @@ -202,7 +201,7 @@ async fn run_relay_messages(command: cli::RelayMessages) -> Result<(), String> {
lane.into(),
prometheus_params.into(),
)
.await;
.await
}
cli::RelayMessages::RialtoToMillau {
rialto,
Expand All @@ -225,10 +224,9 @@ async fn run_relay_messages(command: cli::RelayMessages) -> Result<(), String> {
lane.into(),
prometheus_params.into(),
)
.await;
.await
}
}
Ok(())
}

async fn run_send_message(command: cli::SendMessage) -> Result<(), String> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ pub async fn run(
millau_client: MillauClient,
millau_sign: MillauSigningParams,
metrics_params: Option<relay_utils::metrics::MetricsParams>,
) {
) -> Result<(), String> {
crate::finality_pipeline::run(
RialtoFinalityToMillau::new(millau_client.clone(), millau_sign),
rialto_client,
millau_client,
metrics_params,
)
.await;
.await
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ pub async fn run(
millau_sign: MillauSigningParams,
lane_id: LaneId,
metrics_params: Option<MetricsParams>,
) {
) -> Result<(), String> {
let stall_timeout = Duration::from_secs(5 * 60);
let relayer_id_at_rialto = rialto_sign.signer.public().as_array_ref().clone().into();

Expand Down Expand Up @@ -185,5 +185,5 @@ pub async fn run(
metrics_params,
futures::future::pending(),
)
.await;
.await
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ pub async fn run(
millau_client: MillauClient,
millau_sign: MillauSigningParams,
metrics_params: Option<relay_utils::metrics::MetricsParams>,
) {
) -> Result<(), String> {
crate::finality_pipeline::run(
WestendFinalityToMillau::new(millau_client.clone(), millau_sign),
westend_client,
millau_client,
metrics_params,
)
.await;
.await
}
50 changes: 14 additions & 36 deletions relays/exchange/src/exchange_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use backoff::backoff::Backoff;
use futures::{future::FutureExt, select};
use num_traits::One;
use relay_utils::{
metrics::{start as metrics_start, GlobalMetrics, MetricsParams},
metrics::{GlobalMetrics, MetricsParams},
retry_backoff, FailedClient, MaybeConnectionError,
};
use std::future::Future;
Expand Down Expand Up @@ -85,50 +85,32 @@ pub async fn run<P: TransactionProofPipeline>(
target_client: impl TargetClient<P>,
metrics_params: Option<MetricsParams>,
exit_signal: impl Future<Output = ()>,
) {
) -> Result<(), String> {
let exit_signal = exit_signal.shared();
let metrics_global = GlobalMetrics::default();
let metrics_exch = ExchangeLoopMetrics::default();
let metrics_enabled = metrics_params.is_some();
metrics_start(
format!("{}_to_{}_Exchange", P::SOURCE_NAME, P::TARGET_NAME),
metrics_params,
&metrics_global,
&metrics_exch,
);

relay_utils::relay_loop::run(
relay_utils::relay_loop::RECONNECT_DELAY,
source_client,
target_client,
|source_client, target_client| {

relay_utils::relay_loop(source_client, target_client)
.with_metrics(format!("{}_to_{}_Exchange", P::SOURCE_NAME, P::TARGET_NAME))
.loop_metric(ExchangeLoopMetrics::default())?
.standalone_metric(GlobalMetrics::default())?
.expose(metrics_params)
.await?
.run(|source_client, target_client, metrics| {
run_until_connection_lost(
storage.clone(),
source_client,
target_client,
if metrics_enabled {
Some(metrics_global.clone())
} else {
None
},
if metrics_enabled {
Some(metrics_exch.clone())
} else {
None
},
metrics,
exit_signal.clone(),
)
},
)
.await;
})
.await
}

/// Run proofs synchronization.
async fn run_until_connection_lost<P: TransactionProofPipeline>(
mut storage: impl TransactionProofsRelayStorage<BlockNumber = BlockNumberOf<P>>,
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
metrics_global: Option<GlobalMetrics>,
metrics_exch: Option<ExchangeLoopMetrics>,
exit_signal: impl Future<Output = ()>,
) -> Result<(), FailedClient> {
Expand All @@ -151,10 +133,6 @@ async fn run_until_connection_lost<P: TransactionProofPipeline>(
)
.await;

if let Some(ref metrics_global) = metrics_global {
metrics_global.update().await;
}

if let Err((is_connection_error, failed_client)) = iteration_result {
if is_connection_error {
return Err(failed_client);
Expand Down Expand Up @@ -321,7 +299,7 @@ mod tests {
}
}));

async_std::task::block_on(run(
let _ = async_std::task::block_on(run(
storage,
source,
target,
Expand Down
49 changes: 12 additions & 37 deletions relays/finality/src/finality_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use futures::{select, Future, FutureExt, Stream, StreamExt};
use headers_relay::sync_loop_metrics::SyncLoopMetrics;
use num_traits::{One, Saturating};
use relay_utils::{
metrics::{start as metrics_start, GlobalMetrics, MetricsParams},
metrics::{GlobalMetrics, MetricsParams},
relay_loop::Client as RelayClient,
retry_backoff, FailedClient, MaybeConnectionError,
};
Expand Down Expand Up @@ -97,43 +97,24 @@ pub async fn run<P: FinalitySyncPipeline>(
sync_params: FinalitySyncParams,
metrics_params: Option<MetricsParams>,
exit_signal: impl Future<Output = ()>,
) {
) -> Result<(), String> {
let exit_signal = exit_signal.shared();

let metrics_global = GlobalMetrics::default();
let metrics_sync = SyncLoopMetrics::default();
let metrics_enabled = metrics_params.is_some();
metrics_start(
format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME),
metrics_params,
&metrics_global,
&metrics_sync,
);

relay_utils::relay_loop::run(
relay_utils::relay_loop::RECONNECT_DELAY,
source_client,
target_client,
|source_client, target_client| {
relay_utils::relay_loop(source_client, target_client)
.with_metrics(format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME))
.loop_metric(SyncLoopMetrics::default())?
.standalone_metric(GlobalMetrics::default())?
.expose(metrics_params)
.await?
.run(|source_client, target_client, metrics| {
run_until_connection_lost(
source_client,
target_client,
sync_params.clone(),
if metrics_enabled {
Some(metrics_global.clone())
} else {
None
},
if metrics_enabled {
Some(metrics_sync.clone())
} else {
None
},
metrics,
exit_signal.clone(),
)
},
)
.await;
})
.await
}

/// Unjustified headers container. Ordered by header number.
Expand Down Expand Up @@ -221,7 +202,6 @@ async fn run_until_connection_lost<P: FinalitySyncPipeline>(
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
sync_params: FinalitySyncParams,
metrics_global: Option<GlobalMetrics>,
metrics_sync: Option<SyncLoopMetrics>,
exit_signal: impl Future<Output = ()>,
) -> Result<(), FailedClient> {
Expand Down Expand Up @@ -267,11 +247,6 @@ async fn run_until_connection_lost<P: FinalitySyncPipeline>(
)
.await;

// update global metrics
if let Some(ref metrics_global) = metrics_global {
metrics_global.update().await;
}

// deal with errors
let next_tick = match iteration_result {
Ok(updated_last_transaction) => {
Expand Down
2 changes: 1 addition & 1 deletion relays/finality/src/finality_loop_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ fn run_sync_loop(state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync
stall_timeout: Duration::from_secs(1),
};

async_std::task::block_on(run(
let _ = async_std::task::block_on(run(
source_client,
target_client,
sync_params,
Expand Down
Loading

0 comments on commit 68541c2

Please sign in to comment.