From 2ae886506da4ab3662bfaeedd72d2e03a34f5937 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Thu, 17 Sep 2020 15:51:33 +0300 Subject: [PATCH] Relay receiving + processing confirmations (#351) * relay receiving + processing confirmations * fmt && clippy * removed message processing race * remove more traces * generic args names --- bridges/relays/ethereum/src/main.rs | 1 + bridges/relays/ethereum/src/message_lane.rs | 16 +- .../relays/ethereum/src/message_lane_loop.rs | 103 ++++++++++-- .../ethereum/src/message_race_delivery.rs | 102 ++++++++---- .../ethereum/src/message_race_receiving.rs | 152 ++++++++++++++++++ 5 files changed, 328 insertions(+), 46 deletions(-) create mode 100644 bridges/relays/ethereum/src/message_race_receiving.rs diff --git a/bridges/relays/ethereum/src/main.rs b/bridges/relays/ethereum/src/main.rs index 82dfb08209c2..16f9d3e79c2f 100644 --- a/bridges/relays/ethereum/src/main.rs +++ b/bridges/relays/ethereum/src/main.rs @@ -31,6 +31,7 @@ mod message_lane; mod message_lane_loop; mod message_race_delivery; mod message_race_loop; +mod message_race_receiving; mod metrics; mod rpc; mod rpc_errors; diff --git a/bridges/relays/ethereum/src/message_lane.rs b/bridges/relays/ethereum/src/message_lane.rs index 7ca8d2e9fb98..0635456c4064 100644 --- a/bridges/relays/ethereum/src/message_lane.rs +++ b/bridges/relays/ethereum/src/message_lane.rs @@ -17,11 +17,11 @@ //! One-way message lane types. Within single one-way lane we have three 'races' where we try to: //! //! 1) relay new messages from source to target node; -//! 2) relay proof-of-receiving from target to source node; -//! 3) relay proof-of-processing from target no source node. +//! 2) relay proof-of-receiving from target to source node. use crate::utils::HeaderId; +use num_traits::{One, Zero}; use std::fmt::Debug; /// One-way message lane. @@ -32,10 +32,20 @@ pub trait MessageLane { const TARGET_NAME: &'static str; /// Message nonce type. - type MessageNonce: Clone + Copy + Debug + Default + From + Ord + std::ops::Add; + type MessageNonce: Clone + + Copy + + Debug + + Default + + From + + Ord + + std::ops::Add + + One + + Zero; /// Messages proof. type MessagesProof: Clone; + /// Messages receiving proof. + type MessagesReceivingProof: Clone; /// Number of the source header. type SourceHeaderNumber: Clone + Debug + Default + Ord + PartialEq; diff --git a/bridges/relays/ethereum/src/message_lane_loop.rs b/bridges/relays/ethereum/src/message_lane_loop.rs index 03cc2929b3a7..0618382b8395 100644 --- a/bridges/relays/ethereum/src/message_lane_loop.rs +++ b/bridges/relays/ethereum/src/message_lane_loop.rs @@ -29,6 +29,7 @@ use crate::message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf}; use crate::message_race_delivery::run as run_message_delivery_race; +use crate::message_race_receiving::run as run_message_receiving_race; use crate::utils::{interval, process_future_result, retry_backoff, FailedClient, MaybeConnectionError}; use async_trait::async_trait; @@ -52,6 +53,11 @@ pub trait SourceClient: Clone { &self, id: SourceHeaderIdOf

, ) -> Result<(SourceHeaderIdOf

, P::MessageNonce), Self::Error>; + /// Get nonce of the latest message, which receiving has been confirmed by the target chain. + async fn latest_confirmed_received_nonce( + &self, + id: SourceHeaderIdOf

, + ) -> Result<(SourceHeaderIdOf

, P::MessageNonce), Self::Error>; /// Prove messages in inclusive range [begin; end]. async fn prove_messages( @@ -59,6 +65,13 @@ pub trait SourceClient: Clone { id: SourceHeaderIdOf

, nonces: RangeInclusive, ) -> Result<(SourceHeaderIdOf

, RangeInclusive, P::MessagesProof), Self::Error>; + + /// Submit messages receiving proof. + async fn submit_messages_receiving_proof( + &self, + generated_at_block: TargetHeaderIdOf

