Skip to content

Commit

Permalink
Message relay fixes for nightly stuck (paritytech#532)
Browse files Browse the repository at this point in the history
* fixed missing else

* really wake up when retry timeout is completed

* do not query weights if target nonce is unknown

* fix compilation
  • Loading branch information
svyatonik authored and serban300 committed Apr 9, 2024
1 parent 944a936 commit b8f6e32
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 47 deletions.
3 changes: 3 additions & 0 deletions bridges/relays/messages-relay/src/message_lane_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ pub trait SourceClient<P: MessageLane>: Clone + Send + Sync {
) -> Result<(SourceHeaderIdOf<P>, MessageNonce), Self::Error>;

/// Returns mapping of message nonces, generated on this client, to their weights.
///
/// Some weights may be missing from returned map, if corresponding messages were pruned at
/// the source chain.
async fn generated_messages_weights(
&self,
id: SourceHeaderIdOf<P>,
Expand Down
4 changes: 2 additions & 2 deletions bridges/relays/messages-relay/src/message_race_delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,11 @@ impl<P: MessageLane> RaceStrategy<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::M
self.strategy.is_empty()
}

fn best_at_source(&self) -> MessageNonce {
fn best_at_source(&self) -> Option<MessageNonce> {
self.strategy.best_at_source()
}

fn best_at_target(&self) -> MessageNonce {
fn best_at_target(&self) -> Option<MessageNonce> {
self.strategy.best_at_target()
}

Expand Down
31 changes: 24 additions & 7 deletions bridges/relays/messages-relay/src/message_race_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,14 @@ pub trait RaceStrategy<SourceHeaderId, TargetHeaderId, Proof> {
/// Should return true if nothing has to be synced.
fn is_empty(&self) -> bool;
/// Return best nonce at source node.
fn best_at_source(&self) -> MessageNonce;
///
/// `Some` is returned only if we are sure that the value is greater or equal
/// than the result of `best_at_target`.
fn best_at_source(&self) -> Option<MessageNonce>;
/// Return best nonce at target node.
fn best_at_target(&self) -> MessageNonce;
///
/// May return `None` if value is yet unknown.
fn best_at_target(&self) -> Option<MessageNonce>;

/// Called when nonces are updated at source node of the race.
fn source_nonces_updated(&mut self, at_block: SourceHeaderId, nonces: SourceClientNonces<Self::SourceNoncesRange>);
Expand Down Expand Up @@ -334,7 +339,15 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>>(
async_std::task::sleep,
|| format!("Error submitting proof {}", P::target_name()),
).fail_if_connection_error(FailedClient::Target)?;
}
},

// when we're ready to retry request
_ = source_go_offline_future => {
source_client_is_online = true;
},
_ = target_go_offline_future => {
target_client_is_online = true;
},
}

progress_context = print_race_progress::<P, _>(progress_context, &strategy);
Expand All @@ -350,6 +363,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>>(
source_client_is_online = false;

let nonces_to_deliver = select_nonces_to_deliver(&race_state, &mut strategy);
let best_at_source = strategy.best_at_source();

if let Some((at_block, nonces_range, proof_parameters)) = nonces_to_deliver {
log::debug!(
Expand All @@ -364,7 +378,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>>(
.generate_proof(at_block, nonces_range, proof_parameters)
.fuse(),
);
} else if source_nonces_required {
} else if source_nonces_required && best_at_source.is_some() {
log::debug!(target: "bridge", "Asking {} about message nonces", P::source_name());
let at_block = race_state
.best_finalized_source_header_id_at_source
Expand All @@ -374,7 +388,11 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>>(
best_finalized_source_header_id_at_source is Some; qed",
)
.clone();
source_nonces.set(race_source.nonces(at_block, strategy.best_at_source()).fuse());
source_nonces.set(
race_source
.nonces(at_block, best_at_source.expect("guaranteed by if condition; qed"))
.fuse(),
);
} else {
source_client_is_online = true;
}
Expand All @@ -395,8 +413,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>>(
.submit_proof(at_block.clone(), nonces_range.clone(), proof.clone())
.fuse(),
);
}
if target_nonces_required {
} else if target_nonces_required {
log::debug!(target: "bridge", "Asking {} about message nonces", P::target_name());
let at_block = race_state
.best_target_header_id
Expand Down
59 changes: 37 additions & 22 deletions bridges/relays/messages-relay/src/message_race_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ pub struct BasicStrategy<
> {
/// All queued nonces.
source_queue: VecDeque<(HeaderId<SourceHeaderHash, SourceHeaderNumber>, SourceNoncesRange)>,
/// Best nonce known to target node.
target_nonce: MessageNonce,
/// Best nonce known to target node. `None` if it has not been received yet.
target_nonce: Option<MessageNonce>,
/// Unused generic types dump.
_phantom: PhantomData<(TargetHeaderNumber, TargetHeaderHash, Proof)>,
}
Expand All @@ -52,7 +52,7 @@ where
pub fn new() -> Self {
BasicStrategy {
source_queue: VecDeque::new(),
target_nonce: Default::default(),
target_nonce: None,
_phantom: Default::default(),
}
}
Expand All @@ -74,6 +74,9 @@ where
>,
mut selector: impl FnMut(SourceNoncesRange) -> Option<SourceNoncesRange>,
) -> Option<RangeInclusive<MessageNonce>> {
// if we do not know best nonce at target node, we can't select anything
let target_nonce = self.target_nonce?;

// if we have already selected nonces that we want to submit, do nothing
if race_state.nonces_to_submit.is_some() {
return None;
Expand Down Expand Up @@ -128,7 +131,7 @@ where
}
}

nonces_end.map(|nonces_end| RangeInclusive::new(self.target_nonce + 1, nonces_end))
nonces_end.map(|nonces_end| RangeInclusive::new(target_nonce + 1, nonces_end))
}
}

