From 68541c2d116873daef120687374307f898ae9a47 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Wed, 24 Mar 2021 11:46:30 +0300 Subject: [PATCH] Changed how relay loops are started (#840) * slightly changed relay loop initialization * git mv * clippy * more clippy * loop_run -> run_loop * review and clippy * clippy --- relays/bin-ethereum/src/ethereum_exchange.rs | 2 +- relays/bin-ethereum/src/ethereum_sync_loop.rs | 7 +- relays/bin-ethereum/src/rpc_errors.rs | 3 + .../bin-ethereum/src/substrate_sync_loop.rs | 7 +- relays/bin-substrate/src/finality_pipeline.rs | 5 +- .../rialto_millau/millau_headers_to_rialto.rs | 4 +- .../millau_messages_to_rialto.rs | 4 +- relays/bin-substrate/src/rialto_millau/mod.rs | 12 +- .../rialto_millau/rialto_headers_to_millau.rs | 4 +- .../rialto_messages_to_millau.rs | 4 +- .../westend_headers_to_millau.rs | 4 +- relays/exchange/src/exchange_loop.rs | 50 ++--- relays/finality/src/finality_loop.rs | 49 ++--- relays/finality/src/finality_loop_tests.rs | 2 +- relays/headers/src/sync_loop.rs | 47 ++-- relays/headers/src/sync_loop_tests.rs | 2 +- relays/messages/src/message_lane_loop.rs | 51 ++--- relays/utils/src/lib.rs | 2 + relays/utils/src/metrics.rs | 143 ++---------- relays/utils/src/metrics/global.rs | 109 ++++++++++ relays/utils/src/relay_loop.rs | 204 +++++++++++++----- 21 files changed, 373 insertions(+), 342 deletions(-) create mode 100644 relays/utils/src/metrics/global.rs diff --git a/relays/bin-ethereum/src/ethereum_exchange.rs b/relays/bin-ethereum/src/ethereum_exchange.rs index 5fed62b9ca68..c075ecb7f81b 100644 --- a/relays/bin-ethereum/src/ethereum_exchange.rs +++ b/relays/bin-ethereum/src/ethereum_exchange.rs @@ -385,7 +385,7 @@ async fn run_auto_transactions_relay_loop( metrics_params, futures::future::pending(), ) - .await; + .await?; Ok(()) } diff --git a/relays/bin-ethereum/src/ethereum_sync_loop.rs b/relays/bin-ethereum/src/ethereum_sync_loop.rs index b4fd788f9f67..ab6483cdb881 100644 --- a/relays/bin-ethereum/src/ethereum_sync_loop.rs +++ b/relays/bin-ethereum/src/ethereum_sync_loop.rs @@ -258,8 +258,8 @@ pub async fn run(params: EthereumSyncParams) -> Result<(), RpcError> { instance, } = params; - let eth_client = async_std::task::block_on(EthereumClient::new(eth_params))?; - let sub_client = async_std::task::block_on(SubstrateClient::::new(sub_params))?; + let eth_client = EthereumClient::new(eth_params).await?; + let sub_client = SubstrateClient::::new(sub_params).await?; let sign_sub_transactions = match sync_params.target_tx_mode { TargetTransactionMode::Signed | TargetTransactionMode::Backup => true, @@ -279,7 +279,8 @@ pub async fn run(params: EthereumSyncParams) -> Result<(), RpcError> { metrics_params, futures::future::pending(), ) - .await; + .await + .map_err(RpcError::SyncLoop)?; Ok(()) } diff --git a/relays/bin-ethereum/src/rpc_errors.rs b/relays/bin-ethereum/src/rpc_errors.rs index 9f7f14cf9a1f..6f1c06040e11 100644 --- a/relays/bin-ethereum/src/rpc_errors.rs +++ b/relays/bin-ethereum/src/rpc_errors.rs @@ -29,6 +29,8 @@ pub enum RpcError { Ethereum(EthereumNodeError), /// An error occured when interacting with a Substrate node. Substrate(SubstrateNodeError), + /// Error running relay loop. + SyncLoop(String), } impl From for String { @@ -37,6 +39,7 @@ impl From for String { RpcError::Serialization(e) => e.to_string(), RpcError::Ethereum(e) => e.to_string(), RpcError::Substrate(e) => e.to_string(), + RpcError::SyncLoop(e) => e, } } } diff --git a/relays/bin-ethereum/src/substrate_sync_loop.rs b/relays/bin-ethereum/src/substrate_sync_loop.rs index a0ff44d4d9be..f72fcd96324d 100644 --- a/relays/bin-ethereum/src/substrate_sync_loop.rs +++ b/relays/bin-ethereum/src/substrate_sync_loop.rs @@ -173,8 +173,8 @@ pub async fn run(params: SubstrateSyncParams) -> Result<(), RpcError> { metrics_params, } = params; - let eth_client = async_std::task::block_on(EthereumClient::new(eth_params))?; - let sub_client = async_std::task::block_on(SubstrateClient::::new(sub_params))?; + let eth_client = EthereumClient::new(eth_params).await?; + let sub_client = SubstrateClient::::new(sub_params).await?; let target = EthereumHeadersTarget::new(eth_client, eth_contract_address, eth_sign); let source = SubstrateHeadersSource::new(sub_client); @@ -189,7 +189,8 @@ pub async fn run(params: SubstrateSyncParams) -> Result<(), RpcError> { metrics_params, futures::future::pending(), ) - .await; + .await + .map_err(RpcError::SyncLoop)?; Ok(()) } diff --git a/relays/bin-substrate/src/finality_pipeline.rs b/relays/bin-substrate/src/finality_pipeline.rs index 574db6a3f533..0ddc4a9a8de9 100644 --- a/relays/bin-substrate/src/finality_pipeline.rs +++ b/relays/bin-substrate/src/finality_pipeline.rs @@ -98,7 +98,8 @@ pub async fn run( source_client: Client, target_client: Client, metrics_params: Option, -) where +) -> Result<(), String> +where P: SubstrateFinalitySyncPipeline< Hash = HashOf, Number = BlockNumberOf, @@ -127,5 +128,5 @@ pub async fn run( metrics_params, futures::future::pending(), ) - .await; + .await } diff --git a/relays/bin-substrate/src/rialto_millau/millau_headers_to_rialto.rs b/relays/bin-substrate/src/rialto_millau/millau_headers_to_rialto.rs index 934c77830a68..a14fb691275a 100644 --- a/relays/bin-substrate/src/rialto_millau/millau_headers_to_rialto.rs +++ b/relays/bin-substrate/src/rialto_millau/millau_headers_to_rialto.rs @@ -58,12 +58,12 @@ pub async fn run( rialto_client: RialtoClient, rialto_sign: RialtoSigningParams, metrics_params: Option, -) { +) -> Result<(), String> { crate::finality_pipeline::run( MillauFinalityToRialto::new(rialto_client.clone(), rialto_sign), millau_client, rialto_client, metrics_params, ) - .await; + .await } diff --git a/relays/bin-substrate/src/rialto_millau/millau_messages_to_rialto.rs b/relays/bin-substrate/src/rialto_millau/millau_messages_to_rialto.rs index 5b6d080d5a74..df2cb3a03dee 100644 --- a/relays/bin-substrate/src/rialto_millau/millau_messages_to_rialto.rs +++ b/relays/bin-substrate/src/rialto_millau/millau_messages_to_rialto.rs @@ -132,7 +132,7 @@ pub async fn run( rialto_sign: RialtoSigningParams, lane_id: LaneId, metrics_params: Option, -) { +) -> Result<(), String> { let stall_timeout = Duration::from_secs(5 * 60); let relayer_id_at_millau = millau_sign.signer.public().as_array_ref().clone().into(); @@ -186,5 +186,5 @@ pub async fn run( metrics_params, futures::future::pending(), ) - .await; + .await } diff --git a/relays/bin-substrate/src/rialto_millau/mod.rs b/relays/bin-substrate/src/rialto_millau/mod.rs index d3e85ca14eac..eadee1484f7b 100644 --- a/relays/bin-substrate/src/rialto_millau/mod.rs +++ b/relays/bin-substrate/src/rialto_millau/mod.rs @@ -151,7 +151,7 @@ async fn run_relay_headers(command: cli::RelayHeaders) -> Result<(), String> { let millau_client = millau.into_client().await?; let rialto_client = rialto.into_client().await?; let rialto_sign = rialto_sign.parse()?; - millau_headers_to_rialto::run(millau_client, rialto_client, rialto_sign, prometheus_params.into()).await; + millau_headers_to_rialto::run(millau_client, rialto_client, rialto_sign, prometheus_params.into()).await } cli::RelayHeaders::RialtoToMillau { rialto, @@ -162,7 +162,7 @@ async fn run_relay_headers(command: cli::RelayHeaders) -> Result<(), String> { let rialto_client = rialto.into_client().await?; let millau_client = millau.into_client().await?; let millau_sign = millau_sign.parse()?; - rialto_headers_to_millau::run(rialto_client, millau_client, millau_sign, prometheus_params.into()).await; + rialto_headers_to_millau::run(rialto_client, millau_client, millau_sign, prometheus_params.into()).await } cli::RelayHeaders::WestendToMillau { westend, @@ -173,10 +173,9 @@ async fn run_relay_headers(command: cli::RelayHeaders) -> Result<(), String> { let westend_client = westend.into_client().await?; let millau_client = millau.into_client().await?; let millau_sign = millau_sign.parse()?; - westend_headers_to_millau::run(westend_client, millau_client, millau_sign, prometheus_params.into()).await; + westend_headers_to_millau::run(westend_client, millau_client, millau_sign, prometheus_params.into()).await } } - Ok(()) } async fn run_relay_messages(command: cli::RelayMessages) -> Result<(), String> { @@ -202,7 +201,7 @@ async fn run_relay_messages(command: cli::RelayMessages) -> Result<(), String> { lane.into(), prometheus_params.into(), ) - .await; + .await } cli::RelayMessages::RialtoToMillau { rialto, @@ -225,10 +224,9 @@ async fn run_relay_messages(command: cli::RelayMessages) -> Result<(), String> { lane.into(), prometheus_params.into(), ) - .await; + .await } } - Ok(()) } async fn run_send_message(command: cli::SendMessage) -> Result<(), String> { diff --git a/relays/bin-substrate/src/rialto_millau/rialto_headers_to_millau.rs b/relays/bin-substrate/src/rialto_millau/rialto_headers_to_millau.rs index 046883f11c32..3a6e7a8fc302 100644 --- a/relays/bin-substrate/src/rialto_millau/rialto_headers_to_millau.rs +++ b/relays/bin-substrate/src/rialto_millau/rialto_headers_to_millau.rs @@ -61,12 +61,12 @@ pub async fn run( millau_client: MillauClient, millau_sign: MillauSigningParams, metrics_params: Option, -) { +) -> Result<(), String> { crate::finality_pipeline::run( RialtoFinalityToMillau::new(millau_client.clone(), millau_sign), rialto_client, millau_client, metrics_params, ) - .await; + .await } diff --git a/relays/bin-substrate/src/rialto_millau/rialto_messages_to_millau.rs b/relays/bin-substrate/src/rialto_millau/rialto_messages_to_millau.rs index df94c25f3ce6..97df3734ff7d 100644 --- a/relays/bin-substrate/src/rialto_millau/rialto_messages_to_millau.rs +++ b/relays/bin-substrate/src/rialto_millau/rialto_messages_to_millau.rs @@ -132,7 +132,7 @@ pub async fn run( millau_sign: MillauSigningParams, lane_id: LaneId, metrics_params: Option, -) { +) -> Result<(), String> { let stall_timeout = Duration::from_secs(5 * 60); let relayer_id_at_rialto = rialto_sign.signer.public().as_array_ref().clone().into(); @@ -185,5 +185,5 @@ pub async fn run( metrics_params, futures::future::pending(), ) - .await; + .await } diff --git a/relays/bin-substrate/src/rialto_millau/westend_headers_to_millau.rs b/relays/bin-substrate/src/rialto_millau/westend_headers_to_millau.rs index ded1df32ce73..4f2ed8a21173 100644 --- a/relays/bin-substrate/src/rialto_millau/westend_headers_to_millau.rs +++ b/relays/bin-substrate/src/rialto_millau/westend_headers_to_millau.rs @@ -61,12 +61,12 @@ pub async fn run( millau_client: MillauClient, millau_sign: MillauSigningParams, metrics_params: Option, -) { +) -> Result<(), String> { crate::finality_pipeline::run( WestendFinalityToMillau::new(millau_client.clone(), millau_sign), westend_client, millau_client, metrics_params, ) - .await; + .await } diff --git a/relays/exchange/src/exchange_loop.rs b/relays/exchange/src/exchange_loop.rs index d3282d70f664..101a655c2576 100644 --- a/relays/exchange/src/exchange_loop.rs +++ b/relays/exchange/src/exchange_loop.rs @@ -26,7 +26,7 @@ use backoff::backoff::Backoff; use futures::{future::FutureExt, select}; use num_traits::One; use relay_utils::{ - metrics::{start as metrics_start, GlobalMetrics, MetricsParams}, + metrics::{GlobalMetrics, MetricsParams}, retry_backoff, FailedClient, MaybeConnectionError, }; use std::future::Future; @@ -85,42 +85,25 @@ pub async fn run( target_client: impl TargetClient

, metrics_params: Option, exit_signal: impl Future, -) { +) -> Result<(), String> { let exit_signal = exit_signal.shared(); - let metrics_global = GlobalMetrics::default(); - let metrics_exch = ExchangeLoopMetrics::default(); - let metrics_enabled = metrics_params.is_some(); - metrics_start( - format!("{}_to_{}_Exchange", P::SOURCE_NAME, P::TARGET_NAME), - metrics_params, - &metrics_global, - &metrics_exch, - ); - - relay_utils::relay_loop::run( - relay_utils::relay_loop::RECONNECT_DELAY, - source_client, - target_client, - |source_client, target_client| { + + relay_utils::relay_loop(source_client, target_client) + .with_metrics(format!("{}_to_{}_Exchange", P::SOURCE_NAME, P::TARGET_NAME)) + .loop_metric(ExchangeLoopMetrics::default())? + .standalone_metric(GlobalMetrics::default())? + .expose(metrics_params) + .await? + .run(|source_client, target_client, metrics| { run_until_connection_lost( storage.clone(), source_client, target_client, - if metrics_enabled { - Some(metrics_global.clone()) - } else { - None - }, - if metrics_enabled { - Some(metrics_exch.clone()) - } else { - None - }, + metrics, exit_signal.clone(), ) - }, - ) - .await; + }) + .await } /// Run proofs synchronization. @@ -128,7 +111,6 @@ async fn run_until_connection_lost( mut storage: impl TransactionProofsRelayStorage>, source_client: impl SourceClient

