ReadSyncedHeaders {
+ pub async fn next>(
+ self,
+ target_client: &mut TC,
+ ) -> Result, Self> {
+ match target_client.synced_headers_finality_info(self.target_block_num).await {
+ Ok(synced_headers) =>
+ Ok(ReadContext { target_block_num: self.target_block_num, synced_headers }),
+ Err(e) => {
+ log::error!(
+ target: "bridge",
+ "Could not get {} headers synced to {} at block {}: {e:?}",
+ P::SOURCE_NAME,
+ P::TARGET_NAME,
+ self.target_block_num
+ );
+
+ // Reconnect target client in case of a connection error.
+ handle_client_error(target_client, e).await;
+
+ Err(self)
+ },
+ }
+ }
+}
+
+/// Second step in the block checking state machine.
+///
+/// Reading the equivocation reporting context from the target chain.
+pub struct ReadContext {
+ target_block_num: P::TargetNumber,
+ synced_headers: Vec>,
+}
+
+impl ReadContext {
+ pub async fn next>(
+ self,
+ target_client: &mut TC,
+ ) -> Result>, Self> {
+ match EquivocationReportingContext::try_read_from_target::(
+ target_client,
+ self.target_block_num.saturating_sub(1.into()),
+ )
+ .await
+ {
+ Ok(Some(context)) => Ok(Some(FindEquivocations {
+ target_block_num: self.target_block_num,
+ synced_headers: self.synced_headers,
+ context,
+ })),
+ Ok(None) => Ok(None),
+ Err(e) => {
+ log::error!(
+ target: "bridge",
+ "Could not read {} `EquivocationReportingContext` from {} at block {}: {e:?}",
+ P::SOURCE_NAME,
+ P::TARGET_NAME,
+ self.target_block_num.saturating_sub(1.into()),
+ );
+
+ // Reconnect target client in case of a connection error.
+ handle_client_error(target_client, e).await;
+
+ Err(self)
+ },
+ }
+ }
+}
+
+/// Third step in the block checking state machine.
+///
+/// Searching for equivocations in the source headers synced with the target chain.
+pub struct FindEquivocations {
+ target_block_num: P::TargetNumber,
+ synced_headers: Vec>,
+ context: EquivocationReportingContext,
+}
+
+impl FindEquivocations {
+ pub fn next(
+ mut self,
+ finality_proofs_buf: &mut FinalityProofsBuf
,
+ ) -> Vec> {
+ let mut result = vec![];
+ for synced_header in self.synced_headers {
+ match P::EquivocationsFinder::find_equivocations(
+ &self.context.synced_verification_context,
+ &synced_header.finality_proof,
+ finality_proofs_buf.buf().as_slice(),
+ ) {
+ Ok(equivocations) => result.push(ReportEquivocations {
+ source_block_hash: self.context.synced_header_hash,
+ equivocations,
+ }),
+ Err(e) => {
+ log::error!(
+ target: "bridge",
+ "Could not search for equivocations in the finality proof \
+ for source header {:?} synced at target block {}: {e:?}",
+ synced_header.finality_proof.target_header_hash(),
+ self.target_block_num
+ );
+ },
+ };
+
+ finality_proofs_buf.prune(synced_header.finality_proof.target_header_number(), None);
+ self.context.update(synced_header);
+ }
+
+ result
+ }
+}
+
+/// Fourth step in the block checking state machine.
+///
+/// Reporting the detected equivocations (if any).
+pub struct ReportEquivocations {
+ source_block_hash: P::Hash,
+ equivocations: Vec,
+}
+
+impl ReportEquivocations {
+ pub async fn next>(
+ mut self,
+ source_client: &mut SC,
+ reporter: &mut EquivocationsReporter,
+ ) -> Result<(), Self> {
+ let mut unprocessed_equivocations = vec![];
+ for equivocation in self.equivocations {
+ match reporter
+ .submit_report(source_client, self.source_block_hash, equivocation.clone())
+ .await
+ {
+ Ok(_) => {},
+ Err(e) => {
+ log::error!(
+ target: "bridge",
+ "Could not submit equivocation report to {} for {equivocation:?}: {e:?}",
+ P::SOURCE_NAME,
+ );
+
+ // Mark the equivocation as unprocessed
+ unprocessed_equivocations.push(equivocation);
+ // Reconnect source client in case of a connection error.
+ handle_client_error(source_client, e).await;
+ },
+ }
+ }
+
+ self.equivocations = unprocessed_equivocations;
+ if !self.equivocations.is_empty() {
+ return Err(self)
+ }
+
+ Ok(())
+ }
+}
+
+/// Block checking state machine.
+pub enum BlockChecker {
+ ReadSyncedHeaders(ReadSyncedHeaders),
+ ReadContext(ReadContext
),
+ ReportEquivocations(Vec>),
+}
+
+impl BlockChecker {
+ pub fn new(target_block_num: P::TargetNumber) -> Self {
+ Self::ReadSyncedHeaders(ReadSyncedHeaders { target_block_num })
+ }
+
+ pub fn run<'a, SC: SourceClient
, TC: TargetClient
>(
+ self,
+ source_client: &'a mut SC,
+ target_client: &'a mut TC,
+ finality_proofs_buf: &'a mut FinalityProofsBuf
,
+ reporter: &'a mut EquivocationsReporter
,
+ ) -> BoxFuture<'a, Result<(), Self>> {
+ async move {
+ match self {
+ Self::ReadSyncedHeaders(state) => {
+ let read_context =
+ state.next(target_client).await.map_err(Self::ReadSyncedHeaders)?;
+ Self::ReadContext(read_context)
+ .run(source_client, target_client, finality_proofs_buf, reporter)
+ .await
+ },
+ Self::ReadContext(state) => {
+ let maybe_find_equivocations =
+ state.next(target_client).await.map_err(Self::ReadContext)?;
+ let find_equivocations = match maybe_find_equivocations {
+ Some(find_equivocations) => find_equivocations,
+ None => return Ok(()),
+ };
+ Self::ReportEquivocations(find_equivocations.next(finality_proofs_buf))
+ .run(source_client, target_client, finality_proofs_buf, reporter)
+ .await
+ },
+ Self::ReportEquivocations(state) => {
+ let mut failures = vec![];
+ for report_equivocations in state {
+ if let Err(failure) =
+ report_equivocations.next(source_client, reporter).await
+ {
+ failures.push(failure);
+ }
+ }
+
+ if !failures.is_empty() {
+ return Err(Self::ReportEquivocations(failures))
+ }
+
+ Ok(())
+ },
+ }
+ }
+ .boxed()
+ }
+}
diff --git a/bridges/relays/equivocation/src/equivocation_loop.rs b/bridges/relays/equivocation/src/equivocation_loop.rs
index 61ffa92c8dc4..da3f72b94660 100644
--- a/bridges/relays/equivocation/src/equivocation_loop.rs
+++ b/bridges/relays/equivocation/src/equivocation_loop.rs
@@ -15,55 +15,17 @@
// along with Parity Bridges Common. If not, see .
use crate::{
- reporter::EquivocationsReporter, EquivocationDetectionPipeline, HeaderFinalityInfo,
+ handle_client_error, reporter::EquivocationsReporter, EquivocationDetectionPipeline,
SourceClient, TargetClient,
};
-use bp_header_chain::{FinalityProof, FindEquivocations};
+use crate::block_checker::BlockChecker;
use finality_relay::{FinalityProofsBuf, FinalityProofsStream};
use futures::{select, FutureExt};
use num_traits::Saturating;
-use relay_utils::{
- metrics::MetricsParams,
- relay_loop::{reconnect_failed_client, RECONNECT_DELAY},
- FailedClient, MaybeConnectionError,
-};
+use relay_utils::{metrics::MetricsParams, FailedClient};
use std::{future::Future, time::Duration};
-/// The context needed for finding equivocations inside finality proofs and reporting them.
-struct EquivocationReportingContext {
- synced_header_hash: P::Hash,
- synced_verification_context: P::FinalityVerificationContext,
-}
-
-impl EquivocationReportingContext {
- /// Try to get the `EquivocationReportingContext` used by the target chain
- /// at the provided block.
- async fn try_read_from_target>(
- target_client: &TC,
- at: P::TargetNumber,
- ) -> Result, TC::Error> {
- let maybe_best_synced_header_hash = target_client.best_synced_header_hash(at).await?;
- Ok(match maybe_best_synced_header_hash {
- Some(best_synced_header_hash) => Some(EquivocationReportingContext {
- synced_header_hash: best_synced_header_hash,
- synced_verification_context: target_client
- .finality_verification_context(at)
- .await?,
- }),
- None => None,
- })
- }
-
- /// Update with the new context introduced by the `HeaderFinalityInfo` if any.
- fn update(&mut self, info: HeaderFinalityInfo
) {
- if let Some(new_verification_context) = info.new_verification_context {
- self.synced_header_hash = info.finality_proof.target_header_hash();
- self.synced_verification_context = new_verification_context;
- }
- }
-}
-
/// Equivocations detection loop state.
struct EquivocationDetectionLoop<
P: EquivocationDetectionPipeline,
@@ -85,34 +47,6 @@ struct EquivocationDetectionLoop<
impl, TC: TargetClient>
EquivocationDetectionLoop
{
- async fn handle_source_error(&mut self, e: SC::Error) {
- if e.is_connection_error() {
- reconnect_failed_client(
- FailedClient::Source,
- RECONNECT_DELAY,
- &mut self.source_client,
- &mut self.target_client,
- )
- .await;
- } else {
- async_std::task::sleep(RECONNECT_DELAY).await;
- }
- }
-
- async fn handle_target_error(&mut self, e: TC::Error) {
- if e.is_connection_error() {
- reconnect_failed_client(
- FailedClient::Target,
- RECONNECT_DELAY,
- &mut self.source_client,
- &mut self.target_client,
- )
- .await;
- } else {
- async_std::task::sleep(RECONNECT_DELAY).await;
- }
- }
-
async fn ensure_finality_proofs_stream(&mut self) {
match self.finality_proofs_stream.ensure_stream(&self.source_client).await {
Ok(_) => {},
@@ -124,7 +58,7 @@ impl, TC: TargetClient>
);
// Reconnect to the source client if needed
- self.handle_source_error(e).await
+ handle_client_error(&mut self.source_client, e).await;
},
}
}
@@ -140,116 +74,13 @@ impl, TC: TargetClient>
);
// Reconnect target client and move on
- self.handle_target_error(e).await;
-
- None
- },
- }
- }
-
- async fn build_equivocation_reporting_context(
- &mut self,
- block_num: P::TargetNumber,
- ) -> Option> {
- match EquivocationReportingContext::try_read_from_target(
- &self.target_client,
- block_num.saturating_sub(1.into()),
- )
- .await
- {
- Ok(Some(context)) => Some(context),
- Ok(None) => None,
- Err(e) => {
- log::error!(
- target: "bridge",
- "Could not read {} `EquivocationReportingContext` from {} at block {block_num}: {e:?}",
- P::SOURCE_NAME,
- P::TARGET_NAME,
- );
+ handle_client_error(&mut self.target_client, e).await;
- // Reconnect target client if needed and move on.
- self.handle_target_error(e).await;
None
},
}
}
- /// Try to get the finality info associated to the source headers synced with the target chain
- /// at the specified block.
- async fn synced_source_headers_at_target(
- &mut self,
- at: P::TargetNumber,
- ) -> Vec> {
- match self.target_client.synced_headers_finality_info(at).await {
- Ok(synced_headers) => synced_headers,
- Err(e) => {
- log::error!(
- target: "bridge",
- "Could not get {} headers synced to {} at block {at:?}",
- P::SOURCE_NAME,
- P::TARGET_NAME
- );
-
- // Reconnect in case of a connection error.
- self.handle_target_error(e).await;
- // And move on to the next block.
- vec![]
- },
- }
- }
-
- async fn report_equivocation(&mut self, at: P::Hash, equivocation: P::EquivocationProof) {
- match self.reporter.submit_report(&self.source_client, at, equivocation.clone()).await {
- Ok(_) => {},
- Err(e) => {
- log::error!(
- target: "bridge",
- "Could not submit equivocation report to {} for {equivocation:?}: {e:?}",
- P::SOURCE_NAME,
- );
-
- // Reconnect source client and move on
- self.handle_source_error(e).await;
- },
- }
- }
-
- async fn check_block(
- &mut self,
- block_num: P::TargetNumber,
- context: &mut EquivocationReportingContext,
- ) {
- let synced_headers = self.synced_source_headers_at_target(block_num).await;
-
- for synced_header in synced_headers {
- self.finality_proofs_buf.fill(&mut self.finality_proofs_stream);
-
- let equivocations = match P::EquivocationsFinder::find_equivocations(
- &context.synced_verification_context,
- &synced_header.finality_proof,
- self.finality_proofs_buf.buf().as_slice(),
- ) {
- Ok(equivocations) => equivocations,
- Err(e) => {
- log::error!(
- target: "bridge",
- "Could not search for equivocations in the finality proof \
- for source header {:?} synced at target block {block_num:?}: {e:?}",
- synced_header.finality_proof.target_header_hash()
- );
- continue
- },
- };
- for equivocation in equivocations {
- self.report_equivocation(context.synced_header_hash, equivocation).await;
- }
-
- self.finality_proofs_buf
- .prune(synced_header.finality_proof.target_header_number(), None);
- context.update(synced_header);
- }
- }
-
async fn do_run(&mut self, tick: Duration, exit_signal: impl Future) {
let exit_signal = exit_signal.fuse();
futures::pin_mut!(exit_signal);
@@ -273,15 +104,16 @@ impl, TC: TargetClient>
// Check the available blocks
let mut current_block_number = from;
while current_block_number <= until {
- let mut context =
- match self.build_equivocation_reporting_context(current_block_number).await {
- Some(context) => context,
- None => {
- current_block_number = current_block_number.saturating_add(1.into());
- continue
- },
- };
- self.check_block(current_block_number, &mut context).await;
+ self.finality_proofs_buf.fill(&mut self.finality_proofs_stream);
+ let block_checker = BlockChecker::new(current_block_number);
+ let _ = block_checker
+ .run(
+ &mut self.source_client,
+ &mut self.target_client,
+ &mut self.finality_proofs_buf,
+ &mut self.reporter,
+ )
+ .await;
current_block_number = current_block_number.saturating_add(1.into());
}
self.until_block_num = Some(current_block_number);
diff --git a/bridges/relays/equivocation/src/lib.rs b/bridges/relays/equivocation/src/lib.rs
index 6f9337483fda..bb1f40c13e6d 100644
--- a/bridges/relays/equivocation/src/lib.rs
+++ b/bridges/relays/equivocation/src/lib.rs
@@ -14,13 +14,17 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see .
+mod block_checker;
mod equivocation_loop;
mod reporter;
use async_trait::async_trait;
-use bp_header_chain::FindEquivocations;
+use bp_header_chain::{FinalityProof, FindEquivocations};
use finality_relay::{FinalityPipeline, SourceClientBase};
-use relay_utils::{relay_loop::Client as RelayClient, TransactionTracker};
+use relay_utils::{
+ relay_loop::{Client as RelayClient, RECONNECT_DELAY},
+ MaybeConnectionError, TransactionTracker,
+};
use std::fmt::Debug;
pub use equivocation_loop::run;
@@ -85,3 +89,45 @@ pub trait TargetClient: RelayClient {
at: P::TargetNumber,
) -> Result>, Self::Error>;
}
+
+/// The context needed for finding equivocations inside finality proofs and reporting them.
+struct EquivocationReportingContext {
+ pub synced_header_hash: P::Hash,
+ pub synced_verification_context: P::FinalityVerificationContext,
+}
+
+impl EquivocationReportingContext {
+ /// Try to get the `EquivocationReportingContext` used by the target chain
+ /// at the provided block.
+ pub async fn try_read_from_target>(
+ target_client: &TC,
+ at: P::TargetNumber,
+ ) -> Result, TC::Error> {
+ let maybe_best_synced_header_hash = target_client.best_synced_header_hash(at).await?;
+ Ok(match maybe_best_synced_header_hash {
+ Some(best_synced_header_hash) => Some(EquivocationReportingContext {
+ synced_header_hash: best_synced_header_hash,
+ synced_verification_context: target_client
+ .finality_verification_context(at)
+ .await?,
+ }),
+ None => None,
+ })
+ }
+
+ /// Update with the new context introduced by the `HeaderFinalityInfo` if any.
+ pub fn update(&mut self, info: HeaderFinalityInfo
) {
+ if let Some(new_verification_context) = info.new_verification_context {
+ self.synced_header_hash = info.finality_proof.target_header_hash();
+ self.synced_verification_context = new_verification_context;
+ }
+ }
+}
+
+async fn handle_client_error(client: &mut C, e: C::Error) {
+ if e.is_connection_error() {
+ client.reconnect_until_success(RECONNECT_DELAY).await;
+ } else {
+ async_std::task::sleep(RECONNECT_DELAY).await;
+ }
+}
diff --git a/bridges/relays/utils/src/relay_loop.rs b/bridges/relays/utils/src/relay_loop.rs
index dad7293de6d2..7105190a4583 100644
--- a/bridges/relays/utils/src/relay_loop.rs
+++ b/bridges/relays/utils/src/relay_loop.rs
@@ -35,6 +35,25 @@ pub trait Client: 'static + Clone + Send + Sync {
/// Try to reconnect to source node.
async fn reconnect(&mut self) -> Result<(), Self::Error>;
+
+ /// Try to reconnect to the source node in an infinite loop until it succeeds.
+ async fn reconnect_until_success(&mut self, delay: Duration) {
+ loop {
+ match self.reconnect().await {
+ Ok(()) => break,
+ Err(error) => {
+ log::warn!(
+ target: "bridge",
+ "Failed to reconnect to client. Going to retry in {}s: {:?}",
+ delay.as_secs(),
+ error,
+ );
+
+ async_std::task::sleep(delay).await;
+ },
+ }
+ }
+ }
}
#[async_trait]
@@ -226,44 +245,18 @@ impl LoopMetrics {
}
}
-/// Deal with the client who has returned connection error.
+/// Deal with the clients that have returned connection error.
pub async fn reconnect_failed_client(
failed_client: FailedClient,
reconnect_delay: Duration,
source_client: &mut impl Client,
target_client: &mut impl Client,
) {
- loop {
- async_std::task::sleep(reconnect_delay).await;
- if failed_client == FailedClient::Both || failed_client == FailedClient::Source {
- match source_client.reconnect().await {
- Ok(()) => (),
- Err(error) => {
- log::warn!(
- target: "bridge",
- "Failed to reconnect to source client. Going to retry in {}s: {:?}",
- reconnect_delay.as_secs(),
- error,
- );
- continue
- },
- }
- }
- if failed_client == FailedClient::Both || failed_client == FailedClient::Target {
- match target_client.reconnect().await {
- Ok(()) => (),
- Err(error) => {
- log::warn!(
- target: "bridge",
- "Failed to reconnect to target client. Going to retry in {}s: {:?}",
- reconnect_delay.as_secs(),
- error,
- );
- continue
- },
- }
- }
+ if failed_client == FailedClient::Source || failed_client == FailedClient::Both {
+ source_client.reconnect_until_success(reconnect_delay).await;
+ }
- break
+ if failed_client == FailedClient::Target || failed_client == FailedClient::Both {
+ target_client.reconnect_until_success(reconnect_delay).await;
}
}