, + proof: P::MessagesReceivingProof, + ) -> Result, Self::Error>; } /// Target client trait. @@ -73,12 +86,18 @@ pub trait TargetClient: Clone { /// Returns state of the client. async fn state(&self) -> Result, Self::Error>; - /// Get nonce of latest message, which receival has been confirmed. + /// Get nonce of latest received message. async fn latest_received_nonce( &self, id: TargetHeaderIdOf

, ) -> Result<(TargetHeaderIdOf

, P::MessageNonce), Self::Error>; + /// Prove messages receiving at given block. + async fn prove_messages_receiving( + &self, + id: TargetHeaderIdOf

, + ) -> Result<(TargetHeaderIdOf

, P::MessagesReceivingProof), Self::Error>; + /// Submit messages proof. async fn submit_messages_proof( &self, @@ -196,6 +215,19 @@ async fn run_until_connection_lost, TC: Targ ) .fuse(); + let ( + (receiving_source_state_sender, receiving_source_state_receiver), + (receiving_target_state_sender, receiving_target_state_receiver), + ) = (unbounded(), unbounded()); + let receiving_race_loop = run_message_receiving_race( + source_client.clone(), + receiving_source_state_receiver, + target_client.clone(), + receiving_target_state_receiver, + stall_timeout, + ) + .fuse(); + let exit_signal = exit_signal.fuse(); futures::pin_mut!( @@ -206,6 +238,7 @@ async fn run_until_connection_lost, TC: Targ target_go_offline_future, target_tick_stream, delivery_race_loop, + receiving_race_loop, exit_signal ); @@ -224,7 +257,8 @@ async fn run_until_connection_lost, TC: Targ P::SOURCE_NAME, new_source_state, ); - let _ = delivery_source_state_sender.unbounded_send(new_source_state); + let _ = delivery_source_state_sender.unbounded_send(new_source_state.clone()); + let _ = receiving_source_state_sender.unbounded_send(new_source_state.clone()); }, &mut source_go_offline_future, |delay| async_std::task::sleep(delay), @@ -250,7 +284,8 @@ async fn run_until_connection_lost, TC: Targ P::TARGET_NAME, new_target_state, ); - let _ = delivery_target_state_sender.unbounded_send(new_target_state); + let _ = delivery_target_state_sender.unbounded_send(new_target_state.clone()); + let _ = receiving_target_state_sender.unbounded_send(new_target_state.clone()); }, &mut target_go_offline_future, |delay| async_std::task::sleep(delay), @@ -270,6 +305,12 @@ async fn run_until_connection_lost, TC: Targ Err(err) => return Err(err), } }, + receiving_error = receiving_race_loop => { + match receiving_error { + Ok(_) => unreachable!("only ends with error; qed"), + Err(err) => return Err(err), + } + }, () = exit_signal => { return Ok(()); @@ -304,6 +345,7 @@ pub(crate) mod tests { pub type TestMessageNonce = u64; pub type TestMessagesProof = RangeInclusive; + pub type TestMessagesReceivingProof = TestMessageNonce; pub type TestSourceHeaderNumber = u64; pub type TestSourceHeaderHash = u64; @@ -333,7 +375,9 @@ pub(crate) mod tests { const TARGET_NAME: &'static str = "TestTarget"; type MessageNonce = TestMessageNonce; + type MessagesProof = TestMessagesProof; + type MessagesReceivingProof = TestMessagesReceivingProof; type SourceHeaderNumber = TestSourceHeaderNumber; type SourceHeaderHash = TestSourceHeaderHash; @@ -348,6 +392,8 @@ pub(crate) mod tests { is_source_reconnected: bool, source_state: SourceClientState, source_latest_generated_nonce: TestMessageNonce, + source_latest_confirmed_received_nonce: TestMessageNonce, + submitted_messages_receiving_proofs: Vec, is_target_fails: bool, is_target_reconnected: bool, target_state: SourceClientState, @@ -383,7 +429,6 @@ pub(crate) mod tests { Ok(data.source_state.clone()) } - /// Get nonce of instance of latest generated message. async fn latest_generated_nonce( &self, id: SourceHeaderIdOf, @@ -396,6 +441,15 @@ pub(crate) mod tests { Ok((id, data.source_latest_generated_nonce)) } + async fn latest_confirmed_received_nonce( + &self, + id: SourceHeaderIdOf, + ) -> Result<(SourceHeaderIdOf, TestMessageNonce), Self::Error> { + let mut data = self.data.lock(); + (self.tick)(&mut *data); + Ok((id, data.source_latest_confirmed_received_nonce)) + } + async fn prove_messages( &self, id: SourceHeaderIdOf, @@ -410,6 +464,18 @@ pub(crate) mod tests { > { Ok((id, nonces.clone(), nonces)) } + + async fn submit_messages_receiving_proof( + &self, + _generated_at_block: TargetHeaderIdOf, + proof: TestMessagesReceivingProof, + ) -> Result, Self::Error> { + let mut data = self.data.lock(); + (self.tick)(&mut *data); + data.submitted_messages_receiving_proofs.push(proof); + data.source_latest_confirmed_received_nonce = proof; + Ok(proof..=proof) + } } #[derive(Clone)] @@ -452,7 +518,13 @@ pub(crate) mod tests { Ok((id, data.target_latest_received_nonce)) } - /// Submit messages proof. + async fn prove_messages_receiving( + &self, + id: TargetHeaderIdOf, + ) -> Result<(TargetHeaderIdOf, TestMessagesReceivingProof), Self::Error> { + Ok((id, self.data.lock().target_latest_received_nonce)) + } + async fn submit_messages_proof( &self, _generated_at_header: SourceHeaderIdOf, @@ -570,16 +642,22 @@ pub(crate) mod tests { }, Arc::new(|_: &mut TestClientData| {}), Arc::new(move |data: &mut TestClientData| { - if data.target_state.best_peer.0 < 10 { + // syncing source headers -> target chain (by one) + if data.target_state.best_peer.0 < data.source_state.best_self.0 { data.target_state.best_peer = HeaderId(data.target_state.best_peer.0 + 1, data.target_state.best_peer.0 + 1); } - if data - .submitted_messages_proofs - .last() - .map(|last| *last.end() == 10) - .unwrap_or(false) - { + // syncing source headers -> target chain (all at once) + if data.source_state.best_peer.0 < data.target_state.best_self.0 { + data.source_state.best_peer = data.target_state.best_self; + } + // if target has received all messages => increase target block so that confirmations may be sent + if data.target_latest_received_nonce == 10 { + data.target_state.best_self = + HeaderId(data.source_state.best_self.0 + 1, data.source_state.best_self.0 + 1); + } + // if source has received all messages receiving confirmations => increase source block so that confirmations may be sent + if data.source_latest_confirmed_received_nonce == 10 { exit_sender.unbounded_send(()).unwrap(); } }), @@ -587,5 +665,6 @@ pub(crate) mod tests { ); assert_eq!(result.submitted_messages_proofs, vec![1..=4, 5..=8, 9..=10],); + assert!(!result.submitted_messages_receiving_proofs.is_empty()); } } diff --git a/bridges/relays/ethereum/src/message_race_delivery.rs b/bridges/relays/ethereum/src/message_race_delivery.rs index a0039909789f..d55b8353ee4d 100644 --- a/bridges/relays/ethereum/src/message_race_delivery.rs +++ b/bridges/relays/ethereum/src/message_race_delivery.rs @@ -19,10 +19,11 @@ use crate::message_lane_loop::{ TargetClientState, }; use crate::message_race_loop::{MessageRace, RaceState, RaceStrategy, SourceClient, TargetClient}; -use crate::utils::FailedClient; +use crate::utils::{FailedClient, HeaderId}; use async_trait::async_trait; use futures::stream::FusedStream; +use num_traits::{One, Zero}; use std::{collections::VecDeque, marker::PhantomData, ops::RangeInclusive, time::Duration}; /// Maximal number of messages to relay in single transaction. @@ -48,7 +49,7 @@ pub async fn run( }, target_state_updates, stall_timeout, - MessageDeliveryStrategy::