Expand All @@ -147,17 +150,16 @@ where
self.source_queue.is_empty()
}

fn best_at_source(&self) -> MessageNonce {
std::cmp::max(
self.source_queue
.back()
.map(|(_, range)| range.end())
.unwrap_or(self.target_nonce),
self.target_nonce,
)
fn best_at_source(&self) -> Option<MessageNonce> {
let best_in_queue = self.source_queue.back().map(|(_, range)| range.end());
match (best_in_queue, self.target_nonce) {
(Some(best_in_queue), Some(target_nonce)) if best_in_queue > target_nonce => Some(best_in_queue),
(_, Some(target_nonce)) => Some(target_nonce),
(_, None) => None,
}
}

fn best_at_target(&self) -> MessageNonce {
fn best_at_target(&self) -> Option<MessageNonce> {
self.target_nonce
}

Expand All @@ -166,11 +168,16 @@ where
at_block: HeaderId<SourceHeaderHash, SourceHeaderNumber>,
nonces: SourceClientNonces<SourceNoncesRange>,
) {
let prev_best_at_source = self.best_at_source();
let best_in_queue = self
.source_queue
.back()
.map(|(_, range)| range.end())
.or(self.target_nonce)
.unwrap_or_default();
self.source_queue.extend(
nonces
.new_nonces
.greater_than(prev_best_at_source)
.greater_than(best_in_queue)
.into_iter()
.map(move |range| (at_block.clone(), range)),
)
Expand All @@ -187,8 +194,10 @@ where
) {
let nonce = nonces.latest_nonce;

if nonce < self.target_nonce {
return;
if let Some(target_nonce) = self.target_nonce {
if nonce < target_nonce {
return;
}
}

while let Some(true) = self.source_queue.front().map(|(_, range)| range.begin() <= nonce) {
Expand Down Expand Up @@ -220,7 +229,7 @@ where
race_state.nonces_submitted = None;
}

self.target_nonce = nonce;
self.target_nonce = Some(nonce);
}

fn select_nonces_to_deliver(
Expand Down Expand Up @@ -278,12 +287,12 @@ mod tests {
#[test]
fn best_at_source_is_never_lower_than_target_nonce() {
let mut strategy = BasicStrategy::<TestMessageLane>::new();
assert_eq!(strategy.best_at_source(), 0);
assert_eq!(strategy.best_at_source(), None);
strategy.source_nonces_updated(header_id(1), source_nonces(1..=5));
assert_eq!(strategy.best_at_source(), 5);
assert_eq!(strategy.best_at_source(), None);
strategy.target_nonces_updated(target_nonces(10), &mut Default::default());
assert_eq!(strategy.source_queue, vec![]);
assert_eq!(strategy.best_at_source(), 10);
assert_eq!(strategy.best_at_source(), Some(10));
}

#[test]
Expand All @@ -306,9 +315,11 @@ mod tests {
#[test]
fn target_nonce_is_never_lower_than_latest_known_target_nonce() {
let mut strategy = BasicStrategy::<TestMessageLane>::new();
assert_eq!(strategy.target_nonce, None);
strategy.target_nonces_updated(target_nonces(10), &mut Default::default());
assert_eq!(strategy.target_nonce, Some(10));
strategy.target_nonces_updated(target_nonces(5), &mut Default::default());
assert_eq!(strategy.target_nonce, 10);
assert_eq!(strategy.target_nonce, Some(10));
}

#[test]
Expand Down Expand Up @@ -351,6 +362,7 @@ mod tests {
let mut state = RaceState::default();
let mut strategy = BasicStrategy::<TestMessageLane>::new();
state.nonces_to_submit = Some((header_id(1), 1..=10, (1..=10, None)));
strategy.target_nonces_updated(target_nonces(0), &mut state);
strategy.source_nonces_updated(header_id(1), source_nonces(1..=10));
assert_eq!(strategy.select_nonces_to_deliver(&state), None);
}
Expand All @@ -360,6 +372,7 @@ mod tests {
let mut state = RaceState::default();
let mut strategy = BasicStrategy::<TestMessageLane>::new();
state.nonces_submitted = Some(1..=10);
strategy.target_nonces_updated(target_nonces(0), &mut state);
strategy.source_nonces_updated(header_id(1), source_nonces(1..=10));
assert_eq!(strategy.select_nonces_to_deliver(&state), None);
}
Expand All @@ -368,6 +381,7 @@ mod tests {
fn select_nonces_to_deliver_works() {
let mut state = RaceState::<_, _, TestMessagesProof>::default();
let mut strategy = BasicStrategy::<TestMessageLane>::new();
strategy.target_nonces_updated(target_nonces(0), &mut state);
strategy.source_nonces_updated(header_id(1), source_nonces(1..=1));
strategy.source_nonces_updated(header_id(2), source_nonces(2..=2));
strategy.source_nonces_updated(header_id(3), source_nonces(3..=6));
Expand All @@ -388,6 +402,7 @@ mod tests {
fn select_nonces_to_deliver_able_to_split_ranges_with_selector() {
let mut state = RaceState::<_, _, TestMessagesProof>::default();
let mut strategy = BasicStrategy::<TestMessageLane>::new();
strategy.target_nonces_updated(target_nonces(0), &mut state);
strategy.source_nonces_updated(header_id(1), source_nonces(1..=100));

state.best_finalized_source_header_id_at_source = Some(header_id(1));
Expand Down
Loading

0 comments on commit b8f6e32

Please sign in to comment.