From e9ba1689984522790b32d27bd9f3c304d5f6c4e1 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Wed, 26 May 2021 09:23:31 +0300 Subject: [PATCH] Always run on-demand headers relay in complex relay (#975) * always run on-demand headers relay in complex relay * fix compilation --- .../bin-substrate/src/cli/relay_headers.rs | 1 - .../bin-substrate/src/finality_pipeline.rs | 8 +- .../bin-substrate/src/messages_source.rs | 2 +- .../bin-substrate/src/messages_target.rs | 2 +- .../bin-substrate/src/on_demand_headers.rs | 372 +++++++----------- bridges/relays/client-substrate/Cargo.toml | 2 +- .../client-substrate/src/finality_source.rs | 33 +- bridges/relays/finality/src/finality_loop.rs | 3 - .../finality/src/finality_loop_tests.rs | 1 - bridges/relays/utils/src/relay_loop.rs | 23 +- 10 files changed, 166 insertions(+), 281 deletions(-) diff --git a/bridges/relays/bin-substrate/src/cli/relay_headers.rs b/bridges/relays/bin-substrate/src/cli/relay_headers.rs index c0d80f80d36be..abce9b3bfe163 100644 --- a/bridges/relays/bin-substrate/src/cli/relay_headers.rs +++ b/bridges/relays/bin-substrate/src/cli/relay_headers.rs @@ -103,7 +103,6 @@ impl RelayHeaders { Finality::new(target_client.clone(), target_sign), source_client, target_client, - false, metrics_params, ) .await diff --git a/bridges/relays/bin-substrate/src/finality_pipeline.rs b/bridges/relays/bin-substrate/src/finality_pipeline.rs index f1c72184cc080..fd222f1c2981b 100644 --- a/bridges/relays/bin-substrate/src/finality_pipeline.rs +++ b/bridges/relays/bin-substrate/src/finality_pipeline.rs @@ -26,12 +26,12 @@ use sp_core::Bytes; use std::{fmt::Debug, marker::PhantomData, time::Duration}; /// Default synchronization loop timeout. -const STALL_TIMEOUT: Duration = Duration::from_secs(120); +pub(crate) const STALL_TIMEOUT: Duration = Duration::from_secs(120); /// Default limit of recent finality proofs. /// /// Finality delay of 4096 blocks is unlikely to happen in practice in /// Substrate+GRANDPA based chains (good to know). -const RECENT_FINALITY_PROOFS_LIMIT: usize = 4096; +pub(crate) const RECENT_FINALITY_PROOFS_LIMIT: usize = 4096; /// Headers sync pipeline for Substrate <-> Substrate relays. pub trait SubstrateFinalitySyncPipeline: FinalitySyncPipeline { @@ -119,7 +119,6 @@ pub async fn run( pipeline: P, source_client: Client, target_client: Client, - is_on_demand_task: bool, metrics_params: MetricsParams, ) -> anyhow::Result<()> where @@ -142,10 +141,9 @@ where ); finality_relay::run( - FinalitySource::new(source_client), + FinalitySource::new(source_client, None), SubstrateFinalityTarget::new(target_client, pipeline), FinalitySyncParams { - is_on_demand_task, tick: std::cmp::max(SourceChain::AVERAGE_BLOCK_INTERVAL, TargetChain::AVERAGE_BLOCK_INTERVAL), recent_finality_proofs_limit: RECENT_FINALITY_PROOFS_LIMIT, stall_timeout: STALL_TIMEOUT, diff --git a/bridges/relays/bin-substrate/src/messages_source.rs b/bridges/relays/bin-substrate/src/messages_source.rs index 36f8b6d85cdea..49e2c7efeea93 100644 --- a/bridges/relays/bin-substrate/src/messages_source.rs +++ b/bridges/relays/bin-substrate/src/messages_source.rs @@ -239,7 +239,7 @@ where async fn require_target_header_on_source(&self, id: TargetHeaderIdOf

) { if let Some(ref target_to_source_headers_relay) = self.target_to_source_headers_relay { - target_to_source_headers_relay.require_finalized_header(id); + target_to_source_headers_relay.require_finalized_header(id).await; } } } diff --git a/bridges/relays/bin-substrate/src/messages_target.rs b/bridges/relays/bin-substrate/src/messages_target.rs index 004bb47db518f..715efce88ec28 100644 --- a/bridges/relays/bin-substrate/src/messages_target.rs +++ b/bridges/relays/bin-substrate/src/messages_target.rs @@ -226,7 +226,7 @@ where async fn require_source_header_on_target(&self, id: SourceHeaderIdOf

) { if let Some(ref source_to_target_headers_relay) = self.source_to_target_headers_relay { - source_to_target_headers_relay.require_finalized_header(id); + source_to_target_headers_relay.require_finalized_header(id).await; } } } diff --git a/bridges/relays/bin-substrate/src/on_demand_headers.rs b/bridges/relays/bin-substrate/src/on_demand_headers.rs index 77d2b37054104..5730fc3869be7 100644 --- a/bridges/relays/bin-substrate/src/on_demand_headers.rs +++ b/bridges/relays/bin-substrate/src/on_demand_headers.rs @@ -16,39 +16,38 @@ //! On-demand Substrate -> Substrate headers relay. -use crate::finality_pipeline::{SubstrateFinalitySyncPipeline, SubstrateFinalityToSubstrate}; +use crate::finality_pipeline::{ + SubstrateFinalitySyncPipeline, SubstrateFinalityToSubstrate, RECENT_FINALITY_PROOFS_LIMIT, STALL_TIMEOUT, +}; use crate::finality_target::SubstrateFinalityTarget; +use async_std::sync::{Arc, Mutex}; use bp_header_chain::justification::GrandpaJustification; use finality_relay::{ - FinalitySyncPipeline, SourceClient as FinalitySourceClient, TargetClient as FinalityTargetClient, -}; -use futures::{ - channel::{mpsc, oneshot}, - select, FutureExt, StreamExt, + FinalitySyncParams, FinalitySyncPipeline, SourceClient as FinalitySourceClient, + TargetClient as FinalityTargetClient, }; +use futures::{select, FutureExt}; use num_traits::{CheckedSub, Zero}; use relay_substrate_client::{ - finality_source::FinalitySource as SubstrateFinalitySource, BlockNumberOf, Chain, Client, HashOf, HeaderIdOf, - SyncHeader, + finality_source::{FinalitySource as SubstrateFinalitySource, RequiredHeaderNumberRef}, + BlockNumberOf, Chain, Client, HashOf, HeaderIdOf, SyncHeader, }; use relay_utils::{ - metrics::MetricsParams, relay_loop::Client as RelayClient, BlockNumberBase, FailedClient, HeaderId, - MaybeConnectionError, + metrics::MetricsParams, relay_loop::Client as RelayClient, BlockNumberBase, FailedClient, MaybeConnectionError, }; use std::fmt::Debug; /// On-demand Substrate <-> Substrate headers relay. /// -/// This relay may be started by messages whenever some other relay (e.g. messages relay) needs more -/// headers to be relayed to continue its regular work. When enough headers are relayed, on-demand -/// relay may be deactivated. +/// This relay may be requested to sync more headers, whenever some other relay (e.g. messages relay) needs +/// it to continue its regular work. When enough headers are relayed, on-demand stops syncing headers. #[derive(Clone)] pub struct OnDemandHeadersRelay { - /// Background task name. - background_task_name: String, - /// Required headers to background sender. - required_header_tx: mpsc::Sender>, + /// Relay task name. + relay_task_name: String, + /// Shared reference to maximal required finalized header number. + required_header_number: RequiredHeaderNumberRef, } impl OnDemandHeadersRelay { @@ -75,49 +74,49 @@ impl OnDemandHeadersRelay { SubstrateFinalityTarget>: FinalityTargetClient>, { - let (required_header_tx, required_header_rx) = mpsc::channel(1); + let required_header_number = Arc::new(Mutex::new(Zero::zero())); + let this = OnDemandHeadersRelay { + relay_task_name: on_demand_headers_relay_name::(), + required_header_number: required_header_number.clone(), + }; async_std::task::spawn(async move { background_task( source_client, target_client, pipeline, maximal_headers_difference, - required_header_rx, + required_header_number, ) .await; }); - let background_task_name = format!( - "{}-background", - on_demand_headers_relay_name::() - ); - OnDemandHeadersRelay { - background_task_name, - required_header_tx, - } + this } /// Someone is asking us to relay given finalized header. - pub fn require_finalized_header(&self, header_id: HeaderIdOf) { - if let Err(error) = self.required_header_tx.clone().try_send(header_id) { - log::error!( + pub async fn require_finalized_header(&self, header_id: HeaderIdOf) { + let mut required_header_number = self.required_header_number.lock().await; + if header_id.0 > *required_header_number { + log::trace!( target: "bridge", - "Failed to send require header id {:?} to {:?}: {:?}", - header_id, - self.background_task_name, - error, + "More {} headers required in {} relay. Going to sync up to the {}", + SourceChain::NAME, + self.relay_task_name, + header_id.0, ); + + *required_header_number = header_id.0; } } } -/// Background task that is responsible for starting and stopping headers relay when required. +/// Background task that is responsible for starting headers relay. async fn background_task( source_client: Client, target_client: Client, pipeline: SubstrateFinalityToSubstrate, maximal_headers_difference: SourceChain::BlockNumber, - mut required_header_rx: mpsc::Receiver>, + required_header_number: RequiredHeaderNumberRef, ) where SourceChain: Chain + Debug, SourceChain::BlockNumber: BlockNumberBase, @@ -138,36 +137,19 @@ async fn background_task( let mut finality_source = SubstrateFinalitySource::< _, SubstrateFinalityToSubstrate, - >::new(source_client.clone()); + >::new(source_client.clone(), Some(required_header_number.clone())); let mut finality_target = SubstrateFinalityTarget::new(target_client.clone(), pipeline.clone()); - let mut active_headers_relay = None; - let mut required_header_number = Zero::zero(); - let mut relay_exited_rx = futures::future::pending().left_future(); + let mut restart_relay = true; + let finality_relay_task = futures::future::Fuse::terminated(); + futures::pin_mut!(finality_relay_task); loop { - // wait for next target block or for new required header select! { _ = async_std::task::sleep(TargetChain::AVERAGE_BLOCK_INTERVAL).fuse() => {}, - required_header_id = required_header_rx.next() => { - match required_header_id { - Some(required_header_id) => { - if required_header_id.0 > required_header_number { - required_header_number = required_header_id.0; - } - }, - None => { - // that's the only way to exit background task - to drop `required_header_tx` - break - }, - } - }, - _ = relay_exited_rx => { - // there could be a situation when we're receiving exit signals after we - // have already stopped relay or when we have already started new relay. - // but it isn't critical, because even if we'll accidentally stop new relay - // we'll restart it almost immediately - stop_on_demand_headers_relay(active_headers_relay.take()).await; + _ = finality_relay_task => { + // this should never happen in practice given the current code + restart_relay = true; }, } @@ -199,107 +181,53 @@ async fn background_task( continue; } - // start or stop headers relay if required - let action = select_on_demand_relay_action::( + // update required header + update_required_header_number_if_too_many_are_missing::( best_finalized_source_header_at_source.ok(), best_finalized_source_header_at_target.ok(), - required_header_number, maximal_headers_difference, + &required_header_number, &relay_task_name, - active_headers_relay.is_some(), - ); - match action { - OnDemandRelayAction::Start => { - let (relay_exited_tx, new_relay_exited_rx) = oneshot::channel(); - active_headers_relay = start_on_demand_headers_relay( - relay_task_name.clone(), - relay_exited_tx, - source_client.clone(), - target_client.clone(), - pipeline.clone(), - ); - if active_headers_relay.is_some() { - relay_exited_rx = new_relay_exited_rx.right_future(); - } - } - OnDemandRelayAction::Stop => { - stop_on_demand_headers_relay(active_headers_relay.take()).await; - } - OnDemandRelayAction::None => (), - } - } -} - -/// Read best finalized source block number from source client. -/// -/// Returns `None` if we have failed to read the number. -async fn best_finalized_source_header_at_source( - finality_source: &SubstrateFinalitySource, - relay_task_name: &str, -) -> Result as RelayClient>::Error> -where - SubstrateFinalitySource: FinalitySourceClient