::default(), + MessageDeliveryStrategy::

::new(MAX_MESSAGES_TO_RELAY_IN_SINGLE_TX.into()), ) .await } @@ -135,34 +136,60 @@ where } } -/// Message delivery strategy. -struct MessageDeliveryStrategy { +/// Messages delivery strategy. +type MessageDeliveryStrategy

= DeliveryStrategy< +

::SourceHeaderNumber, +

::SourceHeaderHash, +

::TargetHeaderNumber, +

::TargetHeaderHash, +

::MessageNonce, +

::MessagesProof, +>; + +/// Nonces delivery strategy. +#[derive(Debug)] +pub struct DeliveryStrategy { /// All queued nonces. - source_queue: VecDeque<(SourceHeaderIdOf

, P::MessageNonce)>, + source_queue: VecDeque<(HeaderId, Nonce)>, /// Best nonce known to target node. - target_nonce: P::MessageNonce, + target_nonce: Nonce, + /// Max nonces to relay in single transaction. + max_nonces_to_relay_in_single_tx: Nonce, /// Unused generic types dump. - _phantom: PhantomData

, + _phantom: PhantomData<(TargetHeaderNumber, TargetHeaderHash, Proof)>, } -impl Default for MessageDeliveryStrategy

{ - fn default() -> Self { - MessageDeliveryStrategy { +impl + DeliveryStrategy +{ + /// Create new delivery strategy. + pub fn new(max_nonces_to_relay_in_single_tx: Nonce) -> Self { + DeliveryStrategy { source_queue: VecDeque::new(), target_nonce: Default::default(), + max_nonces_to_relay_in_single_tx, _phantom: Default::default(), } } } -impl RaceStrategy, TargetHeaderIdOf

, P::MessageNonce, P::MessagesProof> - for MessageDeliveryStrategy

+impl + RaceStrategy< + HeaderId, + HeaderId, + Nonce, + Proof, + > for DeliveryStrategy +where + SourceHeaderHash: Clone, + SourceHeaderNumber: Clone + Ord, + Nonce: Clone + Copy + From + Ord + std::ops::Add + One + Zero, { fn is_empty(&self) -> bool { self.source_queue.is_empty() } - fn source_nonce_updated(&mut self, at_block: SourceHeaderIdOf

, nonce: P::MessageNonce) { + fn source_nonce_updated(&mut self, at_block: HeaderId, nonce: Nonce) { if nonce <= self.target_nonce { return; } @@ -178,8 +205,13 @@ impl RaceStrategy, TargetHeaderIdOf

, P::M fn target_nonce_updated( &mut self, - nonce: P::MessageNonce, - race_state: &mut RaceState, TargetHeaderIdOf

, P::MessageNonce, P::MessagesProof>, + nonce: Nonce, + race_state: &mut RaceState< + HeaderId, + HeaderId, + Nonce, + Proof, + >, ) { if nonce < self.target_nonce { return; @@ -216,8 +248,13 @@ impl RaceStrategy, TargetHeaderIdOf

, P::M fn select_nonces_to_deliver( &mut self, - race_state: &RaceState, TargetHeaderIdOf

, P::MessageNonce, P::MessagesProof>, - ) -> Option> { + race_state: &RaceState< + HeaderId, + HeaderId, + Nonce, + Proof, + >, + ) -> Option> { // if we have already selected nonces that we want to submit, do nothing if race_state.nonces_to_submit.is_some() { return None; @@ -229,18 +266,19 @@ impl RaceStrategy, TargetHeaderIdOf

, P::M } // 1) we want to deliver all nonces, starting from `target_nonce + 1` - // 2) we want to deliver at most `MAX_MESSAGES_TO_RELAY_IN_SINGLE_TX` nonces in this batch + // 2) we want to deliver at most `self.max_nonces_to_relay_in_single_tx` nonces in this batch // 3) we can't deliver new nonce until header, that has emitted this nonce, is finalized // by target client let nonces_begin = self.target_nonce + 1.into(); let best_header_at_target = &race_state.target_state.as_ref()?.best_peer; let mut nonces_end = None; - for i in 0..MAX_MESSAGES_TO_RELAY_IN_SINGLE_TX { - let nonce = nonces_begin + i.into(); + let mut i = Zero::zero(); + while i < self.max_nonces_to_relay_in_single_tx { + let nonce = nonces_begin + i; // if queue is empty, we don't need to prove anything let (first_queued_at, first_queued_nonce) = match self.source_queue.front() { - Some((first_queued_at, first_queued_nonce)) => (first_queued_at.clone(), *first_queued_nonce), + Some((first_queued_at, first_queued_nonce)) => ((*first_queued_at).clone(), *first_queued_nonce), None => break, }; @@ -257,6 +295,8 @@ impl RaceStrategy, TargetHeaderIdOf

, P::M if nonce == first_queued_nonce { self.source_queue.pop_front(); } + + i = i + One::one(); } nonces_end.map(|nonces_end| RangeInclusive::new(nonces_begin, nonces_end)) @@ -273,7 +313,7 @@ mod tests { #[test] fn strategy_is_empty_works() { - let mut strategy = MessageDeliveryStrategy::::default(); + let mut strategy = MessageDeliveryStrategy::::new(4); assert_eq!(strategy.is_empty(), true); strategy.source_nonce_updated(header_id(1), 1); assert_eq!(strategy.is_empty(), false); @@ -281,7 +321,7 @@ mod tests { #[test] fn source_nonce_is_never_lower_than_known_target_nonce() { - let mut strategy = MessageDeliveryStrategy::::default(); + let mut strategy = MessageDeliveryStrategy::::new(4); strategy.target_nonce_updated(10, &mut Default::default()); strategy.source_nonce_updated(header_id(1), 5); assert_eq!(strategy.source_queue, vec![]); @@ -289,7 +329,7 @@ mod tests { #[test] fn source_nonce_is_never_lower_than_latest_known_source_nonce() { - let mut strategy = MessageDeliveryStrategy::::default(); + let mut strategy = MessageDeliveryStrategy::::new(4); strategy.source_nonce_updated(header_id(1), 5); strategy.source_nonce_updated(header_id(2), 3); strategy.source_nonce_updated(header_id(2), 5); @@ -298,7 +338,7 @@ mod tests { #[test] fn target_nonce_is_never_lower_than_latest_known_target_nonce() { - let mut strategy = MessageDeliveryStrategy::::default(); + let mut strategy = MessageDeliveryStrategy::::new(4); strategy.target_nonce_updated(10, &mut Default::default()); strategy.target_nonce_updated(5, &mut Default::default()); assert_eq!(strategy.target_nonce, 10); @@ -306,7 +346,7 @@ mod tests { #[test] fn updated_target_nonce_removes_queued_entries() { - let mut strategy = MessageDeliveryStrategy::::default(); + let mut strategy = MessageDeliveryStrategy::::new(4); strategy.source_nonce_updated(header_id(1), 5); strategy.source_nonce_updated(header_id(2), 10); strategy.source_nonce_updated(header_id(3), 15); @@ -318,7 +358,7 @@ mod tests { #[test] fn selected_nonces_are_dropped_on_target_nonce_update() { let mut state = RaceState::default(); - let mut strategy = MessageDeliveryStrategy::::default(); + let mut strategy = MessageDeliveryStrategy::::new(4); state.nonces_to_submit = Some((header_id(1), 5..=10, 5..=10)); strategy.target_nonce_updated(7, &mut state); assert!(state.nonces_to_submit.is_some()); @@ -329,7 +369,7 @@ mod tests { #[test] fn submitted_nonces_are_dropped_on_target_nonce_update() { let mut state = RaceState::default(); - let mut strategy = MessageDeliveryStrategy::::default(); + let mut strategy = MessageDeliveryStrategy::::new(4); state.nonces_submitted = Some(5..=10); strategy.target_nonce_updated(7, &mut state); assert!(state.nonces_submitted.is_some()); @@ -340,7 +380,7 @@ mod tests { #[test] fn nothing_is_selected_if_something_is_already_selected() { let mut state = RaceState::default(); - let mut strategy = MessageDeliveryStrategy::::default(); + let mut strategy = MessageDeliveryStrategy::::new(4); state.nonces_to_submit = Some((header_id(1), 1..=10, 1..=10)); strategy.source_nonce_updated(header_id(1), 10); assert_eq!(strategy.select_nonces_to_deliver(&state), None); @@ -349,7 +389,7 @@ mod tests { #[test] fn nothing_is_selected_if_something_is_already_submitted() { let mut state = RaceState::default(); - let mut strategy = MessageDeliveryStrategy::::default(); + let mut strategy = MessageDeliveryStrategy::::new(4); state.nonces_submitted = Some(1..=10); strategy.source_nonce_updated(header_id(1), 10); assert_eq!(strategy.select_nonces_to_deliver(&state), None); @@ -358,7 +398,7 @@ mod tests { #[test] fn select_nonces_to_deliver_works() { let mut state = RaceState::<_, _, TestMessageNonce, TestMessagesProof>::default(); - let mut strategy = MessageDeliveryStrategy::::default(); + let mut strategy = MessageDeliveryStrategy::::new(4); strategy.source_nonce_updated(header_id(1), 1); strategy.source_nonce_updated(header_id(2), 2); strategy.source_nonce_updated(header_id(3), 6); diff --git a/bridges/relays/ethereum/src/message_race_receiving.rs b/bridges/relays/ethereum/src/message_race_receiving.rs new file mode 100644 index 000000000000..a50b713f0bd7 --- /dev/null +++ b/bridges/relays/ethereum/src/message_race_receiving.rs @@ -0,0 +1,152 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +use crate::message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf}; +use crate::message_lane_loop::{ + SourceClient as MessageLaneSourceClient, SourceClientState, TargetClient as MessageLaneTargetClient, + TargetClientState, +}; +use crate::message_race_delivery::DeliveryStrategy; +use crate::message_race_loop::{MessageRace, SourceClient, TargetClient}; +use crate::utils::FailedClient; + +use async_trait::async_trait; +use futures::stream::FusedStream; +use std::{marker::PhantomData, ops::RangeInclusive, time::Duration}; + +/// Message receiving confirmations delivery strategy. +type ReceivingConfirmationsDeliveryStrategy

= DeliveryStrategy< +

::TargetHeaderNumber, +

::TargetHeaderHash, +

::SourceHeaderNumber, +

::SourceHeaderHash, +

::MessageNonce, +

::MessagesReceivingProof, +>; + +/// Run receiving confirmations race. +pub async fn run( + source_client: impl MessageLaneSourceClient

, + source_state_updates: impl FusedStream>, + target_client: impl MessageLaneTargetClient

, + target_state_updates: impl FusedStream>, + stall_timeout: Duration, +) -> Result<(), FailedClient> { + crate::message_race_loop::run( + ReceivingConfirmationsRaceSource { + client: target_client, + _phantom: Default::default(), + }, + target_state_updates, + ReceivingConfirmationsRaceTarget { + client: source_client, + _phantom: Default::default(), + }, + source_state_updates, + stall_timeout, + ReceivingConfirmationsDeliveryStrategy::

::new(std::u32::MAX.into()), + ) + .await +} + +/// Messages receiving confirmations race. +struct ReceivingConfirmationsRace

(std::marker::PhantomData

); + +impl MessageRace for ReceivingConfirmationsRace

{ + type SourceHeaderId = TargetHeaderIdOf

; + type TargetHeaderId = SourceHeaderIdOf

; + + type MessageNonce = P::MessageNonce; + type Proof = P::MessagesReceivingProof; + + fn source_name() -> String { + format!("{}::ReceivingConfirmationsDelivery", P::SOURCE_NAME) + } + + fn target_name() -> String { + format!("{}::ReceivingConfirmationsDelivery", P::TARGET_NAME) + } +} + +/// Message receiving confirmations race source, which is a target of the lane. +struct ReceivingConfirmationsRaceSource { + client: C, + _phantom: PhantomData

, +} + +#[async_trait(?Send)] +impl SourceClient> for ReceivingConfirmationsRaceSource +where + P: MessageLane, + C: MessageLaneTargetClient

, +{ + type Error = C::Error; + + async fn latest_nonce( + &self, + at_block: TargetHeaderIdOf

, + ) -> Result<(TargetHeaderIdOf

, P::MessageNonce), Self::Error> { + self.client.latest_received_nonce(at_block).await + } + + async fn generate_proof( + &self, + at_block: TargetHeaderIdOf

, + nonces: RangeInclusive, + ) -> Result< + ( + TargetHeaderIdOf

, + RangeInclusive, + P::MessagesReceivingProof, + ), + Self::Error, + > { + self.client + .prove_messages_receiving(at_block) + .await + .map(|(at_block, proof)| (at_block, nonces, proof)) + } +} + +/// Message receiving confirmations race target, which is a source of the lane. +struct ReceivingConfirmationsRaceTarget { + client: C, + _phantom: PhantomData

, +} + +#[async_trait(?Send)] +impl TargetClient> for ReceivingConfirmationsRaceTarget +where + P: MessageLane, + C: MessageLaneSourceClient

, +{ + type Error = C::Error; + + async fn latest_nonce( + &self, + at_block: SourceHeaderIdOf

, + ) -> Result<(SourceHeaderIdOf

, P::MessageNonce), Self::Error> { + self.client.latest_confirmed_received_nonce(at_block).await + } + + async fn submit_proof( + &self, + generated_at_block: TargetHeaderIdOf

, + _nonces: RangeInclusive, + proof: P::MessagesReceivingProof, + ) -> Result, Self::Error> { + self.client + .submit_messages_receiving_proof(generated_at_block, proof) + .await + } +}