, target_client: impl TargetClient

, - metrics_global: Option, metrics_exch: Option, exit_signal: impl Future, ) -> Result<(), FailedClient> { @@ -151,10 +133,6 @@ async fn run_until_connection_lost( ) .await; - if let Some(ref metrics_global) = metrics_global { - metrics_global.update().await; - } - if let Err((is_connection_error, failed_client)) = iteration_result { if is_connection_error { return Err(failed_client); @@ -321,7 +299,7 @@ mod tests { } })); - async_std::task::block_on(run( + let _ = async_std::task::block_on(run( storage, source, target, diff --git a/relays/finality/src/finality_loop.rs b/relays/finality/src/finality_loop.rs index 7aafce075e60..512c51b4b242 100644 --- a/relays/finality/src/finality_loop.rs +++ b/relays/finality/src/finality_loop.rs @@ -27,7 +27,7 @@ use futures::{select, Future, FutureExt, Stream, StreamExt}; use headers_relay::sync_loop_metrics::SyncLoopMetrics; use num_traits::{One, Saturating}; use relay_utils::{ - metrics::{start as metrics_start, GlobalMetrics, MetricsParams}, + metrics::{GlobalMetrics, MetricsParams}, relay_loop::Client as RelayClient, retry_backoff, FailedClient, MaybeConnectionError, }; @@ -97,43 +97,24 @@ pub async fn run( sync_params: FinalitySyncParams, metrics_params: Option, exit_signal: impl Future, -) { +) -> Result<(), String> { let exit_signal = exit_signal.shared(); - - let metrics_global = GlobalMetrics::default(); - let metrics_sync = SyncLoopMetrics::default(); - let metrics_enabled = metrics_params.is_some(); - metrics_start( - format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME), - metrics_params, - &metrics_global, - &metrics_sync, - ); - - relay_utils::relay_loop::run( - relay_utils::relay_loop::RECONNECT_DELAY, - source_client, - target_client, - |source_client, target_client| { + relay_utils::relay_loop(source_client, target_client) + .with_metrics(format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME)) + .loop_metric(SyncLoopMetrics::default())? + .standalone_metric(GlobalMetrics::default())? + .expose(metrics_params) + .await? + .run(|source_client, target_client, metrics| { run_until_connection_lost( source_client, target_client, sync_params.clone(), - if metrics_enabled { - Some(metrics_global.clone()) - } else { - None - }, - if metrics_enabled { - Some(metrics_sync.clone()) - } else { - None - }, + metrics, exit_signal.clone(), ) - }, - ) - .await; + }) + .await } /// Unjustified headers container. Ordered by header number. @@ -221,7 +202,6 @@ async fn run_until_connection_lost( source_client: impl SourceClient

, target_client: impl TargetClient

, sync_params: FinalitySyncParams, - metrics_global: Option, metrics_sync: Option, exit_signal: impl Future, ) -> Result<(), FailedClient> { @@ -267,11 +247,6 @@ async fn run_until_connection_lost( ) .await; - // update global metrics - if let Some(ref metrics_global) = metrics_global { - metrics_global.update().await; - } - // deal with errors let next_tick = match iteration_result { Ok(updated_last_transaction) => { diff --git a/relays/finality/src/finality_loop_tests.rs b/relays/finality/src/finality_loop_tests.rs index 64ab91420fb9..e19f924376d7 100644 --- a/relays/finality/src/finality_loop_tests.rs +++ b/relays/finality/src/finality_loop_tests.rs @@ -202,7 +202,7 @@ fn run_sync_loop(state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync stall_timeout: Duration::from_secs(1), }; - async_std::task::block_on(run( + let _ = async_std::task::block_on(run( source_client, target_client, sync_params, diff --git a/relays/headers/src/sync_loop.rs b/relays/headers/src/sync_loop.rs index 7da8fd4f42fe..9b55456f7900 100644 --- a/relays/headers/src/sync_loop.rs +++ b/relays/headers/src/sync_loop.rs @@ -25,7 +25,7 @@ use futures::{future::FutureExt, stream::StreamExt}; use num_traits::{Saturating, Zero}; use relay_utils::{ format_ids, interval, - metrics::{start as metrics_start, GlobalMetrics, MetricsParams}, + metrics::{GlobalMetrics, MetricsParams}, process_future_result, relay_loop::Client as RelayClient, retry_backoff, FailedClient, MaybeConnectionError, StringifiedMaybeConnectionError, @@ -121,24 +121,15 @@ pub async fn run>( sync_params: HeadersSyncParams, metrics_params: Option, exit_signal: impl Future, -) { +) -> Result<(), String> { let exit_signal = exit_signal.shared(); - - let metrics_global = GlobalMetrics::default(); - let metrics_sync = SyncLoopMetrics::default(); - let metrics_enabled = metrics_params.is_some(); - metrics_start( - format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME), - metrics_params, - &metrics_global, - &metrics_sync, - ); - - relay_utils::relay_loop::run( - relay_utils::relay_loop::RECONNECT_DELAY, - source_client, - target_client, - |source_client, target_client| { + relay_utils::relay_loop(source_client, target_client) + .with_metrics(format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME)) + .loop_metric(SyncLoopMetrics::default())? + .standalone_metric(GlobalMetrics::default())? + .expose(metrics_params) + .await? + .run(|source_client, target_client, metrics| { run_until_connection_lost( source_client, source_tick, @@ -146,21 +137,11 @@ pub async fn run>( target_tick, sync_maintain.clone(), sync_params.clone(), - if metrics_enabled { - Some(metrics_global.clone()) - } else { - None - }, - if metrics_enabled { - Some(metrics_sync.clone()) - } else { - None - }, + metrics, exit_signal.clone(), ) - }, - ) - .await; + }) + .await } /// Run headers synchronization. @@ -172,7 +153,6 @@ async fn run_until_connection_lost>( target_tick: Duration, sync_maintain: impl SyncMaintain

, sync_params: HeadersSyncParams, - metrics_global: Option, metrics_sync: Option, exit_signal: impl Future, ) -> Result<(), FailedClient> { @@ -438,9 +418,6 @@ async fn run_until_connection_lost>( } // update metrics - if let Some(ref metrics_global) = metrics_global { - metrics_global.update().await; - } if let Some(ref metrics_sync) = metrics_sync { metrics_sync.update(&sync); } diff --git a/relays/headers/src/sync_loop_tests.rs b/relays/headers/src/sync_loop_tests.rs index aec9f1fa883f..03303214bc52 100644 --- a/relays/headers/src/sync_loop_tests.rs +++ b/relays/headers/src/sync_loop_tests.rs @@ -493,7 +493,7 @@ fn run_sync_loop_test(params: SyncLoopTestParams) { target.data.lock().requires_extra = target_requires_extra; target.data.lock().requires_completion = target_requires_completion; - async_std::task::block_on(run( + let _ = async_std::task::block_on(run( source, test_tick(), target, diff --git a/relays/messages/src/message_lane_loop.rs b/relays/messages/src/message_lane_loop.rs index 44d31aec975b..77265f4ac96b 100644 --- a/relays/messages/src/message_lane_loop.rs +++ b/relays/messages/src/message_lane_loop.rs @@ -34,7 +34,7 @@ use bp_messages::{LaneId, MessageNonce, UnrewardedRelayersState, Weight}; use futures::{channel::mpsc::unbounded, future::FutureExt, stream::StreamExt}; use relay_utils::{ interval, - metrics::{start as metrics_start, GlobalMetrics, MetricsParams}, + metrics::{GlobalMetrics, MetricsParams}, process_future_result, relay_loop::Client as RelayClient, retry_backoff, FailedClient, @@ -212,47 +212,29 @@ pub async fn run( target_client: impl TargetClient

, metrics_params: Option, exit_signal: impl Future, -) { +) -> Result<(), String> { let exit_signal = exit_signal.shared(); - let metrics_global = GlobalMetrics::default(); - let metrics_msg = MessageLaneLoopMetrics::default(); - let metrics_enabled = metrics_params.is_some(); - metrics_start( - format!( + relay_utils::relay_loop(source_client, target_client) + .with_metrics(format!( "{}_to_{}_MessageLane_{}", P::SOURCE_NAME, P::TARGET_NAME, hex::encode(params.lane) - ), - metrics_params, - &metrics_global, - &metrics_msg, - ); - - relay_utils::relay_loop::run( - params.reconnect_delay, - source_client, - target_client, - |source_client, target_client| { + )) + .loop_metric(MessageLaneLoopMetrics::default())? + .standalone_metric(GlobalMetrics::default())? + .expose(metrics_params) + .await? + .run(|source_client, target_client, metrics| { run_until_connection_lost( params.clone(), source_client, target_client, - if metrics_enabled { - Some(metrics_global.clone()) - } else { - None - }, - if metrics_enabled { - Some(metrics_msg.clone()) - } else { - None - }, + metrics, exit_signal.clone(), ) - }, - ) - .await; + }) + .await } /// Run one-way message delivery loop until connection with target or source node is lost, or exit signal is received. @@ -260,7 +242,6 @@ async fn run_until_connection_lost, TC: Targ params: Params, source_client: SC, target_client: TC, - metrics_global: Option, metrics_msg: Option, exit_signal: impl Future, ) -> Result<(), FailedClient> { @@ -404,10 +385,6 @@ async fn run_until_connection_lost, TC: Targ } } - if let Some(ref metrics_global) = metrics_global { - metrics_global.update().await; - } - if source_client_is_online && source_state_required { log::debug!(target: "bridge", "Asking {} node about its state", P::SOURCE_NAME); source_state.set(source_client.state().fuse()); @@ -707,7 +684,7 @@ pub(crate) mod tests { data: data.clone(), tick: target_tick, }; - run( + let _ = run( Params { lane: [0, 0, 0, 0], source_tick: Duration::from_millis(100), diff --git a/relays/utils/src/lib.rs b/relays/utils/src/lib.rs index f787e8763a7a..0dcc7e9a4f98 100644 --- a/relays/utils/src/lib.rs +++ b/relays/utils/src/lib.rs @@ -16,6 +16,8 @@ //! Utilities used by different relays. +pub use relay_loop::relay_loop; + use backoff::{backoff::Backoff, ExponentialBackoff}; use futures::future::FutureExt; use std::time::Duration; diff --git a/relays/utils/src/metrics.rs b/relays/utils/src/metrics.rs index f38d1bda3a5d..c42b10cad97f 100644 --- a/relays/utils/src/metrics.rs +++ b/relays/utils/src/metrics.rs @@ -14,12 +14,13 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . +pub use global::GlobalMetrics; pub use substrate_prometheus_endpoint::{register, Counter, CounterVec, Gauge, GaugeVec, Opts, Registry, F64, U64}; -use async_std::sync::{Arc, Mutex}; -use std::net::SocketAddr; -use substrate_prometheus_endpoint::init_prometheus; -use sysinfo::{ProcessExt, RefreshKind, System, SystemExt}; +use async_trait::async_trait; +use std::time::Duration; + +mod global; /// Prometheus endpoint MetricsParams. #[derive(Debug, Clone)] @@ -31,62 +32,32 @@ pub struct MetricsParams { } /// Metrics API. -pub trait Metrics { +pub trait Metrics: Clone + Send + Sync + 'static { /// Register metrics in the registry. fn register(&self, registry: &Registry) -> Result<(), String>; } -/// Global Prometheus metrics. -#[derive(Debug, Clone)] -pub struct GlobalMetrics { - system: Arc>, - system_average_load: GaugeVec, - process_cpu_usage_percentage: Gauge, - process_memory_usage_bytes: Gauge, -} - -/// Start Prometheus endpoint with given metrics registry. -pub fn start( - prefix: String, - params: Option, - global_metrics: &GlobalMetrics, - extra_metrics: &impl Metrics, -) { - let params = match params { - Some(params) => params, - None => return, - }; +/// Standalone metrics API. +/// +/// Metrics of this kind know how to update themselves, so we may just spawn and forget the +/// asynchronous self-update task. +#[async_trait] +pub trait StandaloneMetrics: Metrics { + /// Update metric values. + async fn update(&self); - assert!(!prefix.is_empty(), "Metrics prefix can not be empty"); + /// Metrics update interval. + fn update_interval(&self) -> Duration; - let do_start = move || { - let prometheus_socket_addr = SocketAddr::new( - params - .host - .parse() - .map_err(|err| format!("Invalid Prometheus host {}: {}", params.host, err))?, - params.port, - ); - let metrics_registry = - Registry::new_custom(Some(prefix), None).expect("only fails if prefix is empty; prefix is not empty; qed"); - global_metrics.register(&metrics_registry)?; - extra_metrics.register(&metrics_registry)?; + /// Spawn the self update task that will keep update metric value at given intervals. + fn spawn(self) { async_std::task::spawn(async move { - init_prometheus(prometheus_socket_addr, metrics_registry) - .await - .map_err(|err| format!("Error starting Prometheus endpoint: {}", err)) + let update_interval = self.update_interval(); + loop { + self.update().await; + async_std::task::sleep(update_interval).await; + } }); - - Ok(()) - }; - - let result: Result<(), String> = do_start(); - if let Err(err) = result { - log::warn!( - target: "bridge", - "Failed to expose metrics: {}", - err, - ); } } @@ -98,71 +69,3 @@ impl Default for MetricsParams { } } } - -impl Metrics for GlobalMetrics { - fn register(&self, registry: &Registry) -> Result<(), String> { - register(self.system_average_load.clone(), registry).map_err(|e| e.to_string())?; - register(self.process_cpu_usage_percentage.clone(), registry).map_err(|e| e.to_string())?; - register(self.process_memory_usage_bytes.clone(), registry).map_err(|e| e.to_string())?; - Ok(()) - } -} - -impl Default for GlobalMetrics { - fn default() -> Self { - GlobalMetrics { - system: Arc::new(Mutex::new(System::new_with_specifics(RefreshKind::everything()))), - system_average_load: GaugeVec::new(Opts::new("system_average_load", "System load average"), &["over"]) - .expect("metric is static and thus valid; qed"), - process_cpu_usage_percentage: Gauge::new("process_cpu_usage_percentage", "Process CPU usage") - .expect("metric is static and thus valid; qed"), - process_memory_usage_bytes: Gauge::new( - "process_memory_usage_bytes", - "Process memory (resident set size) usage", - ) - .expect("metric is static and thus valid; qed"), - } - } -} - -impl GlobalMetrics { - /// Update metrics. - pub async fn update(&self) { - // update system-wide metrics - let mut system = self.system.lock().await; - let load = system.get_load_average(); - self.system_average_load.with_label_values(&["1min"]).set(load.one); - self.system_average_load.with_label_values(&["5min"]).set(load.five); - self.system_average_load.with_label_values(&["15min"]).set(load.fifteen); - - // update process-related metrics - let pid = sysinfo::get_current_pid().expect( - "only fails where pid is unavailable (os=unknown || arch=wasm32);\ - relay is not supposed to run in such MetricsParamss;\ - qed", - ); - let is_process_refreshed = system.refresh_process(pid); - match (is_process_refreshed, system.get_process(pid)) { - (true, Some(process_info)) => { - let cpu_usage = process_info.cpu_usage() as f64; - let memory_usage = process_info.memory() * 1024; - log::trace!( - target: "bridge-metrics", - "Refreshed process metrics: CPU={}, memory={}", - cpu_usage, - memory_usage, - ); - - self.process_cpu_usage_percentage - .set(if cpu_usage.is_finite() { cpu_usage } else { 0f64 }); - self.process_memory_usage_bytes.set(memory_usage); - } - _ => { - log::warn!( - target: "bridge", - "Failed to refresh process information. Metrics may show obsolete values", - ); - } - } - } -} diff --git a/relays/utils/src/metrics/global.rs b/relays/utils/src/metrics/global.rs new file mode 100644 index 000000000000..c19ac8d46819 --- /dev/null +++ b/relays/utils/src/metrics/global.rs @@ -0,0 +1,109 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Global system-wide Prometheus metrics exposed by relays. + +use crate::metrics::{Metrics, StandaloneMetrics}; + +use async_std::sync::{Arc, Mutex}; +use async_trait::async_trait; +use std::time::Duration; +use substrate_prometheus_endpoint::{register, Gauge, GaugeVec, Opts, Registry, F64, U64}; +use sysinfo::{ProcessExt, RefreshKind, System, SystemExt}; + +/// Global metrics update interval. +const UPDATE_INTERVAL: Duration = Duration::from_secs(10); + +/// Global Prometheus metrics. +#[derive(Debug, Clone)] +pub struct GlobalMetrics { + system: Arc>, + system_average_load: GaugeVec, + process_cpu_usage_percentage: Gauge, + process_memory_usage_bytes: Gauge, +} + +impl Metrics for GlobalMetrics { + fn register(&self, registry: &Registry) -> Result<(), String> { + register(self.system_average_load.clone(), registry).map_err(|e| e.to_string())?; + register(self.process_cpu_usage_percentage.clone(), registry).map_err(|e| e.to_string())?; + register(self.process_memory_usage_bytes.clone(), registry).map_err(|e| e.to_string())?; + Ok(()) + } +} + +#[async_trait] +impl StandaloneMetrics for GlobalMetrics { + async fn update(&self) { + // update system-wide metrics + let mut system = self.system.lock().await; + let load = system.get_load_average(); + self.system_average_load.with_label_values(&["1min"]).set(load.one); + self.system_average_load.with_label_values(&["5min"]).set(load.five); + self.system_average_load.with_label_values(&["15min"]).set(load.fifteen); + + // update process-related metrics + let pid = sysinfo::get_current_pid().expect( + "only fails where pid is unavailable (os=unknown || arch=wasm32);\ + relay is not supposed to run in such MetricsParamss;\ + qed", + ); + let is_process_refreshed = system.refresh_process(pid); + match (is_process_refreshed, system.get_process(pid)) { + (true, Some(process_info)) => { + let cpu_usage = process_info.cpu_usage() as f64; + let memory_usage = process_info.memory() * 1024; + log::trace!( + target: "bridge-metrics", + "Refreshed process metrics: CPU={}, memory={}", + cpu_usage, + memory_usage, + ); + + self.process_cpu_usage_percentage + .set(if cpu_usage.is_finite() { cpu_usage } else { 0f64 }); + self.process_memory_usage_bytes.set(memory_usage); + } + _ => { + log::warn!( + target: "bridge", + "Failed to refresh process information. Metrics may show obsolete values", + ); + } + } + } + + fn update_interval(&self) -> Duration { + UPDATE_INTERVAL + } +} + +impl Default for GlobalMetrics { + fn default() -> Self { + GlobalMetrics { + system: Arc::new(Mutex::new(System::new_with_specifics(RefreshKind::everything()))), + system_average_load: GaugeVec::new(Opts::new("system_average_load", "System load average"), &["over"]) + .expect("metric is static and thus valid; qed"), + process_cpu_usage_percentage: Gauge::new("process_cpu_usage_percentage", "Process CPU usage") + .expect("metric is static and thus valid; qed"), + process_memory_usage_bytes: Gauge::new( + "process_memory_usage_bytes", + "Process memory (resident set size) usage", + ) + .expect("metric is static and thus valid; qed"), + } + } +} diff --git a/relays/utils/src/relay_loop.rs b/relays/utils/src/relay_loop.rs index 6a61ecd28934..d21f6d28ef5b 100644 --- a/relays/utils/src/relay_loop.rs +++ b/relays/utils/src/relay_loop.rs @@ -14,10 +14,12 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . +use crate::metrics::{Metrics, MetricsParams, StandaloneMetrics}; use crate::{FailedClient, MaybeConnectionError}; use async_trait::async_trait; -use std::{fmt::Debug, future::Future, time::Duration}; +use std::{fmt::Debug, future::Future, net::SocketAddr, time::Duration}; +use substrate_prometheus_endpoint::{init_prometheus, Registry}; /// Default pause between reconnect attempts. pub const RECONNECT_DELAY: Duration = Duration::from_secs(10); @@ -32,60 +34,164 @@ pub trait Client: Clone + Send + Sync { async fn reconnect(&mut self) -> Result<(), Self::Error>; } -/// Run relay loop. -/// -/// This function represents an outer loop, which in turn calls provided `loop_run` function to do -/// actual job. When `loop_run` returns, this outer loop reconnects to failed client (source, -/// target or both) and calls `loop_run` again. -pub async fn run( - reconnect_delay: Duration, - mut source_client: SC, - mut target_client: TC, - loop_run: R, -) where - R: Fn(SC, TC) -> F, - F: Future>, -{ - loop { - let result = loop_run(source_client.clone(), target_client.clone()).await; - - match result { - Ok(()) => break, - Err(failed_client) => loop { - async_std::task::sleep(reconnect_delay).await; - if failed_client == FailedClient::Both || failed_client == FailedClient::Source { - match source_client.reconnect().await { - Ok(()) => (), - Err(error) => { - log::warn!( - target: "bridge", - "Failed to reconnect to source client. Going to retry in {}s: {:?}", - reconnect_delay.as_secs(), - error, - ); - continue; +/// Returns generic loop that may be customized and started. +pub fn relay_loop(source_client: SC, target_client: TC) -> Loop { + Loop { + source_client, + target_client, + loop_metric: None, + } +} + +/// Generic relay loop. +pub struct Loop { + source_client: SC, + target_client: TC, + loop_metric: Option, +} + +/// Relay loop metrics builder. +pub struct LoopMetrics { + relay_loop: Loop, + registry: Registry, + loop_metric: Option, +} + +impl Loop { + /// Start building loop metrics using given prefix. + /// + /// Panics if `prefix` is empty. + pub fn with_metrics(self, prefix: String) -> LoopMetrics { + assert!(!prefix.is_empty(), "Metrics prefix can not be empty"); + + LoopMetrics { + relay_loop: Loop { + source_client: self.source_client, + target_client: self.target_client, + loop_metric: None, + }, + registry: Registry::new_custom(Some(prefix), None) + .expect("only fails if prefix is empty; prefix is not empty; qed"), + loop_metric: None, + } + } + + /// Run relay loop. + /// + /// This function represents an outer loop, which in turn calls provided `run_loop` function to do + /// actual job. When `run_loop` returns, this outer loop reconnects to failed client (source, + /// target or both) and calls `run_loop` again. + pub async fn run(mut self, run_loop: R) -> Result<(), String> + where + R: Fn(SC, TC, Option) -> F, + F: Future>, + SC: Client, + TC: Client, + LM: Clone, + { + loop { + let result = run_loop( + self.source_client.clone(), + self.target_client.clone(), + self.loop_metric.clone(), + ) + .await; + + match result { + Ok(()) => break, + Err(failed_client) => loop { + async_std::task::sleep(RECONNECT_DELAY).await; + if failed_client == FailedClient::Both || failed_client == FailedClient::Source { + match self.source_client.reconnect().await { + Ok(()) => (), + Err(error) => { + log::warn!( + target: "bridge", + "Failed to reconnect to source client. Going to retry in {}s: {:?}", + RECONNECT_DELAY.as_secs(), + error, + ); + continue; + } } } - } - if failed_client == FailedClient::Both || failed_client == FailedClient::Target { - match target_client.reconnect().await { - Ok(()) => (), - Err(error) => { - log::warn!( - target: "bridge", - "Failed to reconnect to target client. Going to retry in {}s: {:?}", - reconnect_delay.as_secs(), - error, - ); - continue; + if failed_client == FailedClient::Both || failed_client == FailedClient::Target { + match self.target_client.reconnect().await { + Ok(()) => (), + Err(error) => { + log::warn!( + target: "bridge", + "Failed to reconnect to target client. Going to retry in {}s: {:?}", + RECONNECT_DELAY.as_secs(), + error, + ); + continue; + } } } - } - break; - }, + break; + }, + } + + log::debug!(target: "bridge", "Restarting relay loop"); + } + + Ok(()) + } +} + +impl LoopMetrics { + /// Add relay loop metrics. + /// + /// Loop metrics will be passed to the loop callback. + pub fn loop_metric(self, loop_metric: NewLM) -> Result, String> { + loop_metric.register(&self.registry)?; + + Ok(LoopMetrics { + relay_loop: self.relay_loop, + registry: self.registry, + loop_metric: Some(loop_metric), + }) + } + + /// Add standalone metrics. + pub fn standalone_metric(self, standalone_metrics: M) -> Result { + standalone_metrics.register(&self.registry)?; + standalone_metrics.spawn(); + Ok(self) + } + + /// Expose metrics using given params. + /// + /// If `params` is `None`, metrics are not exposed. + pub async fn expose(self, params: Option) -> Result, String> { + if let Some(params) = params { + let socket_addr = SocketAddr::new( + params.host.parse().map_err(|err| { + format!( + "Invalid host {} is used to expose Prometheus metrics: {}", + params.host, err, + ) + })?, + params.port, + ); + + let registry = self.registry; + async_std::task::spawn(async move { + let result = init_prometheus(socket_addr, registry).await; + log::trace!( + target: "bridge-metrics", + "Prometheus endpoint has exited with result: {:?}", + result, + ); + }); } - log::debug!(target: "bridge", "Restarting relay loop"); + Ok(Loop { + source_client: self.relay_loop.source_client, + target_client: self.relay_loop.target_client, + loop_metric: self.loop_metric, + }) } }