, - P: FinalitySyncPipeline, -{ - finality_source.best_finalized_block_number().await.map_err(|error| { - log::error!( - target: "bridge", - "Failed to read best finalized source header from source in {} relay: {:?}", - relay_task_name, - error, - ); - - error - }) -} - -/// Read best finalized source block number from target client. -/// -/// Returns `None` if we have failed to read the number. -async fn best_finalized_source_header_at_target( - finality_target: &SubstrateFinalityTarget, - relay_task_name: &str, -) -> Result as RelayClient>::Error> -where - SubstrateFinalityTarget: FinalityTargetClient

, - P: FinalitySyncPipeline, -{ - finality_target - .best_finalized_source_block_number() - .await - .map_err(|error| { - log::error!( - target: "bridge", - "Failed to read best finalized source header from target in {} relay: {:?}", - relay_task_name, - error, + ) + .await; + + // start/restart relay + if restart_relay { + finality_relay_task.set( + finality_relay::run( + finality_source.clone(), + finality_target.clone(), + FinalitySyncParams { + tick: std::cmp::max(SourceChain::AVERAGE_BLOCK_INTERVAL, TargetChain::AVERAGE_BLOCK_INTERVAL), + recent_finality_proofs_limit: RECENT_FINALITY_PROOFS_LIMIT, + stall_timeout: STALL_TIMEOUT, + }, + MetricsParams::disabled(), + futures::future::pending(), + ) + .fuse(), ); - error - }) -} - -/// What to do with the on-demand relay task? -#[derive(Debug, PartialEq)] -enum OnDemandRelayAction { - Start, - Stop, - None, + restart_relay = false; + } + } } -fn select_on_demand_relay_action( +/// If there are too many source headers missing at target, we ask for syncing more headers. +async fn update_required_header_number_if_too_many_are_missing( best_finalized_source_header_at_source: Option, best_finalized_source_header_at_target: Option, - mut required_source_header_at_target: C::BlockNumber, maximal_headers_difference: C::BlockNumber, + required_header_number: &RequiredHeaderNumberRef, relay_task_name: &str, - is_active: bool, -) -> OnDemandRelayAction { +) { + let mut required_header_number = required_header_number.lock().await; + // if we have been unable to read header number from the target, then let's assume // that it is the same as required header number. Otherwise we risk submitting // unneeded transactions let best_finalized_source_header_at_target = - best_finalized_source_header_at_target.unwrap_or(required_source_header_at_target); + best_finalized_source_header_at_target.unwrap_or(*required_header_number); // if we have been unable to read header number from the source, then let's assume // that it is the same as at the target @@ -318,10 +246,8 @@ fn select_on_demand_relay_action( .checked_sub(&best_finalized_source_header_at_target) .unwrap_or_else(Zero::zero); if current_headers_difference > maximal_headers_difference { - required_source_header_at_target = best_finalized_source_header_at_source; - - // don't log if relay is already running - if !is_active { + // if relay is already asked to sync headers, don't log anything + if *required_header_number <= best_finalized_source_header_at_target { log::trace!( target: "bridge", "Too many {} headers missing at target in {} relay ({} vs {}). Going to sync up to the {}", @@ -331,76 +257,67 @@ fn select_on_demand_relay_action( best_finalized_source_header_at_target, best_finalized_source_header_at_source, ); - } - } - // now let's select what to do with relay - let needs_to_be_active = required_source_header_at_target > best_finalized_source_header_at_target; - match (needs_to_be_active, is_active) { - (true, false) => OnDemandRelayAction::Start, - (false, true) => OnDemandRelayAction::Stop, - _ => OnDemandRelayAction::None, + *required_header_number = best_finalized_source_header_at_source; + } } } -/// On-demand headers relay task name. -fn on_demand_headers_relay_name() -> String { - format!("on-demand-{}-to-{}", SourceChain::NAME, TargetChain::NAME) +/// Read best finalized source block number from source client. +/// +/// Returns `None` if we have failed to read the number. +async fn best_finalized_source_header_at_source( + finality_source: &SubstrateFinalitySource, + relay_task_name: &str, +) -> Result +where + SubstrateFinalitySource: FinalitySourceClient

, + P: FinalitySyncPipeline, +{ + finality_source + .on_chain_best_finalized_block_number() + .await + .map_err(|error| { + log::error!( + target: "bridge", + "Failed to read best finalized source header from source in {} relay: {:?}", + relay_task_name, + error, + ); + + error + }) } -/// Start on-demand headers relay task. -fn start_on_demand_headers_relay( - task_name: String, - relay_exited_tx: oneshot::Sender<()>, - source_client: Client, - target_client: Client, - pipeline: SubstrateFinalityToSubstrate, -) -> Option> +/// Read best finalized source block number from target client. +/// +/// Returns `None` if we have failed to read the number. +async fn best_finalized_source_header_at_target( + finality_target: &SubstrateFinalityTarget, + relay_task_name: &str, +) -> Result as RelayClient>::Error> where - SourceChain::BlockNumber: BlockNumberBase, - SubstrateFinalityToSubstrate: SubstrateFinalitySyncPipeline< - Hash = HashOf, - Number = BlockNumberOf, - Header = SyncHeader, - FinalityProof = GrandpaJustification, - TargetChain = TargetChain, - >, - TargetSign: 'static, + SubstrateFinalityTarget: FinalityTargetClient

, + P: FinalitySyncPipeline, { - let headers_relay_future = - crate::finality_pipeline::run(pipeline, source_client, target_client, true, MetricsParams::disabled()); - let closure_task_name = task_name.clone(); - async_std::task::Builder::new() - .name(task_name.clone()) - .spawn(async move { - log::info!(target: "bridge", "Starting {} headers relay", closure_task_name); - let result = headers_relay_future.await; - log::trace!(target: "bridge", "{} headers relay has exited. Result: {:?}", closure_task_name, result); - let _ = relay_exited_tx.send(()); - }) + finality_target + .best_finalized_source_block_number() + .await .map_err(|error| { log::error!( target: "bridge", - "Failed to start {} relay: {:?}", - task_name, + "Failed to read best finalized source header from target in {} relay: {:?}", + relay_task_name, error, ); + + error }) - .ok() } -/// Stop on-demand headers relay task. -async fn stop_on_demand_headers_relay(task: Option>) { - if let Some(task) = task { - let task_name = task - .task() - .name() - .expect("on-demand tasks are always started with name; qed") - .to_string(); - log::trace!(target: "bridge", "Cancelling {} headers relay", task_name); - task.cancel().await; - log::info!(target: "bridge", "Cancelled {} headers relay", task_name); - } +/// On-demand headers relay task name. +fn on_demand_headers_relay_name() -> String { + format!("on-demand-{}-to-{}", SourceChain::NAME, TargetChain::NAME) } #[cfg(test)] @@ -412,42 +329,17 @@ mod tests { const AT_SOURCE: Option = Some(10); const AT_TARGET: Option = Some(1); - #[test] - fn starts_relay_when_headers_are_required() { - assert_eq!( - select_on_demand_relay_action::(AT_SOURCE, AT_TARGET, 5, 100, "test", false), - OnDemandRelayAction::Start, - ); - - assert_eq!( - select_on_demand_relay_action::(AT_SOURCE, AT_TARGET, 5, 100, "test", true), - OnDemandRelayAction::None, - ); - } - - #[test] - fn starts_relay_when_too_many_headers_missing() { - assert_eq!( - select_on_demand_relay_action::(AT_SOURCE, AT_TARGET, 0, 5, "test", false), - OnDemandRelayAction::Start, - ); - - assert_eq!( - select_on_demand_relay_action::(AT_SOURCE, AT_TARGET, 0, 5, "test", true), - OnDemandRelayAction::None, - ); - } - - #[test] - fn stops_relay_if_required_header_is_synced() { - assert_eq!( - select_on_demand_relay_action::(AT_SOURCE, AT_TARGET, AT_TARGET.unwrap(), 100, "test", true), - OnDemandRelayAction::Stop, - ); - - assert_eq!( - select_on_demand_relay_action::(AT_SOURCE, AT_TARGET, AT_TARGET.unwrap(), 100, "test", false), - OnDemandRelayAction::None, - ); + #[async_std::test] + async fn updates_required_header_when_too_many_headers_missing() { + let required_header_number = Arc::new(Mutex::new(0)); + update_required_header_number_if_too_many_are_missing::( + AT_SOURCE, + AT_TARGET, + 5, + &required_header_number, + "test", + ) + .await; + assert_eq!(*required_header_number.lock().await, AT_SOURCE.unwrap()); } } diff --git a/bridges/relays/client-substrate/Cargo.toml b/bridges/relays/client-substrate/Cargo.toml index 699c3da400ffe..f5c2e26560593 100644 --- a/bridges/relays/client-substrate/Cargo.toml +++ b/bridges/relays/client-substrate/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" license = "GPL-3.0-or-later WITH Classpath-exception-2.0" [dependencies] -async-std = "1.6.5" +async-std = { version = "1.6.5", features = ["attributes"] } async-trait = "0.1.40" codec = { package = "parity-scale-codec", version = "2.0.0" } jsonrpsee-proc-macros = "=0.2.0-alpha.6" diff --git a/bridges/relays/client-substrate/src/finality_source.rs b/bridges/relays/client-substrate/src/finality_source.rs index 38500934191d2..140e1f17a6ab6 100644 --- a/bridges/relays/client-substrate/src/finality_source.rs +++ b/bridges/relays/client-substrate/src/finality_source.rs @@ -21,6 +21,7 @@ use crate::client::Client; use crate::error::Error; use crate::sync_header::SyncHeader; +use async_std::sync::{Arc, Mutex}; use async_trait::async_trait; use bp_header_chain::justification::GrandpaJustification; use codec::Decode; @@ -30,26 +31,41 @@ use relay_utils::relay_loop::Client as RelayClient; use sp_runtime::traits::Header as HeaderT; use std::{marker::PhantomData, pin::Pin}; +/// Shared updatable reference to the maximal header number that we want to sync from the source. +pub type RequiredHeaderNumberRef = Arc::BlockNumber>>; + /// Substrate node as finality source. pub struct FinalitySource { client: Client, + maximal_header_number: Option>, _phantom: PhantomData

, } impl FinalitySource { /// Create new headers source using given client. - pub fn new(client: Client) -> Self { + pub fn new(client: Client, maximal_header_number: Option>) -> Self { FinalitySource { client, + maximal_header_number, _phantom: Default::default(), } } + + /// Returns best finalized block number. + pub async fn on_chain_best_finalized_block_number(&self) -> Result { + // we **CAN** continue to relay finality proofs if source node is out of sync, because + // target node may be missing proofs that are already available at the source + let finalized_header_hash = self.client.best_finalized_header_hash().await?; + let finalized_header = self.client.header_by_hash(finalized_header_hash).await?; + Ok(*finalized_header.number()) + } } impl Clone for FinalitySource { fn clone(&self) -> Self { FinalitySource { client: self.client.clone(), + maximal_header_number: self.maximal_header_number.clone(), _phantom: Default::default(), } } @@ -80,11 +96,16 @@ where type FinalityProofsStream = Pin> + Send>>; async fn best_finalized_block_number(&self) -> Result { - // we **CAN** continue to relay finality proofs if source node is out of sync, because - // target node may be missing proofs that are already available at the source - let finalized_header_hash = self.client.best_finalized_header_hash().await?; - let finalized_header = self.client.header_by_hash(finalized_header_hash).await?; - Ok(*finalized_header.number()) + let mut finalized_header_number = self.on_chain_best_finalized_block_number().await?; + // never return block number larger than requested. This way we'll never sync headers + // past `maximal_header_number` + if let Some(ref maximal_header_number) = self.maximal_header_number { + let maximal_header_number = *maximal_header_number.lock().await; + if finalized_header_number > maximal_header_number { + finalized_header_number = maximal_header_number; + } + } + Ok(finalized_header_number) } async fn header_and_finality_proof( diff --git a/bridges/relays/finality/src/finality_loop.rs b/bridges/relays/finality/src/finality_loop.rs index 3aa55a8ac5915..cce32839907d9 100644 --- a/bridges/relays/finality/src/finality_loop.rs +++ b/bridges/relays/finality/src/finality_loop.rs @@ -39,8 +39,6 @@ use std::{ /// Finality proof synchronization loop parameters. #[derive(Debug, Clone)] pub struct FinalitySyncParams { - /// If `true`, then the separate async task for running finality loop is NOT spawned. - pub is_on_demand_task: bool, /// Interval at which we check updates on both clients. Normally should be larger than /// `min(source_block_time, target_block_time)`. /// @@ -107,7 +105,6 @@ pub async fn run( ) -> Result<(), String> { let exit_signal = exit_signal.shared(); relay_utils::relay_loop(source_client, target_client) - .spawn_loop_task(!sync_params.is_on_demand_task) .with_metrics(Some(metrics_prefix::

()), metrics_params) .loop_metric(|registry, prefix| SyncLoopMetrics::new(registry, prefix))? .standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))? diff --git a/bridges/relays/finality/src/finality_loop_tests.rs b/bridges/relays/finality/src/finality_loop_tests.rs index 645aeb1777c60..f7826ead730c9 100644 --- a/bridges/relays/finality/src/finality_loop_tests.rs +++ b/bridges/relays/finality/src/finality_loop_tests.rs @@ -197,7 +197,6 @@ fn run_sync_loop(state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync data: clients_data.clone(), }; let sync_params = FinalitySyncParams { - is_on_demand_task: false, tick: Duration::from_secs(0), recent_finality_proofs_limit: 1024, stall_timeout: Duration::from_secs(1), diff --git a/bridges/relays/utils/src/relay_loop.rs b/bridges/relays/utils/src/relay_loop.rs index 8fcaabe4430cc..938136658bd31 100644 --- a/bridges/relays/utils/src/relay_loop.rs +++ b/bridges/relays/utils/src/relay_loop.rs @@ -38,7 +38,6 @@ pub trait Client: 'static + Clone + Send + Sync { pub fn relay_loop(source_client: SC, target_client: TC) -> Loop { Loop { reconnect_delay: RECONNECT_DELAY, - spawn_loop_task: true, source_client, target_client, loop_metric: None, @@ -50,7 +49,6 @@ pub fn relay_metrics(prefix: Option, params: MetricsParams) -> LoopMetri LoopMetrics { relay_loop: Loop { reconnect_delay: RECONNECT_DELAY, - spawn_loop_task: true, source_client: (), target_client: (), loop_metric: None, @@ -65,7 +63,6 @@ pub fn relay_metrics(prefix: Option, params: MetricsParams) -> LoopMetri /// Generic relay loop. pub struct Loop { reconnect_delay: Duration, - spawn_loop_task: bool, source_client: SC, target_client: TC, loop_metric: Option, @@ -87,23 +84,11 @@ impl Loop { self } - /// Set spawn-dedicated-loop-task flag. - /// - /// If `true` (default), separate async task is spawned to run relay loop. This is the default - /// behavior for all loops. If `false`, then loop is executed as a part of the current - /// task. The `false` is used for on-demand tasks, which are cancelled from time to time - /// and there's already a dedicated on-demand task for running such loops. - pub fn spawn_loop_task(mut self, spawn_loop_task: bool) -> Self { - self.spawn_loop_task = spawn_loop_task; - self - } - /// Start building loop metrics using given prefix. pub fn with_metrics(self, prefix: Option, params: MetricsParams) -> LoopMetrics { LoopMetrics { relay_loop: Loop { reconnect_delay: self.reconnect_delay, - spawn_loop_task: self.spawn_loop_task, source_client: self.source_client, target_client: self.target_client, loop_metric: None, @@ -128,7 +113,6 @@ impl Loop { TC: 'static + Client, LM: 'static + Send + Clone, { - let spawn_loop_task = self.spawn_loop_task; let run_loop_task = async move { crate::initialize::initialize_loop(loop_name); @@ -156,11 +140,7 @@ impl Loop { Ok(()) }; - if spawn_loop_task { - async_std::task::spawn(run_loop_task).await - } else { - run_loop_task.await - } + async_std::task::spawn(run_loop_task).await } } @@ -236,7 +216,6 @@ impl LoopMetrics { Ok(Loop { reconnect_delay: self.relay_loop.reconnect_delay, - spawn_loop_task: self.relay_loop.spawn_loop_task, source_client: self.relay_loop.source_client, target_client: self.relay_loop.target_client, loop_metric: self.loop_metric,